网站首页 > 技术文章 正文
关于拖库,那是黑客最喜欢的话题了,拿到数据之后就能进行各种骚操作了,不过各位骚年,咱们今天这里的拖库非彼拖库,也不是“脱裤”,这里的拖库指的是将数据快速从业务数据库(通常是OLTP类型的数据库,比如Oracle、Mysql、Postgresql等)同步到OLAP的存储中(比如HDFS/Greenplum等),供OLAP引擎进行ETL工作,也就是我们尝尝听到的ETL里的数据抽取。
通常情况下我们大数据平台中有个【数据集成】的功能模块,通过这个模块能将各种不同源的数据快速集成到数据开发平台的贴源层(ODS)数据,包括实时的、或者是离线的。对于离线数据我们会通过定时调度任务将数据定期的同步到数仓中,有些是全量的方式,而有些数据量比较大的情况下则是通过增量的方式同步数据,在数仓内部进行数据Merge从而获取到全量的数据。
这种数据同步任务我们一般可以选择阿里开源的DataX和或者开源的Flume等工具,这种工具要么就是要通过一系列的配置文件定义输入输出,要么就是通过代码的方式进行数据同步,而今天我要介绍的是通过Spark进行数据同步。大家都知道Spark是一个分布式的计算框架,那么对于任务的并发和调度就是它核心的能力,因此借助于Spark我们就能实现并发快速的实现数据同步。Spark的架构图大致如下:
上图简单的绘制了我们的Spark通过集群调度实现数据同步的大致过程,当然我们也可以选择单机模式进行同步,单机模式下我们的代码会和Driver在一个JVM中运行,通过增大并发的线程数也一样能实现并发数据同步。通过Spark进行数据同步远远比DataX的配置简单很多。PySpark的代码如下:
from pyspark.sql import SparkSession
# 定义并发的任务数量
spark_builder = SparkSession.builder.master("local[4]").appName("Spark Data Sync")
# 需要制定你的mysql驱动地址
spark_builder.jars = ['./mysql-connector-java-8.0.21.jar']
spark = spark_builder.getOrCreate()
df = spark.read.format('jdbc').options(
url='jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_db_name}',
dbtable='{mysql_table_name}',
user='{mysql_user}',
password='{mysql_password}',
upper_bound='1',
lower_bound='10000',
# 定义分区数量(一共需要多少个task,结合上面的并发任务数量就能推断出共需要几轮的任务)
numPartitions=10,
# 分区字段,这个字段需要和upper_bound以及lower_bound一起使用
partitionColumn='id',
driver='com.mysql.jdbc.Driver'
).load()
df.write.format('parquet').mode('overwrite').save('{local_dir_or_hdfs_dir}')
如此将数据从Mysql同步到hdfs或者本地文件,这样有木有吊打DataX?说说你的想法吧。
猜你喜欢
- 2024-11-01 Canal数据同步实战 数据同步工具有哪些
- 2024-11-01 支持断点续传的数据库同步服务 断点续传需要服务器支持吗
- 2024-11-01 Oracle OGG 单向DDL同步 oracle ogg双向同步
- 2024-11-01 数据同步组件(Canal)在珍爱网的应用与实践
- 2024-11-01 MySQL 到 Hazelcast Cloud 实时数据同步实操分享
- 2024-11-01 PostgreSQL中使用FDW+MV玩转数据同步
- 2024-11-01 mysql:Otter跨机房数据同步(单向)
- 2024-11-01 「数据库」 如何解决异地机房的数据同步问题?
- 2024-11-01 阿里数据同步工具Otter和Canal简介
- 2024-11-01 DataX-Web - 可视化分布式数据同步工具
你 发表评论:
欢迎- 633℃几个Oracle空值处理函数 oracle处理null值的函数
- 626℃Oracle分析函数之Lag和Lead()使用
- 614℃0497-如何将Kerberos的CDH6.1从Oracle JDK 1.8迁移至OpenJDK 1.8
- 608℃Oracle数据库的单、多行函数 oracle执行多个sql语句
- 606℃Oracle 12c PDB迁移(一) oracle迁移到oceanbase
- 598℃【数据统计分析】详解Oracle分组函数之CUBE
- 588℃最佳实践 | 提效 47 倍,制造业生产 Oracle 迁移替换
- 572℃Oracle有哪些常见的函数? oracle中常用的函数
- 最近发表
-
- oracle 19cOCM认证有哪些内容(oracle认证ocm月薪)
- Oracle新出AI课程认证,转型要持续学习
- oracle 表的查询join顺序,可能会影响查询效率
- Oracle DatabaseAmazon Web Services正式可用,Oracle数据库上云更容易了
- Oracle 19.28 RU 升级最佳实践指南
- 汉得信息:发布EBS系统安装启用JWS的高效解决方案
- 如何主导设计一个亿级高并发系统架构-数据存储架构(三)
- Java 后端开发必看!工厂设计模式轻松拿捏
- ORA-00600 「25027」 「x」报错(抱错孩子电视剧 爸爸是武术 另一个爸爸是画家)
- 新项目终于用上了jdk24(jdk新建项目)
- 标签列表
-
- 前端设计模式 (75)
- 前端性能优化 (51)
- 前端模板 (66)
- 前端跨域 (52)
- 前端缓存 (63)
- 前端aes加密 (58)
- 前端脚手架 (56)
- 前端md5加密 (54)
- 前端路由 (61)
- 前端数组 (73)
- 前端js面试题 (50)
- 前端定时器 (59)
- 前端获取当前时间 (50)
- Oracle RAC (76)
- oracle恢复 (77)
- oracle 删除表 (52)
- oracle 用户名 (80)
- oracle 工具 (55)
- oracle 内存 (55)
- oracle 导出表 (62)
- oracle约束 (54)
- oracle 中文 (51)
- oracle链接 (54)
- oracle的函数 (58)
- 前端调试 (52)
本文暂时没有评论,来添加一个吧(●'◡'●)