专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

Spark jdbc 的并发的问题 spark jdbc server

ins518 2024-11-01 13:22:46 技术文章 11 ℃ 0 评论

大家使用spark比较熟的,都了解到spark可以通过jdbc这个API 可以访问 oracle的数据。

而且也可以在


def jdbc(
    url: String,
    table: String,
    predicates: Array[String],//这里就是传入进去的sql,可以是多个sql,这样就能并发向oracle查询数据了
    connectionProperties: Properties): DataFrame = {
  assertNoSpecifiedSchema("jdbc")
  // connectionProperties should override settings in extraOptions.
  val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
  val options = new JDBCOptions(url, table, params)
  val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
    JDBCPartition(part, i) : Partition
  }
  val relation = JDBCRelation(parts, options)(sparkSession)
  sparkSession.baseRelationToDataFrame(relation)
}

只是,这样有问题:

问题一:并发的个数不能控制,如果

predicates的长度是9

exectors的个数是3

cpu的core是3,那么运行这个任务的时候,就会启动 9个task,那问题来了,这么高的并发向oracle发起查询,对oracle的压力大。

可以通过 coalesce 来避免。

其次,这9个task运行的时间差 可能有的task 运行的时间点早,有的晚。

这样触发的是导数据任务,根据update_time进行导入,那么导入的数据在区间A中的一条恰好更新了,更新后应该进入 区间B 中,而这个时候,区间B的任务已经跑完了,因为运行的早。

那么这条记录就会丢失,这算是严重的问题了。

一:需要设置oracle的 isolationLevel,改成 REPEATABLE_READ
二:针对 9句sql,前面8句 可以并发操作,最后一条sql ,等前面8个任务执行完成后,才执行。

这样就可以了。因为 虽然数据会发生变化,变化的时候 updated_time肯定是系统最新的时间,那么这个时间肯定会落入 最后一句sql中,而且 因为 最后一个任务是最后执行,所以就没有问题了。(这里有个注意点:最后一句的sql区间 必须要确定 更新的时间点 一定要落在区间中才行)

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表