将Hive统计分析结果导入到MySQL数据库表中(二)——使用Hive和MySQL JDBC驱动

        上一篇文章中,我介绍了一种将Hive分析结果,通过存放到另外一个Hive表中,使用Sqoop将该表内容直接导入到MySQL中的方法。本人认为这种方式在数据量特别大的时候,可能很有效果,但是一般情况下,Hive的分析、查询、统计结果数据量不会太大,所以在这种情况下,我尝试使用Hive JDBC驱动连接Hive将查询结果集,通过MySQL JDBC驱动,直接导入到数据库中,并取得成功,速度也比Sqoop方式快了很多。

一、启动Hive元数据服务

[hadoopUser@secondmgt ~]$ hive --service metastore
 Starting Hive Metastore Server
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
15/04/22 14:53:12 INFO Configuration.deprecation: mapred.committer.job.setup.cleanup.needed is deprecated. Instead, use mapreduce.job.committer.setup.cleanup.needed
二、启动HiveServer2服务

[hadoopUser@secondmgt ~]$ hive --service hiveserver2
Starting HiveServer2
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.min.split.size.per.node is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.node
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.input.dir.recursive is deprecated. Instead, use mapreduce.input.fileinputformat.input.dir.recursive
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.min.split.size.per.rack is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize.per.rack
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
15/04/22 14:58:22 INFO Configuration.deprecation: mapred.committer.job.setup.cleanup.needed is deprecated. Instead, use mapreduce.job.committer.setup.cleanup.needed
三、Hive关联HBase数据库表

        部分数据查询,结果如下,HBase中目前存放有6649条数据

hive> select  * from transjtxx_hbase;
32108800000000004620140317000817    02   03      苏K22F91        0.00    3       1       0       0
32108800000000004620140317000820    02   03      苏HP062H        0.00    6       1       0       0
32108800000000004620140317000823    02   03      苏KHD687        0.00    7       1       0       0
32108800000000004620140317001011    02   03      苏K42F88        0.00    4       1       0       0
32108800000000004620140317001158    02   03      苏KHD529        0.00    4       1       0       0
32108800000000004620140317001228    02   02      苏KPV670        0.00    2       1       0       0
32108800000000004620140317001349    02   03      苏K0Y787        0.00    4       1       0       0
32108800000000004620140317001357    02   03      苏KT9531        0.00    0       1       0       0
32108800000000004620140317001422    02   02      苏KK8682        0.00    1       1       0       0
32108800000000004620140317001423    02   02      苏KK8682        0.00    5       1       0       0
32108800000000004620140317001539    02   03      苏K8X833        0.00    0       1       0       0
32108800000000004620140317001542    02   03      苏KHP581        0.00    6       1       0       0

hive> select  count(*) from transjtxx_hbase;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1428394594787_0026, Tracking URL = http://secondmgt:8088/proxy/application_1428394594787_0026/
Kill Command = /home/hadoopUser/cloud/hadoop/programs/hadoop-2.2.0/bin/hadoop job  -kill job_1428394594787_0026
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2015-04-22 18:31:00,659 Stage-1 map = 0%,  reduce = 0%
2015-04-22 18:31:17,216 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 13.09 sec
2015-04-22 18:31:29,679 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.54 sec
MapReduce Total cumulative CPU time: 16 seconds 540 msec
Ended Job = job_1428394594787_0026
MapReduce Jobs Launched:
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 16.54 sec   HDFS Read: 256 HDFS Write: 5 SUCCESS
Total MapReduce CPU Time Spent: 16 seconds 540 msec
OK
6649
Time taken: 45.464 seconds, Fetched: 1 row(s)

四、创建目标数据库及表

       创建MySQL目标数据库transport及表jtxx

mysql> show tables;
+---------------------+
| Tables_in_transport |
+---------------------+
| jtxx                |
+---------------------+
1 row in set (0.00 sec)
五、代码部分

1、Hive JDBC链接HiveSqlHelper类

