从mysql2ch到synch,一次重构与升级

项目地址 https://github.com/long2ice/synch mysql2ch mysql2ch 从二月份开始到现在已经接近四个月,项目起初是源于公司的需求,同步 MySQL 的数据到 ClickHouse 进行分析与统计。当时调研了一圈,发现并没有符合需求的项目,只找到一个勉强可用的,于是在该项目之上进行开发与完善。 synch mysql2ch 项目改名为 synch,其主要原因在于 mysql2ch 支持了 postgres 的同步,再取名为 mysql2ch 就有些不合适了,遂改名为 synch,意思是同步到 ClickHouse,其项目目标是同步其它类型的数据库到 ClickHouse 中。 当前架构 设计与实现 最开始的版本非常简陋,只是实现了监听 mysql binlog 然后直接插入到 ClickHouse。在后续的开发与迭代中,逐渐实现了以下功能,以及引入了中间件。 命令行 命令行对于一个独立程序是有必要的。命令行程序可以接受自定义的参数,通过在程序指定不同的参数,可以使用其不同的功能,对于那些需要每次启动都可能会不同的参数,将其放在命令行参数中是有必要的。 配置文件 丰富的配置项可以将程序的自定义配置暴露出来,以满足个性化的运行需求,增强程序的自定义性。 模块分离 该项目分为三个模块,除了源数据库与目标数据库,也就是 ClickHouse 之外,包含生产端、消息队列、消费端。模块的分离是为了解耦,各个模块专注于自身的功能实现以及可靠性。以及借助于消息队列,实现分布式消费。 消息队列 消息队列的引入是为了增加程序的健壮性以及增加生产与消费的性能。通过引入消息队列,将 binlog 先缓冲到消息队列中而不是直接插入到 ClickHouse,避免了当程序崩溃时内存数据的丢失。并且借助于消息队列的 ack 机制,可以保证消息的可靠消费;以及借助消息队列的高吞吐量,将消费者与生产者分离,生产者与消费者之间互不影响,异步工作。 另外一点考虑到 binlog 的有序性,消息队列的选型也是需要考虑的,目前选择了 kafka 和 redis。redis5.0 提供了原生的消息队列并且具有所有消息队列该有的功能,并且消息是有序的,其轻量的特性在并发量不那么大的时候是一个很好的选择;而选择 kafka 是因为其强大的吞吐量,并且单个 topic 的单个分区也是有序的,可以很好的对应需要同步的数据库与数据库表。 redis redis 的引入其一是为了缓存 MySQL 的 binlog,因为 MySQL 的 binlog 是不断在变化的,当程序崩溃或重启时,需要记录当前的 binlog,然后下次启动时需要从上次的位置继续监听;其二是作为消息队列,相比 kafka 而且因为本身既作为缓存组件也作为消息队列,组件依赖较少。 ...

June 30, 2020

Mysql2ch,一个同步MySQL数据到ClickHouse的项目

介绍 mysql2ch 是一个用于同步 MySQL 到 ClickHouse 的工具,支持全量同步与增量同步。 特性 支持全量同步与增量同步。 支持 DDL 与 DML,当前支持 DDL 字段新增与删除与重命名,支持所有的 DML。 支持 redis 和 kafka 作为消息队列。 依赖 kafka,缓冲 MySQL binlog 的消息队列,当使用 kafka 作为消息队列时需要。 redis,缓存 MySQL binlog position 与 file。 安装 pip install mysql2ch 使用 配置文件 [core] # 当前支持kafka和redis作为消息队列 broker_type = kafka mysql_server_id = 1 # redis stream最大长度,多出的消息会按照FIFO删除 queue_max_len = 200000 init_binlog_file = binlog.000024 init_binlog_pos = 252563 # 跳过删除的表,多个以逗号分开 skip_delete_tables = # 跳过更新的表,多个以逗号分开 skip_update_tables = # 跳过的DML,多个以逗号分开 skip_dmls = # 每多少条消息同步一次,生产环境推荐20000 insert_num = 1 # 每多少秒同步一次,生成环境推荐60秒 insert_interval = 1 [sentry] # sentry environment environment = development # sentry dsn dsn = https://xxxxxxxx@sentry.test.com/1 [redis] host = 127.0.0.1 port = 6379 password = db = 0 prefix = mysql2ch # 启用哨兵模式 sentinel = false # 哨兵地址 sentinel_hosts = 127.0.0.1:5000,127.0.0.1:5001,127.0.0.1:5002 sentinel_master = master [mysql] host = 127.0.0.1 port = 3306 user = root password = 123456 # 需要同步的数据库 [mysql.test] # 需要同步的表 tables = test # 指定kafka分区 kafka_partition = 0 [clickhouse] host = 127.0.0.1 port = 9000 user = default password = # need when broker_type=kafka [kafka] # kafka servers,multiple separated with comma servers = 127.0.0.1:9092 topic = mysql2ch 全量同步 你可能需要在开始增量同步之前进行一次全量导入,或者使用--renew重新全量导入,该操作会删除目标表并重新同步。 ...

May 5, 2020