Code-Cookbook

博客

  • Blogs
    • 1. [Springboot x spark]java.util.concurrent.ExecutionException: Boxed Error
    • 2. [Apollo]Apollo Config Center
    • 3. [Confluent]Confluent快速上手
    • 4. [Flink]CommdLine+Springboot+flink无法指定配置文件启动
    • 5. [Flink]Flink-connector-http
    • 6. [Flink]FlinkKafkaProducer启用压缩
    • 7. [Flink]FlinkSQL时间处理函数
    • 8. [Flink]Flink Sources
    • 9. [Flink]Flink中自定义watermark生成器
    • 10. [Flink]Flink的并行度与TaskSlot
      • 10.1. 背景
        • 10.1.1. Flink架构
        • 10.1.2. 任务和算子链
      • 10.2. TaskSlot
      • 10.3. 并行度和TaskSlot的关系
      • 10.4. 举个例子
      • 10.5. 参考文章
    • 11. [Flink]ProcessFunction无法使用,抛出InvalidProgramException
    • 12. [Flink]使用状态算子将stream聚合输出
    • 13. [Flink]关于Flink的Checkpoint的一次问题排查
    • 14. [Flink]如何更通用地将Kafka(或其他)数据落地Hive?
    • 15. [Flink]监控Flink Metrics
    • 16. [Flink]自定义序列化消费Kafka数据
    • 17. [Flink源码]Flink任务是如何启动的
    • 18. [Flink源码]YarnApplication模式的任务启动
    • 19. [Flink源码]流式工厂模式与配置的延迟绑定
    • 20. [Git]Git问题
    • 21. [Git]误在Master分支开发并commit无法push
    • 22. [Hadoop]Hadoop distcp
    • 23. [Hadoop]一些Hadoop问题
    • 24. [Hive]Hive分区表批量删除分区
    • 25. [Hive]Hive的Analyze函数,Statistics in Hive
    • 26. [Hive]修改存储格式为Parquet的Hive表的字段类型
    • 27. [Hive]在指定位置添加字段
    • 28. [Hive]外部表修改为内部表
    • 29. [Hive]更新Metastore中的LastAccessTime
    • 30. [Hive]本地连接需要Kerberos认证的Hive
    • 31. [Java]如何根据需要动态生成Java的class
    • 32. [Java]元注解
    • 33. 注解解析
    • 34. [Java]集合
    • 35. 简单(常用)数据结构
    • 36. [Java]Java8 Stream API
    • 37. [Java]SPI和责任链模式
    • 38. [Java]Socket
    • 39. [Java]使用Java在服务端和客户端之间传送文件
    • 40. [Java]三种策略模式应用于服务的启动
    • 41. [Java]多线程
    • 42. [Java]生产者消费者模型问题
    • 43. [Java]让项目顺利读取resources目录下的文件
    • 44. [Java]设计模式
    • 45. Continuing…
    • 46. [Java]设计模式六大原则
    • 47. [Java]面向对象知识点梳理
    • 48. [Java]OOP防脱发指南
    • 49. [Kerberos]Message stream modified (41)错误
    • 50. [Kudu]关于Kudu Upsert列的问题
    • 51. [Kudu]关于Kudu列的顺序的修改
    • 52. [MongoDB]MongoDB基本查询
    • 53. [Pyspark]PySpark
    • 54. [PySpark]PySpark On Yarn
    • 55. [Spark]CDP上安装其他版本SPARK(SPARK3)
    • 56. [SQL]Druid SQL解析器
    • 57. [SQL]IN/NOT IN/EXISTS/NOT EXISTS的替代写法
    • 58. [SQL]IN OR NOT IN , IS A PROBLEM
    • 59. [SQL]SQLLineage解析SQL血缘
    • 60. [SQL]业务数据库中的create_time和update_time分析时的问题
    • 61. [SQL]为什么LEFT JOIN后总数却与右表的总数一样了?
    • 62. [SQL]求用户任意天连续登录(每天为第多少天连续登录)
    • 63. [SQL]计算指定日期的年-周(为某年的第多少周)
    • 64. [Scala]函数中闭包(Closure)和柯里化(Currying)
    • 65. [Shell]EOF
    • 66. [Shell] Zip命令
    • 67. [Shell]Shell脚本日期递增(起止日期内递增)
    • 68. [Shell]将字符串转换为数字进行大小比较
    • 69. [Shell]打印本机IP
    • 70. [SparkStreaming]消费kafka写入Hive失败的问题Lease timeout of 0 seconds expired
    • 71. [Spark]SparkSQL 列转行的一种方法
    • 72. [Spark]SparkSQL JDBC并发连接读取
    • 73. [Spark]Spark提交任务RSA premaster secret error
    • 74. [Spark]Springboot整合Spark, 本地、集群部署
    • 75. [Spark]如何使用Java创建一个Row
    • 76. [Spark]将Spark DataFrame中的数值取出
    • 77. [Springboot]okHttp错误:Exception in thread “OkHttp Dispatcher” java.lang.IllegalStateException: closed
    • 78. [Vim]Vim查找和替换命令
    • 79. [debezium]在启动任务时传入SQL语句生成Snapshot
    • 80. [debezium]热修改Debezium MySQL Connector配置

Random ramblings

  • Random ramblings

大数据

  • Bigdata
  • Bigdata Tools

大数据辅助工具

  • Auxiliary tools

SQL相关

  • SQL
Code-Cookbook
  • Blogs
  • 10. [Flink]Flink的并行度与TaskSlot
  • 查看页面源码

