一种基于布隆过滤器的大表计算优化方法

问题背景

在大数据行业内,尤其是数仓建设中,一直有一个绕不开的难题,就是大表的分析计算(这里的大表指亿级以上)。特别是大表之间的 Join 分析,对任何公司数据部门都是一个挑战!

主要有以下挑战:

  • 由于数据量大,分析计算时会耗费更多 CPU、内存和 IO,占用大量的集群资源。
  • 由于数据量大,分析计算过程缓慢,挤占其它任务资源使用,从而影响数仓整体任务产出时间。
  • 由于数据量大,长时间占用资源,会造成该任务在时间、资源和财务各方面成本巨大。

当前业内流行的优化方案

1.增加集群资源

优点:简单粗暴,对业务和数据开发人员友好,不用调整。

缺点:费钱,看你公司是否有钱。

2.采用增量计算

优点:可以在不大幅增加计算集群成本的情况下,完成日常计算任务。

缺点:对数据和业务都有一定要求,数据一般要求是日志类数据。或者具有一定的生命周期数据(历史数据可归档)。

问题场景和 Spark 算法分析

Spark 经典算法 SortMergeJoin(以大表间的 Join 分析为例)。

  • 对两张表分别进行 Shuffle 重分区,之后将相同Key的记录分到对应分区,每个分区内的数据在 Join 之前都要进行排序,这一步对应 Exchange 节点和 Sort 节点。也就是 Spark 的 Sort Merge Shuffle 过程。
  • 遍历流式表,对每条记录都采用顺序查找的方式从查找表中搜索,每遇到一条相同的 Key 就进行 Join 关联。每次处理完一条记录,只需从上一次结束的位置开始继续查找。

该算法也可以简化流程为: Map 一> Shuffle 一> Sort 一> Merge 一> Reduce

一种基于布隆过滤器的大表计算优化方法

该算法的性能瓶颈主要在 Sort Merge Shuffle 阶段(红色流程部分),数据量越大,资源要求越高,性能越低。

大表问题思考

大数据计算优化思路,核心无非就三条:增加计算资源;减少被计算数据量;优化计算算法。其中前两条是我们普通人最常用的方法。

两个大表的 Join ,是不是真的每天都有大量的数据有变更呢?如果是的话,那我们的业务就应该思考一下是否合理了。

其实在我们的日常实践场景中,大部分是两个表里面的数据每天只有少量(十万百万至千万级)数据随机变化,大部分数据是不变的。

说到这里,很多人的第一想法是,我们增加分区,按数据是否有变化进行区分,计算有变化的(今日有更新的业务数据),合并未变化的(昨日计算完成的历史数据),不就可以解决问题了。其实这个想法存在以下问题:

  • 由于每个表的数据是随机变化的,那就存在,第一个表中变化的数据在第二个表中是未变的,反之亦然(见图片示例)。并且可能后续计算还有第三个表、第四个表等等呢?这种分区是难以构建的。
  • 变化的数据如果是百万至千万级,那这里也是一个较大规模的数据量了,既要关联计算变化的,也要关联计算未变化的,这里的计算成本也很大。

一种基于布隆过滤器的大表计算优化方法图片

问题读到这里,如果我们分别把表 A、表 B 的有变化记录的关联主键取出来合并在一起,形成一个数组变量。计算的时候用这个变量分别从表 A 和表 B 中过滤出有变化的数据进行计算,并从未变化的表(昨日计算完成的历史数据)中过滤出不存在的(即未变化历史结果数据)。这样两份数据简单合并到一起,不就是表 A 和表 B 全量 Join 计算的结果了吗!

那什么样的数组可以轻易的存下这百万千万级的数据量呢?我们第一个想到的答案: 布隆过滤器!

使用布隆过滤器的优化方案

  1. 构建布隆过滤器:分别读取表 A 和表 B 中有变化的数据的关联主键。
  2. 使用布隆过滤器:分别过滤表 A 和表 B 中的数据(即关联主键命中布隆过滤器),然后进行 join 分析。
  3. 使用布隆过滤器:从未变化的表(昨日计算完成的历史数据)中过滤出数据(即没有命中布隆过滤器)。
  4. 合并 2、 3 步骤的数据结果。

也许这里有人会有疑惑,不是说布隆过滤器是命中并不代表一定存在,不命中才代表一定不存在!其实这个命中不代表一定存在,是一个极少量概率问题,即极少量没有更新的数据也会命中布隆过滤器,从而参与了接下来的数据计算,实际上只要所有变化的数据能命中即可。这个不影响它已经帮我买过滤了绝大部分不需要计算的数据。

回看我们的 Spark 经典算法 SortMergeJoin,我们可以看出,该方案是在 Map 阶段就过滤了数据,大大减少了数据量的,提升了计算效率,减少了计算资源使用!

Spark 函数 Java 代码实现

大家可以根据需要参考、修改和优化,有更好的实现方式欢迎大家分享交流。

