85. [Paimon]Paimon表的读写

85.1. 前言

本文着重记录和探讨在Flink/Spark读写paimon表时遇到的问题和一些写法概要,不讨论基础概念和对其他复杂问题进行研究。下文讨论的场景中,flink版本为1.18.1, Paimon的版本为0.8.2

85.2. Flink与Paimon的集成

在我接触的环境中,flink主要用于写入paimon表,或者将paimon表作为流存储来实时消费

85.2.1. 读取/写入Paimon表

85.2.1.1. 通过catalog方式

如官网所说的,可以通过创建catalog的形式进行读取

CREATE CATALOG my_catalog WITH (
    'type' = 'paimon',
    'warehouse' = 'hdfs:///path/to/warehouse'
);

USE CATALOG my_catalog;

如果使用oss存储,那么warehouse对应的就是oss:///path/to/warehouse

接下来,就是直接写一些查询SQL对Paimon表进行读取和上层分析了

85.2.1.2. 通过Connector方式

还可以通过使用通用Connector形式进行读取Paimon表,

CREATE TEMPORARY TABLE source_paimon_table
(
    id    BIGINT COMMENT '试驾ID',
    status STRING COMMENT '状态',
    event_time       BIGINT COMMENT '事件时间',
    create_time      BIGINT COMMENT '创建时间',
    update_time      BIGINT COMMENT '更新时间',
    dt               STRING COMMENT '分区日期',
    ts AS TO_TIMESTAMP_LTZ(event_time, 3),
    WATERMARK FOR ts AS ts - INTERVAL '60' SECOND,
    PRIMARY KEY (test_drive_id,status,dt) NOT ENFORCED
)
COMMENT '事件数据'
PARTITIONED BY (dt)
WITH (
       'connector' = 'paimon',
       'warehouse' = 'oss://endpoint/paimon/',
       'path' = 'oss://endpoint/paimon/your_database.db/your_table_name',
       'table.local-time-zone' = 'Asia/Shanghai',
       'scan.watermark.idle-timeout'='5min'
     )

同样,接下来就是编写常用SQL对Paimon表进行查询分析

85.2.1.3. NOTE

  • 通过以上两种方式,都支持对paimon表进行实时消费(读取),也支持实时写入

  • 值得注意的是,在面的例子中,dt为分区字段,表的主键字段是id和status。在临时表创建声明中,需要同时指定id、status和dt为主键,即PRIMARY KEY (test_drive_id,status,dt) NOT ENFORCED

  • 如果想要创建物理表,则在建表中不需要声明TEMPORARY, 如果想要仅仅对表进行连接而不想创建物理表,则需要声明TEMPORARY,这样就仅在会话中生效,会话关闭连接自动消失

85.3. Spark与Paimon的集成

在我所接触的环境中,Spark主要用于对paimon表进行读取分析

85.3.1. 时间旅行

如果需要对一个paimon表进行时间旅行查询,官网介绍,有以下几种方式:

-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;

-- read the snapshot from specified timestamp 
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';

-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;

-- read tag 'my-tag'
SELECT * FROM t VERSION AS OF 'my-tag';

-- read the snapshot from specified watermark. will match the first snapshot after the watermark
SELECT * FROM t VERSION AS OF 'watermark-1678883047356';

如果没有试错经验,上面的例子其实会产生误解,就是不知道VERSION AS OF 1该写在哪里,可能会理解为,写在where过滤条件后,其实不是。

以下是一个具体例子:

SELECT vin,
       id,
       info,
       calc_mode,
       date_format(from_unixtime((create_time)/ 1000), 'yyyy-MM-dd HH:mm:ss') AS create_time,
       date_format(from_unixtime((update_time)/ 1000), 'yyyy-MM-dd HH:mm:ss') AS update_time,
       dt
  FROM paimon_catalog.paimon_database.paimon_table
  TIMESTAMP AS OF '2025-09-22 14:16:48'
  -- VERSION AS OF 16000
WHERE dt='2025-09-20'
AND test_drive_id=1969284160278945792
-- and vin='LSVN0000000000000001'
;

时间旅行声明需要紧随表名之后,其他地方与普通SQL没有差别