Bendi新闻
>
大厂咋做多系统数据同步方案的?

大厂咋做多系统数据同步方案的?

9月前

作者JavaEdge在OSCHINA

链接:https://my.oschina.net/u/3494859/blog/10931835

1 背景

业务线与系统越来越多,系统或业务间数据同步需求也越频繁。当前互联网业务系统大多 MySQL 数据存储与处理方案:
  • 随信息时代爆炸,大数据量场景下慢慢凸显短板,如:需对大量数据全文检索,对大量数据组合查询,分库分表后的数据聚合查询

  • 自然想到如何使用其他更适合处理该类问题的数据组件(ES)

因此,公司亟需一套灵活易用的系统间数据同步与处理方案,让特定业务数据可很方便在其他业务或组件间流转,助推业务快速迭代。

2 方案选型

当前业界针对系统数据同步较常见的方案有同步双写、异步双写、侦听 binlog 等方式,各有优劣。本文以 MySQL 同步到 ES 案例讲解。

2.1 同步双写

最简单方案,在将数据写到 MySQL 时,同时将数据写到 ES,实现数据双写。

优点

  • 设计简单易懂

  • 实时性高

缺点

  • 硬编码,有需要写入 MySQL 的地方都要添加写 ES 的代码,导致业务强耦合

  • 存在双写可能失败导致数据丢失的风险,如:

    • ES 不可用

    • 应用系统和 ES 之间网络故障

    • 应用系统重启,导致系统来不及写入 ES

  • 对性能有较大影响,因为每次业务操作都要加个 ES 操作,若对数据有强一致性要求,还需事务处理

2.2 异步双写

在同步双写基础加个 MQ,实现异步写。

优点

  • 解决性能问题,MQ 的性能基本比 mysql 高出一个数量级

  • 不易出现数据丢失问题,主要基于 MQ 消息的消费保障机制,比如 ES 宕机或者写入失败,还能重新消费 MQ 消息

  • 通过异步的方式做到了系统解耦,多源写入之间相互隔离,便于扩展更多的数据源写入

缺点

  • 数据同步实时性,由于 MQ 消费网络链路增加,导致用户写入的数据不一定马上看到,有延时

  • 虽在系统逻辑做到解耦,但存在业务逻辑里依然需增加 MQ 代码耦合

  • 复杂度增加:多个 MQ 中间件维护

  • 硬编码问题,接入新的数据源需要实现新的消费逻辑

2.3 监听 binlog

第二种方案基础上,主要解决业务耦合问题,所以引入数据变动自动监测与处理机制。

优点

  • 无代码侵入,原有系统无需任何变化,无感知

  • 性能高,业务代码完全无需新增任何多余逻辑

  • 耦合度极低,完全无需关注原系统业务逻辑

缺点

  • 存在一定技术复杂度

  • 数据同步实时性可能有问题

基础组件的设计主要考虑尽量做到对业务无侵入,业务接入无感知,同时系统耦合度低,综上选型方案三,同时考虑该方案在可复用和可扩展还存在短板,所以在此基础又做优化。

3 整体方案设计

3.1 概述

需求数据源都是 MySQL,所以先考虑选择组件对 MySQL 数据变动做实时监听,业界成熟方案最熟悉的就是 [canal],功能完善度,社区活跃度,稳定性等都符合。所以,基于 canal 对方案三优化,以满足多系统数据同步,达到业务解耦、可复用、可扩展。

设计核心理念

通过统一的 “消息分发服务” 实现与 Canal Client 对接,并将消息按统一格式分发到不同 MQ 集群,通过统一的 “消息消费服务” 去消费消息并回调业务接口,业务系统无需关注数据流转,只需关注特定业务的数据处理和数据组装。
“消息分发服务” 和 “消息消费服务” 对各业务线,实现了数据流转过程中的功能复用。“消息消费服务” 中的可分发到不同的 MQ 集群,和 “消息消费服务” 中的配置指定数据源输出实现了功能扩展。

