Code-Cookbook

博客

  • Blogs
    • 1. [Springboot x spark]java.util.concurrent.ExecutionException: Boxed Error
    • 2. [Apollo]Apollo Config Center
    • 3. [Confluent]Confluent快速上手
    • 4. [Flink]CommdLine+Springboot+flink无法指定配置文件启动
    • 5. [Flink]Flink-connector-http
    • 6. [Flink]FlinkKafkaProducer启用压缩
    • 7. [Flink]FlinkSQL时间处理函数
    • 8. [Flink]Flink Sources
    • 9. [Flink]Flink中自定义watermark生成器
    • 10. [Flink]Flink的并行度与TaskSlot
    • 11. [Flink]ProcessFunction无法使用,抛出InvalidProgramException
    • 12. [Flink]使用状态算子将stream聚合输出
    • 13. [Flink]关于Flink的Checkpoint的一次问题排查
    • 14. [Flink]如何更通用地将Kafka(或其他)数据落地Hive?
    • 15. [Flink]监控Flink Metrics
    • 16. [Flink]自定义序列化消费Kafka数据
    • 17. [Flink源码]Flink任务是如何启动的
      • 17.1. 背景
      • 17.2. 准备工作
      • 17.3. 启动
      • 17.4. 脚本入手
      • 17.5. 源码从main()开始
      • 17.6. security配置
      • 17.7. RUN
      • 17.8. StreamExecutionEnvironment
      • 17.9. StreamExecutionEnvironmentFactory
      • 17.10. 发散话题
    • 18. [Flink源码]YarnApplication模式的任务启动
    • 19. [Flink源码]流式工厂模式与配置的延迟绑定
    • 20. [Git]Git问题
    • 21. [Git]误在Master分支开发并commit无法push
    • 22. [Hadoop]Hadoop distcp
    • 23. [Hadoop]一些Hadoop问题
    • 24. [Hive]Hive分区表批量删除分区
    • 25. [Hive]Hive的Analyze函数,Statistics in Hive
    • 26. [Hive]修改存储格式为Parquet的Hive表的字段类型
    • 27. [Hive]在指定位置添加字段
    • 28. [Hive]外部表修改为内部表
    • 29. [Hive]更新Metastore中的LastAccessTime
    • 30. [Hive]本地连接需要Kerberos认证的Hive
    • 31. [Java]如何根据需要动态生成Java的class
    • 32. [Java]元注解
    • 33. 注解解析
    • 34. [Java]集合
    • 35. 简单(常用)数据结构
    • 36. [Java]Java8 Stream API
    • 37. [Java]SPI和责任链模式
    • 38. [Java]Socket
    • 39. [Java]使用Java在服务端和客户端之间传送文件
    • 40. [Java]三种策略模式应用于服务的启动
    • 41. [Java]多线程
    • 42. [Java]生产者消费者模型问题
    • 43. [Java]让项目顺利读取resources目录下的文件
    • 44. [Java]设计模式
    • 45. Continuing…
    • 46. [Java]设计模式六大原则
    • 47. [Java]面向对象知识点梳理
    • 48. [Java]OOP防脱发指南
    • 49. [Kerberos]Message stream modified (41)错误
    • 50. [Kudu]关于Kudu Upsert列的问题
    • 51. [Kudu]关于Kudu列的顺序的修改
    • 52. [MongoDB]MongoDB基本查询
    • 53. [Pyspark]PySpark
    • 54. [PySpark]PySpark On Yarn
    • 55. [Spark]CDP上安装其他版本SPARK(SPARK3)
    • 56. [SQL]Druid SQL解析器
    • 57. [SQL]IN/NOT IN/EXISTS/NOT EXISTS的替代写法
    • 58. [SQL]IN OR NOT IN , IS A PROBLEM
    • 59. [SQL]SQLLineage解析SQL血缘
    • 60. [SQL]业务数据库中的create_time和update_time分析时的问题
    • 61. [SQL]为什么LEFT JOIN后总数却与右表的总数一样了?
    • 62. [SQL]求用户任意天连续登录(每天为第多少天连续登录)
    • 63. [SQL]计算指定日期的年-周(为某年的第多少周)
    • 64. [Scala]函数中闭包(Closure)和柯里化(Currying)
    • 65. [Shell]EOF
    • 66. [Shell] Zip命令
    • 67. [Shell]Shell脚本日期递增(起止日期内递增)
    • 68. [Shell]将字符串转换为数字进行大小比较
    • 69. [Shell]打印本机IP
    • 70. [SparkStreaming]消费kafka写入Hive失败的问题Lease timeout of 0 seconds expired
    • 71. [Spark]SparkSQL 列转行的一种方法
    • 72. [Spark]SparkSQL JDBC并发连接读取
    • 73. [Spark]Spark提交任务RSA premaster secret error
    • 74. [Spark]Springboot整合Spark, 本地、集群部署
    • 75. [Spark]如何使用Java创建一个Row
    • 76. [Spark]将Spark DataFrame中的数值取出
    • 77. [Springboot]okHttp错误:Exception in thread “OkHttp Dispatcher” java.lang.IllegalStateException: closed
    • 78. [Vim]Vim查找和替换命令
    • 79. [debezium]在启动任务时传入SQL语句生成Snapshot
    • 80. [debezium]热修改Debezium MySQL Connector配置

