百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

还在为数据同步而苦恼吗?手把手教你实现canal数据同步(三)

cac55 2024-10-07 06:38 21 浏览 0 评论

Canal实战之客户端适配器

canal 1.1.1版本之后, 增加客户端数据落地的适配及启动功能, 目前支持功能:

  • 客户端启动器
  • 同步管理REST接口
  • 日志适配器, 作为DEMO
  • 关系型数据库的数据同步(表对表同步), ETL功能
  • HBase的数据同步(表对表同步), ETL功能
  • ElasticSearch多表数据同步,ETL功能(新)
  • 后续支持redis、mongodb

适配器整体结构

client-adapter分为适配器启动器两部分, 适配器为多个fat jar, 每个适配器会将自己所需的依赖打成一个包, 以SPI的方式让启动器动态加载, 目前所有支持的适配器都放置在plugin目录下。

启动器为 SpringBoot 项目, 运行目录结构为:

详细结构如下:

- bin
    restart.sh
    startup.bat
    startup.sh
    stop.sh
- conf
bootstrap.yml
    application.yml
    - es
    biz_order.yml
    customer.yml
    mytest_user.yml
    - hbase
        mytest_person2.yml
    - rdb
    mytest_user.yml
- lib
...
- logs
- plugin
client-adapter.elasticsearch-1.1.4-jar-with-dependencies.jar
    client-adapter.hbase-1.1.4-jar-with-dependencies.jar
    client-adapter.logger-1.1.4-jar-with-dependencies.jar
    client-adapter.rdb-1.1.4-jar-with-dependencies.jar

适配器配置介绍

总配置文件 application.yml:

canal.conf:
  canalServerHost: 127.0.0.1:11111     # 对应单机模式下的canal server的ip:port
  zookeeperHosts: slave1:2181          # 对应集群模式下的zk地址, 如果配置了canalServerHost, 则以canalServerHost为准
  mqServers: slave1:6667 #or rocketmq  # kafka或rocketMQ地址, 与canalServerHost不能并存
  flatMessage: true                    # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  batchSize: 50                        # 每次获取数据的批大小, 单位为K
  syncBatchSize: 1000                  # 每次同步的批数量
  retries: 0                           # 重试次数, -1为无限重试
  timeout:                             # 同步超时时间, 单位毫秒
  mode: tcp # kafka rocketMQ           # canal client的模式: tcp直连 kafka rocketMQ
  srcDataSources:                      # 源数据库
    defaultDS:                         # 自定义名称
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true   # jdbc url 
      username: root                                            # jdbc 账号
      password: 121212                                          # jdbc 密码
  canalAdapters:                       # 适配器列表
  - instance: example                  # canal 实例名或者 MQ topic 名
    groups:                            # 分组列表
    - groupId: g1                      # 分组id, 如果是MQ模式将用到该值
      outerAdapters:                   # 分组内适配器列表
      - name: logger                   # 日志打印适配器
......

说明:

1. 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase

2. 目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息

适配器启动

前提:启动canal server (单机模式)

上传canal.adapter-1.1.4.tar.gz到/opt目录下

1. 在/opt目录下创建canal-adapter目录:mkdir canal-adapter

2. 解压压缩包:tar -zxvf canal.adapter-1.1.4.tar.gz -C canal-adapter

3. 修改conf/application.yml,如下

4. 启动:bin/startup.sh

server:
  port: 8081
logging:
  level:
    com.alibaba.otter.canal.client.adapter.hbase: DEBUG
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null


canal.conf:
  canalServerHost: 127.0.0.1:11111
  batchSize: 500                            
  syncBatchSize: 1000                       
  retries: 0                               
  timeout:                                 
  mode: tcp 
  canalAdapters:                            
  - instance: example                       
    groups:                                 
    - groupId: g1                           
      outerAdapters:                        
      - name: logger

logger适配器:

最简单的处理, 将收到的变更事件通过日志打印的方式进行输出, 如配置所示, 只需要定义name: logger即可。

adapter管理REST接口

查询所有订阅同步的canal instance或MQ topic:

curl http://127.0.0.1:8081/destinations

数据同步开关:

curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT

针对 example 这个canal instance/MQ topic 进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启

注: 如果在配置文件中配置了 zookeeperHosts 项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关

数据同步开关状态:

curl http://127.0.0.1:8081/syncSwitch/example

查看指定 canal instance/MQ topic 的数据同步开关状态


相关推荐

MIRIX重塑AI记忆:超Gemini 410%,节省99.9%内存,APP同步上线

MIRIX,一个由UCSD和NYU团队主导的新系统,正在重新定义AI的记忆格局。在过去的十年里,我们见证了大型语言模型席卷全球,从写作助手到代码生成器,无所不能。然而,即使最强大的模型依...

