加入收藏 | 设为首页 | 会员中心 | 我要投稿 聊城站长网 (https://www.0635zz.com/)- 智能语音交互、行业智能、AI应用、云计算、5G!
当前位置: 首页 > 站长学院 > MySql教程 > 正文

解读canal同步MySQL增量数据到 ES

发布时间:2023-09-15 15:21:08 所属栏目:MySql教程 来源:
导读:canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

这篇文章,我们手把手向同学们展示使用 canal 将 MySQL 增量数据同步到 ES 。

1 集群模式

server 对应
canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
 
这篇文章,我们手把手向同学们展示使用 canal 将 MySQL 增量数据同步到 ES 。
 
1 集群模式
 
server 对应一个 canal 运行实例 ,对应一个 JVM 。
 
server 中包含 1..n 个 instance , 我们可以将 instance 理解为配置任务。
 
instance 包含如下模块 :
 
eventParser数据源接入,模拟 slave 协议和 master 进行交互,协议解析
 
eventSinkParser 和 Store 链接器,进行数据过滤,加工,分发的工作
 
eventStore数据存储
 
metaManager增量订阅 & 消费信息管理器
 
真实场景中,canal 高可用依赖 zookeeper ,笔者将客户端模式可以简单划分为:TCP 模式 和 MQ 模式 。
 
实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。
 
顺序消费:
 
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
 
图片
 
2 MySQL配置
 
1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf  中配置如下
 
复制
 
[mysqld]
 
log-bin=mysql-bin # 开启 binlog
 
binlog-format=ROW # 选择 ROW 模式
 
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
 
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。
 
2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。
 
复制
 
CREATE USER canal IDENTIFIED BY 'canal';  
 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
 
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
 
FLUSH PRIVILEGES;
 
3、创建数据库商品表 t_product 。
 
复制
 
CREATE TABLE `t_product` (
 
 `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
 
 `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
 
 `price` DECIMAL ( 10, 2 ) NOT NULL,
 
 `status` TINYINT ( 4 ) NOT NULL,
 
 `create_time` datetime NOT NULL,
 
 `update_time` datetime NOT NULL,
 
   PRIMARY KEY ( `id` )
 
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

3 Elasticsearch配置
 
使用 Kibana 创建商品索引 。
 
复制
 
PUT /t_product
 
{
 
    "settings": {
 
        "number_of_shards": 2,
 
        "number_of_replicas": 1
 
    },
 
    "mappings": {
 
            "properties": {
 
               "id": {
 
                    "type":"keyword"
 
                },
 
                "name": {
 
                    "type":"text"
 
                },
 
                "price": {
 
                    "type":"double"
 
                },
 
                "status": {
 
                    "type":"integer"
 
                },
 
                "createTime": {
 
                    "type": "date",
 
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
 
                },
 
                "updateTime": {
 
                    "type": "date",
 
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
 
                }
 
        }
 
    }
 
}

执行完成,
 
4 RocketMQ 配置
 
创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题。
 
5 canal 配置
 
我们选取 canal 版本 1.1.6 ,进入 conf 目录。
 
1、配置 canal.properties
 
复制
 
#集群模式 zk地址
 
canal.zkServers = localhost:2181
 
#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
 
canal.serverMode = rocketMQ
 
#instance 列表
 
canal.destinations = product-syn
 
#conf root dir
 
canal.conf.dir = ../conf
 
#全局的spring配置方式的组件文件 生产环境,集群化部署
 
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
 
######  以下部分是默认值 展示出来
 
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
 
canal.mq.canalBatchSize = 50
 
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
 
canal.mq.canalGetTimeout = 100
 
# 是否为 flat json格式对象
 
canal.mq.flatMessage = true
 
2、instance 配置文件
 
在 conf 目录下创建实例目录 product-syn  , 在 product-syn 目录创建配置文件 :instance.properties。
 
复制
 
#  按需修改成自己的数据库信息
 
#################################################
 
...
 
canal.instance.master.address=192.168.1.20:3306
 
# username/password,数据库的用户名和密码
 
...
 
canal.instance.dbUsername = canal
 
canal.instance.dbPassword = canal
 
...
 
# table regex
 
canal.instance.filter.regex=mytest.t_product
 
# mq config
 
canal.mq.topic=product-syn-topic
 
# 针对库名或者表名发送动态topic
 
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
 
canal.mq.partitinotallow=0
 
# hash partition config
 
#canal.mq.partitinotallow=3
 
#库名.表名: 唯一主键,多个表之间用逗号分隔
 
#canal.mq.partitinotallow=mytest.person:id,mytest.role:id
 
#################################################

3、服务启动
 
启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。
 
修改一条 t_product 表记录,可以从 RocketMQ 控制台中观测到新的消息。
 
6 消费者
 
1、产品索引操作服务
 
2、消费监听器
 
消费者逻辑重点有两点:
 
顺序消费监听器
 
将消息数据转换成  JSON 字符串,从 data 节点中获取表最新数据(批量操作可能是多条)。然后根据操作类型 UPDATE、 INSERT、DELETE 执行产品索引操作服务的方法。
 
7 写到最后
 
canal 是一个非常有趣的开源项目,很多公司使用 canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。
 
推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Disruptor、 流程模型抽象等。
 
 

(编辑:聊城站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章