10. [Flink]Flink的并行度与TaskSlot

10.1. 背景

之前在提交一个flink任务的时候,配置修改默认的并行度为不同值时,发现在Flink的WebUI上除了可以看到算子的parallelism发生变化,也可以看到Task Managers的数量在发生变化,对这二者的关系不是太了解,一直想抽一个时间弄清楚他们之间的关系,于是今天这篇文章就来理解理解并整理一下。

了解这块的知识最好的工具当然是官网了,以Flink1.13版本为例,猛戳去了解,Flink Architecture

10.1.1. Flink架构

Flink architecture

10.1.1.1. JobManger

一个任务至少有一个JobManager,它负责生成任务的执行图、调度任务的执行、协调CHK的生成以及恢复失败的任务等

10.1.1.2. TaskManger

一个任务至少有一个TaskManager,它负责执行数据流的任务,缓存和交换数据流上的数据。

The smallest unit of resource scheduling in a TaskManager is a task slot. The number of task slots in a TaskManager indicates the number of concurrent processing tasks. Note that multiple operators may execute in a task slot.

一个TaskManager的最小的资源调度单位是任务槽。TaskManager上的任务槽的数量表明了该任务可以并发处理的任务数,多个算子可能在同一个任务槽中执行。

10.1.2. 任务和算子链

task_chains

Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency. The chaining behavior can be configured; see the chaining docs for details.

在Flink中每个task都由一个线程执行,Flink会将一些算子链起来,这样的操作有很多好处,比如一些算子如Source和Map算子之间可以链在一起,这样就减少了线程间的切换和缓冲的开销,并且降低了延迟的同时也提高了吞吐量。

上图是一个具有代表性的任务执行图,该任务由Source、Map、ke y B y()/window()/apply()和Sink算子组成,其中source、map和keyBy()算子都具有2个并行度,而sink算子具有1个并行度(比如说出于减少小文件的个数的考虑,这很正常),根据程序的优化,source和map算子被链在一起,而在数据流去往keyBy算子时可能会进行shuffle(跨网络传输,去往不同机器上的线程),所以map算子和keyBy就没有被链在一起。

10.2. TaskSlot

每个任务槽都是TaskManager上的一系列资源的子集。TaskManager的资源会均匀的分配到每个Slot上,它分配的是TaskManager的Managed memory,这样每个字任务就不会从其他任务上抢占managed memory。(这其中不包含CPU的隔离)

Flink允许任务槽的共享,也就是说来自不同任务(task)的子任务(substask)可以运行在同一个Slot上,当然前提是这些task和substask都是来自于同一个job

下面的图是任务和算子链中提到的任务图的解释:

tasks_slots

前面已经知道source、map、keyBy算子的并行度都为2,他们算在一起一共是2个task,每个task有2个substask,整个任务一个5个substask,由于source和map被链在一起,并且任务槽可以共享,所以source和map将运行在一个slot中,这样的source和map的链一共两组,所以上图中左右各一组,每组在一个slot中,keyBy算子占用两个不同的slot,由于sink算子的并行度为1,所以独占一个slot。

这样该任务总共占用5个slot。

如果我们单纯将该任务的并行度提高到6,仍然保持sink算子的并行度为1,这样整个任务将会有13个子任务(source和map共6个,keyBy的6个,sink的1个),同样,source和map链在一起,将会有6组,占用6个不同的slot,keyBy也同样,sink算子并行度为1,由于slot可以共享,所以sink算子将可以跟前面的算子任意一组同处于同一个slot,实际的slot划分是这样的:

slot_sharing

可以提高任务的并行度为6之后,该任务实际占用了6个slot。

优雅,太优雅了!

10.3. 并行度和TaskSlot的关系

TaskSlot是静态的,它表征了一个任务可以并发执行任务的能力,任务的最大并行度不会超过TaskSlot的数量。TaskSlot的数量可以通过flink-conf.yml文件中的taskmanager.numberOfTaskSlots: 8进行配置。并行度是动态的,是指TaskManager的实际并行执行能力,可以通过parallelism.default来指定。

如果你提交了一个任务的并行度超过了槽的数量,程序将不能正常执行,会报错提示任务不能划分足够的资源,这在本地运行时经常会发生。

10.4. 举个例子

一个任务的执行图如图,该任务一共有9个Task

job_graph

一共有25个subtask

subtask

很简单,任务在提交的时候使用的默认并行度为3,而MappedBradcastSource算子的代码级别的并行度被设置为1,所以Task数量很容易知道,task的数量即为:4(source)*1 + 4(sink)*1 + 1(MappedBradcastSource)*1 = 9,而substask的数量即为:4(source)*3 + 4(sink) * 3 + 1(MappedBradcastSource)*1=25

那么该任务一共是用了几个TaskSlot呢?答案是3个。

task_managers

为什么?

因为在flink-conf.yml中指定了每个taskmanager能提供的任务槽数为1:

taskmanager.numberOfTaskSlots: 1

所以该任务只需要3个Slot即可,由于每个taskmanager只能提供1个slot,所以一共需要3个taskmanager

清晰,太清晰了!

至此,对于flink的并行度和slot,我算有了一个大致的了解,知道了WebUI上这些框框和数字是咋来的了!

10.5. 参考文章

  1. Flink TaskSlot与并行度

  2. Flink 任务(Tasks)和任务槽(Task Slots)

完成时间:2023-03-26 23:04

上一页 下一页

© 版权所有 2020-2025, roohom。

利用 Sphinx 构建,使用的 主题 由 Read the Docs 开发.