SQL Is All Your Need: Flink Dynamic SQL

实时监控是Flink一个重要且复杂的应用场景,所以一般不会只采用SQL去实现。但本篇文章我们将从一个简单的问题出发,挑战只使用SQL来满足逐渐复杂的需求,希望最终可以再次拓宽SQL的使用边界。

事先准备

  • 我们有日志流如下:

CREATE TABLE IF NOT EXISTS `dwd_log` (
    `time` STRING COMMENT '事件时间',
    `key_word` STRING COMMENT '日志标识',
    `key1` INT COMMENT '指标',
    `key2` STRING COMMENT '维度'
) WITH (
    ...
);
  • 数据示例如下:

{"time":"2023-04-09 15:40:05","key_word":"stdout","key1":5,"key2":"val1"}
  • 我们将根据一些规则从这个日志流中筛选出关注的事件。

阶段一:需求的开始

规则1:日志关键字为stdout每天不同维度key2下获得指标key1次数大于1的事件

  • 这个规则可以用简单的SQL实现如下:

SELECT
    1 AS `rule_id`,
    DATE_FORMAT(`time`, 'yyyy-MM-dd 00:00:00') AS `window_start`,
    `key2` AS `key`,
    MAX(`time`) AS `alert_time`,
    MAP['key1', CAST(COUNT(`key1`) AS DOUBLE)] AS `alert_metrics`
FROM
    `dwd_log`
WHERE
    `key_word` = 'stdout'
GROUP BY
    `key2`,
    DATE_FORMAT(`time`, 'yyyy-MM-dd 00:00:00')
HAVING
    COUNT(`key1`) > 1;
  • 所以对这种单个简单的统计规则,用Flink SQL实现会非常方便,但当规则越来越多时呢?

阶段二:需求的增长

规则2:不同日志关键字stdout每小时去重维度key2数量大于2的事件

  • 当多个类似的需求出现时,最简单的方法是再起个作业计算,但我们这里想在一个作业里实现,可以直接将不同规则UNION起来:

SELECT
    1 AS `rule_id`,
    DATE_FORMAT(`time`, 'yyyy-MM-dd 00:00:00') AS `window_start`,
    `key2` AS `key`,
    MAX(`time`) AS `alert_time`,
    MAP['key1_cnt', CAST(COUNT(`key1`) AS DOUBLE)] AS `alert_metrics`
FROM
    `dwd_log`
WHERE
    `key_word` = 'stdout'
GROUP BY
    `key2`,
    DATE_FORMAT(`time`, 'yyyy-MM-dd 00:00:00')
HAVING
    COUNT(`key1`) > 1
UNION ALL
SELECT
    2 AS `rule_id`,
    DATE_FORMAT(`time`, 'yyyy-MM-dd HH:00:00') AS `window_start`,
    `key_word` AS `key`,
    MAX(`time`) AS `alert_time`,
    MAP['key2_cnt', CAST(COUNT(DISTINCT `key2`) AS DOUBLE)] AS `alert_metrics`
FROM
    `dwd_log`
GROUP BY
    `key_word`,
    DATE_FORMAT(`time`, 'yyyy-MM-dd HH:00:00')
HAVING
    COUNT(DISTINCT `key2`) > 2;
  • 虽然这样实现逻辑简单,但涉及重复消费,计算消耗大,且将规则硬编码在代码中,不够灵活。我们需要在达到临界点时之前找到更好的解决方法。

阶段三:需求的爆发

  • 我们看到当类似的需求增长后,将面临以下问题:

    • 重复消费导致的计算资源浪费

    • 合并后代码过长导致无法维护

    • 硬编码导致规则没有灵活调整

  • 所以这时需要重构SQL代码,找到一种不用改变SQL又能支持规则CRUD的方法

    • 仔细思考一下现有的规则1和规则2,发现它们都是由相同的部分组成

      • 过滤规则:对应WHERE条件

      • 分组规则:对应监控对象, 规则1中的key2字段以及规则2中的key_word字段

      • 窗口规则:对应监控周期,规则1中的以及规则2的小时

      • 聚合规则:对应聚合计算,规则1中的COUNT方法以及规则2中的COUNT DISTINCT方法

      • 阈值规则:对应HAVING条件

  • 这时我们可以抽象出一个规则模型,将规则放入另外一张(Paimon)表中维护

CREATE TABLE IF NOT EXISTS `dim_rule` (
    `rule_id` INT COMMENT '规则id',
    `job_id` STRING COMMENT '处理规则作业id',
    `filter` STRING COMMENT '过滤规则',
    `key` STRING COMMENT '分组规则',
    `window` STRING COMMENT '窗口规则',
    `aggregate` ROW<`name` STRING, `input` STRING, `method` STRING> COMMENT '聚合规则',
    `threshold` STRING COMMENT '阈值规则',
    PRIMARY KEY(`rule_id`) NOT ENFORCED
);
  • 规则示例为:

