大数据之电商分析系统(一)

大数据之电商分析系统(一)

一:项目介绍

​ 本项目来源于企业级电商网站的大数据统计分析平台, 该平台以 Spark 框架为核心, 对电商网站的日志进行离线和实时分析。该大数据分析平台对电商网站的各种用户行为( 访问行为、购物行为、广告点击行为等)进行分析,根据平台统计出来的数据, 辅助公司中的 PM(产品经理)、数据分析师以及管理人员分析现有产品的情况, 并根据用户行为分析结果持续改进产品的设计,以及调整公司的战略和业务。最终达到用大数据技术来帮助提升公司的业绩、营业额以及市场占有率的目标。

​ 本项目使用了 Spark 技术生态栈中最常用的三个技术框架, Spark Core、SparkSQL 和 Spark Streaming,进行离线计算和实时计算业务模块的开发。实现了包括用户访问 session 分析、页面单跳转化率统计、热门商品离线统计、广告流量实时统计4 个业务模块。通过合理的将实际业务模块进行技术整合与改造, 该项目几乎完全涵盖了 Spark Core、Spark SQL 和 Spark Streaming 这三个技术框架中大部分的功能点、知识点,学员对于 Spark 技术框架的理解将会在本项目中得到很大的提高。

二:项目框架

​ 1. 项目整体框架

在这里插入图片描述
​ 本项目分为离线分析系统与实时分析系统两大模块。

​ 在离线分析系统中,我们将模拟业务数据写入 Hive 表中,离线分析系统从Hive 中获取数据,并根据实际需求(用户访问 Session 分析、页面单跳转化率分析、各区域热门商品统计) 对数据进行处理,最终将分析完毕的统计数据存储到MySQL 的对应表格中。
​ 在实时分析系统中,我们将模拟业务数据写入 Kafka 集群中, 实时分析系统从 Kafka broker 中获取数据, 通过 Spark Streaming 的流式处理对广告点击流量进行实时分析,最终将统计结果存储到 MySQL 的对应表格中。

  1. 离线日志采集流程

    在这里插入图片描述

  2. 实时日志采集流程

在这里插入图片描述

  1. 离线/实时日志采集框架

    在这里插入图片描述

三:数据分析

  1. 离线数据分析

    用户访问行为表(user_visit_action) —>存放网站或者 APP每天的点击流数据,就是用户对网站/APP 每点击一下, 就会产生一条存放在这个表里面的数据。

    用户基本信息表(user_info) ----->是一张普通的用户基本信息表;这张表中存放了网站/APP 所有注册用户的基本信息.

    商品信息表(product_info) ----->是一张普通的商品基本信息表; 这张表中存放了网站/APP所有商品的基本信息。

  2. 在线数据分析

    程序每 5 秒向 Kafka 集群写入数据

    格式 : timestamp province city userid adid

四、项目需求分析

  1. 用户访问session统计

    ​ 用户在电商网站上, 通常会有很多的访问行为,通常都是进入首页, 然后可能点击首页上的一些商品,点击首页上的一些品类,也可能随时在搜索框里面搜索关键词,还可能将一些商品加入购物车,对购物车中的多个商品下订单,最后对订单中的多个商品进行支付。用户的每一次操作,其实可以理解为一个 action,在本项目中,我们关注点击、搜索、 下单、 支付这四个用户行为。
    ​ 用户 session, 是在电商平台的角度定义的会话概念, 指的就是, 从用户第一次进入首页, session 就开始了。 然后在一定时间范围内, 直到最后操作完( 可能做了几十次、甚至上百次操作),离开网站, 关闭浏览器,或者长时间没有做操作, 那么 session 就结束了。以上用户在网站内的访问过程, 就称之为一次 session。 简单理解, session就是某一天某一个时间段内, 某个用户对网站从打开/进入, 到做了大量操作,到最后关闭浏览器。的过程,就叫做 session。session 实际上就是一个电商网站中最基本的数据和大数据。那么面向消费者/用户端的大数据分析( C 端), 最基本的就是面向用户访问行为/用户访问 session 的分析。该模块主要是对用户访问 session 进行统计分析,包括 session 聚合指标计算、 按时间比例随机抽取 session、 获取每天点击、 下单和购买排名前 10 的品类、 并获取 top10 品类中排名前 10的 session。该模块可以让产品经理、数据分析师以及企业管理层形象地看到各种条件下的具体用户行为以及统计指标,从而对公司的产品设计以及业务发展战略做出调整。主要使用 Spark Core 实现。

    1. 页面单跳转换率统计

    ​ 该模块主要是计算关键页面之间的单步跳转转化率,涉及到页面切片算法以及页面流匹配算法。该模块可以让产品经理、数据分析师以及企业管理层看到各个关键页面之间的转化率,从而对网页布局,进行更好的优化设计。主要使用SparkCore实现。

  2. 区域热门统计

    ​ 该模块主要实现每天统计出各个区域的 top3 热门商品。 该模块可以让企业管理层看到电商平台在不同区域出售的商品的整体情况, 从而对公司的商品相关的战略进行调整。主要使用 Spark SQL 实现。

  3. 广告流量实时统计

    ​ 网站 / app 中经常会给第三方平台做广告,这也是一些互联网公司的核心收入来源;当广告位招商完成后,广告会在网站 / app 的某个广告位发布出去,当用户访问网站 / app 的时候, 会看到相应位置的广告, 此时, 有些用户可能就会去点击那个广告。我们要获取用户点击广告的行为,并针对这一行为进行计算和统计。
    ​ 用户每次点击一个广告以后,会产生相应的埋点日志;在大数据实时统计系统中,会通过某些方式将数据写入到分布式消息队列中( Kafka)。日志发送给后台 web 服务器( nginx), nginx 将日志数据负载均衡到多个Tomcat 服务器上, Tomcat 服务器会不断将日志数据写入 Tomcat 日志文件中,写入后,就会被日志采集客户端(比如 flume agent)所采集,随后写入到消息队列中( kafka),我们的实时计算程序会从消息队列中( kafka)去实时地拉取数据,然后对数据进行实时的计算和统计。
    ​ 这个模块的意义在于, 让产品经理、高管可以实时地掌握到公司打的各种广告的投放效果。以便于后期持续地对公司的广告投放相关的战略和策略,进行调整和优化;以期望获得最好的广告收益。该模块负责实时统计公司的广告流量, 包括广告展现流量和广告点击流量。 实现动态黑名单机制, 以及黑名单过滤; 实现滑动窗口内的各城市的广告展现流量和广告点击流量的统计; 实现每个区域每个广告的点击流量实时统计;实现每个区域 top3 点击量的广告的统计。主要使用 Spark Streaming 实现。

