网站首页 > 技术文章 正文
需求
将一台mysql数据库的数据实时同步到另外一台国产数据库上
技术选型
使用阿里开源的中间件Canal,该中间件支持配置要监控的数据库,要监控的表,支持订阅实时变化数据,支持数据同步到Kafka、消息队列,目前支持mysql和oracle数据库版本
- 服务端:负责解析MySQL的binlog日志,传递增量数据给客户端或者消息中间件
- 客户端:负责解析服务端传过来的数据,然后定制自己的业务处理。
部署步骤
- 官网下载安装包 canal.deployer-1.1.5.tar.gz https://github.com/alibaba/canal/releases
- 配置文件 canal.deployer-1.1.5\conf\example\instance.properties 配置好需要监控的书库和表信息,其他细节可以自行搜索一下或者查看官网文档
- 开启被监控数据库mysql的binlog日志,在配置文件my.ini[mysqld]节点下添加
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
修改后需要重启mysql服务,不然可能不生效,重启后是否开启了,需要检查一下,通过命令show variables like '%log_bin%'; log_bin为on表示开启成功
- 通过canal.deployer-1.1.5\bin目录中启动脚本startup.bat启动canal服务端
- 在代码中监控canal实例,可以获取数据的实时变化
- 主要源码
//获取canalServer连接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname,
Integer.parseInt(port)),example,"", "");
int batchSize = 1000;
try {
//连接canalServer
connector.connect();
//订阅Desctinstion
connector.subscribe();
connector.rollback();
while (true) {
//尝试从master那边拉去数据batchSize条记录,轮询拉取数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("线程休眠异常,此异常忽略", e);
}
} else {
try {
dataHandle(message.getEntries()) ;
} catch (JMSException e) {
return ;
} catch (Exception e) {
return ;
}
}
//确认消费。如果发送mq失败,不确认,下次重新启动服务,canal中间件会从未确认记录开始推送
connector.ack(batchId);
}
} catch (CanalClientException e) {
} catch (Exception e) {
}finally {
connector.disconnect();
}
private void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException, JMSException {
for (CanalEntry.Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
//entry里面有详细的信息,打印出来看看,可以找到更新的数据变化
}
}
}
}
}
猜你喜欢
- 2024-11-01 支持断点续传的数据库同步服务 断点续传需要服务器支持吗
- 2024-11-01 Oracle OGG 单向DDL同步 oracle ogg双向同步
- 2024-11-01 数据同步组件(Canal)在珍爱网的应用与实践
- 2024-11-01 MySQL 到 Hazelcast Cloud 实时数据同步实操分享
- 2024-11-01 用Spark实现高并发数据同步 spark并行
- 2024-11-01 PostgreSQL中使用FDW+MV玩转数据同步
- 2024-11-01 mysql:Otter跨机房数据同步(单向)
- 2024-11-01 「数据库」 如何解决异地机房的数据同步问题?
- 2024-11-01 阿里数据同步工具Otter和Canal简介
- 2024-11-01 DataX-Web - 可视化分布式数据同步工具
你 发表评论:
欢迎- 632℃几个Oracle空值处理函数 oracle处理null值的函数
- 625℃Oracle分析函数之Lag和Lead()使用
- 614℃0497-如何将Kerberos的CDH6.1从Oracle JDK 1.8迁移至OpenJDK 1.8
- 608℃Oracle数据库的单、多行函数 oracle执行多个sql语句
- 606℃Oracle 12c PDB迁移(一) oracle迁移到oceanbase
- 598℃【数据统计分析】详解Oracle分组函数之CUBE
- 588℃最佳实践 | 提效 47 倍,制造业生产 Oracle 迁移替换
- 572℃Oracle有哪些常见的函数? oracle中常用的函数
- 最近发表
-
- oracle 19cOCM认证有哪些内容(oracle认证ocm月薪)
- Oracle新出AI课程认证,转型要持续学习
- oracle 表的查询join顺序,可能会影响查询效率
- Oracle DatabaseAmazon Web Services正式可用,Oracle数据库上云更容易了
- Oracle 19.28 RU 升级最佳实践指南
- 汉得信息:发布EBS系统安装启用JWS的高效解决方案
- 如何主导设计一个亿级高并发系统架构-数据存储架构(三)
- Java 后端开发必看!工厂设计模式轻松拿捏
- ORA-00600 「25027」 「x」报错(抱错孩子电视剧 爸爸是武术 另一个爸爸是画家)
- 新项目终于用上了jdk24(jdk新建项目)
- 标签列表
-
- 前端设计模式 (75)
- 前端性能优化 (51)
- 前端模板 (66)
- 前端跨域 (52)
- 前端缓存 (63)
- 前端aes加密 (58)
- 前端脚手架 (56)
- 前端md5加密 (54)
- 前端路由 (61)
- 前端数组 (73)
- 前端js面试题 (50)
- 前端定时器 (59)
- 前端获取当前时间 (50)
- Oracle RAC (76)
- oracle恢复 (77)
- oracle 删除表 (52)
- oracle 用户名 (80)
- oracle 工具 (55)
- oracle 内存 (55)
- oracle 导出表 (62)
- oracle约束 (54)
- oracle 中文 (51)
- oracle链接 (54)
- oracle的函数 (58)
- 前端调试 (52)
本文暂时没有评论,来添加一个吧(●'◡'●)