69. [Spark]SparkSQL JDBC并发连接读取

69.1. 前言

SparkSQL提供了一套统一的API去通过JDBC读取Data Sources,即

.option("url", jdbcUrl)
.option("driver", xxxx.class)
.option("key", "value")




69.2. 配置说明


partitionColumn, lowerBound, upperBound

These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric, date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.


The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

如上是官网的详细说明,意思就是说,当配置了partitionColumn, lowerBound, upperBound这三个其中任意一个的时候,其他两个都必须要配置,并且如果想要生效吗,还必须配置numPartitions这个参数,这些参数描述了当多个worker并行地读取表的时候如何将这个表的数据进行分区(partition),并且这些参数有一些要求:

  • partitionColumn必须是数值类型的、日期或者时间戳之类的列

  • lowerBoundupperBound描述了数据分区的上下边界,而不是为了过滤数据(表的所有数据都会被返回,这些参数仅仅用在读取的时候生效)

  • numPartitions最大的分区数量,这个参数将会被应用在并行地读取和写入表的时候。这个参数也定义了并发连接JDBC的连接数(如果在写入数据的时候分区大于该参数,将会在写入之前调用coalesce参数进行分区的削减)

69.3. 示例:

.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as tmp")
.option("numPartitions", 5)


  • 下限是0,上限是20000

  • 用来进行分区的列是id,该列必须在源表中存在

  • 分区的数量是5,也就是spark在读取数据源的时候会并发的启用5个JDBC连接

当开始执行的时候,通过Spark WebUI上的stages中看到5个task同时执行。