核心模块

  • canal:监听数据源的数据变动

  • 消息分发服务:对接 canal 客户端,拉取变化的数据,将消息解析为 JSON,按固定规则分发到 MQ,MQ 可根据业务配置指定到不同集群,实现横向扩展。由于变更数据可能批量,这里会将消息拆分为单条发送到 MQ 中,并且通过配置可以过滤掉一些业务上不需要的大字段,减少 mq 消息体

  • 消息消费服务:从配置表中加载 MQ 队列,消费 MQ 中的消息,通过队列、回调接口、ES 索引三者映射,将消息 POST 给业务回调接口,接收到业务回调接口返回的操作指令和 ES 文档后,写入对应的 ES 索引。写入失败时插入补偿表,等待补偿。这里 ES 索引可以根据业务配置指定到不同的集群,实现横向扩展

  • 任务调度系统:定时调用消息消费服务中的消息补偿等定时任务接口

  • 业务回调服务:接收消息消费服务 POST 过来的消息,根据消息中的指令和数据,结合数据库中的数据或下游服务接口返回的数据组装 ES 文档中所需要的数据,设置相应的操作指令返回给消息消费服务去写入 ES

  • 业务 ES 查询服务:通过 ES SDK 查询 ES 索引中的数据,通过接口返回给业务调用方

3.2 数据订阅消息分发服务

将数据的订阅与数据的消费通过 MQ 进行解耦,“数据订阅消息分发服务” 的职责是对接 Canal Client,解析数据变更消息,转换为常用的 JSON 格式的消息报文,按照业务配置规则分发到不同的 MQ 集群、路由。

3.2.1 基于 Canal 的数据变更监听机制

Canal 主要是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费:
  • MySQL master 将数据变更写入二进制日志(binary log,其中记录叫二进制日志事件 binary log events,可通过 show binlog events 查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

  • Canal server 和 client 端高可用依赖 zk,启动 canal server 和 client 时都会从 zk 读信息

  • Canal server 和 client 启动时都会去抢占 zk 对应的 running 节点,保证只有一个 server 和 client 在运行,而 server 和 client 高可用切换也是基于监听 running 节点

  • Canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)

  • 创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态

  • 一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤 1 的操作,重新选出一个 canal server 启动 instance

  • Canal client 每次进行 connect 时,会首先向 zookeeper 询问当前是谁启动了 canal instance,然后和其建立链接,一旦链接不可用,会重新尝试 connect

3.2.2 基于 Canal Client 分摊设计提升系统处理效率

从 Canal 服务高可用设计可见,Canal Client 当有多个实例启动时,会保证只有一个实例在运行,消费 binlog 消息。而承载 Canal Client 的 "数据订阅消息分发服务" 会部署在多台服务器,由于服务发布时每台服务器启动时间不同,所有 Canal Client 活跃实例都会集中在先启动的那台服务器运行,消费 binlog 消息。
其余服务器运行的 Canal Client 都处备用状态,不能充分利用每台服务器资源。因此希望不同 destination 分摊在不同服务器执行,但所在服务器宕机时会自动转移到其他服务器执行,这样充分利用每一台服务器,提供 binlog 消息消费性能。
为此,引入 elasticjob-lite 组件,利用分片特性二次封装,实现侦听 destination 在某台服务器中上下线的变更事件。
elasticjob-lite 分片原理
ElasticJob 中任务分片项的概念,使任务可在分布式环境运行,每台任务服务器只运行分配给该服务器的分片。随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更。
若作业分 4 片,用两台服务器执行,则每个服务器分到 2 片:
新增 Job 服务器时,ElasticJob 会通过注册中心的临时节点的变化感知到新服务器,并在下次任务调度时重分片,新服务器会承载一部分作业分片:
当作业服务器在运行中宕机时,注册中心同样会通过临时节点感知,并将在下次运行时将分片转移至仍存活的服务器,以达到作业高可用。本次由于服务器宕机而未执行完的作业,则可以通过失效转移的方式继续执行。

3.2.3 资源隔离

该系统使用方包含公司各业务线,如何保障线上问题后,各业务不相互影响。
在 MQ 集群和队列级别都支持基于业务的资源隔离;将从 canal 中拉取出来的变更消息,按规则分发到不同 MQ 集群,设置统一路由键规则, 以便各业务在对接时申请自己业务的 MQ 队列,按需绑定对应 MQ 集群和消息路由。
  • MQ 集群路由

通过配置将不同的 destination 映射到不同的 MQ 集群和 ZK 集群,可达到性能横向扩展。
  • MQ 消息路由规则

