Code-Cookbook

博客

  • Blogs
    • 1. [Springboot x spark]java.util.concurrent.ExecutionException: Boxed Error
    • 2. [Apollo]Apollo Config Center
    • 3. [Confluent]Confluent快速上手
    • 4. [Flink]CommdLine+Springboot+flink无法指定配置文件启动
    • 5. [Flink]Flink on k8s任务的提交和实操
    • 6. [Flink]Flink-connector-http
    • 7. [Flink]FlinkKafkaProducer启用压缩
    • 8. [Flink]FlinkSQL时间处理函数
    • 9. [Flink]Flink Sources
    • 10. [Flink]Flink中自定义watermark生成器
    • 11. [Flink]Flink的并行度与TaskSlot
    • 12. [Flink]ProcessFunction无法使用,抛出InvalidProgramException
    • 13. [Flink]使用状态算子将stream聚合输出
      • 13.1. 场景需求
      • 13.2. 模拟实现
        • 13.2.1. 定义流幻境以及数据源造数
        • 13.2.2. 划分批次
        • 13.2.3. 结果处理及输出
    • 14. [Flink]关于Flink的Checkpoint的一次问题排查
    • 15. [Flink]多源实时行为报告的设计思路
    • 16. [Flink]如何更通用地将Kafka(或其他)数据落地Hive?
    • 17. [Flink]监控Flink Metrics
    • 18. [Flink]自定义序列化消费Kafka数据
    • 19. [Flink源码]Flink任务是如何启动的
    • 20. [Flink源码]YarnApplication模式的任务启动
    • 21. [Flink源码]流式工厂模式与配置的延迟绑定
    • 22. [Git]Git问题
    • 23. [Git]误在Master分支开发并commit无法push
    • 24. [Hadoop]Hadoop distcp
    • 25. [Hadoop]一些Hadoop问题
    • 26. [Hive]Hive分区表批量删除分区
    • 27. [Hive]Hive的Analyze函数,Statistics in Hive
    • 28. [Hive]修改存储格式为Parquet的Hive表的字段类型
    • 29. [Hive]在指定位置添加字段
    • 30. [Hive]外部表修改为内部表
    • 31. [Hive]更新Metastore中的LastAccessTime
    • 32. [Hive]本地连接需要Kerberos认证的Hive
    • 33. [Java]如何根据需要动态生成Java的class
    • 34. [Java]元注解
    • 35. 注解解析
    • 36. [Java]集合
    • 37. 简单(常用)数据结构
    • 38. [Java]Java8 Stream API
    • 39. [Java]SPI和责任链模式
    • 40. [Java]Socket
    • 41. [Java]使用Java在服务端和客户端之间传送文件
    • 42. [Java]三种策略模式应用于服务的启动
    • 43. [Java]多线程
    • 44. [Java]生产者消费者模型问题
    • 45. [Java]让项目顺利读取resources目录下的文件
    • 46. [Java]设计模式
    • 47. Continuing…
    • 48. [Java]设计模式六大原则
    • 49. [Java]面向对象知识点梳理
    • 50. [Java]OOP防脱发指南
    • 51. [Kerberos]Message stream modified (41)错误
    • 52. [Kudu]关于Kudu Upsert列的问题
    • 53. [Kudu]关于Kudu列的顺序的修改
    • 54. [Logback]特定业务日志重定向
    • 55. [MongoDB]MongoDB基本查询
    • 56. [Paimon]Flink读写Nginx代理的OSS上的Paimon表
    • 57. [Pyspark]PySpark
    • 58. [PySpark]PySpark On Yarn
    • 59. [Spark]CDP上安装其他版本SPARK(SPARK3)
    • 60. [SQL]Druid SQL解析器
    • 61. [SQL]IN/NOT IN/EXISTS/NOT EXISTS的替代写法
    • 62. [SQL]IN OR NOT IN , IS A PROBLEM
    • 63. [SQL]SQLLineage解析SQL血缘
    • 64. [SQL]业务数据库中的create_time和update_time分析时的问题
    • 65. [SQL]为什么LEFT JOIN后总数却与右表的总数一样了?
    • 66. [SQL]求用户任意天连续登录(每天为第多少天连续登录)
    • 67. [SQL]计算指定日期的年-周(为某年的第多少周)
    • 68. [Scala]函数中闭包(Closure)和柯里化(Currying)
    • 69. [Shell]EOF
    • 70. [Shell] Zip命令
    • 71. [Shell]Shell脚本日期递增(起止日期内递增)
    • 72. [Shell]将字符串转换为数字进行大小比较
    • 73. [Shell]打印本机IP
    • 74. [SparkStreaming]消费kafka写入Hive失败的问题Lease timeout of 0 seconds expired
    • 75. [Spark]SparkSQL 列转行的一种方法
    • 76. [Spark]SparkSQL JDBC并发连接读取
    • 77. [Spark]Spark提交任务RSA premaster secret error
    • 78. [Spark]Springboot整合Spark, 本地、集群部署
    • 79. [Spark]如何使用Java创建一个Row
    • 80. [Spark]将Spark DataFrame中的数值取出
    • 81. [Springboot]okHttp错误:Exception in thread “OkHttp Dispatcher” java.lang.IllegalStateException: closed
    • 82. [Vim]Vim查找和替换命令
    • 83. [debezium]在启动任务时传入SQL语句生成Snapshot
    • 84. [debezium]热修改Debezium MySQL Connector配置
    • 85. [Paimon]Paimon表的读写
  • 文章
  • 项目

