网站首页 > 技术文章 正文
需求背景
需求一:前段时间,公司进行一个十年老系统重构,其中涉及到很多数据库、表的迁移,基本思路就是大表通过定时任务分片同步,小表通过rest接口的方式同步数据,所以其中开发了大量只使用一次的代码;
需求二:在做数据异构时,比如将Mysql数据库表同步到ES或者ADB时,其中需要进行一定的数据清洗和过滤,所以增量数据通过订阅binglog放到MQ中进行监听处理,历史全量数据通过定时任务或者将历史数据灌到一个新的数据库,也通过订阅binglog放到MQ中进行相同的处理,如果涉及到ES索引重建,那么需要重复操作以上流程;
需求三:大表的数据归档,比如财务的清结算数据,业务方只关心最近几个月的数据,类似业务表数据量越来越大,很可能产生慢查询、锁表等一系列问题,所以需要将历史数据有规划的进行数据归档。但是以上看似很简单的操作如果不通过程序实现,那么就需要依赖DBA实施;
以上列举了工作中经常会遇到的场景,当然类似的需求还有很多很多。通过以上分析,我们不难发现目前的做法开发成本、资源成本较高。那么我们需要通过什么手段来解决以上问题呢?最近调研了一些资料,感觉DataX基本上可以非常简单完成以上功能。
官网地址:https://github.com/alibaba/DataX
设计理念
DataX是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
架构设计
大家在用过 IDEA 的时候都知道,IDEA 有很多非常棒的插件,用户可根据自身编程需求,下载相关的插件,DataX 也是使用这种可插拔的设计,采用了 Framework + Plugin 的架构设计,如下图所示:
有了插件,DataX 可支持任意数据源到数据源,只要实现了 Reader/Writer Plugin,官方已经实现了主流的数据源插件,比如 MySQL、Oracle、SQLServer 等,当然我们也可以开发一个 DataX 插件。
Reader:数据采集模块,负责采集数据源的数据,将数据发给Framework。
Wiriter: 数据写入模块,负责不断向Framwork取数据,并将数据写入到目的端。
Framework:用于连接read和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等你核心技术问题。
运行原理
DataX 核心主要由 Job、Task Group、Task、Schedule、Channel 等概念组成:
Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。
Task:由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作。
Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5。
TaskGroup:负责启动Task。
Channel:DataX 会单独启动一条线程运行运行一个 Task,而 Task 会持有一个 Channel,用作 Reader 与 Writer 的数据传输媒介,DataX 的数据流向都是按照 Reader—>Channel—>Writer 的方向流转。Channel 作为传输通道,即能充当缓冲层,同时还能对数据传输进行限流操作。
调度流程
DataX 将用户的 job.json 同步作业配置解析成一个 Job,DataX 通过 JobContainer 完成全局切分、调度、前置语句和后置语句等工作,整体调度流程用如下图表示:
操作实战
1.下载地址: http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
2.解压
3.运行自检脚本 bin/datax.py job/job.json
出现以上命令框代表环境可用。
4.通过命令行测试mysql数据库表之间同步,表c_s同步到c_s1,首先编写同步脚本,vim /job/mysql-mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"c_name"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/mybd"
],
"table": [
"c_s"
]
}
],
"password": "root",
"username": "root"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"cname"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/mybd",
"table": [
"c_s1"
]
}
],
"password": "root",
"username": "root",
"writeMode": "replace"
}
}
}
],
"setting": {
"speed": {
"channel": "2"
}
}
}
}
然后执行脚本,bin/datax.py job/mysql-mysql.json
数据表中的3条数据同步完成。
5.通过命令行测试mysql数据同步ES,表c_s同步到索引c_s上,首先编写同步脚本,vim /job/mysql-es.json
{
"job": {
"setting": {
"speed": {
"channel": 1,
"record": -1,
"byte": -1
}
},
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"c_name"
],
"splitPk": "id",
"connection": [{
"table": [
"c_s"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/mybd"
]
}]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://es-cn-xxxxx.elasticsearch.aliyuncs.com:9200",
"index": "c_s",
"type": "default",
"accessId": "elastic",
"accessKey": "xxxxxx",
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 5000,
"splitter": ",",
"column": [
{"name": "id", "type": "id"},
{ "name": "cname","type": "text" }
]
}
}
}]
}
}
然后执行脚本,bin/datax.py job/mysql-es.json
Mysql数据同步ES完成,索引如下图:
以上是Datax实现的基本数据源同步插件功能,如果我们需要将Mysql的数据同步到MQ中,需要去开发插件,后期继续开发出统一的MQ Writer插件,这样就能实现复杂的业务处理数据异构场景啦。
操作界面
以上是通过脚本实现数据异构的同步,但是这样是不够方便灵活的,阿里开源的时候并没有提供可视化界面,好在已经有大佬开源了一套datax-web:https://github.com/WeiYe-Jing/datax-web 前端界面,我们可以通过界面灵活的进行脚本配置和执行,界面如下:
不断分享开发过程用到的技术和面试经常被问到的问题,如果您也对IT技术比较感兴趣可以「关注」我
猜你喜欢
- 2024-11-06 Java开发必备,超详细的JDK安装教程(图文详解)
- 2024-11-06 Oracle Linux 7.9发布:基于Linux 5.4 LTS与UEK 6企业内核构建
- 2024-11-06 windows操作系统的包管理工具-巧克力Choco,安装JDK不再登录
- 2024-11-06 手把手教你搭建java环境 搭建javaweb开发环境的步骤
- 2024-11-06 linux下推荐的开发环境的安装和配置
- 2024-11-06 Java基础|开发环境搭建 java开发环境的搭建步骤
- 2024-11-06 Oracle 19c给我的启示:RESTful API
- 2024-11-06 jvm总结(二)方法区、常量池 jvm内存模型常量池
- 2024-11-06 JDK下载安装 jdk下载安装步骤流程图简述
- 2024-11-06 大厂必知必会——G1收集器详解及调优
你 发表评论:
欢迎- 617℃几个Oracle空值处理函数 oracle处理null值的函数
- 610℃Oracle分析函数之Lag和Lead()使用
- 599℃0497-如何将Kerberos的CDH6.1从Oracle JDK 1.8迁移至OpenJDK 1.8
- 595℃Oracle数据库的单、多行函数 oracle执行多个sql语句
- 591℃Oracle 12c PDB迁移(一) oracle迁移到oceanbase
- 582℃【数据统计分析】详解Oracle分组函数之CUBE
- 572℃最佳实践 | 提效 47 倍,制造业生产 Oracle 迁移替换
- 560℃Oracle有哪些常见的函数? oracle中常用的函数
- 最近发表
-
- PageHelper - 最方便的 MyBatis 分页插件
- 面试二:pagehelper是怎么实现分页的,
- MyBatis如何实现分页查询?(mybatis-plus分页查询)
- SpringBoot 各种分页查询方式详解(全网最全)
- 如何在Linux上运行exe文件,怎么用linux运行windows软件
- 快速了解hive(快速了解美国50个州)
- Python 中的 pyodbc 库(pydbclib)
- Linux搭建Weblogic集群(linux weblogic部署项目步骤)
- 「DM专栏」DMDSC共享集群之部署(一)——共享存储配置
- 故障分析 | MySQL 派生表优化(mysql pipe)
- 标签列表
-
- 前端设计模式 (75)
- 前端性能优化 (51)
- 前端模板 (66)
- 前端跨域 (52)
- 前端缓存 (63)
- 前端aes加密 (58)
- 前端脚手架 (56)
- 前端md5加密 (54)
- 前端路由 (61)
- 前端数组 (73)
- 前端js面试题 (50)
- 前端定时器 (59)
- 前端获取当前时间 (50)
- Oracle RAC (76)
- oracle恢复 (77)
- oracle 删除表 (52)
- oracle 用户名 (80)
- oracle 工具 (55)
- oracle 内存 (55)
- oracle 导出表 (62)
- oracle约束 (54)
- oracle 中文 (51)
- oracle链接 (54)
- oracle的函数 (58)
- 前端调试 (52)
本文暂时没有评论,来添加一个吧(●'◡'●)