package com.xxxx.report.service;
import com.google.common.collect.Lists;
import com.xx.report.config.Constants;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
/**
* @author yangxin-ryan
*/
public class BicycleLog2hive implements Serializable{
// Log日志
private static final Logger LOG = LoggerFactory.getLogger(BicycleLog2hive.class);
// 日期格式化
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
private static final String TMP_TABLE_NAME = "tableNameTemp";
private static final String TABLE_NAME = "tableName1";
private static final String APP_NAME = "xxxxx@yangxin";
private EngineLockLog handleLine(String line) {
EngineLockLog engineLockLog = new EngineLockLog();
try {
System.out.println("handleLine Function -> : " + line);
xxxxxxxxxxxxxxxxx
xxxxxxxxxxxxxxx
xxxxxxxxxxxx
}catch (Exception error) {
System.out.println(error.getMessage() + " | " + line);
error.printStackTrace();
}
return engineLockLog;
}
public void run(String master, String startTime, String endTime) {
long startTimsStamp = System.currentTimeMillis();
startTime = startTime.replace("-", "");
startTime = startTime.replace("_", "");
endTime = endTime.replace("-", "");
endTime = endTime.replace("_", "");
SparkSession spark = SparkSession.builder().appName(APP_NAME).enableHiveSupport().getOrCreate();
List list = Lists.newArrayList();
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.YEAR, Integer.valueOf(startTime.substring(0, 4)));
calendar.set(Calendar.MONTH, Integer.valueOf(startTime.substring(4, 6)) - 1);
calendar.set(Calendar.DATE, Integer.valueOf(startTime.substring(6, 8)));
String date = startTime;
while (!date.equals(endTime)) {
list.add(date);
calendar.add(Calendar.DATE, 1);
date = simpleDateFormat.format(calendar.getTime());
}
list.add(endTime);
for (String day : list) {
LOG.info("日期:-> " + day);
StringBuilder path = new StringBuilder();
path.append(Constants.PREFIX_BICYCLE_LOG_PATH_YangXin).append(day).append("/*/*");
LOG.info("路径:-> " + path);
JavaRDD mapRDD = spark.read().textFile(path.toString()).
javaRDD().
map(line -> {
return handleLine(line);
}).filter(new Function() {
@Override
public Boolean call(EngineLockLog engineLockLog) throws Exception {
return engineLockLog.getUser_id() != null;
}
});
if (!mapRDD.isEmpty()) {
Dataset mapDF = spark.createDataFrame(mapRDD, EngineLockLog.class);
mapDF.createOrReplaceTempView(TMP_TABLE_NAME);
String dayTemp = day.substring(0, 4) + "-" + (day.substring(4,6)) + "-" + day.substring(6, 8);
String insertSQL = "insert into table " + TABLE_NAME + " partition(dt=\'" + dayTemp + "\') " +
"select xxxx,xxxxx,xxxxx from " + TMP_TABLE_NAME;
spark.sql(insertSQL);
}
}
long endTimeStamp = System.currentTimeMillis();
System.out.println("总耗时: -> " + (endTimeStamp - startTimsStamp) + "ms");
}
public static void main(String[] args) {
String master = args[0];
String startTime = args[1];
String endTime = args[2];
BicycleLog2hive bicycleLog2hive = new BicycleLog2hive();
bicycleLog2hive.run(master, startTime, endTime);
}
}