Random ramblings

  • Random ramblings

大数据

  • Bigdata
  • Bigdata Tools

大数据辅助工具

  • Auxiliary tools

SQL相关

  • SQL
Code-Cookbook
  • Blogs
  • 17. [Flink源码]Flink任务是如何启动的
  • 查看页面源码

17. [Flink源码]Flink任务是如何启动的

17.1. 背景

最近写了一个Spark任务,任务在启动初始化的时候,需要加载内置在jar内的yml配置中的默认配置,还需要接收并解析用户命令行传入的自定义参数,于是为了适配和统一,定义了很多类来接收参数,管理起来很麻烦、混乱,且不方便使用。于是为了整合那么多参数,且方便调用使用,我准备参考一下Flink是如何管理这些参数的,于是我打开了flink-cdc的项目源码,一顿分析,发现里面引用了很多flink源码,于是我又打开flink源码,又一顿分析,源码特别多,调用关系相当复杂,稍不注意就分析偏了,这也是看源码时正常的情况,后来,就有了这一篇文章。不过不要紧,参数管理的目的,后面接着研究,下面着重从源码分析一下,一个用户写的Flink程序是如何启动的,这也是我第一篇分析源码的文章。

17.2. 准备工作

我们首先打开Flink的开源项目,把代码pull一份到本地(以下分析以1.15版本为例)

flink源码仓库

假设我们现在已经有了一个jar包,该jar可以通过flink run 或者 run-application去启动

17.3. 启动

正常来说,启动flink任务的脚本可以简化成以下片段,如果你部署的是application模式,那么其中的原理会有不同,因为需要去使用YARN API与YARN打交道,为了方便分析,简化问题的探讨,在此仅分析Client模式

$ ./bin/flink run \
      --detached \
      ./examples/streaming/StateMachineExample.jar

17.4. 脚本入手

拿到了flink的启动命令,我们从flink的启动脚本入手,看脚本都干了些啥,先看bin/flink中的内容:

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break
    fi
    ls=`ls -ld -- "$target"`
    target=`expr "$ls" : '.* -> \(.*\)$'`
    iteration=$((iteration + 1))
done

# Convert relative path to absolute path
bin=`dirname "$target"`

# get flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

CC_CLASSPATH=`constructFlinkClassPath`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)

# Add Client-specific JVM options
FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"

# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

直接看最后一句,因为上面无非是加载配置文件,装载配置信息,最后一句exec很显然是执行的内容,是实际做的事儿

可以看到,执行的类是org.apache.flink.client.cli.CliFrontend,这是一个核心突破口,我们打开flink项目源码,找到这个类

17.5. 源码从main()开始

可以看到,该类位于flink-clients这个module下,并且包含了一个main方法,可以肯定,这是flink程序的入口类

client-entrypoint

main方法中核心做了四件事儿

  1. 定位配置目录。找到核心配置文件所在的目录,即${FLINK_HOME}/conf目录

  2. 文件加载即配置解析。将配置文件flink-conf.yaml中的内容加载并解析,装载进入Configuration类中

  3. 解析命令行参数。获取命令行参数内容,并解析

  4. 执行。通过反射调用UserCode的main方法。

