操作手册2:建数仓,从ODS到DWD层——日志的清洗、转换、集成

需求:

用spark来进行数据ETL:

  • 清洗:

    • 去除json数据中的废弃字段
    • 过滤json格式不正确的脏数据
    • 过滤日志中的account及deviceid全为空的记录
    • 过滤日志中缺少关键字段(properties/eventid/sessionid缺一不可)的记录
    • 过滤日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达)
    • 对于web端日志,过滤爬虫请求数据(通过useragent标识来分析)
  • 转换:

    • 将json打平,解析成扁平结构,主要是里面json的事件日志
    • session分割
      • 对于web端日志,按天然session分割,不需要处理
      • 对app日志,由于使用了登录保持技术,导致app进入后台很长时间后,再恢复前台,依然是同一个session,不符合session分析定义,需要按事件间隔切割(业内通用:30分钟)
      • 对于wx小程序日志,与app类似,session有效期很长,需要按事件间隔时间切割
    • 数据规范处理
      • boolean字段,在数据中有使用1/0/-1标识的,也有使用true/false表示的,统一为Y/N/U
      • 字符串类型字段,在数据中有空串,有null值,统一为null值(这个很重要,会影响计算)
      • 日期格式统一:2020/9/2  2020-9-2 20200902等都统一变成YYYY-MM-dd
  • 集成:

    • gps坐标解析为省、市 、县信息,方便后续的地域维度分析
    • 若gps找不到的地域信息,使用ip进行解析
    • id_mapping:为每个用户生成一个全局唯一标识(给匿名访问,绑定到一个id上,漏斗、留存、session)

 

操作实现:

1、构建 :一个父工程、两个模块:datahouse数仓系统、usertag用户画像

父工程maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>pub.ryan</groupId>
    <artifactId>tm_sensys</artifactId>
    <packaging>pom</packaging>
    <version>1.01</version>
    <modules>
        <module>datahouse</module>
        <module>usertag</module>
    </modules>

    <properties>
        <spark.version>3.0.1</spark.version>
        <lang3.version>3.10</lang3.version>
        <fastjson.version>1.2.68</fastjson.version>
    </properties>

    <!-- 父工程中引入的依赖,所有子模块都会继承-->
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.12</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.10</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
        </dependency>
    </dependencies>

    <!--依赖管理,并不会真正引入依赖,而是约束子模块中对这个依赖的引用-->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.12</artifactId>
                <version>${spark.version}</version>
                <!--<exclusions>
                    <exclusion>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-client</artifactId>
                    </exclusion>
                </exclusions>-->
            </dependency>
        </dependencies>
    </dependencyManagement>


    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <layout>default</layout>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>ali-plugin</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
        </pluginRepository>
    </pluginRepositories>

    <build>
        <plugins>
            <!-- 指定编译java的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- 指定编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!--  把依赖jar中的用到的类,提取到自己的jar中 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>pub.ryan.dw.etl.DeviceIdAccountBind</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <!--下面是为了使用 mvn package命令,如果不加则使用mvn assembly-->
                <executions>
                    <execution>
                        <id>make-assemble</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

datahouse数仓pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>tm_sensys</artifactId>
        <groupId>pub.ryan</groupId>
        <version>1.01</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>datahouse</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.openx.data</groupId>
            <artifactId>json-serde</artifactId>
            <version>1.3.8</version>
        </dependency>
        <dependency>
            <groupId>ch.hsr</groupId>
            <artifactId>geohash</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.lionsoul</groupId>
            <artifactId>ip2region</artifactId>
            <version>1.7.2</version>
        </dependency>
    </dependencies>
</project>

测试:

 

 


版权声明:本文为qq_36269641原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。