Code-Cookbook

博客

  • Blogs
    • 1. [Springboot x spark]java.util.concurrent.ExecutionException: Boxed Error
    • 2. [Apollo]Apollo Config Center
    • 3. [Confluent]Confluent快速上手
    • 4. [Flink]CommdLine+Springboot+flink无法指定配置文件启动
    • 5. [Flink]Flink on k8s任务的提交和实操
    • 6. [Flink]Flink-connector-http
    • 7. [Flink]FlinkKafkaProducer启用压缩
    • 8. [Flink]FlinkSQL时间处理函数
    • 9. [Flink]Flink Sources
    • 10. [Flink]Flink中自定义watermark生成器
    • 11. [Flink]Flink的并行度与TaskSlot
    • 12. [Flink]ProcessFunction无法使用,抛出InvalidProgramException
    • 13. [Flink]使用状态算子将stream聚合输出
    • 14. [Flink]关于Flink的Checkpoint的一次问题排查
    • 15. [Flink]多源实时行为报告的设计思路
    • 16. [Flink]如何更通用地将Kafka(或其他)数据落地Hive?
      • 16.1. 简述
      • 16.2. 开整
        • 16.2.1. SOURCE
        • 16.2.2. TRANSFORM
        • 16.2.3. SINK
      • 16.3. 附录
        • 16.3.1. 获取流表执行环境
        • 16.3.2. Hive建表
        • 16.3.3. Hive插入数据
    • 17. [Flink]监控Flink Metrics
    • 18. [Flink]自定义序列化消费Kafka数据
    • 19. [Flink源码]Flink任务是如何启动的
    • 20. [Flink源码]YarnApplication模式的任务启动
    • 21. [Flink源码]流式工厂模式与配置的延迟绑定
    • 22. [Git]Git问题
    • 23. [Git]误在Master分支开发并commit无法push
    • 24. [Hadoop]Hadoop distcp
    • 25. [Hadoop]一些Hadoop问题
    • 26. [Hive]Hive分区表批量删除分区
    • 27. [Hive]Hive的Analyze函数,Statistics in Hive
    • 28. [Hive]修改存储格式为Parquet的Hive表的字段类型
    • 29. [Hive]在指定位置添加字段
    • 30. [Hive]外部表修改为内部表
    • 31. [Hive]更新Metastore中的LastAccessTime
    • 32. [Hive]本地连接需要Kerberos认证的Hive
    • 33. [Java]如何根据需要动态生成Java的class
    • 34. [Java]元注解
    • 35. 注解解析
    • 36. [Java]集合
    • 37. 简单(常用)数据结构
    • 38. [Java]Java8 Stream API
    • 39. [Java]SPI和责任链模式
    • 40. [Java]Socket
    • 41. [Java]使用Java在服务端和客户端之间传送文件
    • 42. [Java]三种策略模式应用于服务的启动
    • 43. [Java]多线程
    • 44. [Java]生产者消费者模型问题
    • 45. [Java]让项目顺利读取resources目录下的文件
    • 46. [Java]设计模式
    • 47. Continuing…
    • 48. [Java]设计模式六大原则
    • 49. [Java]面向对象知识点梳理
    • 50. [Java]OOP防脱发指南
    • 51. [Kerberos]Message stream modified (41)错误
    • 52. [Kudu]关于Kudu Upsert列的问题
    • 53. [Kudu]关于Kudu列的顺序的修改
    • 54. [Logback]特定业务日志重定向
    • 55. [MongoDB]MongoDB基本查询
    • 56. [Paimon]Flink读写Nginx代理的OSS上的Paimon表
    • 57. [Pyspark]PySpark
    • 58. [PySpark]PySpark On Yarn
    • 59. [Spark]CDP上安装其他版本SPARK(SPARK3)
    • 60. [SQL]Druid SQL解析器
    • 61. [SQL]IN/NOT IN/EXISTS/NOT EXISTS的替代写法
    • 62. [SQL]IN OR NOT IN , IS A PROBLEM
    • 63. [SQL]SQLLineage解析SQL血缘
    • 64. [SQL]业务数据库中的create_time和update_time分析时的问题
    • 65. [SQL]为什么LEFT JOIN后总数却与右表的总数一样了?
    • 66. [SQL]求用户任意天连续登录(每天为第多少天连续登录)
    • 67. [SQL]计算指定日期的年-周(为某年的第多少周)
    • 68. [Scala]函数中闭包(Closure)和柯里化(Currying)
    • 69. [Shell]EOF
    • 70. [Shell] Zip命令
    • 71. [Shell]Shell脚本日期递增(起止日期内递增)
    • 72. [Shell]将字符串转换为数字进行大小比较
    • 73. [Shell]打印本机IP
    • 74. [SparkStreaming]消费kafka写入Hive失败的问题Lease timeout of 0 seconds expired
    • 75. [Spark]SparkSQL 列转行的一种方法
    • 76. [Spark]SparkSQL JDBC并发连接读取
    • 77. [Spark]Spark提交任务RSA premaster secret error
    • 78. [Spark]Springboot整合Spark, 本地、集群部署
    • 79. [Spark]如何使用Java创建一个Row
    • 80. [Spark]将Spark DataFrame中的数值取出
    • 81. [Springboot]okHttp错误:Exception in thread “OkHttp Dispatcher” java.lang.IllegalStateException: closed
    • 82. [Vim]Vim查找和替换命令
    • 83. [debezium]在启动任务时传入SQL语句生成Snapshot
    • 84. [debezium]热修改Debezium MySQL Connector配置
    • 85. [Paimon]Paimon表的读写
  • 文章
  • 项目

