专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

多源异构数据离线同步方案实战 多源异构数据整合处理

ins518 2024-10-22 14:59:01 技术文章 11 ℃ 0 评论

多源数据同步是大数据从业人员经常会遇到的问题,也是做大数据分析的第一步,设计合理的同步方案,保证数据同步的完整性和准确性,是后续数据分析可靠性的必要前提。

本文基于Shell脚本和Datax,提供了一个可靠的离线数据同步方案。

一、数据同步链路简介

如下图所示为目前常见的数据同步链路(箭头的起始位置为源数据库,箭头指向位置为目标数据库):

对于该数据同步链路,通过Shell脚本结合Datax(设置reader和writer插件)即可完美解决。

二、整表同步

适用于源表数据量比较小的情况,同步时直接读取全表,写入到目标数据库即可

2.1 脚本设计

以读取MySQL,写入Hive表为例脚本设计如下:

#! /bin/bash
# 脚本路径
curPath=/home/bi/scripts
# 脚本设计数据库账户信息
source /home/bi/login.info
# 解析数据库账户密码
# 状态表数据库账户密码
pwd_mysql_status=`parsehost $host_mysql_status $user_mysql_status`
# 源表数据库账户密码
pwd_mysql_source=`parsehost $host_mysql_source $user_mysql_source`
# 源表信息
db=mall
tb=produce_type_info
# 目标表信息
targetDb=mall
targetTb=produce_type_info
# 状态记录表信息
recordDb=sync
recordTb=status
pro_name=${db}.${tb}
# 当前日期和截止日期(用于限定脚本执行的时间)
curdate=$(date +%s)
deadline=$(date -d '2021-07-02' +%s)
if [ $curdate -lt $deadline ]; then
   # 读取源数据写入到本地文件
   mysql -h$host_mysql_source -u$user_mysql_source -P3306 -p$pwd_mysql_source -D$db -N -e "SET character_set_results=utf8;SELECT *,DATE(submit_time) FROM $db.$tb" > $curPath/$db.$tb.txt
   # hive load data直接覆盖写入到目标表
   hive -e "LOAD DATA LOCAL INPATH '$curPath/$db.$tb.txt' OVERWRITE INTO TABLE $targetDb.$targetTb"
   if [ $? -eq 0 ]; then
      # 根据执行结果写入执行状态到状态表
      mysql -h$host_mysql_status -u$user_mysql_status -p$pwd_mysql_status -N -e "update $recordDb.$recordTb set update_time=now() where pro_name='${pro_name}' and pro_type='sync'"
   else
      echo "data load error.."
   fi
else
   echo "超出同步时间,同步停止.."
fi

该脚本逻辑简单,通过把脚本执行状态写入到提前设计好的状态表,更新数据更新状态,方便后续分析人员脚本开发时判断数据更新状态。

2.2 状态表设计

状态表旨在记录数据同步状态,所以通常会有同步数据的脚本信息,同步方式,源表名,目标表名等信息。

状态表设计如下:

