SQL Is All Your Need: Flink SQL
大数据开发简单地说就是从一个存储系统经过计算引擎的加工到另外一个存储系统的过程,如果把存储系统抽象为一张表,利用SQL进行处理,那么其实就和传统的数据库查询没有本质的区别。本篇文章利用Paimon和Flink SQL实现数据开发相关示例,迈向SQL Is All Your Need的第一步。
事先准备
数据收集不在我们这篇文章的讨论范围之内,这里我们假设已经有一张不断插入的包含原始日志数据的paimon表。
-- 使用paimon catalog记录元数据信息
CREATE CATALOG my_catalog WITH (
'type'='paimon',
'warehouse'='file:/tmp/paimon'
);
USE CATALOG my_catalog;
CREATE TABLE IF NOT EXISTS ods_log (
log STRING
) WITH (
-- 只考虑INSERT的情况
'write-mode' = 'append-only'
);
其中每条数据形如:
2023-04-09 15:40:05 stdout {"key1":5,"key2":"val1"}
2023-04-09 15:41:05 stdout {"key1":4,"key2":"val2"}
2023-04-09 15:42:05 stdout {"key1":1,"key2":"val2"}
2023-04-09 15:43:05 stdout {"key1":5,"key2":"val3"}
2023-04-09 15:44:05 stdout {"key1":6,"key2":"val4"}
ETL
原始日志数据一般不能直接使用,需要先进行结构化清洗,利用Flink实时处理能力,我们可以构造持续的数据管道进行加工:
CREATE TABLE IF NOT EXISTS `dwd_log` (
`time` STRING,
`key_word` STRING,
`key1` INT,
`key2` STRING,
-- 定义事件时间和watermark以缓解数据乱序的对下游计算的影响
`rowtime` AS TO_TIMESTAMP_LTZ(UNIX_TIMESTAMP(`time`), 0),
WATERMARK FOR `rowtime` AS `rowtime` - INTERVAL '10' SECOND
) WITH (
'write-mode' = 'append-only'
);
INSERT INTO `dwd_log`
SELECT
REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 1) AS `time`,
REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 2) AS `key_word`,
JSON_VALUE(REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 3), '$.key1' RETURNING INT) AS `key1`,
JSON_VALUE(REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 3), '$.key2') AS `key2`
FROM
`ods_log`;
作为一个有洁癖的程序员,一个正则表达式和json函数重复使用显然是不可接受的,需要思考一下如何简化我们的SQL?
Analysis
有了结构化的数据,我们就可以基于此计算指标并做一些分析,比如计算每一分钟的指标汇总值。这里我们利用Flink SQL的Windowing TVFs实现:
CREATE TABLE IF NOT EXISTS dws_metric (
`window_start` STRING,
`window_end` STRING,
`dim` STRING,
`metric` BIGINT,
PRIMARY KEY (`window_start`, `window_end`, `dim`) NOT ENFORCED
);
INSERT INTO dws_metric
SELECT
DATE_FORMAT(`window_start`, 'yyyy-MM-dd HH:mm:ss') AS `window_start`,
DATE_FORMAT(`window_end`, 'yyyy-MM-dd HH:mm:ss') AS `window_end`,
`key_word` AS `dim`,
COUNT(DISTINCT `key2`) AS `metric`
FROM
-- Windowing TVFs
TABLE(TUMBLE(TABLE `dwd_log`, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES))
GROUP BY `key_word`, `window_start`, `window_end`;
实际生产过程中,可能遇到性能问题,Flink也提供了相应性能调优的方法,比如
MiniBatch
,Local-Global
,Split Distinct
等。
Batch
之前我们都是默认使用的实时数据流,但有时实时流可能出现问题,这时一般采用离线修正的方法。因为Paimon + Flink组成了流批一体的存储计算,避免了传统Lambda架构在不同存储和计算引擎之间代码切换的麻烦,这里只需要添加一行代码切换成批执行即可:
SET 'execution.runtime-mode' = 'batch';
...
这样我们才是真正的实现了流批一体:使用同一套API、同一套开发范式来实现大数据的流计算和批计算,进而保证处理过程与结果的一致性。同时paimon的更新能力,给了数据处理更多的可能性~
注意:批计算时会将所有窗口触发,而流计算时只会触发watermark到达的窗口。
Event Driven
实时场景中的另外一类重要的应用就是事件驱动型应用,典型的就是监控风控场景。这里我们以监控每天的累积指标为例:
-- 设置状态过期时间
SET 'table.exec.state.ttl' = '2 d';
CREATE TABLE IF NOT EXISTS `dws_alert` (
`window_start` STRING,
`key` STRING,
`time` STRING,
`metric` BIGINT,
PRIMARY KEY (`window_start`, `key`) NOT ENFORCED
);
INSERT INTO `dws_alert`
SELECT
DATE_FORMAT(`time`, 'yyyyMMdd') AS `window_start`,
`key_word` AS `key`,
MAX(`time`) AS `time`,
SUM(`key1`) AS `metric`
FROM
`dwd_log`
GROUP BY
DATE_FORMAT(`time`, 'yyyyMMdd'), `key_word`
HAVING SUM(`key1`) > 10;
虽然利用SQL开发逻辑简单,但监控规则和阈值等不可以变动,如果采用不同任务执行不同规则,那就会极大的浪费计算资源,这时就需要思考如何支持动态规则?
CEP
Flink SQL也提供了复杂事件处理的能力,我们可以利用 MATCH_RECOGNIZE 子句实现更复杂的应用。这里以求单一日志字段取值不断下降的时期为例:
SELECT *
FROM dwd_log
MATCH_RECOGNIZE (
PARTITION BY key_word
ORDER BY rowtime
MEASURES
START_ROW.rowtime AS start_ts,
LAST(VAL_DOWN.rowtime) AS bottom_ts,
LAST(VAL_UP.rowtime) AS end_ts
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST VAL_UP
PATTERN (START_ROW VAL_DOWN+ VAL_UP)
DEFINE
VAL_DOWN AS
(LAST(VAL_DOWN.key1, 1) IS NULL AND VAL_DOWN.key1 < START_ROW.key1) OR
VAL_DOWN.key1 < LAST(VAL_DOWN.key1, 1),
VAL_UP AS
VAL_UP.key1 > LAST(VAL_DOWN.key1, 1)
) MR;
虽然需要重新学习模式匹配相关语法,这会导致SQL越来越复杂,但这也说明随着SQL标准的不断发展,SQL的表达能力也在不断地完善。
Funny Fact
虽然 SQL Is All Your Need 只是一个口号,也不会有人只用SQL去完成一些复杂灵活的任务,但如果只是单纯地从理论上来讲,这也是正确的~
Summary
本篇文章介绍了如何用统一的技术栈实现大数据开发,虽然只是一些简单的示例,但随着大数据的发展,技术正在逐渐走向融合,相信SQL Is All Your Need的未来~
本文所有代码可参考:SQL IS ALL Your Need: Flink SQL