java spark读写hdfs_Spark读取HDFS写入Hive

package com.xxxx.report.service;

import com.google.common.collect.Lists;

import com.xx.report.config.Constants;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.io.Serializable;

import java.text.SimpleDateFormat;

import java.util.Calendar;

import java.util.List;

/**

* @author yangxin-ryan

*/

public class BicycleLog2hive implements Serializable{

// Log日志

private static final Logger LOG = LoggerFactory.getLogger(BicycleLog2hive.class);

// 日期格式化

private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");

private static final String TMP_TABLE_NAME = "tableNameTemp";

private static final String TABLE_NAME = "tableName1";

private static final String APP_NAME = "xxxxx@yangxin";

private EngineLockLog handleLine(String line) {

EngineLockLog engineLockLog = new EngineLockLog();

try {

System.out.println("handleLine Function -> : " + line);

xxxxxxxxxxxxxxxxx

xxxxxxxxxxxxxxx

xxxxxxxxxxxx

}catch (Exception error) {

System.out.println(error.getMessage() + " | " + line);

error.printStackTrace();

}

return engineLockLog;

}

public void run(String master, String startTime, String endTime) {

long startTimsStamp = System.currentTimeMillis();

startTime = startTime.replace("-", "");

startTime = startTime.replace("_", "");

endTime = endTime.replace("-", "");

endTime = endTime.replace("_", "");

SparkSession spark = SparkSession.builder().appName(APP_NAME).enableHiveSupport().getOrCreate();

List list = Lists.newArrayList();

Calendar calendar = Calendar.getInstance();

calendar.set(Calendar.YEAR, Integer.valueOf(startTime.substring(0, 4)));

calendar.set(Calendar.MONTH, Integer.valueOf(startTime.substring(4, 6)) - 1);

calendar.set(Calendar.DATE, Integer.valueOf(startTime.substring(6, 8)));

String date = startTime;

while (!date.equals(endTime)) {

list.add(date);

calendar.add(Calendar.DATE, 1);

date = simpleDateFormat.format(calendar.getTime());

}

list.add(endTime);

for (String day : list) {

LOG.info("日期:-> " + day);

StringBuilder path = new StringBuilder();

path.append(Constants.PREFIX_BICYCLE_LOG_PATH_YangXin).append(day).append("/*/*");

LOG.info("路径:-> " + path);

JavaRDD mapRDD = spark.read().textFile(path.toString()).

javaRDD().

map(line -> {

return handleLine(line);

}).filter(new Function() {

@Override

public Boolean call(EngineLockLog engineLockLog) throws Exception {

return engineLockLog.getUser_id() != null;

}

});

if (!mapRDD.isEmpty()) {

Dataset mapDF = spark.createDataFrame(mapRDD, EngineLockLog.class);

mapDF.createOrReplaceTempView(TMP_TABLE_NAME);

String dayTemp = day.substring(0, 4) + "-" + (day.substring(4,6)) + "-" + day.substring(6, 8);

String insertSQL = "insert into table " + TABLE_NAME + " partition(dt=\'" + dayTemp + "\') " +

"select xxxx,xxxxx,xxxxx from " + TMP_TABLE_NAME;

spark.sql(insertSQL);

}

}

long endTimeStamp = System.currentTimeMillis();

System.out.println("总耗时: -> " + (endTimeStamp - startTimsStamp) + "ms");

}

public static void main(String[] args) {

String master = args[0];

String startTime = args[1];

String endTime = args[2];

BicycleLog2hive bicycleLog2hive = new BicycleLog2hive();

bicycleLog2hive.run(master, startTime, endTime);

}

}


版权声明:本文为weixin_39672194原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。