CREATE TABLE `status` (
  `pro_name` varchar(64) NOT NULL COMMENT '项目名称',
  `pro_type` varchar(32) NOT NULL COMMENT '项目类型,数据同步/kpi计算/活动支持等',
  `src_tb` varchar(64) DEFAULT NULL COMMENT '源表名',
  `tar_tb` varchar(64) DEFAULT NULL COMMENT '目标表名',
  `idx_column` varchar(64) DEFAULT NULL COMMENT '更新依赖源表字段名',
  `idx_value` varchar(64) DEFAULT NULL COMMENT '更新至的字段值',
  `max_time` datetime DEFAULT NULL COMMENT '更新至时间',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '状态更新时间',
  PRIMARY KEY (`pro_name`,`pro_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

字段解析:

  • pro_name: 项目名称(脚本名称)
  • pro_type: 项目类型,如sync(同步),kpi(指标计算),support(活动支持等),或者设计为源表-目标表的类型也可以(如oracle2hive表示从oracle同步数据至hive)
  • src_tb: 源表名称(源表db.tb)
  • tar_tb: 目标表名称(目标表db.tb)
  • idx_column: 同步数据时依赖的源表字段名称(一般为源表的自增索引/update_time索引等等)
  • idx_value: 同步数据时,同步至源表字段的值
  • max_time: 目标表更新至源表数据的最大时间
  • update_time: 脚本执行状态更新的时间

三、大表数据同步

由于源表数据量很大,直接整表同步显然不合理,对于大表的数据同步一般是先做初始化,再每次同步增量。此时我们要注意源表的索引字段(自增索引/更新时间索引),根据索引字段来灵活的初始化数据。

3.1 脚本设计

同样以MySQL->Hive为例,大表数据初始化要与要考虑到同步过程是否会对源数据库性能产生影响,所以一般会根据自增id,每次同步一定的数据(本例初始化时10w一批次),或者根据更新时间每次同步一定的时间区间来完成。以自增id为例,初始化脚本设计如下

#! /bin/bash
# 数据库信息获取
source /home/bi/login.info
pwd_mysql_status=`parsehost $host_mysql_status $user_mysql_status`
pwd_mysql_source=`parsehost $host_mysql_source $user_mysql_source`
# 每次获取10w数据,通过datax脚本写入到指定的HDFS路径(Hive表路径)
for ((i=0; i<=120500000; ))
do
  j=$i
  i=$((i+100000))
  echo "Start job from $j to $i, read source mysql data"
  /home/datax/bin/datax.py -p"-Dstart_id='${j}' -Dend_id='${i}' -Dhost_mysql_source='${host_mysql_source}' -Duser_mysql_source='${user_mysql_source}' -Dpwd_mysql_source='${pwd_mysql_source}'" /home/bi/projects_web/mall.orders.mysql2hive.fix.json
  if [ $? -eq 0 ]; then
     echo "Finished insert data to hive for job ${start_id} to ${end_id}, pass to the next one..."
     mysql -h${host_mysql_status} -u${user_mysql_status} -p${pwd_mysql_status} -N -e "update project_status.init_status set result=1,update_time=now() WHERE start_id=${start_id}"
  else
     echo "Error insert data to hive..."
     mysql -h${host_mysql_status} -u${user_mysql_status} -p${pwd_mysql_status} -N -e "update project_status.init_status set result=2,update_time=now() WHERE start_id=${start_id}"
  fi
done

脚本以10w为步长,循环调用Datax做数据同步(将每次同步的起始id作为参数传递给datax的json配置文件),直至同步到最大的id。project_status.init_status表为定义的保存每个初始化批次脚本运行状态的表,设计比较简单,这里就不多赘述了。

3.2 Datax json文件配置

本例是MySQL数据同步Hive,所以设置json为mysqlreader和hdfswriter。

{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "${user_mysql_source}",
                        "password": "${pwd_mysql_source}",
                        "connection": [
                            {
                                "querySql": [
                                    "SELECT * FROM mall.orders where id > ${lastValue} and id <= ${maxValue}"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://${host_mysql_source}:3306/mall"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
            "name": "hdfswriter",
            "parameter": {
                "defaultFS": "hdfs://192.168.xxx.xx:8020",
                "fileType": "text",
                "path": "/user/hive/warehouse/mall.db/orders/",
                "fileName": "part",
                "column": [
                    {
                        "name": "id",
                        "type": "bigint"
                    },
                    {
                        "name": "player_id",
                        "type": "bigint"
                    },
                    {
                        "name": "account",
                        "type": "string"
                    },
                    {
                        "name": "order_id",
                        "type": "string"
                    },
                    {
                        "name": "produce_id",
                        "type": "int"
                    },
                    {
                        "name": "order_status",
                        "type": "tinyint"
                    },
                    {
                        "name": "update_time",
                        "type": "timestamp"
                    }
                ],
                "writeMode": "append",
                "fieldDelimiter": "\t",
                "compress": "",
                "encoding": "UTF-8"
            }
        }
            }
        ]
    }
}

这样就通过Shell脚本+Datax的组合,完成了每次10w条数据的数据初始化。

初始化完成之后就是每日增量数据的同步,仍然采用类似初始化的方式,从状态表中获取读取到的id值,每次同步更新大于该id的值,然后把更新后最大的id值写入到状态表中方便下次增量更新使用。

四、最后

本文从实战出发,介绍了MySQL到Hive的最简单同步方式。由于篇幅关系,在此简单介绍下部分复杂情况处理方案。

  • MySQL(按月)分表:这种情况需要明确分表规则,在Shell脚本中保证分表后数据不丢失(如每月首日的按月分表,数据同步时每月首日要同步上个月和当月两张表的数据)。
  • 同步数据写入到Hive分区表:这种情况一般会先通过Datax将数据同步到一个tmp表(每次同步前清空),然后在Shell脚本中执行hive -e将临时表数据写入到Hive分区表内。

其他数据源之间的数据同步逻辑类似,只需按照源表类型和目标表类型调整Datax的reader和writer插件即可。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表