硬盘坏了怎么把数据弄出来对比10种硬盘数据恢复软件

机械硬盘或固态硬盘损坏导致数据丢失时,应立即停止对硬盘的读写操作,并根据损坏类型选择逻辑层恢复工具或专业物理恢复服务。紧急处置措施立即停止通电使用:发现硬盘异响、无法识别或数据异常时,需立即断开连接,...

蓝宝石B850A WIFI主板新玩法:内存小参调节体验

蓝宝石前段时间发布了一款性价比极高的主板:NITRO氮动B850AWIFI主板。这款主板的售价只要1349元,相比普遍1500元以上的B850主板,确实极具竞争力。虽然价格实惠,蓝宝石NITR...

内存卡损坏读不出怎么修复?这5个数据恢复工具汇总,3秒挽回!

在数字化生活的浪潮中,内存卡凭借小巧便携与大容量存储的特性,成为相机、手机、行车记录仪等设备存储数据的得力助手,承载着无数珍贵回忆与重要文件。然而,当内存卡突然损坏无法读取,无论是误删、格式化、病毒入...

内存卡修复不再难,2025年必学的6款软件工具

内存卡出现问题时,通常是因为文件系统损坏、物理损坏或病毒感染。通过专业的修复工具,我们可以尝试恢复数据并修复内存卡。内存卡修复利器:万兴恢复专家万兴恢复专家是一款功能强大的数据恢复软件,支持多种设备和...

有5款内存卡修复工具汇总,内存卡数据轻松找回!

在如今的数字时代,内存卡作为不可或缺的存储介质,广泛应用于相机、手机、行车记录仪等各类设备中,承载着我们珍贵的照片、视频以及重要文件。然而,数据丢失的风险却如影随形,误删、格式化、病毒入侵、硬件故障等...

揭秘:如何通过多种方式精准查询内存条型号及规避风险?

以下是内存条型号查询的常用方法及注意事项,综合了物理查看、软件检测、编码解析等多种方式:一、物理标签查看法1.拆机查看标签打开电脑主机/笔记本后盖找到内存条,观察标签上的型号标识。例如内存标签通常标...

内存卡数据恢复5个工具汇总推荐,轻松找回珍贵记忆!

在这个数字化时代,内存卡作为我们存储珍贵照片、重要文件的常用载体,广泛应用于手机、相机、平板电脑等设备。但数据丢失的意外却常常不期而至,误删除、格式化、病毒攻击,甚至内存卡的物理损坏,都可能让辛苦保存...

电脑内存智能监控清理,优化性能的实用软件

软件介绍Memorycleaner是一款内存清理软件。功能很强,效果很不错。Memorycleaner会在内存用量超出80%时,自动执行“裁剪进程工作集”“清理系统缓存”以及“用全部可能的方法清理...

TechPowerUp MemTest64:内存稳定性测试利器

TechPowerUpMemTest64:内存稳定性测试利器一、软件简介TechPowerUpMemTest64,由知名硬件信息工具GPU-Z的出品公司TechPowerUp发布,是一款专为64位...

微软推出AI恶意软件检测智能体Project Ire,精确度高达98%

IT之家8月6日消息,当地时间周二,微软宣布推出可自主分析恶意软件的AI检测系统原型——ProjectIre。该项目由微软研究院、Defender研究团队及Discovery&a...

农村老木匠常用的20种老工具,手艺人靠它养活一家人,你认识几种

生活中的手艺老匠人是非常受到尊敬和崇拜的,特别是在农村曾经的老匠人都是家里的“座上宾”。对于民间传统的手艺人,有一种说法就是传统的八大匠:木匠、泥匠、篾匠、铁匠、船匠、石匠、油匠和剃头匠。木匠的祖始爷...

恶意木马新变种伪装成聊天工具诱人点击

国家计算机病毒应急处理中心通过对互联网监测发现,近期出现一种恶意木马程序变种Trojan_FakeQQ.CTU。该变种通过伪装成即时聊天工具,诱使计算机用户点击运行。该变种运行后,将其自身复制到受感染...

学习网络安全 这些工具你知道吗?

工欲善其事必先利其器,在新入门网络安全的小伙伴而言。这些工具你必须要有所了解。本文我们简单说说这些网络安全工具吧!Web安全类web类工具主要是通过各种扫描工具,发现web站点存在的各种漏洞...

5分钟盗走你的隐私照片,这个全球性漏洞到底有多可怕?

这个时代,大家对电脑出现漏洞,可能已经习以为常。但如果机哥告诉大家,这个漏洞能够在5分钟内,破解并盗取你所有加密文件,而且还无法通过软件和补丁修复...这可就有点吓人啦。事情是酱婶的。来自荷兰埃因...

取消回复欢迎 发表评论: