专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

用DataX实现两个MySQL实例间的数据同步

ins518 2025-07-17 19:14:14 技术文章 2 ℃ 0 评论

DataX

DataX使用Java实现。

如果可以实现数据库实例之间准实时的#数据同步#,可以解决很多问题。例如数据可靠性和高并发的问题。Oracle Golden Gate是Oracle提供的一个商业解决方案,而开源的也有很多,这里介绍的#datax#就是一种。下面是DataX的设计原理图





环境

两个数据库实例,都是MySQL,一个位于生产环境,一个位于测试环境。想将指定数据库的指定表数据,从生产环境同步到测试环境对应表中。

步骤

  1. 安装JDK 1.8
sudo apt update
sudo apt install openjdk-8-jdk -y
......
gauss@power-edge-r730:~$ java -version
openjdk version "1.8.0_452"
OpenJDK Runtime Environment (build 1.8.0_452-8u452-ga~us1-0ubuntu1~24.04-b09)
OpenJDK 64-Bit Server VM (build 25.452-b09, mixed mode)
gauss@power-edge-r730:~$ javac -version
javac 1.8.0_452
  1. 下载DataX
$ wget https://github.com/alibaba/DataX/releases/download/datax_0.0.1/datax.tar.gz
# 可从国内下载 : https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
$ tar -zxvf datax.tar.gz
$ mv datax ~/soft/
$ cd ~/soft/datax/
$ python3 bin/datax.py --help

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
  1. 编写同步配置文件
$ cat job/mysql_jygt_2_enger.json
{
    "job": {
        "setting": {
            "speed": {
                "channel": 1,
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "name",
                        "password": "passwd",
                        "column": ["*"],
                        "connection": [
                            {
                                "table": ["t_user"],
                                "jdbcUrl": ["jdbc:mysql://192.168.1.222:3306/db_jygt?useSSL=false&serverTimezone=UTC"]
                            }
                        ],
                        "splitPk": "id",
                        "where": "f_modify_time> '${last_sync_time}'"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "name",
                        "password": "passwd",
                        "column": ["*"],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.111:3306/db_enger?useSSL=false&serverTimezone=UTC",
                                "table": ["t_user"]
                            }
                        ],
                        "preSql": [
                                "CREATE TABLE  IF NOT EXISTS `t_user` (
                                `f_id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '主键:唯一标识特定表的一个记录',
                                `f_uid` VARCHAR(64) NOT NULL COMMENT '用户ID,业务语义上,与f_user_source_id一起唯一标记一个用户' COLLATE 'utf8mb4_bin',
                                `f_user_source_id` INT(11) NOT NULL COMMENT '用户来源id,对应t_user_source表的相关记录主键',
                                `f_auth_code` VARCHAR(256) NOT NULL COMMENT '用户身份识别码,一般用来验证用户的身份。例如密码,手机验证码、邮件验证码、或者来自第三方平台的授权吗' COLLATE 'utf8mb4_bin',
                                `f_name` VARCHAR(64) NULL DEFAULT NULL COMMENT '用户名' COLLATE 'utf8mb4_bin',
                                `f_motto` VARCHAR(128) NULL DEFAULT NULL COMMENT '用户座右铭' COLLATE 'utf8mb4_bin',
                                `f_avarta_url` VARCHAR(50) NULL DEFAULT NULL COMMENT '用户的头像url' COLLATE 'utf8mb4_bin',
                                `f_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
                                `f_modify_time` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                                PRIMARY KEY (`f_id`) USING BTREE,
                                UNIQUE INDEX `unq_source_and_uid` (`f_uid`, `f_user_source_id`) USING BTREE,
                                INDEX `idx_source` (`f_user_source_id`) USING BTREE
                                )"
                        ]
                    }
                }
            }
        ]
    }
}
  1. 执行
gauss@power-edge-r730:~/soft/datax$ python3 bin/datax.py job/mysql_jygt_2_enger.json

.......
 All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.177s | Percentage 100.00%
2025-05-27 16:31:22.350 [job-0] INFO  JobContainer -
任务启动时刻                    : 2025-05-28 00:31:11
任务结束时刻                    : 2025-05-28 00:31:22
任务总计耗时                    :                 11s
任务平均流量                    :               73B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   9
读写失败总数                    :                   0
  1. 验证




  1. 更多

可以编制一个脚本来管理已经同步的数据最后更新时间,再次运行就只对其后更改的数据进行同步。并配置定时任务,令其自省,实现准实时同步数据。

gauss@power-edge-r730:~/soft/datax$ cat script/mysql_jygt_2_enger.sh
#!/bin/bash

# DataX路径
DATAX_PATH="/home/gauss/soft/datax"

# 配置文件路径
CONFIG_FILE="$DATAX_PATH/job/mysql_jygt_2_enger.json"

# 时间戳文件路径
TIME_FILE="$DATAX_PATH/job/imysql_jygt_2_enger_last_sync_time.txt"

# 获取上次同步时间
LAST_TIME=$(cat $TIME_FILE)

# 当前时间
CURRENT_TIME=$(date +"%Y-%m-%d %H:%M:%S")

# 替换配置文件中的时间变量
sed -i "s/'\${last_sync_time}'/'$LAST_TIME'/" $CONFIG_FILE

# 执行DataX同步
python3 $DATAX_PATH/bin/datax.py $CONFIG_FILE

# 更新时间戳文件
echo $CURRENT_TIME > $TIME_FILE

# 恢复配置文件
sed -i "s/'$LAST_TIME'/'\${last_sync_time}'/" $CONFIG_FILE
crontab -e
*/5 * * * * /opt/datax/incremental_sync.sh >> /opt/datax/sync.log 2>&1

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表