成本与效率:作业帮数据治理全方位解析
在数字化时代,数据指标已成为企业最重要的指南针。有效的数据治理对于确保“数据指南针”持续稳定工作至关重要。
本文整理自作业帮大数据资深研发工程师贺祺在 QCon 2024 北京 的分享 “成本与效率:作业帮数据治理全方位解析”。本次演讲围绕提升数据指标的 ROI 以 及易用性展开,首先介绍了作业帮数据治理的背景和框架,然后从数据使用成本和数据使用效率的角度详细说明每一个模块的治理方案,最后总结并阐述了对未来数据治理的展望。
会上我们也设置了【下一代 Data for AI 技术架构】专题,将从 LLM、智能 Agent、RAG 等不同的热点领域方向,来探讨新一代 Data 技术的突破方向与思路。了解更多内容,可访问大会官网:https://qcon.infoq.cn/2024/shanghai/track
左边是整体的数据流转图:流量日志和业务 binlog 是主要数据源。中台数仓主要负责各数据源的集成以及公共层数据的构建,业务数仓在中台数仓构建的数据基础上,继续构建各自业务的数据集市,中台数仓也会基于中间层继续构建相关数据主题域的上层应用。按照时效性,数仓会提供实时、小时、离线级别数据,虽然整体是 lamda 架构,但是它们之间还会有些依赖和互补,这个后边会讲到。数据服务侧,除了定制化的核心数据和实时大盘看板数据,也支持用户自助配置报表和自己写 sql 做灵活数据分析。
接下来看一下在数据治理上面临的问题:随着业务的发展,我们发现数据量和产品线数量都在急剧膨胀。这就带来了两方面的问题:
一个是资源利用率低,这里的资源就是计算和存储资源。计算资源不足一方面是由于整体数据量激增,另一方面是因为我们自己构造的加工链路过于复杂。存储资源浪费主要表现在缺乏体系化数据治理,每次都是被动的、手动的运动式治理,导致失效、无用数据长期占用资源。
另一个问题是用户的用数成本很高。主要表现在数不好找——没有寻数工具,只能靠查 wiki 或问人,找数效率低;数不好用——就算是找到了数,数据也不能直接拿来用,需要自己写 SQL 获取,遇到同名不同义、同义不同名的指标还需要辨别应该用哪个;最后就是由于缺少质量信息和血缘信息,用户看不到数据就绪时间、怎么产生的、谁产生和谁负责的,数用起来心里没底,导致就算知道用哪个数也不敢用。
针对以上问题,我们的数据治理方案是:以数据 ROI 为抓手,降低数据生产成本,提升数据使用效能。这里数据 ROI 等于数据效能价值除以数据生产成本。降低成本主要就是计算成本和存储成本的优化,提升数据使用效能主要是通过建设数据治理和开发工具平台来达到目的。最终我们数据治理的目标就是,在成本可控的前提下,保障数据使用效能持续稳定输出。
在优化计算资源之前,我们会先按每份数据最终产出指标的重要程度,将数据分为核心数据、高优数据和普通数据,不同的分层对应不同的治理策略。
核心数据一般指的是影响经营决策数据,用户基本上是经营决策者,这类数据要求就绪时间比较早,一般要求至少在上班前能看到数,我们会给这类数据分配高优计算队列,并且会对它们的计算链路做定向优化。
高优数据和普通数据的使用者一般都是运营和数据分析人员,他们的区别是前者的时效性要求比后者更高一些,所以前者产出数据的任务会被安排优先调度。高优数据和普通数据的任务量众多,开发人员不可能对每个任务做专项优化,在集群资源达到瓶颈的时候,这部分数据就绪时间也没法得到保障,我们采取的治理手段就是进行整体技术栈迭代升级,从而释放宝贵的计算资源,让这些数据的就绪时间可以大幅提前。
对于核心数据链路上的任务,除了分配高优队列,还会额外构建几条计算链路以保障数据稳定产出。因为很多情况下核心数据和普通数据在生产过程中存在计算资源和基础组件共用的情况,普通数据生产中的异常情况会容易传导到核心数据的生产过程,核心数据生产时效性难以保证。
这里拿流量类的活跃用户计算链路举例,因为流量数据计算资源消耗较大,构造逻辑也稍复杂,可以说明一些问题。
右边是活跃用户相关指标构建链路图,原始日志按小时粒度采集到数仓,数仓基于小时粒度数据表构建天级数仓表。然后基于天级数仓表会分出两条链路,一条用于构造核心维度指标(链路①),比如 DAU、激活、留存等,这条链路会在高优队列中跑。另一条按星型模型,关联上用户画像维度,比如年级、地域这些,用于后续多维分析使用,这条链路由于涉及一些用户画像挖掘算法且维度组合较多,会消耗大量的计算资源,不能占用核心任务队列资源,所以不能保障就绪时间。
两条链路都在 hive 中完成预聚合,算好各自维度组合结果数据,然后将结果数据直接灌入 OLAP 计算引擎。
考虑到计算链路较长,想到的可以采取的优化措施就是缩短链路。可以直接将采集流连接到 OLAP 引擎,并在 OLAP 引擎中使用聚合模型获取计算结果。这种优化方式使报表能够以最快的速度获取数据。
那是不是就能只用最短链路就行了?答案是不行,原来的链路还是要保留,因为下游还是需要用这些数仓表,并且实时链路由于字符串 hash 冲突,无法做到精确去重,会有 2% 左右的误差,看板数据和数仓表数据不一致会对依赖数仓表做分析的人造成困扰。但是这条链路可以在离线链路出问题时,充当灾备使用,这样可以让经营管理者至少能看到一个大数和趋势,不影响关键决策。
最后考虑是否可以两者结合,既减少对离线系统的依赖,又能产出准确的结果?答案是可以的。
看下具体怎么做:
链路①就是刚说过的常规的离线数仓构建思路,这条链路的优点是构建的中间数仓可以被其他业务数仓或数据分析人员使用,缺点就是链路长,没有充分利用上 OLAP 计算引擎,只把 OLAP 当 KV 查询引擎用。
链路②就是综合了离线、实时链路构造的一条保障链路:将小时级明细数据关联全局字典灌入 OLAP 引擎,在计算引擎中做聚合。这条链路的优点是构建链路短,能够做到精确去重,缺点是要额外构建一个全局字典,会有额外的开发成本和计算成本,并且还是会依赖一部分离线数据。
链路③复用实时数据,并不额外占用资源,缺点就是刚刚提到的无法精确去重,优点是可以和离线完全隔离,在离线链路完全不可用的情况下充当灾备。
构造好这 3 条链路之后,就可以做到数据准确性和及时性兼顾了,然后在这 3 条链路之上构建视图,屏蔽应用侧选择链路的逻辑(优先级①>②>③)。最后还要对这三条链路建立数据质量监控,让他们之间做交叉验证,保障数据一致性。
完成链路治理后,可以看到活跃类核心指标执行时长由原 160 分钟左右,缩短至 35 分钟左右,节省超 2 个小时执行时长;留存类核心指标执行时长由原 600 分钟左右,缩短至 60 分钟左右,节省超 9 个小时执行时长;其中某日由于集群资源占满导致的数仓表大规模延迟的情况下,保障链路和灾备链路都发挥了作用,很好的保障了 SLA。
以上是核心数据的优化策略。接下来说一下非核心数据的优化。这部分数据消耗了大部分的资源,在资源充足的情况下没有任何问题,但是随着业数据快速增长,可以看到资源使用率基本都到 100% 了,他们的就绪时间也逐渐达不到要求了,各种优化手段都用上也不能解决问题,优化已经到了瓶颈。
如何让这边分数据的就绪时间提前?最简单的方法就是加资源,但是这个与我们控制成本的目标不符,这条路行不通。另外一个办法就是升级底层技术栈,让计算资源更省,计算时长更短,这就是接下来要讲的部分——Spark+Iceberg 迁移。
我们分析了一些要高优就绪时间的任务,发现就绪晚的原因主要有:
ODS 数据贴源层就绪时间晚,留给上层数仓构建的时间余量小
整个数据流基于 Hive on Spark 构建,各种优化已经基本到极限,无法进一步减小计算时长
HiveServer2 超负荷,不合理的 SQL 和 UDF 会导致服务 FullGC,甚至宕机,导致整个集群不稳定,数据大规模延迟
总结起来就是:现有技术栈已经无法解决数据增长与有限资源之间的矛盾,需要升级到新的技术架构。
那为什么选择 Spark+Iceberg?原因如下:
Spark 不依赖 HiveServer2,所有操作均在 Driver 执行,不合理的 SQL 只会导致当前任务失败,不影响其他。有优秀的内存管理,查询效率高,自带 tungsten、AQE 等自动调优组件,能有效提升资源利用率
Iceberg 不依赖 HMS,对象存储友好,避免 filelist 开销,使用 manifest 文件记录表、文件粒度元数据,可以快速过滤无用数据
下边说下 Spark+Iceberg 整体的迁移方案:
图中红色箭头部分为迁移前的数据构建链路,我们会逐步过渡到绿色链路。
迁移前数据由 Flume 采集到 COS 对象存储,然后会用 Flink 再读取 COS 文件做文件合并后写入 Hive ODS 层数据表中,下游数仓在 ODS 表就绪后开始做 ETL 加工处理。
迁移后数据直接由 Kafka 通过 Flink 写入 Iceberg 表,这里不再做合并,下游接收到小时分区就绪标记即可开始处理当前小时分区的数据,提前了 ODS 就绪时间。
在迁移过程中,由于下游还是大量的使用 Hive on Spark 做 ETL,而 Hive 引擎读取 Iceberg 表问题比较多,架构改造成本大,所以采用过度方案是:将 Hive 表的存储路径映射到 Iceberg 表上,这样做的好处是不用往 Iceberg 和 hive 写两份数据,不会造成计算和存储资源的浪费,且可以在迁移过程中让下游无感知。
当然这么做也有弊端,在做正常的增量生产情况下,没有任何问题,但是一旦遇到数据问题需要修复,重刷 Iceberg 表就会导致 Hive 表数据脏读(数据 double)。采取的应对手段就是使用存储过程将过期的分区数据和快照信息删除,然而这么做又带来了另外的一些问题,具体内容我们在后续介绍。
迁移前,需要对比线上 Hive on Spark 读取 Hive 表和用 spark-sql 读取 Iceberg 表的查询性能和资源消耗情况,以确保迁移后是有收益的。测试用例是使用两个不同的计算引擎,读取文件数和文件大小差不多的 Hive 表和 Iceberg 表。两表数据量为 2.9 亿,大小约 27G,测试了 3 种场景下的两组平均耗时和资源消耗情况,以下是我们的测试结论:
在 join+distribute by 生成多轮 shuffle 场景下,sparksql+iceberg 的查询时间比 hive on spark 节省 75%,内存占用节省 90%;
在使用 where 条件过滤数据的场景下,sparksql+iceberg 的查询消耗时间比 hive on spark 节省 63%,内存占用节省 87%;
在按查询条件字段构建索引后,通过索引字段过滤查询时间节省 91%,内存占用节省 81%。
查询性能收益较大,存储性能也需要做测试,防止计算资源节省了但是存储资源却膨胀过大。测试了不同压缩格式下 Hive 表和 Iceberg 表存储的变化情况,使用 Spark 计算引擎读取 1.5 亿数据,使用相同的逻辑写入压缩格式为 orc+zlib 和 parquet+gzip 的 Hive 表和 Iceberg 表,测试结论是:
相同数据量下,Iceberg 表的存储大小会略大于 Hive 表
按各字段对比,发现复杂 KV 字段和较长的 String 类型字段都会膨胀较多
分析发现,主要是数据分布影响了压缩效率,写 Iceberg 表时会按照分区字段排序,而写 Hive 可以直接按指定字段排序,压缩效果会比 Iceberg 好。这个问题会在后续迁移工作中继续解决。
总之,测试的结论就是:存储增长范围可接受,计算资源收益较大,利用数据重排也可以节省一部分存储,所以可以开始进行逐步迁移。
接下来说下之前提到的为了解决重刷数据导致 Hive 脏读而构造存储过程删除过期快照产生的问题。如右图,假设现在有个小时级分区的 Iceberg 表,在迁移双跑期间发现,在同时写入 00 分区和 01 分区的时候,会有偶发的 00 分区会因找不到元数据而报错的情况。
这种问题是怎么造成的呢?假设 00 分区由于上游就绪时间延迟,导致任务调启时间延后,就有可能导致 00 分区和 01 分区任务会同时对表进行写操作,每个任务写入成功后会生成一个快照文件,为了兼容 Hive 脏读问题会立马提交一个过期快照,解绑元数据和过期文件存储路径关系,并同时删除没有引用的孤寡文件。
这里就存在一个并发的问题,假设 01 分区任务写入成功生成 76meta.json 快照信息,并提交生成过期快照文件 77meta.json,这时 00 分区也开始写入了,并成功生成了 78、79 两个快照文件,但是这个时候 01 分区过期快照删除开始执行了,在 01 分区看来,它只知道有 76、77 两个快照信息,并不知道有 78、79 两个快照,所以他就会把这两个快照直接删除了。过期快照执行完成后,程序会根据最新的信息去找到没有引用的孤寡文件并进行删除,但此时 00 分区任务发现 79meta.json 快照文件已经找不到了,所以就会报错。由于最新的快照信息缺失,就会导致 Iceberg 表读取元数据失败。
知道了造成问题的原因,那问题就好解决了。每次分区写入成功后,在生成的 Metadata.json 中保留该次写入的 Snapshot,同时写入完成后删除本分区的孤寡文件,但是并不实际删除“历史快照”(因为单次写入看到的历史快照可能是其他分区正在写入创建的,并不一定是真正的历史快照),只标记过期,这样保证了只要单分区内只有一个写入方就不会出现互相影响的问题,然后对于无用的快照文件,做延期删除,比如小时级分区表都就绪后,统一按天做过期快照删除,这样就完全避免了多分区同时写入的时候相互影响的问题。
最后说一下迁移后的收益:
ODS 平均就绪时间提前半个小时,基本可以保证 1 点前全部就绪
迁移后读写数据量有所增长,但是就绪时间却提前了
迁移后双跑期间,对比资源消耗情况,迁移后的任务在内存消耗上可以节省 50%+
当然还有一些遗留的问题要解决,比如前面提到的存储膨胀,还有我们发现 Sparksql 写 Iceberg 生成的碎文件也比较多,这对还在使用 hive 查询的用户影响比较大(因为 list 文件消耗时间较多),这个 Spark 提供的 AQE 可以在写数据的时候对小文件自动合并,后边存储治理要讲的数据重排方法也可以解决部分问题。
接下来讲下存储方面的治理。我们按照使用数据的 ROI,将每个表天级分区数据分成四部分:无用数据,冷数据,温数据,热数据,对不同部分的数据同样也是会应用不同的生命周期存储策略和数据组织策略。
数据 ROI 的计算方法如图所示。上层用户使用数据主要通过三种途径:通过报表看数,自己写 SQL 查数以及写 ETL 加工数,我们通过解析 SQL 获取被使用的表和字段,然后通过血缘串联统计整体表字段使用情况。另外再将每个表对应的产出任务消耗的计算资源和这个表所消耗的存储资源做等价转换,计算得到资源消耗成本,最后根据数据的使用情况和成本消耗情况进行 ROI 计算。
对于 ROI 为 0 无用数据,我们会建议用户清理数据,并在数据地图上标记废弃,然后等待系统自动删除。如果是 ROI 基本为 0 但是又不能删的冷数据,我们会建议用户修改数据生命周期,对数据做冷备归档。对于使用频次不高的温数据,我们就以存储效率优先,做一些数据的重排处理,进一步压缩存储,对于使用频次较高的热数据,我们就以查询效率优先,会按照经常用来过滤的条件做排序构建索引,进一步提高下游查询效率。
表和字段的清理可以通过数据地图平台工具配置生命周期处理,流量点位日志数据治理比较特殊,无法直接通过平台标记废弃来删除对应的分区或表,而是需要标记表里无效点位所在的行,所以这里对低 ROI 点位的治理方案单独说明一下。
点位的使用情况要用元数据仓库数据,解析上游使用 SQL,获取每个点位的使用情况。然后通过统计实际上报上来的埋点数据,分析得到虽然一直在上报,但是从来没被用过的点位数据,我们发现近 4 个月未使用点位占到了新增存储的近三分之一。对于这部分点位会在点位管理平台标记,并且在对外提供的视图表中将这些点位过滤掉,三个月后没人反馈则在管理平台配置下线,ETL 加工任务会读取配置,直接将无用的点位在物理层面过滤掉,无用点位黑名单也会同步给采集侧,在埋点上报及采集侧做进一步治理。
对于温数据和热数据,两种数据的治理都采用了同一种技术——通过数据在表中不同的组织形式来达到加速查询或压缩存储的目的。对热数据,按照经常查询的列进行排序,可以加速下游查询效率。对温数据,按照特征列进行数据重排,可以进一步节省存储。
构建索引加速查询的原理如左图。查询 id=13 的数据,对于无序的数据,需要扫描 4 份文件,对于全局有序的数据,查询引擎根据存储引擎的元数据能更好的做 Data Skipping,只扫描 1 个文件就可以了。
数据重排是通过空闲的计算资源换取长期的存储资源节省收益,他的原理如右图,总的来说就是找到一些可以代表一份表数据的特征列,按照这些列排序,可以将相似、相同的数据排列在一起,从而达到更好的压缩效果。这里列了几个影响重排效果的因素:
列冗余度,越高越好,也就是相同的值越多,有利于压缩算法
列宽度,越高越好,列的长度越长,那压缩的空间就越大
列相关性,约高越好,也就是通过这列排序,其他列也会跟着有序
最后通过这 3 个参数算出一个压缩效率系数,压缩系数越高越好
还有一个要考虑的是重排时可能产生数据倾斜,可以计算一个倾斜系数,倾斜系数高的就不重排
流量点位数据的典型查询场景就是按照点位名来过滤统计分析用户行为,比如看一下访问了首页的用户有哪些,点击了下单按钮的用户有哪些。可以按照点位排序构建索引,将特定行为的数据快速过滤出来。
但是按照点位构建索引会有个问题:如上图点位 PV 分布所示,点位数据分布不均匀,比如首页的曝光数据量会明显的多于其他点位的数据量,也就是 20% 的点位可能占有了 80% 的数据量,在按照点位进行排序的时候需要考虑数据倾斜问题。
可以按照点位 pv 进行分桶,使每个点位进入到固定的桶内,然后按照桶 id 进行排序就可以缓解数据倾斜问题。具体桶分配逻辑如伪代码:
首先加载昨日点位 pv 量分布数据,因为点位分布一般每天变化都不大。
然后根据总 pv 数和 reducetask 数计算一个步长,用步长和点 pv 总量计算得到桶 id 的上下边界,可以看到 pv 量大的点位,分配到的桶个数就多,而 pv 小的点位可能会被分配到同一个桶内。
对于新出现的点位会被统一分配到最大桶 id 上。
最后排序后的实际效果如图所示,由此就达到了数据分布基本均匀,并且按照桶 id 排序后全局还有序的目的。
我们分别用 Presto、Spark 和 MR 三种计算引擎读取一张百亿级别的 Hive 表,对比原始数据和构建完索引后数据的查询效率,由右图可知:
按点位过滤 Spark 和 MR 引擎查询效率有两倍左右的提升
Presto 查询提升效率约有 40 倍的提升,基本可以秒级内返回
数据重排原理基本和构建索引一样,都是通过数据的组织来达到优化的目的,就不做过多的介绍。这里只说下重排后的收益。分别测试了交易、教学和流量方向 4 张表,按照特征列重排数据后在存储压缩效率上会有较大收益。对于集中度较高的数据集,重排后收益较为明显,可以节省 30%~50% 的存储。其中流量日志数据量大,设备列和其他列相关性高,压缩效果更好,压缩率可以达到 60% 左右。
最后介绍一下我们上层数据应用和产品工具的建设。
如图,从数仓规划、建模到最终数据的生产和应用,这是一个完整的数据建设流程。数据开发人员可以先在 DataRing 上做规划以及标准的建立,这些数据会作为建模平台的数据字典,建模平台完成建模后,会同步元数据到 DataMap 数据地图和 UDA 开发平台,研发可以基于逻辑模型继续构建物理模型,上层的应用平台也能继续使用这些元数据提供数据服务。下面详细介绍几个平台。
主要用于做数仓规划和逻辑建模。平台提供数仓分层、数据域划分、业务过程管理,字段和枚举值标准统一等功能。数据模型与数仓规划建立关系后,可以有全局视角管理能力,方便后续进行数据治理并提供了数据查找能力。规划做完后,可以做事实、维度表逻辑建模,挂载表到具体数据域和业务线上。基于逻辑模型设计环节沉淀的元数据,以此来驱动和约束实际的物理模型,约束对应物理模型的 DDL,在数据加工时,防止因缺乏有效约束带来的“烟囱式”开发。最后,通过构建原子指标 + 时间周期 + 修饰词的方式体系化构建指标模型,将指标关联到前边建设的数据模型上,模型上线前,自动完成业务定义与物理实现一致性验证,确保 DML 实现的正确性。
提供管理表的增、删、改操作功能,以及表的生命周期和安全级别等治理元数据管理功能,新建表直接从逻辑模型选择,完成从逻辑模型到物理模型的转化。另外数据地图很重要的功能就是提供了数据寻址,方便用户快速找到想要的数据,在这里可以查看每个数仓表字段和枚举值的定义、血缘信息以及表使用说明等,同时还提供数据预览功能,节省使用人员数据探查成本。
蓝鲸报表平台提供了两种数据模型作为报表数据源:一种是通过建模平台同步过来的数据模型,这种模型可以支持用户通过拖拽快速生成数据报表。对于较为复杂的报表,系统也支持用户直接写 SQL 建模灵活配置。目前平台已经和多个数据源打通,支持如 Mysql、Hive 表、Starrocks、Excel 等数据源。
通过拖拽的方式构建数据模型,系统能自动识别数据的维度、指标,用户可以在页面配置表和表之间的关联关系,完成二次建模。
完成数据建模后就可以配置报表了。支持多种图表类型,可以将要展现的图表拖入到页面中,自行调整位置和大小,选择配置好的数据模型,通过拖拽配置要分析的指标和维度,如果数据有问题,可以通过调试页面查看模型生成的 SQL,然后重新构建模型。
对于较为复杂的数据展现逻辑,比如分组排名、复杂关联和多层嵌套查询之类,就可以通过自己写 SQL 完成报表配置。蓝鲸平台支持用户通过条件语句动态生成 SQL,并且支持语法扩充,操作简单方便,基本上 5 分钟内就可以完成创建一份数据报表。
本次分享通过成本控制和数据使用效能提升两个方面介绍了一些成本优化的方法,和提升数据可用性的工具平台。有了这些治理手段加上工具,可以让数据 ROI 最大化,达到我们在成本可控的前提下,使数据使用效能持续稳定输出的目的。
未来数据治理方向也希望能用上 AI 辅助。比如智能化监控报警,目前都是简单的规则报警,后续希望能通过 AI 自动识别指标波动,并可通过归因分析发现隐藏在结果背后的根本原因,从而预测潜在的业务问题。还有就是能构建一个智能指标中台,通过自然语言提问搜索与分析业务指标,并进一步基于语境生成数据总结报告,且给出行动建议,这样可以进一步提高数据的使用效能。
InfoQ 将于 10 月 18-19 日在上海举办 QCon 全球软件开发大会 ,覆盖前后端 / 算法工程师、技术管理者、创业者、投资人等泛开发者群体,内容涵盖当下热点(AI Agent、AI Infra、RAG 等)和传统经典(架构、稳定性、云原生等),侧重实操性和可借鉴性。现在大会已开始正式报名,可以享受 8 折优惠,单张门票立省 960 元(原价 4800 元),详情可联系票务经理 17310043226 咨询。
微信扫码关注该文公众号作者