canal 从 binlog 中获取消息后,将批量消息拆分成单条消息,进行分片规则运算后发送到指定 rabbitmq 交换机和路由键,以便根据不同业务场景,按不同业务规则绑定到不同队列,通过消费服务进行消息消费处理,同时会建立一个名为 “exchange.canal” 的 exchange,类型为 topic,路由键规则:key.canal.{destination}.{database}.{table}.{sharding},sharding 按 pkName-value 排序后的 hashcode 取模分片,队列命名规则约定:queue.canal.{appId}.{bizName} 如:
queue.canal.trade_center.order_search.0 绑定 key.canal.dev-instance.trade_order.order_item.0
queue.canal.trade_center.order_search.0 绑定 key.canal.dev-instance.trade_order.order_extend.0
...

3.3 数据订阅消息消费服务

为实现消息的消费与业务系统解耦,独立出 " 数据订阅消费服务”。消费从” 数据订阅消息分发服务 “中投递的数据变更 MQ 消息,根据业务配置回调指定的业务回调接口。业务回调接口负责接收数据变更消息,组装需要执行的 ES 文档信息,返回给消费服务进行 ES 数据操作。

3.3.1 执行指令

从 binlog 订阅的消息有 3 类操作:INSERT,UPDATE,DELETE,这里新增一个 SELECT 指令,作用是业务回调接口在收到该指令后,从数据库中重新获取最新的数据组装成需要执行的 ES 文档信息,返回给消费服务进行 ES 数据操作。

主要应用在全量同步,部分同步,文档刷新,消息补偿等场景。

3.3.2 增量同步

  • MQ 队列动态加载

新的业务功能上线时,会配置对应的队列绑定相关的路由键,订阅到业务场景需要的数据变更的消息。为避免每次有新业务接入需要重新更新消费服务代码,重新发布服务,需实现能定时加载配置表数据,实现动态添加 MQ 队列侦听的功能。

使用 SimpleMessageListenerContainer 容器设置消费队列的动态监听。为每个 MQ 集群创建一个 SimpleMessageListenerContainer 实例,并动态注册到 Spring 容器。

  • 业务队列绑定规则

一个业务通常对应一个 ES 索引,一或多个 MQ 队列(队列绑定路由键的规则见:MQ 消息分片规则):

  • MQ 消息顺序消费

一个 queue,有多个 consumer 去消费, 因为无法保证先读到消息的 consumer 一定先完成操作,所以可能导致顺序错乱。因为不同消息都发到了一个 queue,然后多个消费者又消费同一个 queue 的消息。为此,可创建多个 queue,每个消费者只消费一个 queue, 生产者按规则把消息放入同一 queue(见:3.4.4.2 MQ 消息分片规则),这样同一个消息就只会被同一个消费者顺序消费。

服务通常集群部署,天然每个 queue 就会有多个 consumer。为解决这问题引入 elasticjob-lite 对 MQ 分片,如有 2 个服务实例,5 个队列,可让实例 1 消费队列 1、2、3,让实例 2 消费队列 4、5。当其中有一个实例 1 挂掉时会自动将队列 1、2、3 的消费转移到实例 2 上,当实例 1 重启启动后队列 1、2、3 的消费会重新转移到实例 1。

RabbitMQ 消费顺序错乱原因通常是队列消费是单机多线程消费或消费者是集群部署,由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。如消费者 A 执行增加,消费者 B 执行修改,消费者 C 执行删除,但消费者 C 执行比消费者 B 快,消费者 B 又比消费者 A 快,导致消费 binlog 执行到 ES 时顺序错乱,本该增加、修改、删除,变成删除、修改、增加。

对此,可给 RabbitMQ 创建多个 queue,每个消费者单线程固定消费一个 queue 的消息,生产者发送消息的时候,同一个单号的消息发送到同一个 queue 中,由于同一个 queue 的消息有序,那同一单号的消息就只会被一个消费者顺序消费,从而保证消息顺序性:

但如何保证集群模式下,一个队列只在一台机器上进行单线程消费,若这台机器宕机如何进行故障转移。对此,引入 elasticjob-lite 对 MQ 分片,如有 2 个服务实例,5 个队列,我们可以让实例 1 消费队列 1、2、3,让实例 2 消费队列 4、5。当其中有一个实例 1 挂掉时会自动将队列 1、2、3 的消费转移到实例 2 上,当实例 1 重启启动后队列 1、2、3 的消费会重新转移到实例 1。

对消息顺序消费敏感的业务场景,通过队列分片提升整体并发度。对消息顺序消费不敏感业务场景也可配置成某队列集群消费或单机并发消费。针对不同的业务场景合理选择不同的配置方案,提升整体性能。