程序流程图

一种基于布隆过滤器的大表计算优化方法图片

Spark 函数 Java 代码实现。

package org.example;

import org.apache.curator.shaded.com.google.common.hash.BloomFilter;
import org.apache.curator.shaded.com.google.common.hash.Funnels;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.api.java.*;
import org.apache.spark.SparkConf;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.util.RamUsageEstimator;

/**
 * add by chengwansheng
 */
class MyBloomFilter {
    private BloomFilter bloomFilter;

    public MyBloomFilter(BloomFilter b) {
        bloomFilter = b;
    }

    public BloomFilter getBloomFilter() {
        return bloomFilter;
    }
}

public class BloomUdf implements UDF2<Object, String, Boolean> {
    //最大记录限制,安全起见
    private static int maxSize = 50000000;

    //布隆过滤器是否开启配置, 1 开启,0 关闭
    private static int udfBloomFilterEnable;

    //布隆过滤器是否开启参数,默认开启
    private static String bloomFilterConfKey = "spark.myudf.bloom.enable";

    //加配置配置参数,目前不起作用?? 
    static {
        SparkConf sparkConf = new SparkConf();
        udfBloomFilterEnable = sparkConf.getInt(bloomFilterConfKey, 1);
        System.out.println("the spark.myudf.bloom.enable value " + udfBloomFilterEnable);
    }

    //布隆过滤器列表,支持多个布隆过滤器
    private static ConcurrentHashMap<String, MyBloomFilter> bloomFilterMap = new ConcurrentHashMap<>();

    /**
     * 布隆过滤器核心构建方法
     * 通过读取表的 hdfs 文件信息,构建布隆过滤器
     * 一个 jvm 只加载一次
     * @param key 
     * @param path 
     * @throws IOException 
     */
    private synchronized static void buildBloomFilter(String key, String path) throws IOException {
        if (!bloomFilterMap.containsKey(key)) {
            BloomFilter bloomFilter;
            Configuration cnotallow=new Configuration();
            FileSystem hdfs=FileSystem.get(conf);
            Path pathDf=new Path(path);
            FileStatus[] stats=hdfs.listStatus(pathDf);

            //获取记录总数
            long sum = 0;
            for (int i=0; i<stats.length; i++){
                InputStream inputStream=hdfs.open(stats[i].getPath());
                InputStreamReader inputStreamReader= new InputStreamReader(inputStream);
                BufferedReader reader=new BufferedReader(inputStreamReader);
                sum = sum + reader.lines().count();
            }

            if(sum > maxSize) {
                //如果数据量大于期望值,则将布隆过滤器置空(即布隆过滤器不起作用)
                System.out.println("the max number is " + maxSize + ", but target num is too big, the " + key + " bloom will be invalid");
                bloomFilter = null;
            } else {
                //默认 1000 W,超过取样本数据 2 倍的量。这里取 2 倍是为了提高布隆过滤器的效果, 2 倍是一个比较合适的值
                long exceptSize = sum*2>10000000?sum*2:10000000;
                bloomFilter = BloomFilter.create(Funnels.stringFunnel(StandardCharsets.UTF_8), (int) exceptSize);
                for (int i=0; i<stats.length; i++){
                    //打印每个文件路径
                    System.out.println(stats[i].getPath().toString());
                    //读取每个文件
                    InputStream inputStream=hdfs.open(stats[i].getPath());
                    InputStreamReader inputStreamReader= new InputStreamReader(inputStream);
                    BufferedReader reader=new BufferedReader(inputStreamReader);
                    String line="";
                    while((line=reader.readLine())!=null){
                        bloomFilter.put(line);
                    }
                }
            }

            MyBloomFilter myBloomFilter = new MyBloomFilter(bloomFilter);
            bloomFilterMap.put(key, myBloomFilter);
            System.out.println("the bloom " + key + " size is " + RamUsageEstimator.humanSizeOf(bloomFilter) + " num " + sum);
        }
    }

    /**
     * 核心调用方法
     * 参数 s :被过滤的参数
     * 参数 key :需要构建的布隆过滤器,此处是库名 + 表名称,即 db_name.table_name 
     * @param s 
     * @param key 
     * @return 
     * @throws Exception  
     */
    @Override
    public Boolean call(Object s, String key) throws Exception {
        //如果 spark.myudf.bloom.enable 参数配置为 0 ,则布隆过滤器失效,直接返回 true 
        if (udfBloomFilterEnable == 0) {
            return true;
        }

        if (!bloomFilterMap.containsKey(key)) {
            String[] table_array = key.split("\.");
            if (table_array.length != 2) {
                String msg = "the key is invalid: " + key + ", must like db_name.table_name";
                System.out.println(msg);
                throw new IOException(msg);
            }
            String dbName = table_array[0];
            String tableName = table_array[1];
            String path = "/hive/" + dbName + ".db/" + tableName;
            System.out.println(path);
            //构建布隆过滤器
            buildBloomFilter(key, path);
        }

        if (!bloomFilterMap.containsKey(key)) {
            String msg = "not found bloom filter " + key;
            System.out.println(msg);
            throw new IOException(msg);
        }

        BloomFilter bloomFilter = bloomFilterMap.get(key).getBloomFilter();
        if (bloomFilter == null) {
            //如果数据量大于期望值,则直接返回真,即布隆过滤器不起作用
            return true;
        } else {
            return bloomFilter.mightContain(String.valueOf(s));
        }
    }
}