{
    "rule_id": 1,
    "job_id": "test",
    "filter": "key_word=='stdout'", -- 实际为EXPR$0=='stdout'
    "key": "key2", -- 实际为EXPR$2
    "window": "DAY",
    "aggregate": {
        "name": "key1_cnt",
        "input": "key1", -- 实际为EXPR$1
        "method": "COUNT",
    },
    "threshold_rule": "key1_cnt>1"
}
  • 有了规则的抽象之后,我们就可以拿日志数据去关联规则,然后根据规则去做具体的处理,当然这里就需要引入几个UDF来增强SQL的表达能力(这里我们不去讨论UDF具体实现)

    • dynamic_key(UDF): 根据分组规则取日志中的相应值

    • dynamic_window(UDF):根据窗口规则划分日志到相应窗口,类似之前的DATE_FORMAT函数

    • dynamic_filter(UDF):根据过滤规则或阈值规则的表达式,判断表达式是否满足条件

    • dynamic_agg(UDAF):根据聚合规则中计算聚合值,最终输出指标名:值的映射

  • 重构后的代码如下:

SELECT
    `rule_id`,
    `window_start`,
    `key`,
    MAX(`time`) AS `alert_time`,
    -- 根据聚合规则进行计算
    dynamic_agg(`data`, `aggregate`) AS `alert_metrics`
FROM
    (
        SELECT
            `rule_id`,
            dynamic_window(`time`, `window`) AS `window_start`,
            dynamic_key(`data`, `key`) AS `key`,
            `aggregate`,
            `threshold`,
            `time`,
            -- 其中`data`为ROW(`key_word`, `key1`, `key2`),由于规则中是不确定的,需要可以访问到日志中每一个字段
            `data`
        FROM
            `dwd_log` AS `log`
        LEFT JOIN
            -- Paimon规则维表查询
            `dim_rule` FOR SYSTEM_TIME AS OF `log`.`proc_time` AS `rule`
        -- 只关联作业相关规则
        ON `rule`.`job_id` = 'test'
        WHERE
            -- 根据规则过滤日志
            dynamic_filter(`data`, `filter`) AND `rule_id` IS NOT NULL
    ) `log_with_rule`
GROUP BY
    -- 根据规则分组
    `key`, `rule_id`, `window_start`, `threshold`, `aggregate`
HAVING
    -- 根据阈值规则判断聚合结果是否超过阈值
    dynamic_filter(dynamic_agg(`data`, `aggregate`), `threshold`);

注:在实现过程中发现无法重命名ROW类型中字段,所以在上述规则示例中实际需要指定默认名,即EXPR$0, EXPR$1, EXPR$2。大家也可以思考一下为什么不用MAP类型?

  • 经过这样的重构之后,SQL代码就固定了,所有操作只是针对规则的CRUD。但还会有什么其他问题吗?

阶段四:需求的挑战

  • 上述方法采用维表关联,每来一条数据都需要对维表进行全表扫描,当日志和规则量级增加后,会带来相应的性能问题。虽然可以采用异步、缓存等SQL HINTS缓解,但其实不是根本的解决方案。

  • 幸好Paimon提供了增量读的功能,我们可以将规则表全部读取到Flink中,并可以捕获规则表中的数据变更,这样就和数据源解耦了,不会带来额外的性能压力。这只需改变一行代码就可以实现:

SELECT
    ...
FROM
    (
        SELECT
            ...
        FROM
            `dwd_log` AS `log`
        LEFT JOIN
            -- 修改为常规的左连接
            `dim_rule` AS `rule`
        ON `rule`.`job_id` = 'test'
        ...
    ) `log_with_rule`
...
  • 看起来去利用SQL进行复杂监控的目标达到了,但事实真的是这样吗?

真相

  • 其实我们还会碰见一系列棘手的问题:

    • 报警控制:SQL的实现周期内报警会反复触发报警,如需控制报警次数,只能下游再进行处理

    • 窗口类型:SQL的实现只能在指定周期内实时累积,不能定时轮询(比如每天触发一次)

    • 状态控制:SQL的实现当规则修改时没法细粒度地控制之前累积状态,作业在长期运行后会不堪重负等

  • 但实际工作中,我们大部分碰到的问题其实就是类似阶段一那样确定的规则,都可以利用SQL方便地进行解决。而从长远来看,上层的数据应用会变得更加简单,对于最终用户,所有的数据都可以使用SQL方式进行分析,这就是我理解的SQL Is All Your Need愿景。

总结

  • 在本篇文章中看到了SQL解决复杂问题潜力,但也明白SQL解决问题的局限性,但这并不影响我们的愿景, 而且在随着AI的发展,可以憧憬仅用自然语言处理数据的时代,Keep up with the times ~

本文所有SQL代码可参考:SQL IS ALL Your Need: Flink Dynamic SQL


SQL Is All Your Need: Flink Dynamic SQL
https://syntomic.cn/archives/sql-is-all-your-need-flink-dynamic-sql
作者
syntomic
发布于
2023年06月15日
更新于
2024年08月27日
许可协议