五:项目需求具体实现

  1. Session各范围访问步长、访问时长占比统计

    ​ 需求一要统计出符合筛选条件的 session 中,访问时长在 1s3s、4s6s、7s9s、10s30s、30s60s、1m3m、3m10m、10m30m、30m 以上各个范围内的 session占比;访问步长在 13、46、79、1030、30~60、60 以上各个范围内的 session占比,并将结果保存到 MySQL 数据库中。

    ​ 在计算之前需要根据查询条件筛选 session,查询条件比如搜索过某些关键词的用户、访问时间在某个时间段内的用户、年龄在某个范围内的用户、职业在某个范围内的用户、所在某个城市的用户,发起的 session。找到对应的这些用户的 session,并进行统计, 之所以需要有筛选主要是可以让使用者, 对感兴趣的和关系的用户群体,进行后续各种复杂业务逻辑的统计和分析,那么拿到的结果数据,就是只是针对特殊用户群体的分析结果;而不是对所有用户进行分析的泛泛的分析结果。比如说,现在某个企业高层,就是想看到用户群体中, 28~35 岁的,老师职业的群体, 对应的一些统计和分析的结果数据,从而辅助高管进行公司战略上的决策制定。

    ​ session 访问时长,也就是说一个 session 对应的开始的 action,到结束的 action,之间的时间范围;还有,就是访问步长,指的是,一个 session 执行期间内,依次点击过多少个页面,比如说,一次 session,维持了 1 分钟, 那么访问时长就是 1m,然后在这 1 分钟内,点击了 10 个页面, 那么 session 的访问步长,就是 10.

    ​ 比如说,符合第一步筛选出来的 session 的数量大概是有 1000 万个。那么里面,我们要计算出,访问时长在 1s~3s 内的 session 的数量,并除以符合条件的总 session数量( 比如 1000 万),比如是 100 万/1000 万,那么 1s~3s 内的 session 占比就是 10%。依次类推,这里说的统计,就是这个意思。

    ​ 这个功能可以让人从全局的角度看到,符合某些条件的用户群体,使用我们的产品的一些习惯。比如大多数人,到底是会在产品中停留多长时间, 大多数人,会在一次使用产品的过程中,访问多少个页面。那么对于使用者来说, 有一个全局和清晰的认识。

  2. Session随机抽取

    ​ 在符合条件的 session 中,按照时间比例随机抽取 1000 个 session这个按照时间比例是什么意思呢?随机抽取本身是很简单的,但是按照时间比例,就很复杂了。比如说,这一天总共有 1000 万的 session。那么我现在总共要从这 1000 万 session 中,随机抽取出来 1000 个 session。但是这个随机不是那么简单的。需要做到如下几点要求:首先,如果这一天的 12:00~13:00 的 session 数量是 100万,那么这个小时的 session 占比就是 1/10,那么这个小时中的 100 万的 session,我们就要抽取 1/10 * 1000 = 100 个。然后再从这个小时的 100 万 session 中,随机抽取出 100 个 session。以此类推,其他小时的抽取也是这样做。

    ​ 这个功能的作用,是说,可以让使用者,能够对于符合条件的 session,按照时间比例均匀的随机采样出 1000 个 session,然后观察每个 session 具体的点击流/行为, 比如先进入了首页、然后点击了食品品类、然后点击了雨润火腿肠商品、然后搜索了火腿肠罐头的关键词、接着对王中王火腿肠下了订单、最后对订单做了支付。

    之所以要做到按时间比例随机采用抽取,就是要做到,观察样本的公平性。

    抽取完毕之后,需要将 Session 的相关信息和详细信息保存到 MySQL 数据库中。

    数据源解析:

    Session聚合数据 :AggrInfo 和 Session用户访问数据 UserVisitAction

  3. Top10热门品类

    在符合条件的 session 中,获取点击、下单和支付数量排名前 10 的品类。

    ​ 数据中的每个 session 可能都会对一些品类的商品进行点击、下单和支付等等行为, 那么现在就需要获取这些 session 点击、下单和支付数量排名前 10 的最热门的品类。也就是说,要计算出所有这些 session 对各个品类的点击、下单和支付的次数, 然后按照这三个属性进行排序,获取前 10 个品类。

    ​ 这个功能,很重要,就可以让我们明白, 就是符合条件的用户, 他最感兴趣的商品是什么种类。这个可以让公司里的人, 清晰地了解到不同层次、不同类型的用户的心理和喜好。计算完成之后, 将数据保存到 MySQL 数据库中。

    数据源解析:

    用户访问数据表: UserVisitAction

  4. Top10热门品类Top10活跃Session****统计

    对于排名前 10 的品类,分别获取其点击次数排名前 10 的 session。

    ​ 这个就是说, 对于 top10 的品类, 每一个都要获取对它点击次数排名前 10 的session。

    这个功能,可以让我们看到,对某个用户群体最感兴趣的品类, 各个品类最感兴趣最典型的用户的 session 的行为。计算完成之后,将数据保存到 MySQL 数据库中。

    数据源解析:

    用户访问数据表: UserVisitAction

需求1-4的具体代码实现
UserVisitSessionAnalyze
package com.ityouxin.session

import java.util.{Date, UUID}

