网站首页 > 技术文章 正文
多源数据同步是大数据从业人员经常会遇到的问题,也是做大数据分析的第一步,设计合理的同步方案,保证数据同步的完整性和准确性,是后续数据分析可靠性的必要前提。
本文基于Shell脚本和Datax,提供了一个可靠的离线数据同步方案。
一、数据同步链路简介
如下图所示为目前常见的数据同步链路(箭头的起始位置为源数据库,箭头指向位置为目标数据库):
对于该数据同步链路,通过Shell脚本结合Datax(设置reader和writer插件)即可完美解决。
二、整表同步
适用于源表数据量比较小的情况,同步时直接读取全表,写入到目标数据库即可。
2.1 脚本设计
以读取MySQL,写入Hive表为例脚本设计如下:
#! /bin/bash
# 脚本路径
curPath=/home/bi/scripts
# 脚本设计数据库账户信息
source /home/bi/login.info
# 解析数据库账户密码
# 状态表数据库账户密码
pwd_mysql_status=`parsehost $host_mysql_status $user_mysql_status`
# 源表数据库账户密码
pwd_mysql_source=`parsehost $host_mysql_source $user_mysql_source`
# 源表信息
db=mall
tb=produce_type_info
# 目标表信息
targetDb=mall
targetTb=produce_type_info
# 状态记录表信息
recordDb=sync
recordTb=status
pro_name=${db}.${tb}
# 当前日期和截止日期(用于限定脚本执行的时间)
curdate=$(date +%s)
deadline=$(date -d '2021-07-02' +%s)
if [ $curdate -lt $deadline ]; then
# 读取源数据写入到本地文件
mysql -h$host_mysql_source -u$user_mysql_source -P3306 -p$pwd_mysql_source -D$db -N -e "SET character_set_results=utf8;SELECT *,DATE(submit_time) FROM $db.$tb" > $curPath/$db.$tb.txt
# hive load data直接覆盖写入到目标表
hive -e "LOAD DATA LOCAL INPATH '$curPath/$db.$tb.txt' OVERWRITE INTO TABLE $targetDb.$targetTb"
if [ $? -eq 0 ]; then
# 根据执行结果写入执行状态到状态表
mysql -h$host_mysql_status -u$user_mysql_status -p$pwd_mysql_status -N -e "update $recordDb.$recordTb set update_time=now() where pro_name='${pro_name}' and pro_type='sync'"
else
echo "data load error.."
fi
else
echo "超出同步时间,同步停止.."
fi
该脚本逻辑简单,通过把脚本执行状态写入到提前设计好的状态表,更新数据更新状态,方便后续分析人员脚本开发时判断数据更新状态。
2.2 状态表设计
状态表旨在记录数据同步状态,所以通常会有同步数据的脚本信息,同步方式,源表名,目标表名等信息。
状态表设计如下:
CREATE TABLE `status` (
`pro_name` varchar(64) NOT NULL COMMENT '项目名称',
`pro_type` varchar(32) NOT NULL COMMENT '项目类型,数据同步/kpi计算/活动支持等',
`src_tb` varchar(64) DEFAULT NULL COMMENT '源表名',
`tar_tb` varchar(64) DEFAULT NULL COMMENT '目标表名',
`idx_column` varchar(64) DEFAULT NULL COMMENT '更新依赖源表字段名',
`idx_value` varchar(64) DEFAULT NULL COMMENT '更新至的字段值',
`max_time` datetime DEFAULT NULL COMMENT '更新至时间',
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '状态更新时间',
PRIMARY KEY (`pro_name`,`pro_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
字段解析:
- pro_name: 项目名称(脚本名称)
- pro_type: 项目类型,如sync(同步),kpi(指标计算),support(活动支持等),或者设计为源表-目标表的类型也可以(如oracle2hive表示从oracle同步数据至hive)
- src_tb: 源表名称(源表db.tb)
- tar_tb: 目标表名称(目标表db.tb)
- idx_column: 同步数据时依赖的源表字段名称(一般为源表的自增索引/update_time索引等等)
- idx_value: 同步数据时,同步至源表字段的值
- max_time: 目标表更新至源表数据的最大时间
- update_time: 脚本执行状态更新的时间
三、大表数据同步
由于源表数据量很大,直接整表同步显然不合理,对于大表的数据同步一般是先做初始化,再每次同步增量。此时我们要注意源表的索引字段(自增索引/更新时间索引),根据索引字段来灵活的初始化数据。
3.1 脚本设计
同样以MySQL->Hive为例,大表数据初始化要与要考虑到同步过程是否会对源数据库性能产生影响,所以一般会根据自增id,每次同步一定的数据(本例初始化时10w一批次),或者根据更新时间每次同步一定的时间区间来完成。以自增id为例,初始化脚本设计如下
#! /bin/bash
# 数据库信息获取
source /home/bi/login.info
pwd_mysql_status=`parsehost $host_mysql_status $user_mysql_status`
pwd_mysql_source=`parsehost $host_mysql_source $user_mysql_source`
# 每次获取10w数据,通过datax脚本写入到指定的HDFS路径(Hive表路径)
for ((i=0; i<=120500000; ))
do
j=$i
i=$((i+100000))
echo "Start job from $j to $i, read source mysql data"
/home/datax/bin/datax.py -p"-Dstart_id='${j}' -Dend_id='${i}' -Dhost_mysql_source='${host_mysql_source}' -Duser_mysql_source='${user_mysql_source}' -Dpwd_mysql_source='${pwd_mysql_source}'" /home/bi/projects_web/mall.orders.mysql2hive.fix.json
if [ $? -eq 0 ]; then
echo "Finished insert data to hive for job ${start_id} to ${end_id}, pass to the next one..."
mysql -h${host_mysql_status} -u${user_mysql_status} -p${pwd_mysql_status} -N -e "update project_status.init_status set result=1,update_time=now() WHERE start_id=${start_id}"
else
echo "Error insert data to hive..."
mysql -h${host_mysql_status} -u${user_mysql_status} -p${pwd_mysql_status} -N -e "update project_status.init_status set result=2,update_time=now() WHERE start_id=${start_id}"
fi
done
脚本以10w为步长,循环调用Datax做数据同步(将每次同步的起始id作为参数传递给datax的json配置文件),直至同步到最大的id。project_status.init_status表为定义的保存每个初始化批次脚本运行状态的表,设计比较简单,这里就不多赘述了。
3.2 Datax json文件配置
本例是MySQL数据同步Hive,所以设置json为mysqlreader和hdfswriter。
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "${user_mysql_source}",
"password": "${pwd_mysql_source}",
"connection": [
{
"querySql": [
"SELECT * FROM mall.orders where id > ${lastValue} and id <= ${maxValue}"
],
"jdbcUrl": [
"jdbc:mysql://${host_mysql_source}:3306/mall"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://192.168.xxx.xx:8020",
"fileType": "text",
"path": "/user/hive/warehouse/mall.db/orders/",
"fileName": "part",
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "player_id",
"type": "bigint"
},
{
"name": "account",
"type": "string"
},
{
"name": "order_id",
"type": "string"
},
{
"name": "produce_id",
"type": "int"
},
{
"name": "order_status",
"type": "tinyint"
},
{
"name": "update_time",
"type": "timestamp"
}
],
"writeMode": "append",
"fieldDelimiter": "\t",
"compress": "",
"encoding": "UTF-8"
}
}
}
]
}
}
这样就通过Shell脚本+Datax的组合,完成了每次10w条数据的数据初始化。
初始化完成之后就是每日增量数据的同步,仍然采用类似初始化的方式,从状态表中获取读取到的id值,每次同步更新大于该id的值,然后把更新后最大的id值写入到状态表中方便下次增量更新使用。
四、最后
本文从实战出发,介绍了MySQL到Hive的最简单同步方式。由于篇幅关系,在此简单介绍下部分复杂情况处理方案。
- MySQL(按月)分表:这种情况需要明确分表规则,在Shell脚本中保证分表后数据不丢失(如每月首日的按月分表,数据同步时每月首日要同步上个月和当月两张表的数据)。
- 同步数据写入到Hive分区表:这种情况一般会先通过Datax将数据同步到一个tmp表(每次同步前清空),然后在Shell脚本中执行hive -e将临时表数据写入到Hive分区表内。
其他数据源之间的数据同步逻辑类似,只需按照源表类型和目标表类型调整Datax的reader和writer插件即可。
猜你喜欢
- 2024-10-22 一键实现 Oracle 数据整库同步至 Apache Doris
- 2024-10-22 Oracle OGG 单向DML同步 oracle的ogg同步
- 2024-10-22 Spring Boot整合DataX同步数据 springboot自动装配原理
- 2024-10-22 常见数据同步工具之实时同步 数据同步工具有哪些
- 2024-10-22 MySQL超时参数以及相关DataX数据同步案例分享
- 2024-10-22 《github精选系列》——数据库同步中间件(DBSyncer)
- 2024-10-22 如何将Oracle的blob同步到MySQL oracle blob转字符串
- 2024-10-22 ETL数据集成丨实现SQLServer数据库的高效实时数据同步
- 2024-10-22 SeaTunnel同步Oracle数据至ClickHouse
- 2024-10-22 搭建私有化DataWorks平台 - Part 2 数据同步
你 发表评论:
欢迎- 500℃几个Oracle空值处理函数 oracle处理null值的函数
- 494℃Oracle分析函数之Lag和Lead()使用
- 493℃Oracle数据库的单、多行函数 oracle执行多个sql语句
- 481℃0497-如何将Kerberos的CDH6.1从Oracle JDK 1.8迁移至OpenJDK 1.8
- 473℃Oracle 12c PDB迁移(一) oracle迁移到oceanbase
- 468℃【数据统计分析】详解Oracle分组函数之CUBE
- 453℃Oracle有哪些常见的函数? oracle中常用的函数
- 448℃最佳实践 | 提效 47 倍,制造业生产 Oracle 迁移替换
- 最近发表
-
- Spring Boot跨域难题终结者:3种方案,从此告别CORS噩梦!
- 京东大佬问我,SpringBoot为什么会出现跨域问题?如何解决?
- 在 Spring Boot3 中轻松解决接口跨域访问问题
- 最常见五种跨域解决方案(常见跨域及其解决方案)
- Java Web开发中优雅应对跨域问题(java跨域问题解决办法)
- Spring Boot解决跨域最全指南:从入门到放弃?不,到根治!
- Spring Boot跨域问题终极解决方案:3种方案彻底告别CORS错误
- Spring Cloud 轻松解决跨域,别再乱用了
- Github 太狠了,居然把 "master" 干掉了
- IntelliJ IDEA 调试 Java 8,实在太香了
- 标签列表
-
- 前端设计模式 (75)
- 前端性能优化 (51)
- 前端模板 (66)
- 前端跨域 (52)
- 前端缓存 (63)
- 前端react (48)
- 前端aes加密 (58)
- 前端脚手架 (56)
- 前端md5加密 (54)
- 前端富文本编辑器 (47)
- 前端路由 (55)
- 前端数组 (65)
- 前端定时器 (47)
- Oracle RAC (73)
- oracle恢复 (76)
- oracle 删除表 (48)
- oracle 用户名 (74)
- oracle 工具 (55)
- oracle 内存 (50)
- oracle 导出表 (57)
- oracle 中文 (51)
- oracle链接 (47)
- oracle的函数 (57)
- 前端调试 (52)
- 前端登录页面 (48)
本文暂时没有评论,来添加一个吧(●'◡'●)