package com.test.log.makefriends import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.joda.time.{DateTime, DateTimeZone} import org.joda.time.format.DateTimeFormat import redis.clients.jedis.Jedis import scala.collection.mutable.ArrayBuffer /** * @Author: jxx * @Date: 2018/3/23 17:48 * */ object MakeFriendsWithRedis extends Serializable { //设置上海时间 DateTimeZone.setDefault(DateTimeZone.forID("Asia/Shanghai")) def main(args: Array[String]): Unit = { if (args.length != 2) { println("Usage: $0 <CYCLETIMES> <BEGINTIME>") println("Descriptions:\n You need a args of cycles.\n And You need a return date.") throw new IllegalArgumentException("Only accept two arguments!") } //param1:cycletimes param2:begintime val CYCLETIMES = args(0).toInt val BEGINTIME = args(1).toInt val conf = new SparkConf() //.setAppName("test")//.setMaster("local[5]") val sc = new SparkContext(conf) newTask(sc, CYCLETIMES, BEGINTIME) sc.stop() } def newTask(sc: SparkContext, mouthDay: Int, beforeDay: Int) { val P_TGID = "\"user_gid\":\"[0-9A-Z]{32}\"".r val P_contactsType = "\"contactsType\":2".r for (i <- 0 to mouthDay) { //time val format = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") val begin = DateTime.parse("2018-03-01 01:00:00", format).withTimeAtStartOfDay() .plusDays(-beforeDay + i).toDateTime().toString().substring(0, 10).replaceAll("-", "") println(begin) //input val inputUrl1 = "hdfs://test1/" + begin + "/part-*" println(inputUrl1) //input RDD val inputRdd = sc.textFile(inputUrl1) .map(x => x.split(",")).filter(x => x.length == 6) .map(x => (x(0).replaceAll("\\(", ""), x(1), x(2), x(3), x(4), x(5).replaceAll("\\)", ""))) //get data from redis val redisRdd: RDD[String] = inputRdd.map(x => (x._1, x._3)) .mapPartitions(x => { val keyByteBuf = collection.mutable.ArrayBuffer[Array[Byte]]() var tmpAB = ArrayBuffer[String]() while (x.hasNext) { val cur = x.next() keyByteBuf.append((cur._1).getBytes()) tmpAB.append(cur._1 + ";" + cur._2) } val vals = RedisUtil.redisUtil(keyByteBuf) val res = collection.mutable.ListBuffer[String]() for (i <- 0 until vals.size()) { var v = "" if (null != vals.get(i)) v = new String(vals.get(i).asInstanceOf[Array[Byte]]) res.append(tmpAB(i) + ";" + v) } res.iterator }) val rdd1 = redisRdd.map(x => x.split(";")) .filter(x => x.length == 3).map(x => (x(0), x(1), x(2))) .filter(x =>x._1 != "" && x._2 != "" && x._3 != "") val rdd2 = rdd1.map(x =>(x._1,x._2,x._3.split("}"))).map(x =>{ var xx = 0 for (i <- 0 until x._3.length){ var b = (x._1, x._2, (P_TGID findFirstIn x._3(i)).getOrElse("").replaceAll("\"colume2 \":", "").replaceAll("\"", ""), (P_contactsType findFirstIn x._3(i)).getOrElse("7").replaceAll("\"colume1\":", "").replaceAll("\"", "")) if (b._3 == x._2 && b._4.trim.toInt == 2) { xx = xx +1 } } (x._1,x._2,xx) }) val resultRdd = rdd2.filter(x =>x._3 > 0).map(x =>(x._1,x._2,x._3)) resultRdd.saveAsTextFile("hdfs://test/" + begin) } } }
版权声明:本文为jxx4903049原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。