import com.ityouxin.commons.conf.ConfigurationManager
import com.ityouxin.commons.constant.Constants
import com.ityouxin.commons.model.{UserInfo, UserVisitAction}
import com.ityouxin.commons.utils.{DateUtils, NumberUtils, ParamUtils, StringUtils, ValidUtils}
import com.ityouxin.session.DataModel.{SessionAggrState, SessionDetail, Top10Category, Top10Session}
import com.ityouxin.session.UserVisitSessionAnalyze.{getClickCategoryIdCountRDD, getOrderCategoryIdCountRDD, getPayCategoryIdCountRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
import net.sf.json.JSONObject
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.Random


object UserVisitSessionAnalyze {



  def main(args: Array[String]): Unit = {
    //初始化sc
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("UserVisitSessionAnalyze")
    //初始化SparkSession
    val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    //获取sc
    val sc: SparkContext = spark.sparkContext
    //根据配置工具类ConfigurationManager来获取config,获取任务配置
    val jsonStr = ConfigurationManager.config.getString("task.params.json")
    //将获取到的配置String转换成json格式,便于传递
    val taskParm: JSONObject = JSONObject.fromObject(jsonStr)

    //查询user_visit_action表中的数据(按照日期范围)
    val userVisitActionRDD: RDD[UserVisitAction] = getActionRDDByDateRange(spark,taskParm)
    //println(userVisitActionRDD.collect().mkString("/r/n"))
    //将用户行为信息转换为k-v元组
    val sessionidActionRDD: RDD[(String, UserVisitAction)] = userVisitActionRDD.map(uva => {
      (uva.session_id, uva)
    })
    //缓存数据
    sessionidActionRDD.persist(StorageLevel.MEMORY_ONLY)
    //聚合解析写个方法进行分组、计算等操作
    val sessionAggrInfoRDD: RDD[(String, String)] = aggregateBySession(spark,sessionidActionRDD)
    //println(sessionidActionRDD.collect().mkString("\r\n"))
    /*
    进行下一步操作,计算步长和时长,累加更新session访问数、session访问时长范围、session访问步长范围,需要用到累加器进行对数据得叠加
    */
    //创建一个累加器,进行计算每个excultor中的数据计算
    val sessionAggrStateAccumulator = new SessionAggrStateAccumulator
    //累加器需要注册,才能使用
    sc.register(sessionAggrStateAccumulator,"sessionAggrStateAccumulator")
    //根据查询条件过滤用户行为数据,并且同时累加时长范围和步长访问的数据
    val filteredSessionAggrInfoRDD: RDD[(String, String)] = filterSessionAndAggrState(sessionAggrInfoRDD:RDD[(String,String)],taskParm,sessionAggrStateAccumulator)
    //缓存
    filteredSessionAggrInfoRDD.persist(StorageLevel.MEMORY_ONLY)
    //打印过滤后的数据
    filteredSessionAggrInfoRDD.foreach(println)

    //统计各个时长范围和步长范围ide占比
    val taskUUID = UUID.randomUUID().toString
    //需求一:计算所有符合条件的数据各个范围的占比,保存到mysql数据库中
    sessionAggrStateAccumulator.value
    calculateAndPersistAggrState(spark,sessionAggrStateAccumulator.value,taskUUID)


    val sessionDetailRDD: RDD[(String, UserVisitAction)] = getSessionDetailRDD(sessionidActionRDD,filteredSessionAggrInfoRDD)
    sessionDetailRDD.persist(StorageLevel.MEMORY_ONLY)
   // println(sessionDetailRDD.collect().mkString("\r\n"))

    //需求二:随机均匀抽取样本session
    randRomExtractSession(spark,taskUUID,filteredSessionAggrInfoRDD,sessionDetailRDD)

    //需求三:求Top的热门品类
     val top10Categories: Array[(CategorySortKey, String)] = getTop10Category(spark,taskUUID,sessionDetailRDD)
    println(top10Categories.mkString("\r\n"))

    //需求四:获取top10品类的活跃top10的session
    getTop10CategoryToTop10Session(spark,taskUUID,top10Categories,sessionDetailRDD)


    //释放资源
    spark.close()
  }

  //需求四:统计top10中每个品类的前10的session
  def getTop10CategoryToTop10Session(spark: SparkSession,
                                     taskUUID: String,
                                     top10Categories: Array[(CategorySortKey, String)],
                                     sessionDetailRDD: RDD[(String, UserVisitAction)]) = {
    //对top10的品类进行数据处理   将其转换为(caregoryid,categoryid)
    val top10CategoryIds: Array[(Long, Long)] = top10Categories.map {
      case (csk, line) =>
        //获取categoryid
        val categoryid = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CATEGORY_ID)
        (categoryid.toLong, categoryid.toLong)
    }
    top10CategoryIds
    //获取到top10的品类的RDD
    val top10CategoryIdRDD: RDD[(Long, Long)] = spark.sparkContext.makeRDD(top10CategoryIds)

    //将sessionDetail行为数据,按照sessionid进行聚合
    val sessionActionsRDD: RDD[(String, Iterable[UserVisitAction])] = sessionDetailRDD.groupByKey()

    //统计每个品类categoeyid下的每个session的点击次数
    val categorySessionCountRDD: RDD[(Long, String)] = sessionActionsRDD.flatMap {
      case (sessionid, uvas) =>
        val categoryIdCountMap = new mutable.HashMap[Long, Long]()
        //遍历Iterable[UserVisitAction])  迭代器  中的数据,取出用户的行为数据
        for (uva <- uvas) {
          //判断容器中的点击品类id是否为空,是否存在点击行为
          if (uva.click_category_id != null && uva.click_category_id != -1 && !categoryIdCountMap.contains(uva.click_category_id))
          //不为空并且不存在行为数据,则将点击品类id值为0
            categoryIdCountMap.put(uva.click_category_id, 0)
          if (uva.click_category_id != null && uva.click_category_id != -1) {
            //不为空  存在则更新容器中的id
            categoryIdCountMap.update(uva.click_category_id, categoryIdCountMap(uva.click_category_id) + 1)
          }
        }
        //对categoryCountMap数据进行格式转换
        //遍历map容器中的数据 拼接出需要的字符串 yield 注意 在for循环中使用
        /*
        yield的用法总结
           针对每一次 for 循环的迭代, yield 会产生一个值,被循环记录下来 (内部实现上,像是一个缓冲区).
          当循环结束后, 会返回所有 yield 的值组成的集合.
          返回集合的类型与被遍历的集合类型是一致的.
        */

        for ((cid, count) <- categoryIdCountMap)
          yield (cid, sessionid + "," + count)
    }
    //统计所有的品类所有的session下的点击次数 categorySessionCountRDD
    //将两个RDD进行join操作 取导我们想要的RDD【(categoryid,sessionid+ “,” + count)】
    val top10CategoryJoinRDD: RDD[(Long, (Long, String))] = top10CategoryIdRDD.join(categorySessionCountRDD)
    //统计top10热门品类的每个品类的每个session的点击次数
    val top10CategorySessionCountRDD: RDD[(Long, String)] = top10CategoryJoinRDD.map {
      case (cid, (cid2, line)) =>
        (cid, line)
    }
    //根据品类id进行分组
    val top10CategorySessionsCountRDD: RDD[(Long, Iterable[String])] = top10CategorySessionCountRDD.groupByKey()

    //对分组后的数据进行压平,然后进行排序  拼接
    val top10SessionObjRDD: RDD[Top10Session] = top10CategorySessionsCountRDD.flatMap {
      case (cid, clicks) =>
        //处理sessionid +"," + count  这个字符串
       val top10Session = clicks.toList.sortWith(
          //转换为List后  可以使用sortwith进行排序  sortWith(排序函数)
          //_.split(“,”)(1) > _.split(",")(1)
          (x: String, y: String) => {
            x.split(",")(1) > y.split(",")(1)
          }
        ).take(10)

        top10Session.map{
          case line =>
            //得到sessionid和count
            val sessionid = line.split(",")(0)
            val count = line.split(",")(1).toLong
            //封装样例类
            Top10Session(taskUUID,cid,sessionid,count)
        }
    }
    import spark.implicits._
    //将RDD 转换形成DF
    top10SessionObjRDD.toDF().write
      .format("jdbc")
      .option("url",ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("user",ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .option("dbtable","top10_session")
      .mode(SaveMode.Append)
      .save()

    //将top10Session对应的用户行为详情写入到数据库
    //将join后的RDD转换格式
    val top10SessionRDD: RDD[(String, String)] = top10SessionObjRDD.map {
      item => (item.sessionid, item.sessionid)
    }
    //与用户session详情RDD与转换后的top10Session join操作
    val top10SessionDetailActionRDD: RDD[(String, (String, UserVisitAction))] = top10SessionRDD.join(sessionDetailRDD)
    val top10SessionDetail: RDD[SessionDetail] = top10SessionDetailActionRDD.map {
      case (sid, (sid2, uva)) =>
        SessionDetail(taskUUID, uva.user_id, uva.session_id, uva.page_id, uva.action_time,
          uva.search_keyword, uva.click_category_id, uva.click_product_id, uva.order_category_ids,
          uva.order_product_ids, uva.pay_category_ids, uva.pay_product_ids)
    }
    top10SessionDetail.toDF().write
      .format("jdbc")
      .option("url",ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("user",ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .option("dbtable","session_detail")
      .mode(SaveMode.Append)
      .save()
  }



  //计算点击类别的次数  需要进行筛选  将null和-1 去掉
  def getClickCategoryIdCountRDD(sessionDetailRDD: RDD[(String, UserVisitAction)]) = {
    val clickActionRDD: RDD[(String, UserVisitAction)] = sessionDetailRDD.filter {
      case (sessionid, uva) =>
        uva.click_category_id != null && uva.click_category_id != -1
    }
    val clickCategoryIdRDD: RDD[(Long, Long)] = clickActionRDD.map {
      case (sessionid, uva) =>
        (uva.click_category_id, 1L)
    }
    clickCategoryIdRDD.reduceByKey(_+_)
  }
  //计算每个品类的下单次数  也需要进行过滤掉空值
  def getOrderCategoryIdCountRDD(sessionDetailRDD: RDD[(String, UserVisitAction)]) = {
    val ordderAction: RDD[(String, UserVisitAction)] = sessionDetailRDD.filter {

      case (sessionid, uva) =>
        uva.order_category_ids != null

    }
    //对符合条件的下单进行扁平处理,组成一个新的RDD
    val orderCategoryIdRDD: RDD[(Long, Long)] = ordderAction.flatMap {

      case (sessionid, uva) =>

        uva.order_category_ids.split(",").map( item=>(item.toLong, 1L))
    }
    //+
    orderCategoryIdRDD.reduceByKey(_+_)
  }

    //支付次数,与上述类似
  def getPayCategoryIdCountRDD(sessionDetailRDD: RDD[(String, UserVisitAction)]) = {
    val payAction: RDD[(String, UserVisitAction)] = sessionDetailRDD.filter {
      case (sessionid, uva) =>
        uva.pay_category_ids != null
    }

    val payCategoryIdRDD: RDD[(Long, Long)] = payAction.flatMap {

      case (sessionid, uva) =>
        uva.pay_category_ids.split(",").map(item=>(item.toLong, 1L))

    }
    payCategoryIdRDD.reduceByKey(_+_)
  }
  //进行join处理,为排序做准备
  def joinCategoryAndDate(distinctCategoryIddRDD: RDD[(Long, Long)],
                          clickCategoryIdCountRDD: RDD[(Long, Long)],
                          orderCategoryIdCountRDD: RDD[(Long, Long)],
                          payCategoryIdCountRDD: RDD[(Long, Long)]) = {
    //join操作后,拼接需要的RDD
    val clickJoinRDD: RDD[(Long, String)] = distinctCategoryIddRDD.leftOuterJoin(clickCategoryIdCountRDD).map {
      case (categroyid, (cid, optionValue)) =>
        val clickCount = if (optionValue.isDefined) optionValue.get else 0L
        val value = Constants.FIELD_CATEGORY_ID + "=" + categroyid + "|" + Constants.FIELD_CLICK_COUNT + "=" + clickCount
        (categroyid, value)
    }
    //拼接出orderJoinRDD
    val orderJoinRDD: RDD[(Long, String)] = clickJoinRDD.leftOuterJoin(orderCategoryIdCountRDD).map {
      case (categroyid, (oldvalue, optionValue)) =>
        val orderCount = if (optionValue.isDefined) optionValue.get else 0L
        val value = oldvalue+ "|" + Constants.FIELD_CATEGORY_ID + "=" + categroyid + "|" + Constants.FIELD_ORDER_COUNT + "=" + orderCount
        (categroyid, value)
    }

    //拼接支付Join后的RDD
    val payJoinRDD: RDD[(Long, String)] = orderJoinRDD.leftOuterJoin(payCategoryIdCountRDD).map {
      case (categroyid, (oldvalue, optionValue)) =>
        val payCount = if (optionValue.isDefined) optionValue.get else 0L
        val value = oldvalue+ "|"+ Constants.FIELD_CATEGORY_ID + "=" + categroyid + "|" + Constants.FIELD_PAY_COUNT + "=" + payCount
        (categroyid, value)

    }
    payJoinRDD
  }

  //需求三:获取Top10的人们品类  点击量、下单量、支付量
  def getTop10Category(spark: SparkSession, taskUUID: String, sessionDetailRDD: RDD[(String, UserVisitAction)]) = {
    //获取所有产生点击 下单 支付行为的品类
  val categoryidRDD: RDD[(Long, Long)] = sessionDetailRDD.flatMap {
    case (sessionid, uva) =>
      val list = ArrayBuffer[(Long, Long)]()
      //点击行为的品类id
      if (uva.click_category_id != null && uva.click_category_id != -1) {
        list += ((uva.click_category_id, uva.click_category_id))
      }
      //下单行为的品类id  用户可能对应多个订单,ids  需要遍历
      if (uva.order_category_ids != null) {
        for (orderCid <- uva.order_category_ids.split(",")) {
          //将符合条件的id添加到容器list中
          list += ((orderCid.toLong, orderCid.toLong))
        }
      }
      //支付行为的品类id
      if (uva.pay_category_ids != null) {
        for (payCid <- uva.pay_category_ids.split(",")) {
          list += ((payCid.toLong, payCid.toLong))
        }
      }
      list
  }
    //一个用户可能多次访问一个类别,所以统计热门数量需要去重
    val distinctCategoryIddRDD = categoryidRDD.distinct()
    //计算每个品类的点击次数
    val clickCategoryIdCountRDD: RDD[(Long, Long)] = getClickCategoryIdCountRDD(sessionDetailRDD)
    //计算每个品类的下单次数
    val orderCategoryIdCountRDD: RDD[(Long, Long)] = getOrderCategoryIdCountRDD(sessionDetailRDD)
    //计算每个品类的支付次数
    val payCategoryIdCountRDD: RDD[(Long, Long)] = getPayCategoryIdCountRDD(sessionDetailRDD)

    //对计算后的点击次数 下单次数 支付次数 进行排序
    val joinCategoryRDD: RDD[(Long, String)] = joinCategoryAndDate(distinctCategoryIddRDD,clickCategoryIdCountRDD,orderCategoryIdCountRDD,payCategoryIdCountRDD)
    println(joinCategoryRDD.collect().mkString("\r\n"))

    //排序操作前准备
    val sortKeyCountRDD: RDD[(CategorySortKey, String)] = joinCategoryRDD.map {
      case (cid, line) =>
        //拼接出排序所需要的字段
        println(line)
        val clickCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CLICK_COUNT).toLong
        val orderCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_ORDER_COUNT).toLong
        val payCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_PAY_COUNT).toLong
        //封装排序样例类,进行处理数据
        (CategorySortKey(clickCount, orderCount, payCount), line)
    }
    //进行排序算法  降序
    val sortedKeyCountRDD: RDD[(CategorySortKey, String)] = sortKeyCountRDD.sortByKey(false)
    //前10行
    val top10Category: Array[(CategorySortKey, String)] = sortedKeyCountRDD.take(10)
    //排序后,再次拼接出符合保存mysql数据库的数据格式
    val top10CategoryArray: Array[Top10Category] = top10Category.map {
      case (categorySortKey, line) =>
        val categoryid = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CATEGORY_ID).toLong
        val clickCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_CLICK_COUNT).toLong
        val orderCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_ORDER_COUNT).toLong
        val payCount = StringUtils.getFieldFromConcatString(line, "\\|", Constants.FIELD_PAY_COUNT).toLong
        //样例类  进行数据拼接
        Top10Category(taskUUID, categoryid, clickCount, orderCount, payCount)
    }
    //转RDD
    val top10CategoryRDD = spark.sparkContext.parallelize(top10CategoryArray)
    import spark.implicits._
    //保存到数据库msyql
    top10CategoryRDD.toDF().write
      .format("jdbc")
      .option("url",ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("user",ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .option("dbtable","top10_category")
      .mode(SaveMode.Append)
      .save()
    top10Category
  }

  //需求二:1  获取过滤后的用户行为详细数据
  def getSessionDetailRDD(sessionidActionRDD: RDD[(String, UserVisitAction)], filteredSessionAggrInfoRDD: RDD[(String, String)]) = {
    val tempRDD: RDD[(String, (String, UserVisitAction))] = filteredSessionAggrInfoRDD.join(sessionidActionRDD)
    tempRDD.map(item => (item._1,item._2._2))
  }


  //2. 随机均匀抽取样本
  def randRomExtractSession(spark: SparkSession,
                            taskUUID: String,
                            filteredSessionAggrInfoRDD: RDD[(String, String)],
                            sessionDetailRDD: RDD[(String, UserVisitAction)]) = {
    //第一步  求每天每小时的session
    val timeSessionRDD: RDD[(String, String)] = filteredSessionAggrInfoRDD.map {
      case (sessionid, aggrInfo) =>
        val startTime = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_START_TIME)
        println(startTime)
        //将开始时间的格式转换为yyyy-MM-dd_HH
        val dateHour = DateUtils.getDateHour(startTime)
        (dateHour, aggrInfo)
    }

    //计算每天每个小时的session数量,通过调用countByKey方法来统计
    val countMap: collection.Map[String, Long] = timeSessionRDD.countByKey()
    //转换格式,将countMap转换为Map【date,Map【Hour,Count】】
    //创建一个hashMap用来转换数据
    val dateHourCountMap = mutable.HashMap[String,mutable.HashMap[String,Long]]()
    //遍历上面的ccountMap,取出天和小时
    for ((dateHour,count) <- countMap){
      //yyyy-MM-dd_HH  将key拆分,出来天和小时
      val date = dateHour.split("_")(0)
      val hour = dateHour.split("_")(1)
      //从hashMap中取出date天,然后进行模式匹配,判断天
      dateHourCountMap.get(date) match {
        case None =>
          //当天数为none时,n新建一个hashMap
          dateHourCountMap(date) = new mutable.HashMap[String,Long]()
          dateHourCountMap(date) += (hour->count)
        case Some(hourCountMap)=>
          hourCountMap+=(hour->count)
      }
    }
    println(dateHourCountMap.mkString("\r\n"))
    //求一天需要抽取的数量,所有天数中的总的session数量就时hashMap的大小
    val extractNumberPerDay = 100 / dateHourCountMap.size
    //存放,每天每小时需要抽取的样本的索引值列表  格式为【天,【小时,索引列表】】
    val dateHourExtractMap = mutable.HashMap[String,mutable.HashMap[String,ListBuffer[Int]]]()

    //创建一个随机数
    val random = new Random()

    //根据每个小时应该抽取的数量,来随机产生索引值
    def hourExtractMapFunc(hourExtractMap: mutable.HashMap[String, ListBuffer[Int]],
                           hourCountMap: mutable.HashMap[String, Long],
                           sessionCount: Long) = {
      for ((hour,count) <- hourCountMap){
        //计算每天每小时需要抽取的session数量
        var hourExtractNumber:Int = ((count/sessionCount.toDouble) * extractNumberPerDay).round.toInt
        //判断需要抽取的数量和count的大小,如果需要的大于实际的,则就按照实际取
        if (hourExtractNumber>count){
          hourExtractNumber = count.toInt
        }
        hourExtractMap.get(hour) match {
          case None =>
            hourExtractMap(hour) = new mutable.ListBuffer[Int]()
            for (i <- 0 until hourExtractNumber){
              var extractIndex = random.nextInt(count.toInt)
              //索引值重复的情况,一旦索引值发生重复现象,则需要重新获取index
              while (hourExtractMap(hour).contains(extractIndex)){
                extractIndex = random.nextInt(count.toInt)
              }
              hourExtractMap(hour) += (extractIndex)
            }
          case Some(extractIndexList)=>
            for (i <- 0 until hourExtractNumber){
              var extractIndex = random.nextInt(count.toInt)
              while (hourExtractMap(hour).contains(extractIndex)){
                extractIndex = random.nextInt(count.toInt)
              }
              hourExtractMap(hour) += (extractIndex)
            }
        }
      }
    }
    for ((date,hourCountMap) <-dateHourCountMap){
      //计算出每天session的总数
      val sessionCount = hourCountMap.values.sum
      //依旧对索引值列表中的小时数进行匹配
      dateHourExtractMap.get(date) match {
        case None =>
          dateHourExtractMap(date) = new mutable.HashMap[String,mutable.ListBuffer[Int]]()
          //如果小时不存在,则进行更新到hashMap中
          hourExtractMapFunc(dateHourExtractMap(date),hourCountMap,sessionCount)
        case Some(hourExtractMap) =>
          hourExtractMapFunc(hourExtractMap,hourCountMap,sessionCount)
      }
    }
  }



  //计算时长、步长的各个范围的占比,并进行持久化
  def calculateAndPersistAggrState(spark: SparkSession, value: mutable.HashMap[String, Int], taskUUID: String) = {
    //获取到当前总的sessioncount,
    val session_count = value(Constants.SESSION_COUNT).toDouble

    val visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s,0).toDouble
    val visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s,0).toDouble
    val visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s,0).toDouble
    val visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s,0).toDouble
    val visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s,0).toDouble
    val visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m,0).toDouble
    val visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m,0).toDouble
    val visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m,0).toDouble
    val visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m,0).toDouble

    val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3,0).toDouble
    val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6,0).toDouble
    val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9,0).toDouble
    val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30,0).toDouble
    val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60,0).toDouble
    val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60,0).toDouble
    //时长各个范围占比
    val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s/session_count,2)
    val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s/session_count,2)
    val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s/session_count,2)
    val visit_length_10s_30s_ratio =  NumberUtils.formatDouble(visit_length_10s_30s/session_count,2)
    val visit_length_30s_60s_ratio =  NumberUtils.formatDouble(visit_length_30s_60s/session_count,2)
    val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m/session_count,2)
    val visit_length_3m_10m_ratio =NumberUtils.formatDouble(visit_length_3m_10m/session_count,2)
    val visit_length_10m_30m_ratio =NumberUtils.formatDouble(visit_length_10m_30m/session_count,2)
    val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m/session_count,2)
    步长各个范围占比
    val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3/session_count,2)
    val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6/session_count,2)
    val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9/session_count,2)
    val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30/session_count,2)
    val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60/session_count,2)
    val step_length_60_ratio = NumberUtils.formatDouble(step_length_60/session_count,2)

    //得到数据库中的字段
    val sessionAggrStat = SessionAggrState(taskUUID,
      session_count.toInt,
      visit_length_1s_3s_ratio,
      visit_length_4s_6s_ratio,
      visit_length_7s_9s_ratio,
      visit_length_10s_30s_ratio,
      visit_length_30s_60s_ratio,
      visit_length_1m_3m_ratio,
      visit_length_3m_10m_ratio,
      visit_length_10m_30m_ratio,
      visit_length_30m_ratio,
      step_length_1_3_ratio,
      step_length_4_6_ratio,
      step_length_7_9_ratio,
      step_length_10_30_ratio,
      step_length_30_60_ratio,
      step_length_60_ratio
    )
    import spark.implicits._
    //写入到mysql数据库中
    val sessionAggrStateRDD: RDD[SessionAggrState] = spark.sparkContext.makeRDD(Array(sessionAggrStat))
    //获取到配置jdbc的环境,连接数据库,保存数据
    sessionAggrStateRDD.toDF().write.format("jdbc")
      .option("url",ConfigurationManager.config.getString(Constants.JDBC_URL))
      .option("user",ConfigurationManager.config.getString(Constants.JDBC_USER))
      .option("password",ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
      .option("dbtable","session_aggr_stat")
      .mode(SaveMode.Append)
      .save()
  }

  //过滤累加个范围的统计值,时长和步长
  def filterSessionAndAggrState(sessionAggrInfoRDD: RDD[(String,String)], taskParm: JSONObject, sessionAggrStateAccumulator: SessionAggrStateAccumulator) = {
    //获取任务的查询条件配置
    //年龄开始时间和截至时间
    val startAge: String = ParamUtils.getParam(taskParm,Constants.PARAM_START_AGE)
    println(startAge)
    val endAge: String = ParamUtils.getParam(taskParm,Constants.PARAM_END_AGE)
    //条件:职业、城市、性别、密码、类别
    val professional = ParamUtils.getParam(taskParm,Constants.PARAM_PROFESSIONALS)
    val cites: String = ParamUtils.getParam(taskParm,Constants.PARAM_CITIES)
    val sex = ParamUtils.getParam(taskParm,Constants.PARAM_SEX)
    val keywords = ParamUtils.getParam(taskParm,Constants.PARAM_KEYWORDS)
    val categoryIds = ParamUtils.getParam(taskParm,Constants.PARAM_CATEGORY_IDS)
    //判断获取到的条件、拼接字符串
    var _parmeter=(
          (if (startAge !=null)Constants.PARAM_START_AGE + "=" + startAge + "|" else "") +
          (if (endAge != null)Constants.PARAM_END_AGE + "=" + endAge + "|" else "") +
          (if (professional != null) Constants.PARAM_PROFESSIONALS + "=" + professional + "|" else "") +
          (if (cites!=null)Constants.PARAM_CITIES + "=" + cites + "|" else "") +
          (if (sex!=null)Constants.PARAM_SEX + "=" + cites + "|" else "") +
          (if (keywords!=null)Constants.PARAM_KEYWORDS + "=" + cites + "|" else "") +
          (if (categoryIds!=null)Constants.PARAM_CATEGORY_IDS + "=" + cites + "|" else "")
      )
    //去掉末尾的|字符
    if (_parmeter.endsWith("|")){
      _parmeter = _parmeter.substring(0,_parmeter.length()-1)
    }
    //将字符串拼接好之后,执行过滤算子
    val filterSessionAggrInfoRDD: RDD[(String, String)] = sessionAggrInfoRDD.filter {
      case (sessionid, aggrInfo) =>
        //判断年龄、职位、性别、城市
        var success = true
        if (!ValidUtils.between(aggrInfo, Constants.FIELD_AGE, _parmeter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {
          success = false
        }
        if (!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL, _parmeter, Constants.PARAM_PROFESSIONALS)) {
          success = false
        }
        if (!ValidUtils.in(aggrInfo, Constants.FIELD_CITY, _parmeter, Constants.PARAM_CITIES)) {
          success = false
        }
        if (!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX, _parmeter, Constants.PARAM_SEX)) {
          success = false
        }
        if (!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS, _parmeter, Constants.PARAM_KEYWORDS)) {
          success = false
        }
        if (!ValidUtils.in(aggrInfo, Constants.FIELD_CATEGORY_ID, _parmeter, Constants.PARAM_CATEGORY_IDS)) {
          success = false
        }
        //保留的数据,累加计算
        if (success) {
          sessionAggrStateAccumulator.add(Constants.SESSION_COUNT)

          //计算访问时长的范围
          def calculateVisitLength(visitLength: Long) = {
            if (visitLength >= 1 && visitLength <= 3) {
              sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_1s_3s)
            } else if (visitLength >= 4 && visitLength <= 6) {
              sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_4s_6s)
            } else if (visitLength >= 7 && visitLength <= 9) {
              sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_7s_9s)
            } else if (visitLength >= 10 && visitLength <= 30) {
              sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_10s_30s)
            } else if (visitLength > 30 && visitLength <= 60) {
              sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_30s_60s)
            } else if (visitLength > 60 && visitLength <= 180) {
              sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_1m_3m)
            } else if (visitLength > 180 && visitLength <= 600) {
              sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_3m_10m)
            } else if (visitLength > 600 && visitLength <= 1800) {
              sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_10m_30m)
            } else if (visitLength > 1800) {
              sessionAggrStateAccumulator.add(Constants.TIME_PERIOD_30m)
            }
          }

          //使用字符串工具类获取到当前的用户时长
          val visitLength: Long = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLong
          //调用时长判断方法,判断其所在区间
          calculateVisitLength(visitLength)

          //计算用户的步长
          def calculateStepLength(stepLength: Long) = {
            //判断范围
            if (stepLength >= 1 && stepLength <= 3) {
              sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_1_3)
            } else if (stepLength >= 4 && stepLength <= 6) {
              sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_4_6)
            } else if (stepLength >= 7 && stepLength <= 9) {
              sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_7_9)
            } else if (stepLength >= 10 && stepLength <= 30) {
              sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_10_30)
            } else if (stepLength > 30 && stepLength <= 60) {
              sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_30_60)
            } else if (stepLength > 60) {
              sessionAggrStateAccumulator.add(Constants.STEP_PERIOD_60)
            }
          }

          //获取当前用户步长
          val stepLength: String = StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_STEP_LENGTH)
          //调用方法,判断范围
          calculateStepLength(stepLength.toLong)
        }
        success
    }
    filterSessionAggrInfoRDD
  }

  //查询数据,从user_visit_action
  def getActionRDDByDateRange(spark: SparkSession, taskParm: JSONObject) = {
    //获取开始时间
    val startDate: String = ParamUtils.getParam(taskParm,Constants.PARAM_START_DATE)
    //获取结束时间
    val endDate = ParamUtils.getParam(taskParm,Constants.PARAM_END_DATE)
    //导入spark的隐式转换
    import spark.implicits._
    //在表中查询,注意变量需要用单引号引入
    val SQLQuery = "select * from user_visit_action where date >= '"+startDate+"' and date <= '" +endDate+"'"
    //将查询出的数据转换成dataSet,再转换成RDD
    val ds: Dataset[UserVisitAction] = spark.sql(SQLQuery).as[UserVisitAction]
    ds.rdd
  }
  //聚合解析,分组、计算步长、时长等将其session行为数据聚合成partAggInfo(Sessionid | 查询词集合 |点击物品类别集合 | Session访问时长 | Session访问步长 | Session访问时间)
  def aggregateBySession(spark: SparkSession, sessionidActionRDD: RDD[(String, UserVisitAction)]) = {
    //对行为数据按照session粒度Sessionid 进行分组
     val sessionActionsRDD: RDD[(String, Iterable[UserVisitAction])] = sessionidActionRDD.groupByKey()
    //对每个session聚合操作
    val partAggInfoRDD = sessionActionsRDD.map {
      case (sessionid,userVisitActions)=>
        //定义buffer用来存储搜索word和行为的id
        val searchKeyWordsBuffer = new StringBuffer("")
        val clickCategoryIdsBuffer = new StringBuffer("")
        var userid = -1L
        //session的起始时间和结束时间
        var startTime:Date = null
        var endTime:Date = null
        //行为步长
        var stepLength = 0
        userVisitActions.foreach(uva=>{
          //判断用户名的空值
          if (userid == -1L){
            userid = uva.user_id
          }
          val searchKeyWord = uva.search_keyword
          val click_category_id:Long = uva.click_category_id
          if (StringUtils.isNotEmpty(searchKeyWord)){
            if (!searchKeyWordsBuffer.toString.contains(searchKeyWord)){
              clickCategoryIdsBuffer.append(click_category_id + ",")
            }
          }
          if (click_category_id !=null && click_category_id != -1L){
            if (!clickCategoryIdsBuffer.toString.contains(click_category_id.toString)){
              clickCategoryIdsBuffer.append(click_category_id + ",")
            }
          }
          //获取行为时间
          val actionTime:Date = DateUtils.parseTime(uva.action_time)
          //获取startTime
          if (startTime == null || actionTime.before(startTime)){
            startTime=actionTime
            println(startTime)
          }
          //获取结束时间
          if (endTime == null || actionTime.before(endTime)){
            endTime=actionTime
          }
          //行为步长
          stepLength += 1
        })//end userVisitActions froeach
        val searchKeyWords:String=StringUtils.trimComma(searchKeyWordsBuffer.toString)
        val clickCategoryIds:String = StringUtils.trimComma(clickCategoryIdsBuffer.toString)
        //计算时长(秒)
        val visitLength:Long=(endTime.getTime - startTime.getTime)/1000
        // 拼装字符串  k=v | k=v
        val partAggrInfo:String= Constants.FIELD_SESSION_ID + "=" + sessionid + "|" +
          Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeyWords + "|" +
          Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|" +
          Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" +
          Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +
          Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime)
        (userid,partAggrInfo)
    }
    //println(partAggInfoRDD.collect().mkString("\r\n"))
    //TODO 和userInfo join操作
    //查询用户信息表,然后和用户行为表做join操作
    import spark.implicits._
    //从表中查询出数据,然后将其转换成ds
    val ds: Dataset[UserInfo] = spark.sql("select * from user_info").as[UserInfo]
    val userInfoRDD: RDD[UserInfo] = ds.rdd
    //将userInfoRDD转换为元组(userid,user)
    val useridUserInfoRDD: RDD[(Long, UserInfo)] = userInfoRDD.map(user => (user.user_id,user))
    //useridUserInfo   join partAggrInfoRDD
    val userIdFullInfoRDD = partAggInfoRDD.join(useridUserInfoRDD)
    //按照用户id join 在一起得数据,对这些数据进行聚合成一个RDD【String,String】
    val sessionFullAggrInfoRDD: RDD[(String, String)] = userIdFullInfoRDD.map {
      //使用模式匹配,对数据进行拼接成需要得类型userId,(partAggrInfo,userInfo)
      case (userId, (partAggrInfo, userInfo)) => {
        //使用StringUtils工具类来获取到partAggrInfo中的sessionid
        val sessionid: String = StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.FIELD_SESSION_ID)
        //拼接业务流程中需要得字符串
        val fullAggrInfo: String = partAggrInfo + "|" +
          Constants.FIELD_AGE + "=" + userInfo.age + "|" +
          Constants.FIELD_PROFESSIONAL + "=" + userInfo.professional + "|" +
          Constants.FIELD_CITY + "=" + userInfo.city + "|" +
          Constants.FIELD_SEX + "=" + userInfo.sex
        (sessionid, fullAggrInfo)
      }
    }
    sessionFullAggrInfoRDD
  }
}
DataModel数据模型样例类
package com.ityouxin.session

