网站首页 > 技术文章 正文
Flink-Doris-Connector 1.4.0 允许用户一步将包含数千个表的整个数据库(MySQL或Oracle )摄取到Apache Doris(一种实时分析数据库)中。
Connector 内置 Flink CDC,可以直接将上游源的表 schema 和数据同步到 Apache Doris,这意味着用户不再需要在 Doris 中编写 DataStream 程序或预先创建映射表。
当 Flink 作业启动时,Connector 会自动检查源数据库和 Apache Doris 之间的数据等效性。如果数据源包含 Doris 中不存在的表,Connector 会自动在 Doris 中创建相同的表,并利用 Flink 的侧输出来方便一次摄取多个表;如果源中发生架构更改,它将自动获取 DDL 语句并在 Doris 中进行相同的架构更改。
快速开始
对于MySQL:
下载 JAR 文件:https://github.com/apache/doris-flink-connector/releases/tag/1.4.0
行家:
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.15</artifactId>
<!--artifactId>flink-doris-connector-1.16</artifactId-->
<!--artifactId>flink-doris-connector-1.17</artifactId-->
<version>1.4.0</version>
</dependency>
对于甲骨文:
下载 JAR 文件:Flink 1.15、Flink 1.16、Flink 1.17
如何使用它
例如,要将整个 MySQL 数据库引入mysql_dbDoris(MySQL 表名以tbl或开头test),只需执行以下命令(无需提前在 Doris 中创建表):
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.4.0.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label1 \
--table-conf replication_num=1
摄取Oracle数据库:请参考示例代码。
表现如何
当涉及到同步整个数据库(包含数百甚至数千个表,活动或不活动)时,大多数用户希望在几秒钟内完成。因此我们测试了连接器,看看它是否符合要求:
- 1000 个 MySQL 表,每个表有 100 个字段。所有表都是活动的(这意味着它们不断更新,每次数据写入涉及一百多行)
- Flink作业检查点:10s
经过压力测试,系统表现出较高的稳定性,主要指标如下:
根据早期采用者的反馈,该Connector在生产环境中的万表数据库同步中也提供了高性能和系统稳定性。这证明Apache Doris和Flink CDC的结合能够高效可靠地进行大规模数据同步。
它如何使数据工程师受益
工程师不再需要担心表创建或表模式维护,从而节省了数天繁琐且容易出错的工作。之前在Flink CDC中,需要为每个表创建一个Flink作业,并在源端建立日志解析链路,但现在通过全库摄取,源数据库的资源消耗大大减少。也是增量更新和全量更新的统一解决方案。
其他特性
1.连接维度表和事实表
常见的做法是将维度表放在Doris中,通过Flink的实时流进行Join查询。Flink-Doris-Connector 1.4.0基于Flink 的 Async I/O实现了异步 Lookup Join,因此 Flink 实时流不会因为查询而阻塞。此外,连接器还允许您将多个查询合并为一个大查询,并将其立即发送给 Doris 进行处理。这提高了此类连接查询的效率和吞吐量。
2.节俭 SDK
我们在 Connector 中引入了 Thrift-Service SDK,用户不再需要使用 Thrift 插件或在编译时配置 Thrift 环境。这使得编译过程变得更加简单。
3. 按需流加载
数据同步过程中,当没有新的数据摄入时,不会发出Stream Load请求。这样可以避免不必要的集群资源消耗。
4. 后端节点轮询
对于数据摄取,Doris 调用前端节点获取后端节点列表,并随机选择一个发起摄取请求。该后端节点将是协调器。Flink-Doris-Connector 1.4.0 允许用户启用轮询机制,即在每个 Flink 检查点都有不同的后端节点作为 Coordinator,以避免单个后端节点长期承受过大的压力。
5. 支持更多数据类型
除了常见的数据类型外,Flink-Doris-Connector 1.4.0 还支持 Doris 中的 DecimalV3/DateV2/DateTimev2/Array/JSON。
用法示例
从Apache Doris读:
您可以通过DataStream或FlinkSQL(有界流)从Doris读取数据。支持谓词下推。
CREATE TABLE flink_doris_source (
name STRING,
age INT,
score DECIMAL(5,2)
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = 'password',
'doris.filter.query' = 'age=18'
);
SELECT * FROM flink_doris_source;
连接维度表和事实表:
CREATE TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);
create table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'lookup.jdbc.async' = 'true',
'table.identifier' = 'dim.dim_city',
'username' = 'root',
'password' = ''
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
写给Apache Doris:
CREATE TABLE doris_sink (
name STRING,
age INT,
score DECIMAL(5,2)
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label',
//json write in
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true'
);
猜你喜欢
- 2024-11-14 开源的数据同步中间件DBSyncer 开源数据同步工具
- 2024-11-14 【赵强老师】利用数据库触发器实现数据的同步
- 2024-11-14 MYSQL主从同步详细教程 mysql数据库同步到另一个数据库
- 2024-11-14 MySQL数据库主从同步的3种一致性方案实现,及优劣比较
- 2024-11-14 高并发场景下,如何实现数据库主从同步?
你 发表评论:
欢迎- 612℃几个Oracle空值处理函数 oracle处理null值的函数
- 603℃Oracle分析函数之Lag和Lead()使用
- 592℃0497-如何将Kerberos的CDH6.1从Oracle JDK 1.8迁移至OpenJDK 1.8
- 589℃Oracle数据库的单、多行函数 oracle执行多个sql语句
- 583℃Oracle 12c PDB迁移(一) oracle迁移到oceanbase
- 576℃【数据统计分析】详解Oracle分组函数之CUBE
- 566℃最佳实践 | 提效 47 倍,制造业生产 Oracle 迁移替换
- 558℃Oracle有哪些常见的函数? oracle中常用的函数
- 最近发表
-
- PageHelper - 最方便的 MyBatis 分页插件
- 面试二:pagehelper是怎么实现分页的,
- MyBatis如何实现分页查询?(mybatis-plus分页查询)
- SpringBoot 各种分页查询方式详解(全网最全)
- 如何在Linux上运行exe文件,怎么用linux运行windows软件
- 快速了解hive(快速了解美国50个州)
- Python 中的 pyodbc 库(pydbclib)
- Linux搭建Weblogic集群(linux weblogic部署项目步骤)
- 「DM专栏」DMDSC共享集群之部署(一)——共享存储配置
- 故障分析 | MySQL 派生表优化(mysql pipe)
- 标签列表
-
- 前端设计模式 (75)
- 前端性能优化 (51)
- 前端模板 (66)
- 前端跨域 (52)
- 前端缓存 (63)
- 前端aes加密 (58)
- 前端脚手架 (56)
- 前端md5加密 (54)
- 前端路由 (61)
- 前端数组 (73)
- 前端js面试题 (50)
- 前端定时器 (59)
- 前端获取当前时间 (50)
- Oracle RAC (76)
- oracle恢复 (77)
- oracle 删除表 (52)
- oracle 用户名 (80)
- oracle 工具 (55)
- oracle 内存 (55)
- oracle 导出表 (62)
- oracle约束 (54)
- oracle 中文 (51)
- oracle链接 (54)
- oracle的函数 (58)
- 前端调试 (52)
本文暂时没有评论,来添加一个吧(●'◡'●)