下面着重介绍第4步

17.6. security配置

在实际调用SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args))去执行job之前,还需要安装环境安全配置

SecurityUtils.install(new SecurityConfiguration(cli.configuration))

  1. 安装security模块

  2. 安装security上下文

security安装

我们进入SecurityUtils中去看看实际进行了什么动作,以Security Module的安装为例,在SecurityUtils中,核心方法是install(SecurityConfiguration config)

/**
 * Installs a process-wide security configuration.
 *
 * <p>Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
 */
public static void install(SecurityConfiguration config) throws Exception {
    // Install the security modules first before installing the security context
    installModules(config);
    installContext(config);
}

static void installModules(SecurityConfiguration config) throws Exception {

    // install the security module factories
    List<SecurityModule> modules = new ArrayList<>();
    for (String moduleFactoryClass : config.getSecurityModuleFactories()) {
        SecurityModuleFactory moduleFactory = null;
        try {
            moduleFactory = SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
        } catch (NoMatchSecurityFactoryException ne) {
            LOG.error("Unable to instantiate security module factory {}", moduleFactoryClass);
            throw new IllegalArgumentException("Unable to find module factory class", ne);
        }
        SecurityModule module = moduleFactory.createModule(config);
        // can be null if a SecurityModule is not supported in the current environment
        if (module != null) {
            module.install();
            modules.add(module);
        }
    }
    installedModules = modules;
}

一眼看过去,可以看到installModules(SecurityConfiguration config)方法中有一个SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass)的方法调用,根据我为数不多的经验可以看出来,这是java的SPI机制,里面一定有ServiceLoader的加载调用

进入SecurityFactoryServiceLoader中,可以看到调用链路是

/** Find a suitable {@link SecurityModuleFactory} based on canonical name. */
public static SecurityModuleFactory findModuleFactory(String securityModuleFactoryClass)
        throws NoMatchSecurityFactoryException {
    return findFactoryInternal(
            securityModuleFactoryClass,
            SecurityModuleFactory.class,
            SecurityModuleFactory.class.getClassLoader());
}



private static <T> T findFactoryInternal(
        String factoryClassCanonicalName, Class<T> factoryClass, ClassLoader classLoader)
        throws NoMatchSecurityFactoryException {

    Preconditions.checkNotNull(factoryClassCanonicalName);

    ServiceLoader<T> serviceLoader;
    if (classLoader != null) {
        serviceLoader = ServiceLoader.load(factoryClass, classLoader);
    } else {
        serviceLoader = ServiceLoader.load(factoryClass);
    }

    List<T> matchingFactories = new ArrayList<>();
    Iterator<T> classFactoryIterator = serviceLoader.iterator();
    classFactoryIterator.forEachRemaining(
            classFactory -> {
                if (factoryClassCanonicalName.matches(
                        classFactory.getClass().getCanonicalName())) {
                    matchingFactories.add(classFactory);
                }
            });

    if (matchingFactories.size() != 1) {
        throw new NoMatchSecurityFactoryException(
                "zero or more than one security factory found",
                factoryClassCanonicalName,
                matchingFactories);
    }
    return matchingFactories.get(0);
}

既然有了SPI机制,那么一定有一个顶层接口类,才能加载到其实现类,那么这个顶层接口是什么呢?

让我们回退到org.apache.flink.runtime.security.SecurityUtils中,再看一眼调用链路,这个顶层接口的类名其实来自于SecurityConfiguration

factory class

那么接下来,我们只要找到这个SecurityConfiguration中是怎么加载的就可以了,回到install方法被调用的地方,

SecurityUtils.install(new SecurityConfiguration(cli.configuration))

可以看到SecurityConfiguration在实例化的时候传入的是配置configuration对象,进入到SecurityConfiguration中,内部构造方法里很清晰地就可以看到对于Module工厂类的传入

public SecurityConfiguration(Configuration flinkConf) {
    this(
            flinkConf,
            flinkConf.get(SECURITY_CONTEXT_FACTORY_CLASSES),
            flinkConf.get(SECURITY_MODULE_FACTORY_CLASSES));
}