package com.gxnzx.hive.util;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class HiveSqlHelper {

         private  static String hiveDriverName="org.apache.hive.jdbc.HiveDriver";
         private  static String hiveUrl="jdbc:hive2://192.168.2.133:10000/hive";
         private  static String hiveUserName="hadoopUser";
         private  static String hivePassword="";
         private  static Connection conn=null;

         static{
                try {
                         Class.forName(hiveDriverName);
                        } catch (ClassNotFoundException e) {
                                e.printStackTrace();
                        }
            }

           public static Connection getConn(){

                try {
                        conn=DriverManager.getConnection(hiveUrl, hiveUserName, hivePassword);

                        } catch (SQLException e) {
                                e.printStackTrace();
                        }
                return conn;
          }

}

2、MySQL数据库JDBC连接类DBSqlHelper

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;

import com.gxnzx.hive.model.TransInfo;
import com.mysql.jdbc.PreparedStatement;

public class DBSqlHelper {

         private  static String driverName="com.mysql.jdbc.Driver";
         private  static String url="jdbc:mysql://192.168.2.133:3306/transport";
         private  static String userName="hive";
         private  static String password="hive";

         private  static Connection conn=null;
         private  static PreparedStatement ps=null;


         static{

                try {
                                Class.forName(driverName);
                        } catch (ClassNotFoundException e) {
                                e.printStackTrace();
                        }
            }

           public static Connection getConn(){

                try {
                                conn=DriverManager.getConnection(url, userName, password);

                        } catch (SQLException e) {
                                e.printStackTrace();
                        }
                return conn;
          }

          //批量添加
          public static boolean addBatch(String sql,ArrayList<TransInfo> list){
                  boolean flag=false;
                  try{
                          conn=DBSqlHelper.getConn();  //打开数据库连接
                          conn.setAutoCommit(flag);    //关闭事务自动提交

                          ps=(PreparedStatement) conn.prepareStatement(sql);

                          for(int i=0;i<list.size();i++){

                                  TransInfo trans=list.get(i);

                                  sql="insert into jtxx values('"+trans.getClxxbh()+"','"+trans.getXsfx()+"','"+trans.getCdbh()+"'"
                                                + ",'"+trans.getHphm()+"','"+trans.getClsd()+"','"+trans.getCllx()+"','"+trans.getClbj()+"'"
                                                                + ",'"+trans.getCllb()+"','"+trans.getWflx()+"')";
                                  ps.addBatch(sql);  //添加批量添加语句
                          }

                          ps.executeBatch();  //批量添加
                          conn.commit();      //事务提交
                          flag=true;

                  }catch(Exception e){
                          e.printStackTrace();
                  }finally{
                          try {
                                ps.close();
                        } catch (SQLException e) {
                                
                                e.printStackTrace();
                        }
                  }

                 return flag;
          }
}
3、交通基本信息Model类TransInfo

public class TransInfo {

        private String clxxbh;
        private String xsfx;
        private String cdbh;
        private String hphm;
        private String clsd;
        private String cllx;
        private String clbj;
        private String cllb;
        private String wflx;


        public String getClxxbh() {
                return clxxbh;
        }
        public void setClxxbh(String clxxbh) {
                this.clxxbh = clxxbh;
        }
        public String getXsfx() {
                return xsfx;
        }
        public void setXsfx(String xsfx) {
                this.xsfx = xsfx;
        }
        public String getCdbh() {
                return cdbh;
        }
        public void setCdbh(String cdbh) {
                this.cdbh = cdbh;
        }
        public String getHphm() {
                return hphm;
        }
        public void setHphm(String hphm) {
                this.hphm = hphm;
        }
        public String getClsd() {
                return clsd;
        }
        public void setClsd(String clsd) {
                this.clsd = clsd;
        }
        public String getCllx() {
                return cllx;
        }
        public void setCllx(String cllx) {
                this.cllx = cllx;
        }
        public String getClbj() {
                return clbj;
        }
        public void setClbj(String clbj) {
                this.clbj = clbj;
        }
        public String getCllb() {
                return cllb;
        }
        public void setCllb(String cllb) {
                this.cllb = cllb;
        }
        public String getWflx() {
                return wflx;
        }
        public void setWflx(String wflx) {
                this.wflx = wflx;
        }
}
4、Main调用函数

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.TimeZone;