Random ramblings

  • Random ramblings

大数据

  • Bigdata
  • Bigdata Tools

大数据辅助工具

  • Auxiliary tools

SQL相关

  • SQL
Code-Cookbook
  • Blogs
  • 16. [Flink]如何更通用地将Kafka(或其他)数据落地Hive?
  • 查看页面源码

16. [Flink]如何更通用地将Kafka(或其他)数据落地Hive?

项目上经常遇到一些将Kafka数据落地Hive的需求,Spark可以通过SparkStreaming解决,但当下更流行的方式应该属Flink了,为了更好地学习Flink,特地归类总结了一些将Kafka数据或者其他数据源的数据落入Hive的代码,提炼一些主要的思路,并将一些核心代码整理在这里,方便后面自己回忆,在进行重复开发的时候可以开箱即用(Ctrl-C, Ctrl-V)。

16.1. 简述

大多数的数据源一般都是类似于Kafka这样的及时消息队列,下游可以是消息队列或者是离线存储系统(比如Hive、HDFS),也或许是MySQL之类的数据库。

有很简单的FlinkSQL可以使用,在这里不做讨论,以下讨论较为灵活的低阶API(DataStream API),并且以Flink消费Kafka数据落入Hive为例,基于Flink1.12.1版本。

大致流程都是:

  • 1、SOURCE,消费Kafka的数据

  • 2、TRANSFORM,转换,实现自己的业务逻辑

  • 3、SINK,将数据落到下游存储系统

废话少说,直接放码。

16.2. 开整

16.2.1. SOURCE

代码实现使用了Flink去消费Kafka,得到一个DataStream<Object>

Properties properties = new Properties();
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(
                "a_topic_to_consume",
                new SimpleStringShemma(),//一个合适的deserializer
                properties
        );
SingleOutputStreamOperator sourceStream = env.addSource(flinkKafkaConsumer);

16.2.2. TRANSFORM

为了方便落入Hive表中,我们将前面得到的DataStream<Object>处理成DataStream<Row>,经过以下处理得到了一个名为processedStream的数据流,流中的数据类型为Row。但是该Row上没有被指定合适的类型,Flink并不知道怎么去处理这些类型,如果不加处理,程序会抛出异常提示我们去手动分配。

SingleOutputStreamOperator<Row> processedStream = sourceStream.flatMap(
    new MbbVehFlatMapFunction(),
    TypeUtil.getTypeInformation(columnMap, columnList)
)

columnMap存储的是一个字段名和字段类型的映射关系,columnList存储的是字段名称,并且有序。

TypeUtil.getTypeInformation()内容如下:

/**
     * 将row中的字段指定合适的类型
     *
     * @param columnMap  存储有字段名及对应的字段类型的map
     * @param columnList 存储字段名的list
     * @return
     */
public static TypeInformation<Row> getTypeInformation(Map<String, String> columnMap, List<String> columnList) {
    List<String> fieldNames = new ArrayList<>();
    List<TypeInformation<?>> types = new ArrayList<>();
    for (String column : columnList) {
        String type = columnMap.get(column);
        fieldNames.add(column);
        types.add(TypeInformation.of(typeCheck(type)));
    }
    return new RowTypeInfo(types.toArray(new TypeInformation<?>[0]), fieldNames.toArray(new String[0]));
}

以上方法的作用是,将一个Row中没个字段都指定合适的类型,并且该类型被应用在前面得到的processedStream上。

16.2.3. SINK

在SINK部分,我们使用FlinkSQL去处理,首先将上面的到的Row流创建通过TableAPI创建一个临时表,再使用SQL将该临时表的数据INSERT到Hive表中。

String tempViewName = x + "_stream_view";
streamTableEnv.createTemporaryView(tempViewName, processedStream);

//使用hive方言
streamTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
//执行Hive建表语句
//some code here
streamTableEnv.executeSql("INSERT INTO TABLE xxx SELECT * FROM " + tempViewName); //也可能是动态分区插入数据,可以使用附录部分的util

​

16.3. 附录

16.3.1. 获取流表执行环境

public class FlinkSinkHiveProperties implements Serializable {
    private String catalog;

    private String database;

    private String confDir;

    private String tableName;

    private ArrayList<String> partition;

