网站首页 > 技术文章 正文
关于拖库,那是黑客最喜欢的话题了,拿到数据之后就能进行各种骚操作了,不过各位骚年,咱们今天这里的拖库非彼拖库,也不是“脱裤”,这里的拖库指的是将数据快速从业务数据库(通常是OLTP类型的数据库,比如Oracle、Mysql、Postgresql等)同步到OLAP的存储中(比如HDFS/Greenplum等),供OLAP引擎进行ETL工作,也就是我们尝尝听到的ETL里的数据抽取。
通常情况下我们大数据平台中有个【数据集成】的功能模块,通过这个模块能将各种不同源的数据快速集成到数据开发平台的贴源层(ODS)数据,包括实时的、或者是离线的。对于离线数据我们会通过定时调度任务将数据定期的同步到数仓中,有些是全量的方式,而有些数据量比较大的情况下则是通过增量的方式同步数据,在数仓内部进行数据Merge从而获取到全量的数据。
这种数据同步任务我们一般可以选择阿里开源的DataX和或者开源的Flume等工具,这种工具要么就是要通过一系列的配置文件定义输入输出,要么就是通过代码的方式进行数据同步,而今天我要介绍的是通过Spark进行数据同步。大家都知道Spark是一个分布式的计算框架,那么对于任务的并发和调度就是它核心的能力,因此借助于Spark我们就能实现并发快速的实现数据同步。Spark的架构图大致如下:
上图简单的绘制了我们的Spark通过集群调度实现数据同步的大致过程,当然我们也可以选择单机模式进行同步,单机模式下我们的代码会和Driver在一个JVM中运行,通过增大并发的线程数也一样能实现并发数据同步。通过Spark进行数据同步远远比DataX的配置简单很多。PySpark的代码如下:
from pyspark.sql import SparkSession
# 定义并发的任务数量
spark_builder = SparkSession.builder.master("local[4]").appName("Spark Data Sync")
# 需要制定你的mysql驱动地址
spark_builder.jars = ['./mysql-connector-java-8.0.21.jar']
spark = spark_builder.getOrCreate()
df = spark.read.format('jdbc').options(
url='jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_db_name}',
dbtable='{mysql_table_name}',
user='{mysql_user}',
password='{mysql_password}',
upper_bound='1',
lower_bound='10000',
# 定义分区数量(一共需要多少个task,结合上面的并发任务数量就能推断出共需要几轮的任务)
numPartitions=10,
# 分区字段,这个字段需要和upper_bound以及lower_bound一起使用
partitionColumn='id',
driver='com.mysql.jdbc.Driver'
).load()
df.write.format('parquet').mode('overwrite').save('{local_dir_or_hdfs_dir}')
如此将数据从Mysql同步到hdfs或者本地文件,这样有木有吊打DataX?说说你的想法吧。
猜你喜欢
- 2025-08-03 OpenAI 和 Oracle 承诺为 Stargate Network 扩展 4.5 GW 数据中心
- 2025-08-03 异构跨库数据同步还在用Datax?来看看这几个开源的同步方案
- 2025-08-03 黄远邦:应对7月1日闰秒对Oracle数据库影响
- 2025-08-03 开源:一款开源的数据同步中间件DBSyncer
- 2025-08-03 计算机存储之数据一致性
- 2025-08-03 OGG同步10PB数据到Kafka不停库?全量+增量...
- 2025-08-03 【推荐】一款开源免费、功能强大的数据同步工具,支持多种数据源
- 2024-11-01 Canal数据同步实战 数据同步工具有哪些
- 2024-11-01 支持断点续传的数据库同步服务 断点续传需要服务器支持吗
- 2024-11-01 Oracle OGG 单向DDL同步 oracle ogg双向同步
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- 前端设计模式 (75)
- 前端性能优化 (51)
- 前端模板 (66)
- 前端跨域 (52)
- 前端缓存 (63)
- 前端aes加密 (58)
- 前端脚手架 (56)
- 前端md5加密 (54)
- 前端路由 (61)
- 前端数组 (73)
- 前端js面试题 (50)
- 前端定时器 (59)
- Oracle RAC (76)
- oracle恢复 (77)
- oracle 删除表 (52)
- oracle 用户名 (80)
- oracle 工具 (55)
- oracle 内存 (55)
- oracle 导出表 (62)
- oracle约束 (54)
- oracle 中文 (51)
- oracle链接 (54)
- oracle的函数 (58)
- oracle面试 (55)
- 前端调试 (52)
本文暂时没有评论,来添加一个吧(●'◡'●)