于是可以看到,对于工厂类的提供使用了可选参数,如果不指定,将会有默认参数,这些定义在org.apache.flink.configuration.SecurityOptions中

public static final ConfigOption<List<String>> SECURITY_CONTEXT_FACTORY_CLASSES =
        key("security.context.factory.classes")
                .stringType()
                .asList()
                .defaultValues(
                        "org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory",
                        "org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory")
                .withDescription(
                        "List of factories that should be used to instantiate a security context. "
                                + "If multiple are configured, Flink will use the first compatible "
                                + "factory. You should have a NoOpSecurityContextFactory in this list "
                                + "as a fallback.");
public static final ConfigOption<List<String>> SECURITY_MODULE_FACTORY_CLASSES =
        key("security.module.factory.classes")
                .stringType()
                .asList()
                .defaultValues(
                        "org.apache.flink.runtime.security.modules.HadoopModuleFactory",
                        "org.apache.flink.runtime.security.modules.JaasModuleFactory",
                        "org.apache.flink.runtime.security.modules.ZookeeperModuleFactory")
                .withDescription(
                        "List of factories that should be used to instantiate security "
                                + "modules. All listed modules will be installed. Keep in mind that the "
                                + "configured security context might rely on some modules being present.");

对于Security Context的装配配置也是类似

17.7. RUN

一切准备工作做好,接下来就是实际执行用户的代码了

SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args))

进入到parseAndRun(org.apache.flink.client.cli.CliFrontend#parseAndRun)中,

/**
 * Parses the command line arguments and starts the requested action.
 *
 * @param args command line arguments of the client.
 * @return The return code of the program
 */
public int parseAndRun(String[] args) {

    // check for action
    if (args.length < 1) {
        CliFrontendParser.printHelp(customCommandLines);
        System.out.println("Please specify an action.");
        return 1;
    }

    // get action
    String action = args[0];

    // remove action from parameters
    final String[] params = Arrays.copyOfRange(args, 1, args.length);

    try {
        // do action
        switch (action) {
            case ACTION_RUN:
                run(params);
                return 0;
            case ACTION_RUN_APPLICATION:
                runApplication(params);
                return 0;
            case ACTION_LIST:
                list(params);
                return 0;
            case ACTION_INFO:
                info(params);
                return 0;
            case ACTION_CANCEL:
                cancel(params);
                return 0;
            case ACTION_STOP:
                stop(params);
                return 0;
            case ACTION_SAVEPOINT:
                savepoint(params);
                return 0;
            case "-h":
            case "--help":
                CliFrontendParser.printHelp(customCommandLines);
                return 0;
            case "-v":
            case "--version":
                ......
                return 1;
        }
    } catch (CliArgsException ce) {
        return handleArgException(ce);
    }
    ......
}

到这里,剧情就变得熟悉起来,看到ACTION_RUN,ACTION_RUN_APPLICATION等分支,很自然可以想到命令行启动程序时传入的action参数,按照本文开头说的,我们进入到ACTION_RUN分支中,一探究竟

/**
 * Executions the run action.
 *
 * @param args Command line arguments for the run action.
 */
protected void run(String[] args) throws Exception {
    LOG.info("Running 'run' command.");

    final Options commandOptions = CliFrontendParser.getRunCommandOptions();
    final CommandLine commandLine = getCommandLine(commandOptions, args, true);

    // evaluate help flag
    if (commandLine.hasOption(HELP_OPTION.getOpt())) {
        CliFrontendParser.printHelpForRun(customCommandLines);
        return;
    }

    final CustomCommandLine activeCommandLine =
            validateAndGetActiveCommandLine(checkNotNull(commandLine));

    final ProgramOptions programOptions = ProgramOptions.create(commandLine);

    final List<URL> jobJars = getJobJarAndDependencies(programOptions);

    final Configuration effectiveConfiguration =
            getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

    LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

    try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
        executeProgram(effectiveConfiguration, program);
    }
}

直接看最后一行,executeProgram(effectiveConfiguration, program)触发执行就在于此,那么上面的代码根据方法命名可以猜到,同样是在做准备工作,配置加载,参数校验,顺着executeProgram我们找到它实际执行的动作,摘掉它一层层的面纱