3.3.3 全量同步

通过 Canal 获取的变更消息只能满足增量订阅数据的业务场景,然而我们通常我们还需要进行一次全量的历史数据同步后增量数据的订阅才会有意义。对于业务数据表的 id 是自增模式时,可以通过给定一个最小 id 值,最大 id 值,然后进行切片,如 100 个一片,生成 MQ 报文,发送到 MQ 中。消费 MQ 消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。

3.3.4 部分同步

有的时候我们需要修复指定的数据,或业务表的 id 是非自增模式的,需要进行全量同步。可以通过部分同步的接口,指定一组需要同步的 id 列表,生成分片 MQ 报文,发送到 MQ 中。消费服务接收到同步 MQ 消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。

3.3.5 刷新文档

当我们 ES 索引中有大批量的数据异常,需要重新刷新 ES 索引数据时,可以通过生成一个全量同步的任务,分页获取指定 ES 索引的文档 ID 列表,模拟生成部分同步消息报文,发送到 MQ 中。消费 MQ 消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。

3.3.6 消息补偿

将同步失败的消息存储到消息重试表中,通过 Job 执行补偿,便于监控。补偿时将消息重置为 SELECT 类型的 MQ 报文。业务回调接口接收到消息后会从数据库中获取最新的数据更新 ES 文档。

3.4 ES SDK 功能扩展

目前 ES 官方推荐使用的客户端是 RestHighLevelClient,我们在此基础上进行了二次封装开发,主要从扩展性和易用性方面考虑。

3.4.1、常用功能封装

  • 使用工厂模式,方便注册和获取不同 ES 集群对应的 RestHighLevelClient 实例,为业务端使用时对 ES 集群的扩展提供便利。
  • 对 RestHighLevelClient 的主要功能进行二次封装如:索引的存在判断、创建、更新、删除;文档的存在判断、获取、新增、更新、保存、删除、统计、查询。降低开发人员使用 RestHighLevelClient 的复杂度,提高开发效率。

3.4.2、ES 查询数据权限隔离

对于一些有数据隔离需求的业务场景,我们提供了一个 ES 数据隔离插件。在 ES SDK 中设计了一个搜索过滤器的接口,采用拦截器的方式对统计文档,搜索文档等方法的搜索条件参数进行拦截过滤。

/**
* 搜索过滤器
*/

public interface SearchSourceBuilderFilter {
String getFilterName();
void filter(SearchSourceBuilder searchSourceBuilder);
}

4 坑

4.1 Canal 相关

4.1.1 Canal Admin 部署时需注意的配置项

  • 如何支持 HA:'canal.instance.global.spring.xml' 设置为 'classpath:spring/default-instance.xml'

  • 设置合适的并行线程数:canal.instance.parser.parallelThreadSize,我们当前设置的是 16,如果该配置项被注释掉,那么可能会导致解析阻塞

  • 开启 tsdb 导致的各种问题:canal 默认开启 tsdb 功能,也就是会通过 h2 数据库缓存解析的表结构,但是实际情况下,如果上游变更了表结构,h2 数据库对应的缓存是不会更新的,这个时候一般会出现神奇的解析异常,异常的信息一般如下:‘Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table’,该异常还会导致一个可怕的后果:解析线程被阻塞,也就是 binlog 事件不会再接收和解析。目前认为比较可行的解决方案是:禁用 tsdb 功能,也就是 canal.instance.tsdb.enable 设置为 false。如果不禁用 tsdb 功能,一旦出现了该问题,必须要「先停止」Canal 服务,接着「删除」$CANAL_HOME/conf/ 目标数据库实例标识 /h2.mv.db 文件,然后「启动」Canal 服务。目前我们是设置为禁用的。

  • 设置合理的订阅级别:其配置项是‘canal.instance.filter.regex’;库表订阅的配置建议配置到表级别,如果定义到库级别一方面会消费一些无效的消息,给下游的 MQ 等带来不必要的压力。还有可能订阅到一些日志表等这类有着大字段数据的消息,消息过大在 JSON 化的时候可能导致内存溢出异常。针对这个问题我们进行大字段过滤和告警的改造。

4.1.2 binlog 文件不存在,导致同步异常

