专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

Canal数据同步实战 数据同步工具有哪些

ins518 2024-11-01 13:18:44 技术文章 15 ℃ 0 评论

需求

将一台mysql数据库的数据实时同步到另外一台国产数据库上

技术选型

使用阿里开源的中间件Canal,该中间件支持配置要监控的数据库,要监控的表,支持订阅实时变化数据,支持数据同步到Kafka、消息队列,目前支持mysql和oracle数据库版本

  • 服务端:负责解析MySQL的binlog日志,传递增量数据给客户端或者消息中间件
  • 客户端:负责解析服务端传过来的数据,然后定制自己的业务处理。

部署步骤

  • 官网下载安装包 canal.deployer-1.1.5.tar.gz https://github.com/alibaba/canal/releases
  • 配置文件 canal.deployer-1.1.5\conf\example\instance.properties 配置好需要监控的书库和表信息,其他细节可以自行搜索一下或者查看官网文档


  • 开启被监控数据库mysql的binlog日志,在配置文件my.ini[mysqld]节点下添加

log-bin=mysql-bin # 开启 binlog

binlog-format=ROW # 选择 ROW 模式

修改后需要重启mysql服务,不然可能不生效,重启后是否开启了,需要检查一下,通过命令show variables like '%log_bin%'; log_bin为on表示开启成功

  • 通过canal.deployer-1.1.5\bin目录中启动脚本startup.bat启动canal服务端
  • 在代码中监控canal实例,可以获取数据的实时变化



  • 主要源码

//获取canalServer连接

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname,

Integer.parseInt(port)),example,"", "");

int batchSize = 1000;

try {

//连接canalServer

connector.connect();

//订阅Desctinstion

connector.subscribe();

connector.rollback();

while (true) {

//尝试从master那边拉去数据batchSize条记录,轮询拉取数据

Message message = connector.getWithoutAck(batchSize);

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

log.error("线程休眠异常,此异常忽略", e);

}

} else {

try {

dataHandle(message.getEntries()) ;

} catch (JMSException e) {

return ;

} catch (Exception e) {

return ;

}

}

//确认消费。如果发送mq失败,不确认,下次重新启动服务,canal中间件会从未确认记录开始推送

connector.ack(batchId);

}

} catch (CanalClientException e) {

} catch (Exception e) {

}finally {

connector.disconnect();

}

private void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException, JMSException {

for (CanalEntry.Entry entry : entrys) {

if (EntryType.ROWDATA == entry.getEntryType()) {

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

CanalEntry.EventType eventType = rowChange.getEventType();

//entry里面有详细的信息,打印出来看看,可以找到更新的数据变化

}

}

}

}

}

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表