//org.apache.flink.client.cli.CliFrontend#executeProgram
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
        throws ProgramInvocationException {
    ClientUtils.executeProgram(
            new DefaultExecutorServiceLoader(), configuration, program, false, false);
}

//org.apache.flink.client.ClientUtils#executeProgram
public static void executeProgram(
     PipelineExecutorServiceLoader executorServiceLoader,
     Configuration configuration,
     PackagedProgram program,
     boolean enforceSingleJobExecution,
     boolean suppressSysout)
     throws ProgramInvocationException {
 checkNotNull(executorServiceLoader);
 final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
 final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
 try {
     Thread.currentThread().setContextClassLoader(userCodeClassLoader);

     LOG.info(
             "Starting program (detached: {})",
             !configuration.getBoolean(DeploymentOptions.ATTACHED));

     ContextEnvironment.setAsContext(
             executorServiceLoader,
             configuration,
             userCodeClassLoader,
             enforceSingleJobExecution,
             suppressSysout);

     StreamContextEnvironment.setAsContext(
             executorServiceLoader,
             configuration,
             userCodeClassLoader,
             enforceSingleJobExecution,
         suppressSysout);

        try {
            program.invokeInteractiveModeForExecution();
        } finally {
            ContextEnvironment.unsetAsContext();
            StreamContextEnvironment.unsetAsContext();
        }
    } finally {
        Thread.currentThread().setContextClassLoader(contextClassLoader);
    }
}

看到program.invokeInteractiveModeForExecution(),再进去看看干了什么,

/**
 * This method assumes that the context environment is prepared, or the execution will be a
 * local execution by default.
 */
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
    FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
    try {
        callMainMethod(mainClass, args);
    } finally {
        FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
    }
}

有一个callMainMethod(mainClass, args),已经很明显了,接着进去,已经迫不及待了!

private static void callMainMethod(Class<?> entryClass, String[] args)
        throws ProgramInvocationException {
    Method mainMethod;
    ......
	try {
	    mainMethod = entryClass.getMethod("main", String[].class);
	} catch (NoSuchMethodException e) {
	    ......
	}

    try {
        mainMethod.invoke(null, (Object) args);
    } catch (IllegalArgumentException e) {
        ......
    }
}

源码可以看到做了很多的对于入口类、main方法限定关键字的校验,是否是public,是否是static,一切校验通过,就可以invoke了

到此,即执行了用户代码的main方法

17.8. StreamExecutionEnvironment

此时,用户代码中一般使用StreamExecutionEnvironment.getExecutionEnvironment()来获得一个流执行环境

StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();

那么当执行StreamExecutionEnvironment.getExecutionEnvironment()的时候都干了什么呢?

让我们进入到StreamExecutionEnvironment(org.apache.flink.streaming.api.environment.StreamExecutionEnvironment)中一探究竟,找到调用链路

//org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getExecutionEnvironment()
public static StreamExecutionEnvironment getExecutionEnvironment() {
    return getExecutionEnvironment(new Configuration());
}

//org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#getExecutionEnvironment(org.apache.flink.configuration.Configuration)
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
    return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
            .map(factory -> factory.createExecutionEnvironment(configuration))
            .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}

可以看到,这里使用工厂类StreamExecutionEnvironmentFactory(org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory)

来构造我们需要的StreamExecutionEnvironment,步骤是

1、先得到工厂类

2、调用工厂类的createExecutionEnvironment方法

3、如果没找到工厂,则调用createLocalEnvironment方法,创建一个本地运行环境

接下来,只要找到工厂类的“诞生”的位置就OK了

17.9. StreamExecutionEnvironmentFactory

进入到StreamExecutionEnvironmentFactory(org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory)中,使用快捷键control + H(macOS, windows不详)调出实现类,可以看到有3个使用了lambda表达式的实现类

StreamExecutionEnvironmentFactory

凭着大数据直觉,我们选第二个进去,此时我们来到了一个叫StreamContextEnvironmentorg.apache.flink.client.program.StreamContextEnvironment的类中,并且进入到方法org.apache.flink.client.program.StreamContextEnvironment#setAsContext中,还记得吗,这个方法很熟悉!

