Mysql2ch,一个同步MySQL数据到ClickHouse的项目

介绍 mysql2ch 是一个用于同步 MySQL 到 ClickHouse 的工具,支持全量同步与增量同步。 特性 支持全量同步与增量同步。 支持 DDL 与 DML,当前支持 DDL 字段新增与删除与重命名,支持所有的 DML。 支持 redis 和 kafka 作为消息队列。 依赖 kafka,缓冲 MySQL binlog 的消息队列,当使用 kafka 作为消息队列时需要。 redis,缓存 MySQL binlog position 与 file。 安装 pip install mysql2ch 使用 配置文件 [core] # 当前支持kafka和redis作为消息队列 broker_type = kafka mysql_server_id = 1 # redis stream最大长度,多出的消息会按照FIFO删除 queue_max_len = 200000 init_binlog_file = binlog.000024 init_binlog_pos = 252563 # 跳过删除的表,多个以逗号分开 skip_delete_tables = # 跳过更新的表,多个以逗号分开 skip_update_tables = # 跳过的DML,多个以逗号分开 skip_dmls = # 每多少条消息同步一次,生产环境推荐20000 insert_num = 1 # 每多少秒同步一次,生成环境推荐60秒 insert_interval = 1 [sentry] # sentry environment environment = development # sentry dsn dsn = https://xxxxxxxx@sentry.test.com/1 [redis] host = 127.0.0.1 port = 6379 password = db = 0 prefix = mysql2ch # 启用哨兵模式 sentinel = false # 哨兵地址 sentinel_hosts = 127.0.0.1:5000,127.0.0.1:5001,127.0.0.1:5002 sentinel_master = master [mysql] host = 127.0.0.1 port = 3306 user = root password = 123456 # 需要同步的数据库 [mysql.test] # 需要同步的表 tables = test # 指定kafka分区 kafka_partition = 0 [clickhouse] host = 127.0.0.1 port = 9000 user = default password = # need when broker_type=kafka [kafka] # kafka servers,multiple separated with comma servers = 127.0.0.1:9092 topic = mysql2ch 全量同步 你可能需要在开始增量同步之前进行一次全量导入,或者使用--renew重新全量导入,该操作会删除目标表并重新同步。 ...

May 5, 2020

新面貌,新博客

某一天深夜里,窗外马路上零星车辆驶过,突然心血来潮记起了自己年久失修的博客,打算重构一番。 之前是使用的hexo框架搭建的,感觉还不错。不过,又想尝试一些新的东西,搜索了一番,准备用hugo来重构博客。 看了一下文档,用法跟hexo很是相似,只不过hexo是用node写的,hugo是用go写的。 说起来,总想好好经营一下自己的博客,或记录生活与心情,或沉淀知识与技术。只不过总是难以持续,怪生活琐事太多, 怪这样那样,归根结底是因为懒罢了。回想曾经自己也是喜欢文字的人,不知道什么时候就再也没有那份心思了。哎~~~ 罢了,重新构建起自己的一个天地,只愿勤勉吧。

November 28, 2019

flask-admin之自定义ModelView使不同行显示不同样式

前言 如果有使用过django-suit—一个 django admin 的美化框架,这个框架的话,其 admin 设置中可以使用 suit_row_attributes 这个方法,在后台表单中设置表单列表中不同的行有不同的属性。这个方法在有些情况下很有用,比如说订单列表可以根据不同的订单状态在表单中呈现不同的样式,比如支付成功的可以设置为 bootstrap 的 table-success,支付失败的可以设置为 table-danger,这样在查看后台表单的时候可以更加一目了然。代码类似于下面这样: def suit_row_attributes(self, obj->Order, request): css = { 2: 'table-success', }.get(obj.status) if css: return {'class': css} 这样在订单状态为 2 的一行的时候,该行会显示为绿色。 关于 flask-admin 由于最近在使用 flask 这个框架,对应的 admin 管理使用的是flask-admin,同样需要像 django-suit 这样设置 row attributes。 关于 django 和 flask,flask 是一个微服务框架,本身只提供了基本的请求响应处理等功能,并且可以很快的搭建一个 rest api 服务,但是如果需要其他的,比如 ORM、权限管理、后台管理等扩展功能,就需要一个一个集成第三方扩展;而 django 提供的是一站式全方位解决方案,包含 admin 管理、ORM、权限控制、模板引擎等,具体该使用哪个框架,需要视业务场景而定。 回到 flask-admin,这个扩展的功能还是比较强大的,包含了 model view 显示,action 等功能,并且可以很方便的自定义。然而关于上面提到的需求,官方是没有自带的解决方案的,所以就只能重写 list.html 这个模板了。 解决方案 官方文档里有写: To override any of the built-in templates, simply copy them from the Flask-Admin source into your project’s templates/admin/ directory. As long as the filenames stay the same, the templates in your project directory should automatically take precedence over the built-in ones. ...

