大厂咋做多系统数据同步方案的?
作者:JavaEdge在OSCHINA
链接:https://my.oschina.net/u/3494859/blog/10931835
1 背景
随信息时代爆炸,大数据量场景下慢慢凸显短板,如:需对大量数据全文检索,对大量数据组合查询,分库分表后的数据聚合查询
自然想到如何使用其他更适合处理该类问题的数据组件(ES)
2 方案选型
2.1 同步双写
优点
设计简单易懂
实时性高
缺点
硬编码,有需要写入 MySQL 的地方都要添加写 ES 的代码,导致业务强耦合
存在双写可能失败导致数据丢失的风险,如:
ES 不可用
应用系统和 ES 之间网络故障
应用系统重启,导致系统来不及写入 ES
对性能有较大影响,因为每次业务操作都要加个 ES 操作,若对数据有强一致性要求,还需事务处理
2.2 异步双写
优点
解决性能问题,MQ 的性能基本比 mysql 高出一个数量级
不易出现数据丢失问题,主要基于 MQ 消息的消费保障机制,比如 ES 宕机或者写入失败,还能重新消费 MQ 消息
通过异步的方式做到了系统解耦,多源写入之间相互隔离,便于扩展更多的数据源写入
缺点
数据同步实时性,由于 MQ 消费网络链路增加,导致用户写入的数据不一定马上看到,有延时
虽在系统逻辑做到解耦,但存在业务逻辑里依然需增加 MQ 代码耦合
复杂度增加:多个 MQ 中间件维护
硬编码问题,接入新的数据源需要实现新的消费逻辑
2.3 监听 binlog
优点
无代码侵入,原有系统无需任何变化,无感知
性能高,业务代码完全无需新增任何多余逻辑
耦合度极低,完全无需关注原系统业务逻辑
缺点
存在一定技术复杂度
数据同步实时性可能有问题
3 整体方案设计
3.1 概述
设计核心理念
核心模块
canal:监听数据源的数据变动
消息分发服务:对接 canal 客户端,拉取变化的数据,将消息解析为 JSON,按固定规则分发到 MQ,MQ 可根据业务配置指定到不同集群,实现横向扩展。由于变更数据可能批量,这里会将消息拆分为单条发送到 MQ 中,并且通过配置可以过滤掉一些业务上不需要的大字段,减少 mq 消息体
消息消费服务:从配置表中加载 MQ 队列,消费 MQ 中的消息,通过队列、回调接口、ES 索引三者映射,将消息 POST 给业务回调接口,接收到业务回调接口返回的操作指令和 ES 文档后,写入对应的 ES 索引。写入失败时插入补偿表,等待补偿。这里 ES 索引可以根据业务配置指定到不同的集群,实现横向扩展
任务调度系统:定时调用消息消费服务中的消息补偿等定时任务接口
业务回调服务:接收消息消费服务 POST 过来的消息,根据消息中的指令和数据,结合数据库中的数据或下游服务接口返回的数据组装 ES 文档中所需要的数据,设置相应的操作指令返回给消息消费服务去写入 ES
业务 ES 查询服务:通过 ES SDK 查询 ES 索引中的数据,通过接口返回给业务调用方
3.2 数据订阅消息分发服务
3.2.1 基于 Canal 的数据变更监听机制
MySQL master 将数据变更写入二进制日志(binary log,其中记录叫二进制日志事件 binary log events,可通过 show binlog events 查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
Canal server 和 client 端高可用依赖 zk,启动 canal server 和 client 时都会从 zk 读信息
Canal server 和 client 启动时都会去抢占 zk 对应的 running 节点,保证只有一个 server 和 client 在运行,而 server 和 client 高可用切换也是基于监听 running 节点
Canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)
创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态
一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤 1 的操作,重新选出一个 canal server 启动 instance
Canal client 每次进行 connect 时,会首先向 zookeeper 询问当前是谁启动了 canal instance,然后和其建立链接,一旦链接不可用,会重新尝试 connect
3.2.2 基于 Canal Client 分摊设计提升系统处理效率
elasticjob-lite 分片原理
3.2.3 资源隔离
MQ 集群路由
MQ 消息路由规则
queue.canal.trade_center.order_search.0 绑定 key.canal.dev-instance.trade_order.order_item.0
queue.canal.trade_center.order_search.0 绑定 key.canal.dev-instance.trade_order.order_extend.0
...
3.3 数据订阅消息消费服务
为实现消息的消费与业务系统解耦,独立出 " 数据订阅消费服务”。消费从” 数据订阅消息分发服务 “中投递的数据变更 MQ 消息,根据业务配置回调指定的业务回调接口。业务回调接口负责接收数据变更消息,组装需要执行的 ES 文档信息,返回给消费服务进行 ES 数据操作。
3.3.1 执行指令
从 binlog 订阅的消息有 3 类操作:INSERT,UPDATE,DELETE,这里新增一个 SELECT 指令,作用是业务回调接口在收到该指令后,从数据库中重新获取最新的数据组装成需要执行的 ES 文档信息,返回给消费服务进行 ES 数据操作。
主要应用在全量同步,部分同步,文档刷新,消息补偿等场景。
3.3.2 增量同步
MQ 队列动态加载
新的业务功能上线时,会配置对应的队列绑定相关的路由键,订阅到业务场景需要的数据变更的消息。为避免每次有新业务接入需要重新更新消费服务代码,重新发布服务,需实现能定时加载配置表数据,实现动态添加 MQ 队列侦听的功能。
使用 SimpleMessageListenerContainer 容器设置消费队列的动态监听。为每个 MQ 集群创建一个 SimpleMessageListenerContainer 实例,并动态注册到 Spring 容器。
业务队列绑定规则
一个业务通常对应一个 ES 索引,一或多个 MQ 队列(队列绑定路由键的规则见:MQ 消息分片规则):
MQ 消息顺序消费
一个 queue,有多个 consumer 去消费, 因为无法保证先读到消息的 consumer 一定先完成操作,所以可能导致顺序错乱。因为不同消息都发到了一个 queue,然后多个消费者又消费同一个 queue 的消息。为此,可创建多个 queue,每个消费者只消费一个 queue, 生产者按规则把消息放入同一 queue(见:3.4.4.2 MQ 消息分片规则),这样同一个消息就只会被同一个消费者顺序消费。
服务通常集群部署,天然每个 queue 就会有多个 consumer。为解决这问题引入 elasticjob-lite 对 MQ 分片,如有 2 个服务实例,5 个队列,可让实例 1 消费队列 1、2、3,让实例 2 消费队列 4、5。当其中有一个实例 1 挂掉时会自动将队列 1、2、3 的消费转移到实例 2 上,当实例 1 重启启动后队列 1、2、3 的消费会重新转移到实例 1。
RabbitMQ 消费顺序错乱原因通常是队列消费是单机多线程消费或消费者是集群部署,由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。如消费者 A 执行增加,消费者 B 执行修改,消费者 C 执行删除,但消费者 C 执行比消费者 B 快,消费者 B 又比消费者 A 快,导致消费 binlog 执行到 ES 时顺序错乱,本该增加、修改、删除,变成删除、修改、增加。
对此,可给 RabbitMQ 创建多个 queue,每个消费者单线程固定消费一个 queue 的消息,生产者发送消息的时候,同一个单号的消息发送到同一个 queue 中,由于同一个 queue 的消息有序,那同一单号的消息就只会被一个消费者顺序消费,从而保证消息顺序性:
但如何保证集群模式下,一个队列只在一台机器上进行单线程消费,若这台机器宕机如何进行故障转移。对此,引入 elasticjob-lite 对 MQ 分片,如有 2 个服务实例,5 个队列,我们可以让实例 1 消费队列 1、2、3,让实例 2 消费队列 4、5。当其中有一个实例 1 挂掉时会自动将队列 1、2、3 的消费转移到实例 2 上,当实例 1 重启启动后队列 1、2、3 的消费会重新转移到实例 1。
对消息顺序消费敏感的业务场景,通过队列分片提升整体并发度。对消息顺序消费不敏感业务场景也可配置成某队列集群消费或单机并发消费。针对不同的业务场景合理选择不同的配置方案,提升整体性能。
3.3.3 全量同步
通过 Canal 获取的变更消息只能满足增量订阅数据的业务场景,然而我们通常我们还需要进行一次全量的历史数据同步后增量数据的订阅才会有意义。对于业务数据表的 id 是自增模式时,可以通过给定一个最小 id 值,最大 id 值,然后进行切片,如 100 个一片,生成 MQ 报文,发送到 MQ 中。消费 MQ 消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
3.3.4 部分同步
有的时候我们需要修复指定的数据,或业务表的 id 是非自增模式的,需要进行全量同步。可以通过部分同步的接口,指定一组需要同步的 id 列表,生成分片 MQ 报文,发送到 MQ 中。消费服务接收到同步 MQ 消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
3.3.5 刷新文档
当我们 ES 索引中有大批量的数据异常,需要重新刷新 ES 索引数据时,可以通过生成一个全量同步的任务,分页获取指定 ES 索引的文档 ID 列表,模拟生成部分同步消息报文,发送到 MQ 中。消费 MQ 消息后对消息进行组装,生成模拟增量数据变更的消息报文,走原有的增量消息回调的方式同步数据。
3.3.6 消息补偿
将同步失败的消息存储到消息重试表中,通过 Job 执行补偿,便于监控。补偿时将消息重置为 SELECT 类型的 MQ 报文。业务回调接口接收到消息后会从数据库中获取最新的数据更新 ES 文档。
3.4 ES SDK 功能扩展
目前 ES 官方推荐使用的客户端是 RestHighLevelClient,我们在此基础上进行了二次封装开发,主要从扩展性和易用性方面考虑。
3.4.1、常用功能封装
使用工厂模式,方便注册和获取不同 ES 集群对应的 RestHighLevelClient 实例,为业务端使用时对 ES 集群的扩展提供便利。 对 RestHighLevelClient 的主要功能进行二次封装如:索引的存在判断、创建、更新、删除;文档的存在判断、获取、新增、更新、保存、删除、统计、查询。降低开发人员使用 RestHighLevelClient 的复杂度,提高开发效率。
3.4.2、ES 查询数据权限隔离
对于一些有数据隔离需求的业务场景,我们提供了一个 ES 数据隔离插件。在 ES SDK 中设计了一个搜索过滤器的接口,采用拦截器的方式对统计文档,搜索文档等方法的搜索条件参数进行拦截过滤。
/**
* 搜索过滤器
*/
public interface SearchSourceBuilderFilter {
String getFilterName();
void filter(SearchSourceBuilder searchSourceBuilder);
}
4 坑
4.1 Canal 相关
4.1.1 Canal Admin 部署时需注意的配置项
如何支持 HA:'canal.instance.global.spring.xml' 设置为 'classpath:spring/default-instance.xml'
设置合适的并行线程数:canal.instance.parser.parallelThreadSize,我们当前设置的是 16,如果该配置项被注释掉,那么可能会导致解析阻塞
开启 tsdb 导致的各种问题:canal 默认开启 tsdb 功能,也就是会通过 h2 数据库缓存解析的表结构,但是实际情况下,如果上游变更了表结构,h2 数据库对应的缓存是不会更新的,这个时候一般会出现神奇的解析异常,异常的信息一般如下:‘Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table’,该异常还会导致一个可怕的后果:解析线程被阻塞,也就是 binlog 事件不会再接收和解析。目前认为比较可行的解决方案是:禁用 tsdb 功能,也就是 canal.instance.tsdb.enable 设置为 false。如果不禁用 tsdb 功能,一旦出现了该问题,必须要「先停止」Canal 服务,接着「删除」$CANAL_HOME/conf/ 目标数据库实例标识 /h2.mv.db 文件,然后「启动」Canal 服务。目前我们是设置为禁用的。
设置合理的订阅级别:其配置项是‘canal.instance.filter.regex’;库表订阅的配置建议配置到表级别,如果定义到库级别一方面会消费一些无效的消息,给下游的 MQ 等带来不必要的压力。还有可能订阅到一些日志表等这类有着大字段数据的消息,消息过大在 JSON 化的时候可能导致内存溢出异常。针对这个问题我们进行大字段过滤和告警的改造。
4.1.2 binlog 文件不存在,导致同步异常
单机部署的删除 canal/conf/$instance 目录中的 meta.dat 文件
集群模式需要进入 zk 删除 /otter/canal/destinations/xxxxx/1001/cursor,然后重启 canal
4.2 ES updateByQuery 问题
5 规划
问题时及时报警,特别在业务连续性监控上,如系统内特定组件工作异常导致数据同步流中断,是后续需重点优化的方向
有些对实时性要求较高的业务依赖该系统进行数据同步,随着业务量越来越大,该方案当前当前采用的 MQ 组件在性能和高可用性都有所欠缺,后续打算采用性能更好,可用性机制更完善的 MQ 组件
由于采用小步快跑迭代,设计更多考虑线上运行顺畅性,而忽略新业务接入便利性,目前一个新的业务服务对接数据同步系统,需要维护人员做不少配置文件,数据库等相关的修改,并做人工确认,随着接入需求越来越频繁,亟需一个管理后台,提升接入的效率和自动化度
作者简介:魔都国企技术专家兼架构,多家大厂后台研发和架构经验,负责复杂度极高业务系统的模块化、服务化、平台化研发工作。具有丰富带团队经验,深厚人才识别和培养的积累。
编程严选网
往期推荐
这里有最新开源资讯、软件更新、技术干货等内容
点这里 ↓↓↓ 记得 关注✔ 标星⭐ 哦
微信扫码关注该文公众号作者