import com.gxnzx.hive.model.TransInfo;
import com.gxnzx.hive.util.DBSqlHelper;
import com.gxnzx.hive.util.HiveSqlHelper;

public class HiveServer2 {

    private static Connection conn=null;

        public static void main(String args[]){

                try {
                          SimpleDateFormat  sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS");

                          System.out.println(sdf.format(new Date()));

                          conn=HiveSqlHelper.getConn();  //连接Hive

                          Statement st=conn.createStatement();

                          String sql1="select * from transjtxx_hbase";  //执行Hive查询语句

                          ResultSet rs=st.executeQuery(sql1);

                          ArrayList<TransInfo> list=new ArrayList<TransInfo>();
                          //将查询结果集存放在list集合中
                          while(rs.next()){

                                  TransInfo info=new TransInfo();

                                  info.setClxxbh(rs.getString(1));
                                  info.setXsfx(rs.getString(2));
                                  info.setCdbh(rs.getString(3));
                                  info.setHphm(rs.getString(4));
                                  info.setClsd(rs.getString(5));
                                  info.setCllx(rs.getString(6));
                                  info.setClbj(rs.getString(7));
                                  info.setCllb(rs.getString(8));
                                  info.setWflx(rs.getString(9));

                                  list.add(info);
                          }
                          //执行批量导入操作
                          if(DBSqlHelper.addBatch("", list)){
                                  System.out.println("success");
                          }else{
                                  System.out.println("error");
                          }

                          System.out.println(sdf.format(new Date()));

                }  catch (SQLException e) {

                        e.printStackTrace();
                }
        }
}
5、查看执行情况

        由执行情况来看,6649条件记录,从Hive通过Hadoop集群查询到最后导入到MySQL数据库中,大概花了3秒多,而如果是使用上一篇博文中的Sqoop方式则远远不止。当然这可能是数据量比较小的原因。

六、查看MySQL导入结果

mysql> select count(*) from jtxx;
+----------+
| count(*) |
+----------+
|     6649 |
+----------+
1 row in set (0.00 sec)
mysql> select  * from jtxx limit 10;
+----------------------------------+------+------+-------------+------+------+------+------+------+
| clxxbh                           | xsfx | cdbh | hphm        | clsd | cllx | clbj | cllb | wflx |
+----------------------------------+------+------+-------------+------+------+------+------+------+
| 32100017000000000220140317000015 | 03   | 02   | 鲁Q58182    | 0.00 | 0    | 1    | 0    | 0    |
| 32100017000000000220140317000016 | 02   | 03   | 鲁QV4662    | 0.00 | 6    | 1    | 0    | 0    |
| 32100017000000000220140317000019 | 03   | 01   | 苏LL8128    | 0.00 | 1    | 1    | 0    | 0    |
| 32100017000000000220140317000020 | 02   | 02   | 苏CAH367    | 0.00 | 1    | 1    | 0    | 0    |
| 32100017000000000220140317000023 | 02   | 03   | 鲁Q7899W    | 0.00 | 7    | 1    | 0    | 0    |
| 32100017000000000220140317000029 | 03   | 02   | 苏HN3819    | 0.00 | 0    | 1    | 0    | 0    |
| 32100017000000000220140317000038 | 03   | 02   | 鲁C01576    | 0.00 | 0    | 1    | 0    | 0    |
| 32100017000000000220140317000044 | 03   | 02   | 苏DT9178    | 0.00 | 6    | 1    | 0    | 0    |
| 32100017000000000220140317000049 | 03   | 01   | 苏LZ1112    | 0.00 | 7    | 1    | 0    | 0    |
| 32100017000000000220140317000052 | 03   | 01   | 苏K9795警   | 0.00 | 2    | 1    | 0    | 0    |
+----------------------------------+------+------+-------------+------+------+------+------+------+
10 rows in set (0.00 sec)
推荐阅读: Hive用户接口(二)—使用Hive JDBC驱动连接Hive操作实例







版权声明:本文为NIITYZU原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。