什么?不熟悉?没关系,看了这么多代码一定脑子懵懵的,我们慢慢接着分析~

其实这个方法就在上面的executeProgram方法中有过调用:

setAsContext

先看看这个方法干了啥:

public static void setAsContext(
        final PipelineExecutorServiceLoader executorServiceLoader,
        final Configuration configuration,
        final ClassLoader userCodeClassLoader,
        final boolean enforceSingleJobExecution,
        final boolean suppressSysout) {
    StreamExecutionEnvironmentFactory factory =
            conf -> {
                final List<String> errors = new ArrayList<>();
                final boolean allowConfigurations =
                        configuration.getBoolean(
                                DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS);
                if (!allowConfigurations && !conf.toMap().isEmpty()) {
                    conf.toMap()
                            .forEach(
                                    (k, v) ->
                                            errors.add(
                                                    ConfigurationNotAllowedMessage
                                                            .ofConfigurationKeyAndValue(k, v)));
                }
                Configuration mergedConfiguration = new Configuration();
                mergedConfiguration.addAll(configuration);
                mergedConfiguration.addAll(conf);
                return new StreamContextEnvironment(
                        executorServiceLoader,
                        mergedConfiguration,
                        userCodeClassLoader,
                        enforceSingleJobExecution,
                        suppressSysout,
                        allowConfigurations,
                        errors);
            };
    initializeContextEnvironment(factory);
}

和你想的一样,实例化一个StreamExecutionEnvironmentFactory,但是,你发现没?lambda表达式中,return的是一个StreamContextEnvironment对象,不是我们要的工厂呀!咋回事?

别慌~

StreamContextEnvironment

看见没,StreamContextEnvironment继承了StreamExecutionEnvironment,即StreamContextEnvironment就是我们苦苦寻找的StreamExecutionEnvironment,

但是细心地你又问了,刚才的代码段中左边是工厂类,右边是一段lambada表达式,但是lambada表达式返回的却不是工厂类啊,那是实实在在的StreamContextEnvironment呀,咋回事儿?

哈哈,事情变得有趣起来。

让我们再回去看看那个工厂类,这会眼睛睁大点看清楚了:

/** Factory class for stream execution environments. */
@PublicEvolving
@FunctionalInterface
public interface StreamExecutionEnvironmentFactory {

    /**
     * Creates a StreamExecutionEnvironment from this factory.
     *
     * @return A StreamExecutionEnvironment.
     */
    StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration);
}

这个工厂类使用了一个注解@FunctionalInterface,这就是大名鼎鼎的函数式注解,工厂类只有一个需要实现的方法,那么lambda表达式就可以直接实现该接口,因此

Lambda 表达式 conf -> { ... } 实际上实现了 createExecutionEnvironment 方法,而返回的 StreamContextEnvironment 是 StreamExecutionEnvironment 的子类,因此类型完全兼容。这是延迟绑定特性。

由此产生出了一个新话题:这里的工厂类为什么要这么写?延迟绑定有什么好处?和普通的使用时再手动构造工厂再创建实例有什么区别?

这里不做解答,也是对我自己的一个提醒,提醒我自己在看到这个问题时,都要去再搜索一遍该问题的答案,以加深印象。

由此,我们得到了我们需要的StreamExecutionEnvironment,该对象中包含了execute方法,可以用来触发执行整个用户代码所构成的任务的执行。

17.10. 发散话题

  • 上面的分析中得到的是StreamContextEnvironment,但其实还有LocalStreamEnvironment、RemoteStreamEnvironment和TestStreamEnvironment等,这些又是什么时候被激活使用的呢?程序是怎么发现当前处于什么环境的呢?

  • 这里用到了什么设计模式?(询问了deepseek发现使用了工厂模式+策略模式,StreamContextEnvironment使用了工厂模式,加载启动不同的environment则使用了策略模式)

  • StreamContextEnvironment有什么用?不是已经有了StreamExecutionEnvironment了吗,为什么还需要设计它?

问题留在这里,今后还需要慢慢探索学习flink的精妙之处。

上一页 下一页

© 版权所有 2020-2025, roohom。

利用 Sphinx 构建,使用的 主题 由 Read the Docs 开发.