本文共 1817 字,大约阅读时间需要 6 分钟。
在 Flink SQL 中,动态表可以是仅追加型或更新型的表。版本表是一种特殊的更新表,它记录每个键的之前值。时态表可以是一个包含一个或多个版本快照的表,或者是一个跟踪所有变更记录的表(如数据库表的 changelog),也可以是物化所有变更后的表(如数据库表,只有最新快照)。
时态表可以划分为一系列带版本的快照集合。每个快照的版本表示记录的有效区间,有效时间范围可以由用户自定义。根据是否可以追溯自身的历史版本,时态表分为两种类型:
以订单流和产品表为例,订单表(order)从 Kafka 中实时获取订单流,产品 changelog 表(product_changelog)从数据库表 products 中获取。产品价格随时间动态变化。
在特定时间点查询产品 changelog 表的版本:
某些场景需要连接变化的维表。LatestRates 是一个物化的最新汇率表(如 HBase 表)。它总是表示最新汇率数据。
在 Flink 中,定义主键约束和事件时间属性的表即为版本表。Flink 支持通过去重查询定义版本视图。
以下是定义版本表的示例:
-- 定义版本表CREATE TABLE product_changelog ( product_id STRING, product_name STRING, product_price DECIMAL(10, 4), update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, PRIMARY KEY (product_id) NOT ENFORCED, WATERMARK FOR update_time AS update_time)WITH ( 'connector' = 'kafka', 'topic' = 'products', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'value.format' = 'debezium-json');
以下是定义版本视图的示例:
-- 定义版本视图CREATE VIEW versioned_rates ASSELECT currency, rate, currency_timeFROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY currency ORDER BY currency_time DESC) AS rowNum FROM RatesHistory) WHERE rowNum = 1;
通过上述方法,可以在 Flink 中高效管理和查询动态数据表,充分发挥其时态数据处理能力。
转载地址:http://gtefk.baihongyu.com/