1.描述:
通过读取SQL脚本文件内的SQL脚本,实现在大数据平台中的业务处理,对于没有JAVA或是语言基础的同学来说,通过封装完全可以只写SQL就能实现业务的处理。
2.平台环境
Spark:spark-2.2.1-bin-hadoop2.7
3.具体思路:
通过读取HDFS上的SQL脚本文件[可以直接放到Linux上面],解析SQL脚本获取SparkSQL需要的原表、目标表、目标表的字段名以及查询SQL,通过SparkSession.sql方法执行查询SQL返回结果保存到目标表。
4.代码实现
4.1读取SQL脚本文件
通过jsqlparser读取在Maven中加入
<!--SQL解析包-->
<dependency>
<groupId>com.github.jsqlparser</groupId>
<artifactId>jsqlparser</artifactId>
<version>1.1</version>
</dependency>
SQL样例如下:
INSERT INTO org_table( id ,ygbh ,ygzh ,zwxm ,zsxm ,bmmc ,xjsj ) SELECT DISTINCT a.ygbh AS id ,a.ygbh AS ygbh /**员工编号**/ ,a.ygzh AS ygzh /**账户**/ ,a.zwxm AS zwxm /**中文姓名**/ ,concat(a.ygbh,'-',a.zsxm) AS zsxm /**真实姓名**/ ,c.bmmc AS bmmc /**部门名称**/ ,current_timestamp() AS xjsj /**新建时间**/ FROM yuan_gong_biao a INNER JOIN (SELECT b.bmid AS bmid ,b.bmmc AS bmmc FROM bu_men_biao b UNION ALL SELECT b.bmid AS bmid ,b.bmmc AS bmmc FROM bu_men_biao b ) c ON a.bmid = c.bmid WHERE a.ygbh like'81%'
读取SQL脚本文件方法
/**
* 输入文件路径[HDFS]读取文件内容
* @param filePath
* @return
*/
private static List<String> getFileValue(String filePath){
List<String> fileValueList = new ArrayList<>();
Configuration conf = new Configuration();
BufferedReader confBuff = null;
try {
//读取HDFS上的文件系统
FileSystem fs = FileSystem.get(conf);
// 调取任务的配置信息
Path confPath = new Path(filePath);
//设置流读入和写入
InputStream confIn = fs.open(confPath);
//使用缓冲流,进行按行读取的功能
confBuff = new BufferedReader(new InputStreamReader(confIn));
String confStr=null;
String keyOneStr= null;
String keyTwoStr=null;
while((confStr=confBuff.readLine())!=null){
//截取注释
if(confStr.trim().length()!=0) {
if(confStr.trim().length()>=3){
keyOneStr = confStr.trim().substring(0, 3);
keyTwoStr = confStr.trim().substring(0, 2);
if (!keyOneStr.equals("/**") || !keyTwoStr.equals("--")) {
//本行无注释
if (confStr.indexOf("/**") == -1 && confStr.indexOf("--") == -1) {
fileValueList.add(confStr);
}
//本行以/**开头后面的注释包含--
if ((confStr.indexOf("/**") > -1 && confStr.indexOf("--") > -1) && (confStr.indexOf("/**") < confStr.indexOf("--"))) {
fileValueList.add(confStr.substring(0, confStr.indexOf("/**"))+" ");
}
//本行以--开头后面的注释包含/**
if ((confStr.indexOf("/**") > -1 && confStr.indexOf("--") > -1) && (confStr.indexOf("/**") > confStr.indexOf("--"))) {
fileValueList.add(confStr.substring(0, confStr.indexOf("--"))+" ");
}
//本行以/**注释开头
if (confStr.indexOf("/**") > -1 && confStr.indexOf("--") == -1) {
fileValueList.add(confStr.substring(0, confStr.indexOf("/**"))+" ");
}
//本行以--注释开头
if (confStr.indexOf("/**") == -1 && confStr.indexOf("--") > -1) {
fileValueList.add(confStr.substring(0, confStr.indexOf("--"))+" ");
}
}
}else{
fileValueList.add(confStr+" ");
}
}
}
confBuff.close();
confIn.close();
} catch (IOException e) {
e.printStackTrace();
}
return fileValueList;
}
解析SQL脚本文件方法:
/**
* 通过输入的SQL语句获取插入表名与插入的列
* @param sqlStr
* @return
* @throws JSQLParserException
*/
public static Map getSQLPares(String sqlStr) throws JSQLParserException {
//解析SQL后把要的内容存储到MAP里
Map sqlPareMap = new HashMap();
//生成对象
CCJSqlParserManager pm = new CCJSqlParserManager();
//返回一个InsertStatement对象
System.out.println("sqlStr ================ " + sqlStr);
Insert insertStatement = (Insert) pm.parse(new StringReader(sqlStr));
//返回要插入的目标表表名
String insertTableName=insertStatement.getTable().getName();
//放入MAP里
sqlPareMap.put("tgtTableName",insertTableName);
//通过目标表名得到字段名
List<String> tgtTableColumnList = HBaseUtils.getTableColumn(insertTableName);
//如果目标表为空字段名直接从SQL语句里取得
if(tgtTableColumnList.size()==0||tgtTableColumnList==null){
tgtTableColumnList = getColumnName(insertStatement);
}
//把返回的列名LIST放入MAP里
sqlPareMap.put("tgtTableColumn", tgtTableColumnList);
//把insert语句后面跟着的SELECT语句放到MAP里
sqlPareMap.put("SQL",insertStatement.getSelect().toString());
//返回一个查询对象
Select selectStatement = (Select) pm.parse(new StringReader(insertStatement.getSelect().toString()));
TablesNamesFinder tablesNamesFinder = new TablesNamesFinder();
//获取查询对象中源表的表名LIST
List<String> tableNameList = tablesNamesFinder.getTableList(selectStatement);
//放入到MAP里
sqlPareMap.put("srcTableName",tableNameList);
return sqlPareMap;
}
实现调用:
public static void main(String[] args){ //配置文件名 Map confInfoMap = SparkSQLUtils.getSQLConfig("deal","org"); //固定写法得到源表的表列表 List<String> tableNameList =(ArrayList<String>)confInfoMap.get("srcTableName"); for(String srcTableName:tableNameList){ System.out.println("=src==============="+srcTableName); } //固定写法得到业务逻辑处理的SQL脚本 String selSQL = (String)confInfoMap.get("SQL"); System.out.println("=sql==============="+selSQL); //固定写法得到业务逻辑处理的目标表名 String tgtTable = (String)confInfoMap.get("tgtTableName"); System.out.println("=tgt==============="+tgtTable); //固定写法得到目标表的列字段名 List<String> columnList = (List<String>)confInfoMap.get("tgtTableColumn"); //清空数据 // HBaseUtils.truncateData(tgtTable); SparkSession sparkSession = SparkSQLUtils.getSparkSQL(tableNameList,columnList); Dataset<Row> nameDf=sparkSession.sql(selSQL); // List<Row> recordList=nameDf.collectAsList(); // List<Put> putList = new ArrayList<>(); // for(Row record:recordList){ // Put recordPut = new Put(Bytes.toBytes(record.getAs("id").toString())); // for(String columnName:columnList){ // recordPut.addColumn(Bytes.toBytes("data"), Bytes.toBytes(columnName), Bytes.toBytes(record.getAs(columnName).toString())); // } // putList.add(recordPut); // } // HBaseUtils.insertData(tgtTable,putList); nameDf.show(); }
SparkSQL封装类:
import net.sf.jsqlparser.JSQLParserException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class SparkSQLUtils { public static SparkSession getSparkSQL(List<String> tableNameList,List<String> columnList){ //新建SparkSession SparkSession sparkSQL= getSparkSession(); //new一个JavaSparkContext 对象 JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSQL.sparkContext()); //获取HBASE连接 Configuration hbaseConfig = SparkHBaseUtils.getConfiguration(); //循环遍历把要查询的表视图化 for(String tableNameStr:tableNameList){ hbaseConfig.set(TableInputFormat.INPUT_TABLE, tableNameStr); JavaPairRDD<ImmutableBytesWritable, Result> hbaseRDD = javaSparkContext.newAPIHadoopRDD(hbaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); //列RDD JavaRDD<List<String>> recordColumnRDD = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable,Result>, List<String>>() { public List<String> call(Tuple2<ImmutableBytesWritable, Result> tuple) { List<String> recordColumnList = new ArrayList(); Result result = tuple._2; Cell[] cells = result.rawCells(); for (Cell cell : cells) { recordColumnList.add(new String(CellUtil.cloneQualifier(cell))); } return recordColumnList; } }); //数据RDD JavaRDD<Row> recordValuesRDD = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable,Result>, Row>() { public Row call(Tuple2<ImmutableBytesWritable, Result> tuple) { List<String> recordList = new ArrayList(); Result result = tuple._2; Cell[] cells = result.rawCells(); for (Cell cell : cells) { recordList.add(new String(CellUtil.cloneValue(cell))); } return (Row) RowFactory.create(recordList.toArray()); } }); //设置字段 List<StructField> structFields=new ArrayList<StructField>(); // if(columnList!=null){ // for(String columnStr:columnList){ // structFields.add(DataTypes.createStructField(columnStr, DataTypes.StringType, true)); // } // }else{ List<String> fieldsList=recordColumnRDD.first(); for(String columnStr:fieldsList){ structFields.add(DataTypes.createStructField(columnStr, DataTypes.StringType, true)); } // } //新建列schema StructType schema=DataTypes.createStructType(structFields); Dataset employeesDataset= sparkSQL.createDataFrame(recordValuesRDD,schema); employeesDataset.printSchema(); //spark表视图 employeesDataset.createOrReplaceTempView(tableNameStr); } return sparkSQL; } public static Map getSQLConfig(String path,String fileName){ Map tableConfigMap=null; List<String> tableList = new ArrayList<String>(); //SQL文件路径 String sqlPath=null; if(path==null){ sqlPath="/user/app/hbase_table/sysconfig/sql/"+fileName+".sql"; }else{ sqlPath="/user/app/hbase_table/sysconfig/sql/"+path+"/"+fileName+".sql"; } List<String> fileValueList = getFileValue(sqlPath); String sqlValueStr=""; for(String lineStr:fileValueList){ sqlValueStr=sqlValueStr+lineStr; } try { tableConfigMap = SQLAnalysisUtils.getSQLPares(sqlValueStr); // tableConfigMap.put("SQL",sqlValueStr); } catch (JSQLParserException e) { e.printStackTrace(); } return tableConfigMap; } /** * 输入文件路径[HDFS]读取文件内容 * @param filePath * @return */ private static List<String> getFileValue(String filePath){ List<String> fileValueList = new ArrayList<>(); Configuration conf = new Configuration(); BufferedReader confBuff = null; try { //读取HDFS上的文件系统 FileSystem fs = FileSystem.get(conf); // 调取任务的配置信息 Path confPath = new Path(filePath); //设置流读入和写入 InputStream confIn = fs.open(confPath); //使用缓冲流,进行按行读取的功能 confBuff = new BufferedReader(new InputStreamReader(confIn)); String confStr=null; String keyOneStr= null; String keyTwoStr=null; while((confStr=confBuff.readLine())!=null){ //截取注释 if(confStr.trim().length()!=0) { if(confStr.trim().length()>=3){ keyOneStr = confStr.trim().substring(0, 3); keyTwoStr = confStr.trim().substring(0, 2); if (!keyOneStr.equals("/**") || !keyTwoStr.equals("--")) { //本行无注释 if (confStr.indexOf("/**") == -1 && confStr.indexOf("--") == -1) { fileValueList.add(confStr); } //本行以/**开头后面的注释包含-- if ((confStr.indexOf("/**") > -1 && confStr.indexOf("--") > -1) && (confStr.indexOf("/**") < confStr.indexOf("--"))) { fileValueList.add(confStr.substring(0, confStr.indexOf("/**"))+" "); } //本行以--开头后面的注释包含/** if ((confStr.indexOf("/**") > -1 && confStr.indexOf("--") > -1) && (confStr.indexOf("/**") > confStr.indexOf("--"))) { fileValueList.add(confStr.substring(0, confStr.indexOf("--"))+" "); } //本行以/**注释开头 if (confStr.indexOf("/**") > -1 && confStr.indexOf("--") == -1) { fileValueList.add(confStr.substring(0, confStr.indexOf("/**"))+" "); } //本行以--注释开头 if (confStr.indexOf("/**") == -1 && confStr.indexOf("--") > -1) { fileValueList.add(confStr.substring(0, confStr.indexOf("--"))+" "); } } }else{ fileValueList.add(confStr+" "); } } } confBuff.close(); confIn.close(); } catch (IOException e) { e.printStackTrace(); } return fileValueList; } private static SparkSession getSparkSession(){ SparkSession spark= SparkSession.builder() .appName("SparkApp") .master("local[2]") .getOrCreate(); return spark; } }
SQL解析类:
import hbase.comm.HBaseUtils; import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.parser.CCJSqlParserManager; import net.sf.jsqlparser.schema.Column; import net.sf.jsqlparser.statement.insert.Insert; import net.sf.jsqlparser.statement.select.Select; import net.sf.jsqlparser.util.TablesNamesFinder; import java.io.StringReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class SQLAnalysisUtils { /** * 通过输入的SQL语句获取插入表名与插入的列 * @param sqlStr * @return * @throws JSQLParserException */ public static Map getSQLPares(String sqlStr) throws JSQLParserException { //解析SQL后把要的内容存储到MAP里 Map sqlPareMap = new HashMap(); //生成对象 CCJSqlParserManager pm = new CCJSqlParserManager(); //返回一个InsertStatement对象 System.out.println("sqlStr ================ " + sqlStr); Insert insertStatement = (Insert) pm.parse(new StringReader(sqlStr)); //返回要插入的目标表表名 String insertTableName=insertStatement.getTable().getName(); //放入MAP里 sqlPareMap.put("tgtTableName",insertTableName); //通过目标表名得到字段名 List<String> tgtTableColumnList = HBaseUtils.getTableColumn(insertTableName); //如果目标表为空字段名直接从SQL语句里取得 if(tgtTableColumnList.size()==0||tgtTableColumnList==null){ tgtTableColumnList = getColumnName(insertStatement); } //把返回的列名LIST放入MAP里 sqlPareMap.put("tgtTableColumn", tgtTableColumnList); //把insert语句后面跟着的SELECT语句放到MAP里 sqlPareMap.put("SQL",insertStatement.getSelect().toString()); //返回一个查询对象 Select selectStatement = (Select) pm.parse(new StringReader(insertStatement.getSelect().toString())); TablesNamesFinder tablesNamesFinder = new TablesNamesFinder(); //获取查询对象中源表的表名LIST List<String> tableNameList = tablesNamesFinder.getTableList(selectStatement); //放入到MAP里 sqlPareMap.put("srcTableName",tableNameList); return sqlPareMap; } /** * 返回SQL语句中INSERT后面的字段名 * @param insertStatement * @return * @throws JSQLParserException */ public static List<String> getColumnName(Insert insertStatement) throws JSQLParserException { List<String> columnNameList = new ArrayList<String>(); List<Column> columns=insertStatement.getColumns(); for(Column column:columns){ System.out.println("tableColumn=============="+column.getColumnName()); columnNameList.add(column.getColumnName()); } return columnNameList; } }
HBASE操作类:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
public class SparkHBaseUtils {
private static final String QUORUM = "192.168.0.100";
private static final String CLIENTPORT = "2181";
private static Configuration conf = null;
private static Connection conn = null;
private static JavaSparkContext context=null;
/**
* 返回一个JavaSparkContext
* @return
*/
public static synchronized JavaSparkContext getJavaSparkContext(){
if(context==null){
SparkConf sparkConf=new SparkConf();
sparkConf.setAppName("SparkApp");
sparkConf.setMaster("local[5]");
JavaSparkContext context = new JavaSparkContext(sparkConf);
}
return context;
}
/**
* 获取全局唯一的Configuration实例
* @return
*/
public static synchronized Configuration getConfiguration()
{
if(conf == null)
{
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", QUORUM);
conf.set("hbase.zookeeper.property.clientPort", CLIENTPORT);
conf.addResource("hbase-site.xml");
conf.addResource("core-site.xml");
}
return conf;
}
/**
* 获取全局唯一的HConnection实例
* @return
*
*/
public static synchronized Connection getHConnection() {
if (conf == null){
getConfiguration();
}
if(conn == null)
{
/*
* * 创建一个HConnection
* HConnection connection = HConnectionManager.createConnection(conf);
* HTableInterface table = connection.getTable("mytable");
* table.get(...); ...
* table.close();
* connection.close();
* */
try {
conn = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
return conn;
}
// 关闭连接
public static void connClose() {
try {
if (null != conn)
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
版权声明:本文为chsong888原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。