Sqoop是一款开源的数据传输工具,将数据从存储空间导入HDFS,Hive , HBase等,完成数据接入工作。Sqoop本身调用的是MapReduce, 但只有MapTask, 且具有并行机制
1. 全量导入——将数据从MySQL(关系型数据库)导入HDFS
bin/sqoop import \
--connect jdbc:mysql://localhost:3306/test \
--username root \
--password 123456789 \
--delete-target-dir \
--target-dir /test \
--fields-terminated-by ',' \
--split-by 'XX' \
--table emp --m 2
- 连接mysql数据库种的DB, 利用用户名密码登陆
- 删除原导入目的路径
- 设立新的导入目的路径(HDFS中某个文件夹)
- table YYY 代表需要导入的表(test数据库中)
- fields-terminated-by ' Y ' 指明分隔符,默认按' , ' 分隔
- --m Z 代表并行度为Z ,指定了有多少个MapTask并行执行导入
- --split-by 'XX' 表示按字段切分,在--m 的参数2以上时,且被导入的表未指定主键时,必须使用该语句来指明根据哪个字段来切分,否则报错
2. 全量导入 —— 将数据从MySQL(关系型数据库)导入Hive
这里直接列出 直接复制表结构到Hive的方法,不是先导入表结构再导入数据
bin/sqoop import \
--connect jdbc:mysql://localhost:3306/test \
--username root \
--password **** \
--table XX \
--hive-import \
--m 2 \
--hive-database YY
其中 --hive-import 指的就是导入hive表,--hive-databse YY 代表将表XX导入Hive的数据库 YY,且在 Hive中创建的表名就是关系型数据库中的表名
3. 表子集导入 —— where过滤
全量导入时,只导入一部分,使用where语句完成过滤
bin/sqoop import \
--connect jdbc:mysql//localhost:3306/test \
--username root \
--password **** \
--where " AA = '' " \
--table YY \ --m 2 \
4. 表子集导入 —— query查询
使用 query ' ' 语句将查询的表子集导入
bin/sqoop import \
--connect jdbc:mysql//localhost:3306/test \
--username root \
--password **** \
--delete-target-dir \
--target-dir \
--query ' select XX,YY from XXX where YYY and $CONDITIONS '
--m 2
需要注意的是
- query 语句中必须 where 条件
- query 语句必须使用单引号而非双引号
- query 语句中的 where 条件必须存在,如果无条件则用 1=1,即条件永远成立来实现 ,同时必须带一个$CONDITION 字符串
- query查询时,不能使用 --table
5. 增量同步
Sqoop中完成增量同步有两种模式 append 和 lastmodified
--incremental append
--check-column XX
--last-value YY
--incremental 指定增量方式,是 append 还是 lastmodified
--check-column XX 指定增量字段,对于append模式,采用数值型字段(id, uuid这种),对于lastmodifed 采用时间类型字段(一般都是时间戳)
--last-value 指定上次同步到的位置,注意:--incremental append 模式下,新增从大于last-value的数据开始(--last-value对应数据的下一行,即不包含边界),--incremental lastmodified 模式下,新增从--last-value这一行本身开始,即包含边界
对于 lastmodified 模式,需要注意:有两种模式 --append XX 和 --merge-key XX
lastmodified + --append 是只更新增量
lastmodifed + --merge-key 是既更新增量又更新对原有数据的修改