March 18, 2019

Django之自定义Field实现admin直接上传图片到七牛云并存储链接

Django自带的admin模块的功能实际上已经足够强大了,但是我们不免有一些自定义化的需求,比如我们在后台存储图片时,为了提高系统的性能,我们会使用第三方cdn服务去存储图片等静态资源,然后将链接存入数据库中,但是将图片上传到cdn,然后复制链接存入数据库又是比较繁琐的一个过程,作为程序员,肯定要寻求更高效的解决方法,比如自定义Field。 实际上覆盖admin的字段在Django中有两种方式,一种是自定义Form,然后在admin.py对应的ModelAdmin中覆盖form属性,比如: class MyForm(forms.Form): class Meta: widgets = { 'field':CustomWidget() } # admin.py class MyAdmin(admin.ModelAdmin): form = MyForm 这样就将field字段覆盖为CustomWidget,在CustomWidget中,就可以自定义template和css等。这样的话在每个需要自定义的Field都需要这样做一次,如果使用频繁的话就不怎么推荐,推荐另一种更通用的方式,自定义models.py中的Field。 1、构建widgets 要实现上传七牛的功能,首先在项目同名的目录下建一个widgets.py,实现自己的QiniuWidgets类,另外说一点的是,该实现参考了django-ckeditor项目,可以在admin中集成富文本编辑器,还是很不错的,推荐使用。 class QiniuWidgets(forms.FileInput): template_name = 'admin/qiniu_input.html' def __init__(self, attrs=None, app=None, table=None, unique_list=None): """ :param attrs: :param app: app :param table: 数据模型 :param unique_list: 唯一标识列表,除了id """ super(QiniuWidgets, self).__init__(attrs) self.unique = unique_list if settings.DEBUG: env = 'dev' else: env = 'pro' self.filename_prefix = '{}/{}/{}/'.format(env, app, table) def format_value(self, value): return value def value_from_datadict(self, data, files, name): file = files.get(name) # type:InMemoryUploadedFile file_data = b''.join(chunk for chunk in file.chunks()) # 取出二进制数据 file_type = file.name.split('.')[-1] # 得到文件的后缀 unique_filename = '_'.join(list(map(lambda x: data.get(x), self.unique))) file_name = self.filename_prefix + '{}_{}.{}'.format(name, unique_filename, file_type) # 构造文件的唯一文件名 q = QiniuUtils(settings.QINIU_ACCESS_KEY, settings.QINIU_SECRET_KEY) # 七牛上传实例 q.delete(settings.QINIU_BUCKET_NAME, file_name) # 删除已经存在的 q.upload(settings.QINIU_BUCKET_NAME, file_name, file_data) # 上传新的 http = 'https://' if settings.QINIU_USE_SSL else 'http://' url = http + settings.QINIU_DOMAIN + '/' + file_name # 拼接最终的url return url def render(self, name, value, attrs=None, renderer=None): context = self.get_context(name, value, attrs) template = loader.get_template(self.template_name).render(context) return mark_safe(template) 现在QiniuWidgets介绍类中方法的含义: ...

February 13, 2019

关于idea连接docker无法下载docker-java.jar问题解决

最近使用idea连接docker是遇到一个问题,就是一直无法连接,提示是无法从maven的中央仓库下载docker-java.jar。但是我的settings.xml已经更改为阿里云镜像了,那是因为什么呢?网上百度了一番没有找到答案,我猜测idea的maven默认使用的maven仓库并不是我们自己配置的,应该是idea自己确定的。 于是寻找了一番,最终定位到这个文件: 打开查看一番,果不然奇然是定义好的jar下载的仓库地址,于是改成阿里云的地址,像这样 再次连接docker,速度嗖嗖的就上来了,成功解决该问题。

May 12, 2017