Code-Cookbook

博客

  • Blogs
    • 1. [Springboot x spark]java.util.concurrent.ExecutionException: Boxed Error
    • 2. [Apollo]Apollo Config Center
    • 3. [Confluent]Confluent快速上手
    • 4. [Flink]CommdLine+Springboot+flink无法指定配置文件启动
    • 5. [Flink]Flink on k8s任务的提交和实操
    • 6. [Flink]Flink-connector-http
    • 7. [Flink]FlinkKafkaProducer启用压缩
    • 8. [Flink]FlinkSQL时间处理函数
    • 9. [Flink]Flink Sources
    • 10. [Flink]Flink中自定义watermark生成器
    • 11. [Flink]Flink的并行度与TaskSlot
    • 12. [Flink]ProcessFunction无法使用,抛出InvalidProgramException
    • 13. [Flink]使用状态算子将stream聚合输出
    • 14. [Flink]关于Flink的Checkpoint的一次问题排查
    • 15. [Flink]多源实时行为报告的设计思路
    • 16. [Flink]如何更通用地将Kafka(或其他)数据落地Hive?
    • 17. [Flink]监控Flink Metrics
    • 18. [Flink]自定义序列化消费Kafka数据
    • 19. [Flink源码]Flink任务是如何启动的
    • 20. [Flink源码]YarnApplication模式的任务启动
    • 21. [Flink源码]流式工厂模式与配置的延迟绑定
      • 21.1. 前言
      • 21.2. Flink中的运行环境工厂
      • 21.3. 细说延迟绑定
      • 21.4. 避免工厂子类的冗余
      • 21.5. 我的实际应用
    • 22. [Git]Git问题
    • 23. [Git]误在Master分支开发并commit无法push
    • 24. [Hadoop]Hadoop distcp
    • 25. [Hadoop]一些Hadoop问题
    • 26. [Hive]Hive分区表批量删除分区
    • 27. [Hive]Hive的Analyze函数,Statistics in Hive
    • 28. [Hive]修改存储格式为Parquet的Hive表的字段类型
    • 29. [Hive]在指定位置添加字段
    • 30. [Hive]外部表修改为内部表
    • 31. [Hive]更新Metastore中的LastAccessTime
    • 32. [Hive]本地连接需要Kerberos认证的Hive
    • 33. [Java]如何根据需要动态生成Java的class
    • 34. [Java]元注解
    • 35. 注解解析
    • 36. [Java]集合
    • 37. 简单(常用)数据结构
    • 38. [Java]Java8 Stream API
    • 39. [Java]SPI和责任链模式
    • 40. [Java]Socket
    • 41. [Java]使用Java在服务端和客户端之间传送文件
    • 42. [Java]三种策略模式应用于服务的启动
    • 43. [Java]多线程
    • 44. [Java]生产者消费者模型问题
    • 45. [Java]让项目顺利读取resources目录下的文件
    • 46. [Java]设计模式
    • 47. Continuing…
    • 48. [Java]设计模式六大原则
    • 49. [Java]面向对象知识点梳理
    • 50. [Java]OOP防脱发指南
    • 51. [Kerberos]Message stream modified (41)错误
    • 52. [Kudu]关于Kudu Upsert列的问题
    • 53. [Kudu]关于Kudu列的顺序的修改
    • 54. [Logback]特定业务日志重定向
    • 55. [MongoDB]MongoDB基本查询
    • 56. [Paimon]Flink读写Nginx代理的OSS上的Paimon表
    • 57. [Pyspark]PySpark
    • 58. [PySpark]PySpark On Yarn
    • 59. [Spark]CDP上安装其他版本SPARK(SPARK3)
    • 60. [SQL]Druid SQL解析器
    • 61. [SQL]IN/NOT IN/EXISTS/NOT EXISTS的替代写法
    • 62. [SQL]IN OR NOT IN , IS A PROBLEM
    • 63. [SQL]SQLLineage解析SQL血缘
    • 64. [SQL]业务数据库中的create_time和update_time分析时的问题
    • 65. [SQL]为什么LEFT JOIN后总数却与右表的总数一样了?
    • 66. [SQL]求用户任意天连续登录(每天为第多少天连续登录)
    • 67. [SQL]计算指定日期的年-周(为某年的第多少周)
    • 68. [Scala]函数中闭包(Closure)和柯里化(Currying)
    • 69. [Shell]EOF
    • 70. [Shell] Zip命令
    • 71. [Shell]Shell脚本日期递增(起止日期内递增)
    • 72. [Shell]将字符串转换为数字进行大小比较
    • 73. [Shell]打印本机IP
    • 74. [SparkStreaming]消费kafka写入Hive失败的问题Lease timeout of 0 seconds expired
    • 75. [Spark]SparkSQL 列转行的一种方法
    • 76. [Spark]SparkSQL JDBC并发连接读取
    • 77. [Spark]Spark提交任务RSA premaster secret error
    • 78. [Spark]Springboot整合Spark, 本地、集群部署
    • 79. [Spark]如何使用Java创建一个Row
    • 80. [Spark]将Spark DataFrame中的数值取出
    • 81. [Springboot]okHttp错误:Exception in thread “OkHttp Dispatcher” java.lang.IllegalStateException: closed
    • 82. [Vim]Vim查找和替换命令
    • 83. [debezium]在启动任务时传入SQL语句生成Snapshot
    • 84. [debezium]热修改Debezium MySQL Connector配置
    • 85. [Paimon]Paimon表的读写
  • 文章
  • 项目