使用示例演示

表信息和数据准备。

--建表数据
create table default.A (
    item_id bigint comment '商品ID',
    item_name string comment '商品名称',
    item_price bigint comment '商品价格',
    create_time timestamp comment '创建时间',
    update_time timestamp comment '创建时间'
)

create table default.B (
    item_id bigint comment '商品ID',
    sku_id bigint comment 'skuID',
    sku_price bigint comment '商品价格',
    create_time timestamp comment '创建时间',
    update_time timestamp comment '创建时间'
)

create table default.ot (
    item_id bigint comment '商品ID',
    sku_id bigint comment 'skuID',
    sku_price bigint comment '商品价格',
    item_price bigint comment '商品价格'
) PARTITIONED BY (pt string COMMENT '分区字段') 

--准备数据
insert overwrite table default.A 
values
(1,'测试1',101,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(2,'测试2',102,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(3,'测试2',103,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(4,'测试2',104,'2023-03-25 08:00:00','2023-04-22 08:00:00'),
(5,'测试2',105,'2023-03-25 08:00:00','2023-04-22 08:00:00');

insert overwrite table default.B 
values 
(1,11,201,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(1,12,202,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(1,13,203,'2023-04-22 08:00:00','2023-04-22 08:00:00'),
(2,21,211,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(2,22,212,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(4,42,212,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(5,51,251,'2023-03-25 08:00:00','2023-03-25 08:00:00'),
(5,52,252,'2023-04-22 08:00:00','2023-04-22 08:00:00'),
(5,53,253,'2023-04-22 08:00:00','2023-04-22 08:00:00');

insert overwrite table default.ot partition(pt='20230421')
values 
(1,11,201,101),
(1,12,202,101),
(2,21,211,102),
(2,22,212,102),
(4,42,212,114),
(5,51,251,110);

原来处理的 SQL 语句。

insert overwrite table default.ot partition(pt='20230422')
select B.item_id 
,B.sku_id 
,B.sku_price 
,A.item_price
from B 
left join A on(A.item_id=B.item_id)

使用布隆过滤器的 SQL(Java 函数导入 Spark,函数名为 “bloom_filter”)。

--构建布隆过滤器
drop table if exists tmp.tmp_primary_key;
create table tmp.tmp_primary_key stored as TEXTFILE as 
select item_id
from (
    select item_id
    from default.A 
    where update_time>='2023-04-22'
    union all 
    select item_id
    from default.B 
    where update_time>='2023-04-22'
) where length(item_id)>0
group by item_id;

--增量数据计算
insert overwrite table default.ot partition(pt='20230422')
select B.item_id 
,B.sku_id 
,B.sku_price 
,A.item_price
from default.B 
left join default.A on(A.item_id=B.item_id and bloom_filter(A.item_id, "tmp.tmp_primary_key"))
where bloom_filter(B.item_id, "tmp.tmp_primary_key")
union all 
--合并历史未变更数据
select item_id
,sku_id
,sku_price
,item_price
from default.ot
where not bloom_filter(item_id, "tmp.tmp_primary_key")
and pt='20230421'

从上面代码可以看出,使用布隆过滤器的 SQL,核心业务逻辑代码只是在原来全量计算的逻辑中增加了过滤条件而已,使用起来还是比较方便的。

实测效果

以我司的 “dim.dim_itm_sku_info_detail_d” 和 “dim.dim_itm_info_detail_d” 任务为例,使用引擎 Spark2。

一种基于布隆过滤器的大表计算优化方法图片

总结

从理论分析和实测效果来看,使用布隆过滤器的解决方案可以大幅提升任务的性能,并减少集群资源的使用。

该方案不仅适用大表间 Join 分析计算,也适用大表相关的其它分析计算需求,核心思想就是计算有必要的数据,排除没必要数据,减小无效的计算损耗。

文章版权声明

 1 原创文章作者:水依家,如若转载,请注明出处: https://www.52hwl.com/32014.html

 2 温馨提示:软件侵权请联系469472785#qq.com(三天内删除相关链接)资源失效请留言反馈

 3 下载提示:如遇蓝奏云无法访问,请修改lanzous(把s修改成x)

 免责声明:本站为个人博客,所有软件信息均来自网络 修改版软件,加群广告提示为修改者自留,非本站信息,注意鉴别

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023年7月14日 上午12:00
下一篇 2023年7月15日