object DataModel {
  /**
    * Session随机抽取表
    *
    * @param taskid             当前计算批次的ID
    * @param sessionid          抽取的Session的ID
    * @param startTime          Session的开始时间
    * @param searchKeywords     Session的查询字段
    * @param clickCategoryIds   Session点击的类别id集合
    */
  case class SessionRandomExtract(taskid:String,
                                  sessionid:String,
                                  startTime:String,
                                  searchKeywords:String,
                                  clickCategoryIds:String)

  /**
    * Session随机抽取详细表
    *
    * @param taskid            当前计算批次的ID
    * @param userid            用户的ID
    * @param sessionid         Session的ID
    * @param pageid            某个页面的ID
    * @param actionTime        点击行为的时间点
    * @param searchKeyword     用户搜索的关键词
    * @param clickCategoryId   某一个商品品类的ID
    * @param clickProductId    某一个商品的ID
    * @param orderCategoryIds  一次订单中所有品类的ID集合
    * @param orderProductIds   一次订单中所有商品的ID集合
    * @param payCategoryIds    一次支付中所有品类的ID集合
    * @param payProductIds     一次支付中所有商品的ID集合
    **/
  case class SessionDetail(taskid:String,
                           userid:Long,
                           sessionid:String,
                           pageid:Long,
                           actionTime:String,
                           searchKeyword:String,
                           clickCategoryId:Long,
                           clickProductId:Long,
                           orderCategoryIds:String,
                           orderProductIds:String,
                           payCategoryIds:String,
                           payProductIds:String)
  /**
    * 品类Top10表
    * @param taskid
    * @param categoryid
    * @param clickCount
    * @param orderCount
    * @param payCount
    */
  case class Top10Category(taskid:String,
                           categoryid:Long,
                           clickCount:Long,
                           orderCount:Long,
                           payCount:Long)