Random ramblings

  • Random ramblings

大数据

  • Bigdata
  • Bigdata Tools

大数据辅助工具

  • Auxiliary tools

SQL相关

  • SQL
Code-Cookbook
  • Blogs
  • 21. [Flink源码]流式工厂模式与配置的延迟绑定
  • 查看页面源码

21. [Flink源码]流式工厂模式与配置的延迟绑定

21.1. 前言

在flink的源码中大量使用了设计模式,工厂模式当然不例外,下文从flink源码一处有意思的写法的代码块入手,分析设计模式中工厂模式与函数式方法结合的妙用,以回答我在之前的一篇文章([Flink源码]Flink任务是如何启动的)中提出的问题

为什么要有流式工厂,以及有什么好处

21.2. Flink中的运行环境工厂

在flink中有有一个StreamExecutionEnvironmentFactory工厂类,负责生产StreamExecutionEnvironment,代码是这样的:

/** Factory class for stream execution environments. */
@PublicEvolving
@FunctionalInterface
public interface StreamExecutionEnvironmentFactory {

    /**
     * Creates a StreamExecutionEnvironment from this factory.
     *
     * @return A StreamExecutionEnvironment.
     */
    StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration);
}

它只有一个方法,并且该类使用了@FunctionalInterface注解标记,所以该接口实际上是一个函数式接口

它的调用发生在org.apache.flink.streaming.api.environment.StreamExecutionEnvironment这个类中,

public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
        return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
                .map(factory -> factory.createExecutionEnvironment(configuration))
                .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
    }

