数据倾斜:row_number over优化(top数据倾斜)

场景

求每个域名的top url,存在某几个域名的url数量过多(亿级别),导致整个任务的row_number() over 排序阶段出现长尾。

优化思路

row_number over 归根到底就是在域名分组的前提下进行全局排序,那么对该场景的问题进行抽象就是 全局排序如何优化。
如果能在全局排序前先做一次局部排序,筛选出符合候选条件的候选项,再进行全局排序,将能有效的避免长尾。这让我想到了一个场景就是比赛机制,筛选出金银铜牌,从本质上是类似的,所以看看比赛的机制下,是如何做的,emm,其实就是分组,只是每个人作为独立的个体,只能分配到一个小组里。那么top url场景中,url就是比赛的最细粒度的个体,只需要先分组候选出top3,之后再全局top3,就能解决此问题。
所以重点就是:对每个url生成一个group_id, 如1000以内的随机数,让其分配到一个组中去,局部row_number 后,筛选出候选项,再全局row_number一次。这个场景也适合全局sort 求 top场景。

示例代码

对所有域名做了统一处理,也可以挑出热点key, 单独处理

DROP TABLE IF EXISTS ti.tmp_topurl_${date}_${hour};
DROP TABLE IF EXISTS ti.tmp_rank_${date}_${hour};

--Map 端部分聚合,相当于Combiner
SET hive.map.aggr = TRUE;
--有数据倾斜的时候进行负载均衡
SET hive.groupby.skewindata = TRUE;

-- 查询:日期、域名、URL、bytes_sent
-- 域名、URL分组聚合
CREATE  TABLE ti.tmp_topurl_${date}_${hour} AS
SELECT  `datetime`,
        host,
        uri,
        pmod(abs(hash(uri)), 1000) AS group_id, -- 分组使用,稳定分组,随机分组效果是一样的
        count(1) AS request,
        SUM(bytes_bigint) AS bytes
FROM    (
            SELECT  FROM_UNIXTIME(UNIX_TIMESTAMP(ds_req_ts, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd HH') AS `datetime`,
                    ds_http_domain AS host,
                    ds_http_uri_rewrite AS uri,
                    CAST(ds_http_resp_size AS BIGINT) AS bytes_bigint
            FROM    ti.data_table
            WHERE   `pt_d` = "${date}"
            AND     `pt_h` = "${hour}"
        ) AS t
GROUP BY
        `datetime`,
        host,
        uri;

-- 分组域名-请求数Rank
CREATE  TABLE ti.tmp_rank_${date}_${hour} AS
SELECT  *
FROM    (
            SELECT  UNIX_TIMESTAMP(`datetime`, 'yyyy-MM-dd HH') AS `ts`,
                    host,
                    uri,
                    request,
                    bytes,
                    row_number() OVER(
                        PARTITION BY
                                `datetime`,
                                host
                        ORDER BY
                                request DESC
                    ) AS host_request_rank,
                    row_number() OVER(
                        PARTITION BY
                                `datetime`,
                                host
                        ORDER BY
                                bytes DESC
                    ) AS host_bytes_rank
            FROM    (
                        -- 全局rank 1000 一定在局部rank 1000
                        SELECT  `datetime`,
                                host,
                                uri,
                                request,
                                bytes,
                                row_number() OVER(
                                    PARTITION BY
                                            `datetime`,
                                            host,
                                            group_id
                                    ORDER BY
                                            request DESC
                                ) AS request_rank,
                                row_number() OVER(
                                    PARTITION BY
                                            `datetime`,
                                            host,
                                            group_id
                                    ORDER BY
                                            bytes DESC
                                ) AS bytes_rank
                        FROM    ti.tmp_topurl_${date}_${hour}
                    ) it1
            WHERE   it1.request_rank <= 1000
            OR      it1.bytes_rank <= 1000
        ) AS t1
WHERE   t1.host_request_rank <= 1000
OR      t1.host_bytes_rank <= 1000;

版权声明:本文为u013668852原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。