  case class Top10Session(taskid:String,
                          categoryid:Long,
                          sessionid:String,
                          count:Long)
  case class SessionAggrState(taskid:String,
                             session_count:Long,
                             visit_length_1s_3s_ratio:Double,
                             visit_length_4s_6s_ratio:Double,
                             visit_length_7s_9s_ratio:Double,
                             visit_length_10s_30s_ratio:Double,
                             visit_length_30s_60s_ratio:Double,
                             visit_length_1m_3m_ratio:Double,
                             visit_length_3m_10m_ratio:Double,
                             visit_length_10m_30m_ratio:Double,
                             visit_length_30m_ratio:Double,
                             step_length_1_3_ratio:Double,
                             step_length_4_6_ratio:Double,
                             step_length_7_9_ratio:Double,
                             step_length_10_30_ratio:Double,
                             step_length_30_60_ratio:Double,
                             step_length_60_ratio:Double
                            )
}
SessionAffrStateAccumulator  累加器类
package com.ityouxin.session

import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

class SessionAggrStateAccumulator extends AccumulatorV2[String,mutable.HashMap[String,Int]]{
  private val aggrStateMap = mutable.HashMap[String,Int]()
  override def isZero: Boolean = {
    aggrStateMap.isEmpty
  }

  override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {
    val newAcc = new SessionAggrStateAccumulator
    aggrStateMap.synchronized{
      newAcc.aggrStateMap ++= this.aggrStateMap
    }
    newAcc
  }

