上一篇文章中,我介绍了一种将Hive分析结果,通过存放到另外一个Hive表中,使用Sqoop将该表内容直接导入到MySQL中的方法。本人认为这种方式在数据量特别大的时候,可能很有效果,但是一般情况下,Hive的分析、查询、统计结果数据量不会太大,所以在这种情况下,我尝试使用Hive JDBC驱动连接Hive将查询结果集,通过MySQL JDBC驱动,直接导入到数据库中,并取得成功,速度也比Sqoop方式快了很多。
一、启动Hive元数据服务
[hadoopUser@secondmgt ~]$ hive --service metastore
Starting Hive Metastore Server
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.committer.job.setup.cleanup.needed is deprecated. Instead, use mapreduce.job.committer.setup.cleanup.needed
二、启动HiveServer2服务[hadoopUser@secondmgt ~]$ hive --service hiveserver2
Starting HiveServer2
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.committer.job.setup.cleanup.needed is deprecated. Instead, use mapreduce.job.committer.setup.cleanup.needed
三、Hive关联HBase数据库表部分数据查询,结果如下,HBase中目前存放有6649条数据
hive> select * from transjtxx_hbase;
32108800000000004620140317000817 02 03 苏K22F91 0.00 3 1 0 0
32108800000000004620140317000820 02 03 苏HP062H 0.00 6 1 0 0
32108800000000004620140317000823 02 03 苏KHD687 0.00 7 1 0 0
32108800000000004620140317001011 02 03 苏K42F88 0.00 4 1 0 0
32108800000000004620140317001158 02 03 苏KHD529 0.00 4 1 0 0
32108800000000004620140317001228 02 02 苏KPV670 0.00 2 1 0 0
32108800000000004620140317001349 02 03 苏K0Y787 0.00 4 1 0 0
32108800000000004620140317001357 02 03 苏KT9531 0.00 0 1 0 0
32108800000000004620140317001422 02 02 苏KK8682 0.00 1 1 0 0
32108800000000004620140317001423 02 02 苏KK8682 0.00 5 1 0 0
32108800000000004620140317001539 02 03 苏K8X833 0.00 0 1 0 0
32108800000000004620140317001542 02 03 苏KHP581 0.00 6 1 0 0
hive> select count(*) from transjtxx_hbase;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1428394594787_0026, Tracking URL = http://secondmgt:8088/proxy/application_1428394594787_0026/
Kill Command = /home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0/bin/hadoop job -kill job_1428394594787_0026
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2015-04-22 18:31:00,659 Stage-1 map = 0%, reduce = 0%
2015-04-22 18:31:17,216 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 13.09 sec
2015-04-22 18:31:29,679 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 16.54 sec
MapReduce Total cumulative CPU time: 16 seconds 540 msec
Ended Job = job_1428394594787_0026
MapReduce Jobs Launched:
Job 0: Map: 1 Reduce: 1 Cumulative CPU: 16.54 sec HDFS Read: 256 HDFS Write: 5 SUCCESS
Total MapReduce CPU Time Spent: 16 seconds 540 msec
OK
6649
Time taken: 45.464 seconds, Fetched: 1 row(s)
四、创建目标数据库及表创建MySQL目标数据库transport及表jtxx
mysql> show tables;
+---------------------+
| Tables_in_transport |
+---------------------+
| jtxx |
+---------------------+
1 row in set (0.00 sec)
五、代码部分六、查看MySQL导入结果1、Hive JDBC链接HiveSqlHelper类
package com.gxnzx.hive.util; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class HiveSqlHelper { private static String hiveDriverName="org.apache.hive.jdbc.HiveDriver"; private static String hiveUrl="jdbc:hive2://192.168.2.133:10000/hive"; private static String hiveUserName="hadoopUser"; private static String hivePassword=""; private static Connection conn=null; static{ try { Class.forName(hiveDriverName); } catch (ClassNotFoundException e) { e.printStackTrace(); } } public static Connection getConn(){ try { conn=DriverManager.getConnection(hiveUrl, hiveUserName, hivePassword); } catch (SQLException e) { e.printStackTrace(); } return conn; } }
2、MySQL数据库JDBC连接类DBSqlHelper3、交通基本信息Model类TransInfoimport java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; import com.gxnzx.hive.model.TransInfo; import com.mysql.jdbc.PreparedStatement; public class DBSqlHelper { private static String driverName="com.mysql.jdbc.Driver"; private static String url="jdbc:mysql://192.168.2.133:3306/transport"; private static String userName="hive"; private static String password="hive"; private static Connection conn=null; private static PreparedStatement ps=null; static{ try { Class.forName(driverName); } catch (ClassNotFoundException e) { e.printStackTrace(); } } public static Connection getConn(){ try { conn=DriverManager.getConnection(url, userName, password); } catch (SQLException e) { e.printStackTrace(); } return conn; } //批量添加 public static boolean addBatch(String sql,ArrayList<TransInfo> list){ boolean flag=false; try{ conn=DBSqlHelper.getConn(); //打开数据库连接 conn.setAutoCommit(flag); //关闭事务自动提交 ps=(PreparedStatement) conn.prepareStatement(sql); for(int i=0;i<list.size();i++){ TransInfo trans=list.get(i); sql="insert into jtxx values('"+trans.getClxxbh()+"','"+trans.getXsfx()+"','"+trans.getCdbh()+"'" + ",'"+trans.getHphm()+"','"+trans.getClsd()+"','"+trans.getCllx()+"','"+trans.getClbj()+"'" + ",'"+trans.getCllb()+"','"+trans.getWflx()+"')"; ps.addBatch(sql); //添加批量添加语句 } ps.executeBatch(); //批量添加 conn.commit(); //事务提交 flag=true; }catch(Exception e){ e.printStackTrace(); }finally{ try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } } return flag; } }
4、Main调用函数public class TransInfo { private String clxxbh; private String xsfx; private String cdbh; private String hphm; private String clsd; private String cllx; private String clbj; private String cllb; private String wflx; public String getClxxbh() { return clxxbh; } public void setClxxbh(String clxxbh) { this.clxxbh = clxxbh; } public String getXsfx() { return xsfx; } public void setXsfx(String xsfx) { this.xsfx = xsfx; } public String getCdbh() { return cdbh; } public void setCdbh(String cdbh) { this.cdbh = cdbh; } public String getHphm() { return hphm; } public void setHphm(String hphm) { this.hphm = hphm; } public String getClsd() { return clsd; } public void setClsd(String clsd) { this.clsd = clsd; } public String getCllx() { return cllx; } public void setCllx(String cllx) { this.cllx = cllx; } public String getClbj() { return clbj; } public void setClbj(String clbj) { this.clbj = clbj; } public String getCllb() { return cllb; } public void setCllb(String cllb) { this.cllb = cllb; } public String getWflx() { return wflx; } public void setWflx(String wflx) { this.wflx = wflx; } }
5、查看执行情况import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.TimeZone; import com.gxnzx.hive.model.TransInfo; import com.gxnzx.hive.util.DBSqlHelper; import com.gxnzx.hive.util.HiveSqlHelper; public class HiveServer2 { private static Connection conn=null; public static void main(String args[]){ try { SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS"); System.out.println(sdf.format(new Date())); conn=HiveSqlHelper.getConn(); //连接Hive Statement st=conn.createStatement(); String sql1="select * from transjtxx_hbase"; //执行Hive查询语句 ResultSet rs=st.executeQuery(sql1); ArrayList<TransInfo> list=new ArrayList<TransInfo>(); //将查询结果集存放在list集合中 while(rs.next()){ TransInfo info=new TransInfo(); info.setClxxbh(rs.getString(1)); info.setXsfx(rs.getString(2)); info.setCdbh(rs.getString(3)); info.setHphm(rs.getString(4)); info.setClsd(rs.getString(5)); info.setCllx(rs.getString(6)); info.setClbj(rs.getString(7)); info.setCllb(rs.getString(8)); info.setWflx(rs.getString(9)); list.add(info); } //执行批量导入操作 if(DBSqlHelper.addBatch("", list)){ System.out.println("success"); }else{ System.out.println("error"); } System.out.println(sdf.format(new Date())); } catch (SQLException e) { e.printStackTrace(); } } }
由执行情况来看,6649条件记录,从Hive通过Hadoop集群查询到最后导入到MySQL数据库中,大概花了3秒多,而如果是使用上一篇博文中的Sqoop方式则远远不止。当然这可能是数据量比较小的原因。
mysql> select count(*) from jtxx;
+----------+
| count(*) |
+----------+
| 6649 |
+----------+
1 row in set (0.00 sec)
mysql> select * from jtxx limit 10;
+----------------------------------+------+------+-------------+------+------+------+------+------+
| clxxbh | xsfx | cdbh | hphm | clsd | cllx | clbj | cllb | wflx |
+----------------------------------+------+------+-------------+------+------+------+------+------+
| 32100017000000000220140317000015 | 03 | 02 | 鲁Q58182 | 0.00 | 0 | 1 | 0 | 0 |
| 32100017000000000220140317000016 | 02 | 03 | 鲁QV4662 | 0.00 | 6 | 1 | 0 | 0 |
| 32100017000000000220140317000019 | 03 | 01 | 苏LL8128 | 0.00 | 1 | 1 | 0 | 0 |
| 32100017000000000220140317000020 | 02 | 02 | 苏CAH367 | 0.00 | 1 | 1 | 0 | 0 |
| 32100017000000000220140317000023 | 02 | 03 | 鲁Q7899W | 0.00 | 7 | 1 | 0 | 0 |
| 32100017000000000220140317000029 | 03 | 02 | 苏HN3819 | 0.00 | 0 | 1 | 0 | 0 |
| 32100017000000000220140317000038 | 03 | 02 | 鲁C01576 | 0.00 | 0 | 1 | 0 | 0 |
| 32100017000000000220140317000044 | 03 | 02 | 苏DT9178 | 0.00 | 6 | 1 | 0 | 0 |
| 32100017000000000220140317000049 | 03 | 01 | 苏LZ1112 | 0.00 | 7 | 1 | 0 | 0 |
| 32100017000000000220140317000052 | 03 | 01 | 苏K9795警 | 0.00 | 2 | 1 | 0 | 0 |
+----------------------------------+------+------+-------------+------+------+------+------+------+
10 rows in set (0.00 sec)
推荐阅读: Hive用户接口(二)—使用Hive JDBC驱动连接Hive操作实例 版权声明:本文为NIITYZU原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。