MaxCompute 近实时增全量处理一体化新架构和使用场景介绍
阿里妹导读
业务背景和现状
方案一,只使用单一的MaxCompute离线批处理解决方案,对于近实时链路或者增量处理链路通常需要转化成T+1的批处理链路,会一定程度上增加业务逻辑复杂度,且时效性也较差,存储成本也可能较高。方案二,只使用单一的实时引擎,那资源成本会较高,性价比较低,且对于大规模数据批处理链路的稳定性和灵活性也存在一些瓶颈。方案三,使用典型的Lambda架构,全量批处理使用MaxCompute链路,时效性要求比较高的增量处理使用实时引擎链路,但该架构也存在大家所熟知的一些固有缺陷,比如多套处理和存储引擎引发的数据不一致问题,多份数据冗余存储和计算引入的额外成本,架构复杂以及开发周期长等问题。这些解决方案在成本,易用性,低延时,高吞吐等方面互相制约,很难同时具备较好的效果,这也驱动着MaxCompute有必要开发新的架构既能满足这些业务场景需求,也能提供较低的成本和较好的用户体验。
近几年在大数据开源生态中,针对这些问题已经形成了一些典型的解决方案,最流行的就是Spark/Flink/Trino开源数据处理引擎,深度集成Hudi / Delta Lake / Iceberg / Paimon开源数据湖,践行开放统一的计算引擎和统一的数据存储思想来提供解决方案,解决Lamdba架构带来的一系列问题。同时MaxCompute近一年多在离线批处理计算引擎架构上,自研设计了离线&近实时数仓一体化架构,在保持经济高效的批处理优势下,同时具备分钟级的增量数据读写和处理的业务需求,另外,还可提供Upsert,Time travel等一系列实用功能来扩展业务场景,可有效地节省数据计算,存储和迁移成本,切实提高用户体验。
离线&近实时增全量一体化业务架构
业务场景实践
表存储和数据治理优化
建表
createtable dt (pk bigint notnullprimarykey, val string) tblproperties ("transactional"="true");
createtable par_dt (pk bigint notnullprimarykey, val string)
partitioned by (pt string) tblproperties ("transactional"="true");
关键的表属性配置
createtable dt (pk bigint notnullprimarykey, val string)
tblproperties ("transactional"="true", "write.bucket.num" = "32", "acid.data.retain.hours"="48");
表属性: write.bucket.num
此属性非常重要,表示每个partition或者非分区表的分桶数量,默认值为16,所有写入的记录会根据PK值对数据进行分桶存储,相同PK值的记录会落在同一个桶中。非分区表不支持修改,分区表可修改,但只有新分区生效。
数据写入和查询的并发度可通过bucket数量来水平扩展,每个并发可至少处理一个桶数据。但桶数量并不是越多越好,对于每个数据文件只会归属一个桶,因此桶数量越多,越容易产生更多的小文件,进一步可能增加存储成本和压力,以及读取效率。因此需要结合数据写入的吞吐,延时,总数据的大小,分区数,以及读取延时来整体评估合理的桶数量。
此外,数据分桶存储也非常有助于提升点查场景性能,如果查询语句的过滤条件为具体的PK值,那查询时可进行高效的桶裁剪和数据文件裁剪,极大减少查询的数据量。
评估桶数量建议
对于非分区表,如果数据量小于1G,桶数量建议设置为4-16; 如果总数据量大于1G,建议按照128M-256M作为一个桶数据的大小,如果希望查询的并发度更多的话,可以进一步调小桶数据量大小; 如果总数据量大于1T,建议按照500M-1G作为一个桶数据的大小; 但目前能够设置的最大桶数量是4096,因此对于更大的数据量,单个桶的数据量也只能越来越大,会进一步影响查询效率,后续平台也会考虑是否可放开更大的限制。 对于分区表,设置的桶数量是针对每个分区的,并且每个分区的桶数量可以不同。每个分区的桶数量设置原则可以参考上面非分区表的配置建议。对于存在海量分区的表,并且每个分区的数据量又较小的话,比如在几十M以内,建议每个分区的桶数量尽可能少,配置在1-2个即可,避免产生过多的小文件。
表属性: acid.data.retain.hours
此属性也很重要,代表time travel查询时可以读取的历史数据实践范围,默认值是1天,最大支持7天。
建议用户按真实的业务场景需求来设置合理的时间周期,设置的时间越长,保存的历史数据越多,产生的存储费用就越多,而且也会一定程度上影响查询效率,如果用户不需要time travel查询历史数据,建议此属性值设置为0,代表关掉time travel功能,这样可以有效节省数据历史状态的存储成本。
Schema Evolution操作
altertable dt add columns (val2 string);
altertable dt drop columns val;
表数据自动治理优化
存在的问题
表数据组织格式
Delta Data File:每次事务写入或者小文件合并后生成的增量数据文件,会保存每行记录的中间历史状态,用于满足近实时增量读写需求。
Compacted Data File:Delta File经过Compact执行生成的数据文件,会消除数据记录的中间历史状态,PK值相同的记录只会保留一行,按照列式压缩存储,用来支撑高效的数据查询需求。
Delta CDC Log: 按照时序存储的CDC格式增量日志 (目前还未对外推出)。
数据自动治理优化
Auto Sort: 自动将实时写入的行存avro文件转换成aliorc列存文件,节省存储成本和提升读取效率。
Auto Merge: 自动合并小文件,解决小文件数量膨胀引发的各种问题。主要策略是周期性地根据数据文件大小/文件数量/写入时序等多个维度进行综合分析,进行分层次的合并。但它并不会消除任何一条记录的中间历史状态,主要用于time travel查询历史数据。
Auto Partial Compact: 自动合并文件并消除记录的历史状态,降低update/delete记录过多带来的额外存储成本,以及提升读取效率。主要策略是周期性地根据增量的数据大小/写入时序/time travel时间等多个维度进行综合分析来执行compact操作。该操作只针对超过time travel可查询时间范围的历史记录进行compact。
Auto Clean: 自动清理无效文件,节省存储成本。Auto Sort / Auto Merge / Auto Partial Compact操作执行完成后,会生成新的数据文件,所以老的数据文件其实没什么作用了,会被即时自动删除,及时节省存储成本。
set odps.merge.task.mode=service;
altertable dt compact major;
数据写入场景业务实践
分钟级近实时 Upsert 写入链路
基本大部份可融合flink生态的引擎或者工具都可通过flink任务,结合MaxCompute flink connector实时写入数据进 dt 表。
写入并发可以横向扩展,满足低延时高吞吐需求。写入流量吞吐跟flink sink并发数,Delta Table 桶数量等参数配置相关,可根据各自的业务场景进行合理配置。特别说明,针对 Delta Table 桶数量配置为Flink sink并发数的整数倍的场景,系统进行了高效优化,写入性能最佳。
满足数据分钟级可见,支持读写快照隔离
结合Flink的Checkpoint机制处理容错场景,保障exactly_once语义。
支持上千分区同时写入,满足海量分区并发写入场景需求。
流量吞吐上限可参考单个桶1MB/s的处理能力进行评估,不同环境不同配置都可能影响吞吐。如果对写入延时比较敏感,需要相对稳定的吞吐量,可考虑申请独享的数据传输资源[6],但需要额外收费。如果默认使用共享的公共数据传输服务资源组的话,在资源竞抢严重的情况下,可能保障不了稳定的写入吞吐量,并且可使用的资源量也有上限。
部分列增量更新链路
通过SQL Insert进行增量写入部分列:
createtable dt (pk bigint notnullprimarykey, val1 string, val2 string, val3 string) tblproperties ("transactional"="true");
insertinto dt (pk, val1) select pk, val1 from table1;
insertinto dt (pk, val2) select pk, val2 from table2;
insertinto dt (pk, val3) select pk, val3 from table3;
通过Flink Connector实时写入部分列。
SQL DML / Upsert 批处理链路
数据查询场景业务实践
Time travel查询
//查询指定时间戳的历史数据
select * from dt timestampasof'2024-04-01 01:00:00';
//查询5分钟之间的历史数据
select * from dt timestampasofcurrent_timestamp() - 300;
//查询截止到最近第二次Commit写入的历史数据
select * from dt timestampasof get_latest_timestamp('dt', 2);
Time travel查询处理过程简介
图中 dt Schema包含一个pk列和一个val列。左边图展示了数据变化过程,t1 - t5代表了5个事务的时间版本,分别执行了5次数据写入操作,生成了5个Delta file,在t2和t4时刻分别执行了Compact操作,生成了两个Compacted File: c1和c2,可见c1已经消除了中间状态历史记录(2,a),只保留最新状态的记录(2,b)。
如查询t1时刻的历史数据,只需读取Delta file (d1) 进行输出; 如查询t2时刻,只需读取Compacted file (c1) 输出其三条记录。如查询t3时刻,就会包含Compacted file (c1)以及Delta file (d3) 进行合并merge输出,可依此类推其他时刻的查询。可见,Compacted file文件虽可用来加速查询,但需要触发较重的Compact操作,用户需要结合自己的业务场景主动触发major compact,或者由后台系统自动触发compact操作。
Time travel查询设置的事务版本,支持时间版本和ID版本两种形态,SQL语法上除了可直接指定一些常量和常用函数外,还额外开发了get_latest_timestamp[10]和get_latest_version[10]两个函数,第二个参数代表它是最近第几次commit,方便用户获取MaxCompute内部的数据版本进行精准查询,提升用户体验。
增量查询
用户指定时间戳或者版本查询增量数据,详细语法参考官网[11],简单示例:
//查询2024-04-0101:00:00-01:10:00之间十分钟的增量数据
select * from dt timestampbetween'2024-04-01 01:00:00'and'2024-04-01 01:10:00';
//查询前10分钟到前5分钟之间的增量数据
select * from dt timestampbetweencurrent_timestamp() - 601andcurrent_timestamp() - 300;
//查询最近一次commit的增量数据
select * from dt timestampbetween get_latest_timestamp('dt', 2) and get_latest_timestamp('dt');
引擎自动管理数据版本查询增量数据,不需要用户手动指定查询版本, 非常适合周期性的增量计算链路 (功能灰度发布中,以官网发布为准)。简单示例:
//绑定一个stream对象到dt表上
create stream dt_stream ontable dt;
insertinto dt values (1, 'a'), (2, 'b');
//自动查询出来新增的两条记录(1, 'a'), (2, 'b'), 并把下一次的查询版本更新到最新的数据版本
insert overwrite dest select * from dt_stream;
insertinto dt values (3, 'c'), (4, 'd');
//自动查询出来新增的两条记录(3, 'c'), (4, 'd')
insert overwrite dest select * from dt_stream;
增量查询处理过程简介
图中表 dt Schema包含一个pk列和一个val列。左边图展示了数据变化过程,t1 - t5代表了5个事务的时间版本,分别执行了5次数据写入操作,生成了5个Delta file,在t2和t4时刻分别执行了Compact操作,生成了两个Compacted File: c1和c2。
在具体的查询示例中,例如,begin是t1-1,end是t1,只需读取t1时间段对应的Delta file (d1)进行输出; 如果end是t2,会读取两个Delta files (d1, d2);如果begin是t1,end是t2-1,即查询的时间范围为(t1, t2),这个时间段是没有任何增量数据插入的,会返回空行。
Compact / Merge服务生成的数据(c1, c2)不会作为新增数据重复输出。
PK 点查 DataSkipping 优化
先进行Bucket裁剪,只读取包含指定PK值的一个bucket即可;
在Bucket内部进行数据文件裁剪,只读取包含指定PK值的文件即可;
在文件内部进行Block裁剪,根据Block的PK值域范围进行过滤,只读取包含指定PK值的block即可。
select * from dt where pk = 1;
SQL查询分析Plan优化
Distinct的PK列本身具备的Unique属性,因此可以消除去重算子;
Join on key和PK列相同,因此直接使用Bucket Local Join即可,消除资源消耗很重的Shuffle过程;
由于每个桶读取出来的数据本身有序,因此可以直接使用MergeJoin算法,消除前置的Sort算子。
select * from (selectdistinct pk from dt_t1) t
join (selectdistinct pk from dt_t2) t2 on t.pk = t2.pk;
数据库整库实时同步写入 MaxCompute
如上图所示,左边流程是之前MaxCompute支持此类场景的典型ETL处理链路,按照小时/天级别读取数据库的变更记录写入到MaxCompute一张临时的增量表中,然后将临时表和存量的全量表进行Join Merge处理,生成新的全量数据。此链路较复杂,并且延时较长,也会消耗一定的计算和存储成本。
右边流程则是使用新架构支持该场景,直接按照分钟级别实时读取数据库的变更记录upsert写入到 dt 表即可。链路极简单,数据可见降低到分钟级,只需要一张 dt 表即可,计算和存储成本降到最低。
目前MaxCompute集成了两种方式支持该链路:
通过DataWorks数据集成的整库/单表增全量实时同步任务,在页面进行任务配置即可。
优势
用MaxCompute较低的成本来支持近实时以及增量链路,具备很高的性价比。
统一的存储、元数据、计算引擎一体化设计,做了非常深度和高效的集成,具备存储成本低,数据文件管理高效,查询效率高,并且Time travel / 增量查询可复用MaxCompute批量查询的大量优化规则等优势。
通用的全套SQL语法支持所有功能,非常便于用户使用。
深度定制优化的数据导入工具,高性能支持很多复杂的业务场景。
无缝衔接MaxCompute现有的业务场景,可以减少迁移、存储、计算成本。
表数据后台智能自动化治理和优化,保证更好的读写稳定性和性能,自动优化存储效率和成本。
基于MaxCompute平台完全托管,用户可以开箱即用,没有额外的接入成本,功能生效只需要创建一张 dt 表即可。
作为完全自研的架构,需求开发节奏完全自主可控。
生产现状和未来规划
支持CDC数据读写 支持增量物化视图 支持数据秒级可见 表数据服务智能治理深度优化以及查询性能优化
微信扫码关注该文公众号作者