    /**
     * 分区处理规则 如:SUBSTRING(culumn_xxx, 1, 3)
     */
    private String partitionRule;

    //字段名列表
    private ArrayList<String> columnList;
}

/**
 * 获取流表执行环境
 *
 * @param env                     流环境
 * @param flinkSinkHiveProperties flink sink hive的配置
 * @return 流表执行环境
 */
public StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironment env, FlinkSinkHiveProperties flinkSinkHiveProperties) {
    EnvironmentSettings streamSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env, streamSetting);
    HiveCatalog hive = new HiveCatalog(
            flinkSinkHiveProperties.getCatalog(),
            flinkSinkHiveProperties.getDatabase(),
            flinkSinkHiveProperties.getConfDir()
    );
    streamTableEnv.registerCatalog(flinkSinkHiveProperties.getCatalog(), hive);
    streamTableEnv.useCatalog(flinkSinkHiveProperties.getCatalog());
    streamTableEnv.useDatabase(flinkSinkHiveProperties.getDatabase());
    return streamTableEnv;
}

16.3.2. Hive建表

/**
     * conn
     * 
     * param  tableEnv 表执行环境
     * @param table      注册table
     * @param columnList 字段列表
     * @param partition  分区字段名列表
     */
public void createTable(TableEnvironment tableEnv, String table, String columnList, ArrayList<String> partition) {
    String createSql = "\nCREATE EXTERNAL TABLE IF NOT EXISTS " + table + "(\n";
    createSql += columnList + ",etl_time TIMESTAMP)\n";
    //partition
    createSql += (partition == null || partition.size() == 0) ? "" : " PARTITIONED BY (" + partition.stream()
        .map(x -> "`" + x + "` STRING")
        .reduce((x, y) -> x + ", " + y)
        .orElse("") + ")\n";
    createSql += " STORED AS parquet \n" +
        " TBLPROPERTIES (\n" +
        "  'sink.rolling-policy.rollover-interval'='" + env.getProperty("flink.sink.hive.rolling-policy.rollover-interval") + "',\n" +
        "  'sink.partition-commit.trigger'='" + env.getProperty("flink.sink.hive.partition-commit.trigger") + "',\n" +
        "  'sink.partition-commit.delay'='" + env.getProperty("flink.sink.hive.partition-commit.delay") + "',\n" +
        "  'sink.partition-commit.policy.kind'='" + env.getProperty("flink.sink.hive.partition-commit.policy.kind") + "'\n" +
        ")";
    LOG.info("HiveUtil -> Create table sql is -> {}", createSql);
    tableEnv.executeSql(createSql);

以上实际的效果如:

CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
 `vin` STRING, 
 `factoryplatemodel` STRING, 
 `accbmodelname` STRING, 
 `accbtypecode` STRING, 
 `invoicekind` STRING, 
 `dealercode` STRING, 
 `dealername` STRING, 
 `salesorgname` STRING, 
 `invoicetypr` STRING, 
 `biztype` STRING, 
 `discountno` STRING, 
 `isvalid` STRING, 
 `invoicestatus` STRING, 
 `invoicedate` STRING, 
 `totaltaxamount` STRING, 
 `isdelivery` STRING, 
 `deliverydate` STRING, 
 `salestype` STRING,
 etl_time TIMESTAMP
)
 PARTITIONED BY (`dt` STRING)
 STORED AS parquet 
 TBLPROPERTIES (
  'sink.rolling-policy.rollover-interval'='1 min',
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='0s',
  'sink.partition-commit.policy.kind'='metastore,success-file'
)

16.3.3. Hive插入数据

/**
     * 生成动态插入分区的SQL语句
     *
     * @param table         表名
     * @param tempViewName  临时表名
     * @param columnList    字段列,以逗号分割,如: a,b,c
     * @param partition     分区字段
     * @param partitionRule 分区字段处理规则
     * @return insert语句
     */
public String insert(String table, String tempViewName, String columnList, ArrayList<String> partition, String partitionRule) {
    String insertSQL = "INSERT INTO TABLE " + table + " ";
    //PARTITION
    insertSQL += (partition == null || partition.size() == 0) ? "" : " PARTITION(" + partition.stream()
        .map(x -> "`" + x + "`")
        .reduce((x, y) -> x + ", " + y)
        .orElse("") + ")\n";
    //SELECT
    insertSQL += " SELECT " + columnList + ", NOW() AS etl_time, " + partitionRule;
    //PARTITION VALUE
    insertSQL += "\n FROM " + tempViewName;
    LOG.info("HiveUtil -> INSERT SQL IS -> {}", insertSQL);

    return insertSQL;
}

以上的实际效果如:

INSERT INTO TABLE test_table PARTITION(dt)
SELECT column_a,column_b,...,NOW() AS etl_time, SUBSTRING(column_b, 1,3)
FROM xxx_stream_view;
上一页 下一页

© 版权所有 2020-2026, roohom。

利用 Sphinx 构建,使用的 主题 由 Read the Docs 开发.