Creates an execution environment that represents the context in which the program is currently executed. If the program is invoked standalone, this method returns a local execution environment, as returned by {@link #createLocalEnvironment(Configuration)}.

这里是从threadLocalContextEnvironmentFactory和contextEnvironmentFactory解析出一个合适的工厂类,使用这个工厂创建一个StreamExecutionEnvironment对象,如果没有合适的工厂没能创建一个合适的StreamExecutionEnvironment对象,则调用createLocalEnvironment方法创建一个LocalStreamEnvironment

进入这个工厂的内部,看看实现类都有哪些,由于是函数式接口,那么实现类自然就是lambda方法,这里主要关注与StreamContextEnvironment,它也是生产环境代码主要执行的类

StreamContextEnvironment

进入方法内部,工厂是这样被调用的:

final StreamExecutionEnvironmentFactory factory =
        envInitConfig -> {
            final boolean programConfigEnabled =
                    clusterConfiguration.get(DeploymentOptions.PROGRAM_CONFIG_ENABLED);
            final List<String> programConfigWildcards =
                    clusterConfiguration.get(DeploymentOptions.PROGRAM_CONFIG_WILDCARDS);
            final Configuration mergedEnvConfig = new Configuration();
            mergedEnvConfig.addAll(clusterConfiguration);
            mergedEnvConfig.addAll(envInitConfig);
            return new StreamContextEnvironment(
                    executorServiceLoader,
                    clusterConfiguration,
                    mergedEnvConfig,
                    userCodeClassLoader,
                    enforceSingleJobExecution,
                    suppressSysout,
                    programConfigEnabled,
                    programConfigWildcards);
        };
initializeContextEnvironment(factory);

这里使用了lambda表达式envInitConfig -> {},此表达式即调用了工厂的createExecutionEnvironment(Configuration configuration)方法,但此时,实际上得到的其实只是一个工厂StreamExecutionEnvironmentFactory,而不是我们需要的StreamExecutionEnvironment

21.3. 细说延迟绑定

为什么说流式工厂做到了延迟绑定能,延迟绑定又是什么意思呢?我不这样写不也可以达到同样的目的吗?

问得好!

延迟绑定的意思是,代码的实际执行被推迟到代码的调用时,而不是在代码被定义时就确定。

可以看到,上面的工厂方法里需要的唯一一个参数是Configuration,当实际执行的时候根据上下文环境得到的Configuration来创建StreamExecutionEnvironment,而不需要在定义时就明确得到了Configuration,这里的Configuration允许在运行时动态更新。

如果不使用这样的方法,想要达到同样的目的的话,需要这样写(一个假想例子):

传统工厂模式

class ClassicFactory implements StreamExecutionEnvironmentFactory {
    private final Configuration baseConfig;
    private final ExecutorServiceLoader loader;

    // 必须在构造时固定所有依赖
    public ClassicFactory(Configuration config, ExecutorServiceLoader loader) {
        this.baseConfig = config;
        this.loader = loader;
    }

    @Override
    public StreamExecutionEnvironment create(Configuration runtimeConfig) {
        // 合并配置的逻辑仍然需要在这里实现
        Configuration merged = merge(baseConfig, runtimeConfig);
        return new StreamContextEnvironment(loader, merged);
    }
}

// 使用时:
Configuration initialConfig = loadInitialConfig();
ExecutorServiceLoader loader = getLoader();
StreamExecutionEnvironmentFactory factory = new ClassicFactory(initialConfig, loader);

// 稍后在业务代码中:
StreamExecutionEnvironment env = factory.create(userConfig);

Lambda工厂

// 直接捕获当前上下文中的 configuration 和 executorServiceLoader
StreamExecutionEnvironmentFactory factory = conf -> {
    Configuration merged = new Configuration();
    merged.addAll(configuration); // 直接访问外部变量
    merged.addAll(conf);
    return new StreamContextEnvironment(
        executorServiceLoader, // 直接访问外部依赖
        merged
    );
};

在普通工厂中,一些配置和以来运行时状态的变量(如ExecutorServiceLoader)需要预先显示地指定,由构造参数传递进入类内部进行存储,而lambda工厂则不需要这样,直接在调用时即可以补货上下文环境中的配置变量,这其实说的是lambda工厂的动态上下文环境捕获能力

21.4. 避免工厂子类的冗余

使用普通工厂,需要预先new好工厂的子类实现,必须有localEnv、remoteEnv、clusterEnv等,需要由工厂预先创建好,而使用lambda工厂则不需要这样,lambda工厂只是暂留了生成子类的逻辑,当需要时调用这个逻辑即可快速生成需要的子类,无需预先生成好

21.5. 我的实际应用

在我的业务应用中有这样一段逻辑,需要根据不同的配置,选择不同的执行器去执行包含业务逻辑的代码

public interface Executor {
    void execute(SparkSession sparkSession);
}
private static Configuration configuration = null;

public static void main(String[] args) throws Exception {

    initSystem(args);

    for (String id : configId.split(",")) {
        initConfig(Long.parseLong(id));
        switch (app) {
            case "ck":
                Executor checkExecutor = new CheckExecutor(configuration);
                checkExecutor.execute(spark);
                break;
            case "hi":
                Executor pipelineExecutor = new ProcessExecutor(configuration);
                pipelineExecutor.execute(spark);
                break;
            case "wi":
                Executor flattenExecutor = new FlattenExecutor(configuration);
                flattenExecutor.execute(spark);
                break;
            default:
                throw new RuntimeException("You must specify one app name to run this program.");
        }
    }
}


void initConfig(){
    configuration = xxx; //更新configuration的功能模块
}

根据类命名可以联想,在实际传入不同的启动参数时,会进入不同分支,选择不同的executor执行用户代码,但是实际上,由于都是Executor的子类,上面的代码其实可以优化为

public static void main(String[] args) throws Exception {

        initSystem(args);

        /* initialize executors */
        ArrayList<Executor> executors = Lists.newArrayList(
                new CheckExecutor(configuration),
                new ProcessExecutor(configuration),
                new FlattenExecutor(configuration)
        );

        for (String id : configId.split(",")) {
            initConfig(Long.parseLong(id));

            executors.forEach(executor -> {
                if (executor.accept(app)) executor.execute(spark);
            });
        }

        spark.stop();
        }
}

这样看起来清爽了很多,但是在实际运行过程中问题出现了:

由于Executor的子类在对象创建的时候,构造方法中会引用configuration进行一些配置参数的校验和赋值,而initConfig放在在for循环内部,也就是说configuration的更新要晚于对象的创建,所以运行时会报空指针或者其他参数校验错误的异常。

那么如何解决这个问题呢?

下面引入Lambda工厂,有请我们的ExecutorFactory

@FunctionalInterface
public interface ExecutorFactory {
    Executor createExecutor(Configuration configuration);
}

这里使用@FunctionalInterface注解进行标记,表明该类是一个函数式接口,并且只有一个抽象方法,接收一个配置参数,之前提到过,Lambda工厂的创建只是暂留了创建子类的逻辑,在使用时按需生成子类,下面是优化后的版本

public static void main(String[] args) throws Exception {

        initSystem(args);

        /* initialize executors, factory for lazy initialization */
        List<ExecutorFactory> executorFactories = Arrays.asList(
                CheckExecutor::new,
                ProcessExecutor::new,
                FlattenExecutor::new
        );

        for (String id : configId.split(",")) {
            initConfig(Long.parseLong(id));

            executorFactories.stream()
                    .map(factory -> factory.createExecutor(configuration))
                    .filter(executor -> executor.accept(app))
                    .forEach(executor -> executor.execute(spark));
        }

        spark.stop();
    }

以上代码中,预先构建了生成所有子类所需要的工厂,此时并没有生成子类对象,在for循环,也就是业务逻辑执行的代码块内部,随着configuration配置的动态更新完成,此时调用工厂的create方法创建子类对象,调用子类的accept方法,满足的子类则会被选择以执行业务逻辑。

上一页 下一页

© 版权所有 2020-2026, roohom。

利用 Sphinx 构建,使用的 主题 由 Read the Docs 开发.