网站首页 > 技术文章 正文
导读:在日常开发中常有这么一个场景,采集如日志等数据后以JSON形式存储到Kafka中,再由Flink从Kafka中获取数据并进行处理。但是有时候JSON比较复杂(多层嵌套),在FlinkSQL中解析起来比较麻烦,下面将讨论Flink SQL(1.10版本) 如何解析复杂JSON。
官网Demo
//JSON Format
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#table-formats
首先查看官网给出的一个例子,大致的解决思路为使用 format.json-schema,自定义一个format schema。
//官网例子
CREATE TABLE MyUserTable (
...
) WITH (
'format.type' = 'json', -- required: specify the format type
'format.fail-on-missing-field' = 'true' -- optional: flag whether to fail if a field is missing or not, false by default
'format.fields.0.name' = 'lon', -- optional: define the schema explicitly using type information.
'format.fields.0.data-type' = 'FLOAT', -- This overrides default behavior that uses table's schema as format schema.
'format.fields.1.name' = 'rideTime',
'format.fields.1.data-type' = 'TIMESTAMP(3)',
'format.json-schema' = -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP.
'{ -- This also overrides the default behavior.
"type": "object",
"properties": {
"lon": {
"type": "number"
},
"rideTime": {
"type": "string",
"format": "date-time"
}
}
}'
)
分析:Flink 在解析 JSON 的时候,对于复杂的 JSON 可以通过自定义format schema来支持。如果table schema 和 format schema相同,则可以自动派生 json 的 schema(但这种往往不适用于解析复杂JSON )。
实战例子
了解了官网的例子之后,我们手动试验一下。
1、从Kafka中获取复杂JSON用于测试,JSON 如下:
{"code":0,"data":{"request":{"name":"test","id":"ce1beb37-ed3e-4365-8e44-c3bd1d249cfc"}},"message":"SUCCESS","retryCount":1,"success":true}
2、编写format.json-schema
通过参考官网Demo,发现第一层的 retryCount 可直接就映射到字段上,而 data 是多层嵌套,所以定义data 的类型为object ,而properties则是其json的内层数据。我们的例子中为多层嵌套,那么简化对应的 json-schema 如下:
'format.json-schema' = '{
"type": "object",
"properties": {
"retryCount": {type: "string"},
"data":{type: "object",
"properties" : {
"request":{type: "object",
"properties" : {
"id" : {type:"string"}
}
}
}
}
}
}'
3、定义table schema
从上面的 json schame 和 Flink SQL 的映射关系可以看出,data对应的table 字段的类型是ROW,所以 table schema 应是如下:
CREATE TABLE sourceTable (
retryCount VARCHAR,
data ROW(request ROW(id string))
)
WITH (
......
}
4、使用的时候,直接用 "."的方式即可
Table table = bsTableEnv.sqlQuery("SELECT data.request.id AS ID,retryCount FROM sourceTable");
5、Kafka SourceTable完整例子
CREATE TABLE sourceTable (
retryCount VARCHAR,
data ROW(request ROW(id string))
)
WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'XXXX',
'connector.properties.zookeeper.connect' = 'XXXX:2181',
'connector.properties.bootstrap.servers' = 'XXXX:9092',
'connector.properties.group.id' = 'XXXXX',
'format.json-schema' = '{
"type": "object",
"properties": {
"retryCount": {type: "string"},
"data":{type: "object",
"properties" : {
"request":{type: "object",
"properties" : {
"id" : {type:"string"}
}
}
}
}
}
}',
'format.type' = 'json');
最后
以上就是在FlinkSQL1.10中处理复杂JSON的一种方式,通过定义format.json schema实现。而在查看Flink中文邮件列表中也发现了其他的一些不错思路,如下:
通过在上游时将就转义成一个String放到JSON的一个field中,这样Flink解析出来就是一个String,
然后编写UDTF进行处理,感兴趣的朋友也可以尝试一下。
感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。
猜你喜欢
- 2025-10-19 Python常用标准库(pickle序列化和JSON序列化)
- 2025-10-19 Linux json-c使用_linux解析json数据
- 2025-10-19 源码推荐(03.04):微信支付的测试,Json数据解析
- 2025-10-19 打开JSON文件的六种方法,总有一种适合你
- 2025-10-19 springmvc项目中接收Android提交json数据
- 2025-10-19 一篇文章让你详细了解何为JSON_json到底是什么
- 2025-10-19 超级好用的轻量级JSON处理命令jq_json使用教程
- 2025-10-19 .NET性能系列文章二:Newtonsoft.Json vs System.Text.Json
- 2025-10-19 推荐几个开发必备的JSON工具_推荐几个开发必备的json工具
- 2025-10-19 零基础入门AI智能体:详细了解什么是变量类型、JSON结构、Markdown格式
你 发表评论:
欢迎- 最近发表
-
- Python常用标准库(pickle序列化和JSON序列化)
- Linux json-c使用_linux解析json数据
- 源码推荐(03.04):微信支付的测试,Json数据解析
- 打开JSON文件的六种方法,总有一种适合你
- springmvc项目中接收Android提交json数据
- 一篇文章让你详细了解何为JSON_json到底是什么
- FlinkSQL处理复杂JSON的思路_flinksql解析json数组
- 超级好用的轻量级JSON处理命令jq_json使用教程
- .NET性能系列文章二:Newtonsoft.Json vs System.Text.Json
- 推荐几个开发必备的JSON工具_推荐几个开发必备的json工具
- 标签列表
-
- 前端设计模式 (75)
- 前端性能优化 (51)
- 前端模板 (66)
- 前端跨域 (52)
- 前端缓存 (63)
- 前端aes加密 (58)
- 前端脚手架 (56)
- 前端md5加密 (54)
- 前端路由 (61)
- 前端数组 (73)
- 前端js面试题 (50)
- 前端定时器 (59)
- Oracle RAC (76)
- oracle恢复 (77)
- oracle 删除表 (52)
- oracle 用户名 (80)
- oracle 工具 (55)
- oracle 内存 (55)
- oracle 导出表 (62)
- oracle约束 (54)
- oracle 中文 (51)
- oracle链接 (54)
- oracle的函数 (58)
- oracle面试 (55)
- 前端调试 (52)
本文暂时没有评论,来添加一个吧(●'◡'●)