专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

用Spark实现高并发数据同步 spark并行

ins518 2024-11-01 13:18:22 技术文章 12 ℃ 0 评论

关于拖库,那是黑客最喜欢的话题了,拿到数据之后就能进行各种骚操作了,不过各位骚年,咱们今天这里的拖库非彼拖库,也不是“脱裤”,这里的拖库指的是将数据快速从业务数据库(通常是OLTP类型的数据库,比如Oracle、Mysql、Postgresql等)同步到OLAP的存储中(比如HDFS/Greenplum等),供OLAP引擎进行ETL工作,也就是我们尝尝听到的ETL里的数据抽取。

通常情况下我们大数据平台中有个【数据集成】的功能模块,通过这个模块能将各种不同源的数据快速集成到数据开发平台的贴源层(ODS)数据,包括实时的、或者是离线的。对于离线数据我们会通过定时调度任务将数据定期的同步到数仓中,有些是全量的方式,而有些数据量比较大的情况下则是通过增量的方式同步数据,在数仓内部进行数据Merge从而获取到全量的数据。

这种数据同步任务我们一般可以选择阿里开源的DataX和或者开源的Flume等工具,这种工具要么就是要通过一系列的配置文件定义输入输出,要么就是通过代码的方式进行数据同步,而今天我要介绍的是通过Spark进行数据同步。大家都知道Spark是一个分布式的计算框架,那么对于任务的并发和调度就是它核心的能力,因此借助于Spark我们就能实现并发快速的实现数据同步。Spark的架构图大致如下:

上图简单的绘制了我们的Spark通过集群调度实现数据同步的大致过程,当然我们也可以选择单机模式进行同步,单机模式下我们的代码会和Driver在一个JVM中运行,通过增大并发的线程数也一样能实现并发数据同步。通过Spark进行数据同步远远比DataX的配置简单很多。PySpark的代码如下:

from pyspark.sql import SparkSession

# 定义并发的任务数量
spark_builder = SparkSession.builder.master("local[4]").appName("Spark Data Sync")
# 需要制定你的mysql驱动地址
spark_builder.jars = ['./mysql-connector-java-8.0.21.jar']
spark = spark_builder.getOrCreate()

df = spark.read.format('jdbc').options(
    url='jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_db_name}',
    dbtable='{mysql_table_name}',
    user='{mysql_user}',
    password='{mysql_password}',
    upper_bound='1',
    lower_bound='10000',
    # 定义分区数量(一共需要多少个task,结合上面的并发任务数量就能推断出共需要几轮的任务)
    numPartitions=10,
    # 分区字段,这个字段需要和upper_bound以及lower_bound一起使用
    partitionColumn='id',
    driver='com.mysql.jdbc.Driver'
).load()

df.write.format('parquet').mode('overwrite').save('{local_dir_or_hdfs_dir}')

如此将数据从Mysql同步到hdfs或者本地文件,这样有木有吊打DataX?说说你的想法吧。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表