如果发现 Canal Client 长时间获取不到 binlog 消息,可以去 Canal Admin 后台去看一下 Instance 管理中的日志。大概率会出现 “could not find first log file name in binary log index file”,这个是因为 zk 集群中缓存了 binlog 信息导致拉取的数据不对,包括定义了 binlog position 但是启动服务后不对也是同样的原因。
解决:
  • 单机部署的删除 canal/conf/$instance 目录中的 meta.dat 文件

  • 集群模式需要进入 zk 删除 /otter/canal/destinations/xxxxx/1001/cursor,然后重启 canal

4.2 ES updateByQuery 问题

ES 的 Update By Query 对应的就是关系型数据库的 update set ... where... 语句;该命令在 ES 上支持得不是很成熟,可能会导致的问题有:批量更新时非事务模式执行(允许部分成功部分失败)、大批量操作会超时、频繁更新会报错(版本冲突)、脚本执行太频繁时又会触发断路器等。我们的解决办法也比较简单,直接在生产环境放弃使用 updateByQuery 方法,配置成使用先查询出符合条件的数据,然后分发到 MQ 中单条分别更新的模式。

5 规划

  • 问题时及时报警,特别在业务连续性监控上,如系统内特定组件工作异常导致数据同步流中断,是后续需重点优化的方向

  • 有些对实时性要求较高的业务依赖该系统进行数据同步,随着业务量越来越大,该方案当前当前采用的 MQ 组件在性能和高可用性都有所欠缺,后续打算采用性能更好,可用性机制更完善的 MQ 组件

  • 由于采用小步快跑迭代,设计更多考虑线上运行顺畅性,而忽略新业务接入便利性,目前一个新的业务服务对接数据同步系统,需要维护人员做不少配置文件,数据库等相关的修改,并做人工确认,随着接入需求越来越频繁,亟需一个管理后台,提升接入的效率和自动化度

作者简介:魔都国企技术专家兼架构,多家大厂后台研发和架构经验,负责复杂度极高业务系统的模块化、服务化、平台化研发工作。具有丰富带团队经验,深厚人才识别和培养的积累。

参考:
  • 编程严选网


往期推荐



百度输入法在候选词区域植入广告,网友:真nb!

Linus言辞激烈,怒怼谷歌内核贡献者:垃圾代码!

逆天神机 —— 17寸的64核AMD EPYC工作站




这里有最新开源资讯、软件更新、技术干货等内容

点这里 ↓↓↓ 记得 关注✔ 标星⭐ 哦



微信扫码关注该文公众号作者

来源:OSC开源社区

相关新闻

6级员工明目张胆抢4级员工的credit ?!多家大厂L6现身说法!思维导图 | 大厂们,都是如何做产品的?完整的数据指标体系,大厂是怎么搭建的?失业后的大厂高P,都去做什么了?做具身大模型缺数据?ATM 教你人类视频的正确用法大话红楼215:贾宝玉在大观园的生活有多淫荡?他为什么要做贾芸的爸爸?百亿美元只是开胃菜!科技大厂天量撒钱狂升数据中心:英伟达「铲子梦」还能做多久?大数据分析:特斯拉等电动车的事故率真的比燃油车多吗?所有大厂的尽头?鹅厂也不例外一丢一个不吱声!咖啡杯丢蓝桶?指甲头发丢黑桶?大多伦多各区的垃圾回收越来越抽象了挑战 Spark 和 Flink?大数据技术栈的突围和战争|盘点我急了!于适的咪咪咋这么大啊?!?!?!?!美联储依赖的数据,可信度有多高?我们日常生活产生的庞大数据去了哪?一文解析大数据背后的“上帝之眼”|投资笔记经营分析赋能业绩增长,这家大厂是怎么做到的?AI独角兽抢着上岸大厂了?Transfomer作者创办的Character.AI 正式被谷歌收编!咋了?大温这城市的无家可归者爆增七成大厂的堡垒机到底是啥?“骂遍”国外旅行的徐州大表哥,到底有多癫?网友笑哭:法国文旅,一年白干!软件工程走向“现代化工厂”?谈谈大数据平台软件的企业级部署和运维那些离开大厂的人,后悔了吗?被电影喂大的人,格局有多炸裂?俄乌战争,支持大鹅的人是咋想的?生物学家 vs. 医生,做科研的思维方式差别这么大?
logo
联系我们隐私协议©2024 bendi.news
Bendi新闻
Bendi.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Bendi.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。