专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

DataX数据异构、数据同步神器 数据异步是什么意思

ins518 2024-11-06 19:01:02 技术文章 12 ℃ 0 评论

需求背景

需求一:前段时间,公司进行一个十年老系统重构,其中涉及到很多数据库、表的迁移,基本思路就是大表通过定时任务分片同步,小表通过rest接口的方式同步数据,所以其中开发了大量只使用一次的代码;

需求二:在做数据异构时,比如将Mysql数据库表同步到ES或者ADB时,其中需要进行一定的数据清洗和过滤,所以增量数据通过订阅binglog放到MQ中进行监听处理,历史全量数据通过定时任务或者将历史数据灌到一个新的数据库,也通过订阅binglog放到MQ中进行相同的处理,如果涉及到ES索引重建,那么需要重复操作以上流程;

需求三:大表的数据归档,比如财务的清结算数据,业务方只关心最近几个月的数据,类似业务表数据量越来越大,很可能产生慢查询、锁表等一系列问题,所以需要将历史数据有规划的进行数据归档。但是以上看似很简单的操作如果不通过程序实现,那么就需要依赖DBA实施;

以上列举了工作中经常会遇到的场景,当然类似的需求还有很多很多。通过以上分析,我们不难发现目前的做法开发成本、资源成本较高。那么我们需要通过什么手段来解决以上问题呢?最近调研了一些资料,感觉DataX基本上可以非常简单完成以上功能。

官网地址:https://github.com/alibaba/DataX

设计理念

DataX是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

架构设计

大家在用过 IDEA 的时候都知道,IDEA 有很多非常棒的插件,用户可根据自身编程需求,下载相关的插件,DataX 也是使用这种可插拔的设计,采用了 Framework + Plugin 的架构设计,如下图所示:

有了插件,DataX 可支持任意数据源到数据源,只要实现了 Reader/Writer Plugin,官方已经实现了主流的数据源插件,比如 MySQL、Oracle、SQLServer 等,当然我们也可以开发一个 DataX 插件。

Reader:数据采集模块,负责采集数据源的数据,将数据发给Framework。

Wiriter: 数据写入模块,负责不断向Framwork取数据,并将数据写入到目的端。

Framework:用于连接read和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等你核心技术问题。

运行原理

DataX 核心主要由 Job、Task Group、Task、Schedule、Channel 等概念组成:

Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。

Task:由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作。

Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5。

TaskGroup:负责启动Task。

Channel:DataX 会单独启动一条线程运行运行一个 Task,而 Task 会持有一个 Channel,用作 Reader 与 Writer 的数据传输媒介,DataX 的数据流向都是按照 Reader—>Channel—>Writer 的方向流转。Channel 作为传输通道,即能充当缓冲层,同时还能对数据传输进行限流操作。

调度流程

DataX 将用户的 job.json 同步作业配置解析成一个 Job,DataX 通过 JobContainer 完成全局切分、调度、前置语句和后置语句等工作,整体调度流程用如下图表示:

操作实战

1.下载地址: http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

2.解压

3.运行自检脚本 bin/datax.py job/job.json

出现以上命令框代表环境可用。

4.通过命令行测试mysql数据库表之间同步,表c_s同步到c_s1,首先编写同步脚本,vim /job/mysql-mysql.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "c_name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://127.0.0.1:3306/mybd"
                                ], 
                                "table": [
                                    "c_s"
                                ]
                            }
                        ], 
                        "password": "root", 
                        "username": "root"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "id",
                            "cname"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/mybd",
                                "table": [
                                    "c_s1"
                                ]
                            }
                        ], 
                        "password": "root", 
                        "username": "root", 
                        "writeMode": "replace"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"
            }
        }
    }
}

然后执行脚本,bin/datax.py job/mysql-mysql.json

数据表中的3条数据同步完成。

5.通过命令行测试mysql数据同步ES,表c_s同步到索引c_s上,首先编写同步脚本,vim /job/mysql-es.json

{
  "job": {
      "setting": {
          "speed": {
              "channel": 1,
              "record": -1,
              "byte": -1
          }
      },
      "content": [{
          "reader": {
              "name": "mysqlreader",
              "parameter": {
                  "username": "root",
                  "password": "root",
                  "column": [
                      "id",
                      "c_name"
                  ],
                  "splitPk": "id",
                  "connection": [{
                      "table": [
                          "c_s"
                      ],
                      "jdbcUrl": [
               "jdbc:mysql://127.0.0.1:3306/mybd"
                      ]
                
                  }]
              }
          },
          "writer": {
              "name": "elasticsearchwriter",
              "parameter": {
                  "endpoint": "http://es-cn-xxxxx.elasticsearch.aliyuncs.com:9200",
                  "index": "c_s",
                  "type": "default",
                  "accessId": "elastic",
                  "accessKey": "xxxxxx",
                  "settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
                  "discovery": false,
                  "batchSize": 5000,    
                  "splitter": ",",
                  "column": [
                    {"name": "id", "type": "id"},
                    { "name": "cname","type": "text" }
                  ]
              }
          }
      }]
  }
}

然后执行脚本,bin/datax.py job/mysql-es.json

Mysql数据同步ES完成,索引如下图:

以上是Datax实现的基本数据源同步插件功能,如果我们需要将Mysql的数据同步到MQ中,需要去开发插件,后期继续开发出统一的MQ Writer插件,这样就能实现复杂的业务处理数据异构场景啦。

操作界面

以上是通过脚本实现数据异构的同步,但是这样是不够方便灵活的,阿里开源的时候并没有提供可视化界面,好在已经有大佬开源了一套datax-web:https://github.com/WeiYe-Jing/datax-web 前端界面,我们可以通过界面灵活的进行脚本配置和执行,界面如下:

不断分享开发过程用到的技术和面试经常被问到的问题,如果您也对IT技术比较感兴趣可以「关注」我

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表