  override def reset(): Unit = {
    aggrStateMap.clear()
  }
//分区内的累加
  override def add(v: String): Unit = {
    if (!aggrStateMap.contains(v)){
      aggrStateMap += (v -> 1)
    }else{
      aggrStateMap.update(v,aggrStateMap(v) + 1)
    }
  }
  override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
    //将两个map类型的数据进行合并
    other match {
      case acc:SessionAggrStateAccumulator =>{
        (this.aggrStateMap /: acc.aggrStateMap){
          case (map,(k,v))=>{
            map +=(k->(v + map.getOrElse(k,0)))
          }
        }
      }
    }
  }

  override def value: mutable.HashMap[String, Int] = {
    this.aggrStateMap
  }
}

PageSplitCovertRate  页面转换率样例类
package com.ityouxin.session

case class PageSplitConvertRate(taskid:String ,convertRate: String)

val newAcc = new SessionAggrStateAccumulator
aggrStateMap.synchronized{
  newAcc.aggrStateMap ++= this.aggrStateMap
}
newAcc

}

override def reset(): Unit = {
aggrStateMap.clear()
}
//分区内的累加
override def add(v: String): Unit = {
if (!aggrStateMap.contains(v)){
aggrStateMap += (v -> 1)
}else{
aggrStateMap.update(v,aggrStateMap(v) + 1)
}
}
override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
//将两个map类型的数据进行合并
other match {
case acc:SessionAggrStateAccumulator =>{
(this.aggrStateMap /: acc.aggrStateMap){
case (map,(k,v))=>{
map +=(k->(v + map.getOrElse(k,0)))
}
}
}
}
}

override def value: mutable.HashMap[String, Int] = {
this.aggrStateMap
}
}


```scala
PageSplitCovertRate  页面转换率样例类
package com.ityouxin.session

case class PageSplitConvertRate(taskid:String ,convertRate: String)


版权声明:本文为weixin_38255444原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。