Random ramblings

  • Random ramblings

大数据

  • Bigdata
  • Bigdata Tools

大数据辅助工具

  • Auxiliary tools

SQL相关

  • SQL
Code-Cookbook
  • Blogs
  • 13. [Flink]使用状态算子将stream聚合输出
  • 查看页面源码

13. [Flink]使用状态算子将stream聚合输出

13.1. 场景需求

现在有这样一种场景,两个业务系统之间通过接口传递数据,A平台计算好数据之后将数据落入Hive,使用微服务将数据通过接口Sink到B平台,考虑到B端微服务的耐受程度,A端在Sink数据的时候需要将数据以批次的形式发送,比如一次发送一千条。

该场景里应用了,Flink读取Hive,将表转换成流之后自定义HttpSink将数据Sink到B端(这是我的个人的拙略办法,暂时这么解决)。我目前的难点在于如何将Hive表读取到的数据划分批次再Sink到Http接口,下面记录了我的一个思考和实现(模拟实现)。

13.2. 模拟实现

13.2.1. 定义流幻境以及数据源造数

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<Tuple2<Long, String>> list = new ArrayList<>();
Long i = 0L;
while (true) {
    list.add(Tuple2.of(1L, i + "-x"));
    i++;
    if (i > 100) {
        break;
    }
}

以上使用for循环造了100个Tuple2,f0为Long类型,f1为字符串

13.2.2. 划分批次

核心在于如何去对每个流中的元素进行计数,如何让分布式的程序知道该元素为流中的第多少个元素,下面是用Flink提供的State算子ValueState去达到该目的。

package me.roohom.operator;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import java.util.ArrayList;

public class ValueFlatMapFunction extends RichFlatMapFunction<Tuple2<Long, String>, String> {

    private transient ValueState<Long> reduceOut;
    private ArrayList<String> strList;

    @Override
    public void open(Configuration parameters) throws Exception {
        strList = new ArrayList<>();
        super.open(parameters);
        ValueStateDescriptor<Long> descriptor =
                new ValueStateDescriptor<>(
                        "reduceOutput", 
                        TypeInformation.of(new TypeHint<Long>() {
                        }), 
                        0L); 
        reduceOut = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, String> value, Collector<String> out) throws Exception {
        Long current = reduceOut.value();
        current += 1;
        strList.add(value.f1);
        reduceOut.update(current);
        if (current % 10 == 0) {
            out.collect(strList.toString());
            strList.clear();
        }
    }
}

以上自定义了一个RichFlatMapFunction,在open初始化方法中定义了一个ValueState算子为reduceOut,初始值为0。它的作用用来记录flatMap接收到的元素为流中的第多少个元素,每接收到一个元素就将该算子加1,我们还在open方法中定义了一个list,用来存放本批次的所有元素,当reduceOut接收到的元素值为批次大小的整数倍时,将list收集到的数据输出,并且清空list准备下一次的接收。在这里定义的批次大小为10。

13.2.3. 结果处理及输出

DataStreamSource<Tuple2<Long, String>> inputStream = env.fromCollection(list);
inputStream.keyBy(x -> x.f0)
        .flatMap(new ValueFlatMapFunction())
        .print();
env.execute();

以上将元素按照第0个元素分组,通过自定义flatmap方法处理聚合,输入数据共100个,批次大小为10,那么print时将会输出10组数据,每组10个,即一个长度为10的list。

flink-reduce-output

达到了我要的效果。

上一页 下一页

© 版权所有 2020-2026, roohom。

利用 Sphinx 构建,使用的 主题 由 Read the Docs 开发.