Flink规则引擎

随着业务发展,对风控能力的要求会越来越高,比如丰富的事件类型处理、不同的统计方式计算、动态的规则配置支持等。本篇文章我们就来讨论如何利用Flink构建一个规则引擎,去解决这些问题,来支持风控平台的建设。

风控业务

  • 类型

    • 事先风控:提前辨识异常,避免风险事件的发生。

    • 事中风控:实时识别异常,减少风险事件的影响。

    • 事后风控:总结分析异常,防止类似事件再次产生。

  • 方法

    • 基于规则

      • 统计规则:例如5分钟以内访问次数大于100次

      • 序列规则:例如用户点击、加入购物车、删除事件序列

    • 基于算法

  • 本篇文章只考虑利用统计规则做事中的风控:根据规则将实时数据源中异常事件筛选出来

文章所有代码见:RuleEngineJob

设计

  • 利用广播流广播规则到各个算子上,然后数据遍历相应规则进行处理输出。原始思路可见Flink官方博客

Flink作业

  • 利用参数划分作业处理规则的范围,一个作业只处理相同作业IDjob.id的规则。

  • 由于Flink序列化效率的差别, 所以一个作业只处理相同Schema的数据,这样的就能统一采用Row数据类型进行高效序列化。

{
    "type": "record",
    "name": "default",
    "fields": [
        {
            "name": "rule_id",
            "type": "int"
        }
    ]
}
  • 同时如果原始数据源量级比较大,我们可以先统一合并读取拆分出关心的数据,减少公共层压力, 所以这里也顺便提供了纯ETL的功能,通过作业参数agg.enable = false设置。

规则

  • 我们提供下列配置项

    • 过滤条件: 根据计算表达式过滤满足条件的数据

    • 清洗条件

      • 正则解析

      • JSONPath解析

      • 字段表达式

    • 分组条件:根据关注对象进行分组计算

    • 窗口条件

      • 窗口类型:滚动/滑动/累积

      • 窗口触发:单次/批次

      • 窗口大小及偏移

    • 聚合条件

      • 聚合过滤:计算表达式

      • 聚合方法:SUM/COUNT/COUNT DISTINCT等

    • 阈值条件:利用聚合指标的计算表达式判断

    • 以及规则元数据

      • 规则ID:主键

      • 规则状态:控制规则生效状态

      • 作业ID: 匹配相应Flink作业

  • 例子:将数据进行过滤、解析、字段映射、窗口聚合计算、阈值判断的整体流程

{
    "rule_id": 100,
    "rule_state": "ACTIVE",
    "job_id": "test",
    "filter": {
        "expr": "string.contains(raw, 'stdout')",
        "params": [
            "raw"
        ]
    },
    "flat_map": {
        "pattern": "(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (.*?) (.*)",
        "normal_fields": [
            {
                "name": "time",
                "type": "STRING",
                "default": null
            },
            {
                "name": "key_word",
                "type": "STRING",
                "default": null
            },
            {
                "name": "",
                "type": "JSON",
                "json_paths": [
                    {
                        "name": "key1",
                        "expr": "$.key1",
                        "type": "INT",
                        "default": 0
                    },
                    {
                        "name": "key2",
                        "expr": "$.key2",
                        "type": "STRING",
                        "default": null
                    }
                ],
                "default": null
            }
        ],
        "mapping_fields": [
            {
                "name": "is_odd",
                "type": "BOOLEAN",
                "expr": "key1 % 2 == 1",
                "default": null
            }
        ]
    },
    "keys": [
        "is_odd"
    ],
    "window": {
        "type": "TUMBLE",
        "trigger": "SINGLE",
        "offset": 0,
        "size": 86400000,
        "step": 0
    },
    "aggregates": [
        {
            "name": "val_cnt",
            "inputs": [
                "key2"
            ],
            "method": "COUNT_DISTINCT"
        }
    ],
    "threshold": {
        "expr": "val_cnt > 1",
        "params": [
            "val_cnt"
        ]
    }
}

实现

Connector

  • Source:通过自定义RowDeserializationSchema根据传入的Avro Schema进行数据规范化

  • Sink:根据TopicSelector(Kafka)根据关键字拆分到不同Topic

ETL算子

  • 规则流:根据规则状态更新广播状态

  • 数据流:遍历所有广播状态中规则进行处理

    • 引入高性能、轻量级Aviator表达式引擎提升表达能力

    • 如果下游需要聚合,根据rule_id + 按照keys字段取值进行分组。这样下游就相当于固定逻辑处理,减少代码复杂度。

注1:因为我们这里可以得到规则相应的聚合条件,所以可以在ETL算子中做预聚合减少下游数据量,提升吞吐量。

注2: 遍历规则会导致数据重复, 当规则过多时可能会产生性能问题。这里可以先合并相同过滤+窗口+聚合条件的规则, 减少处理压力。

AGG算子

  • Flink原生窗口算子不支持动态变更,所以我们需要设计重新窗口算子。

  • 通过阅读Flink源码可知窗口实现过程:

    • 通过WindowAssigner确定消息所在的窗口(可能属于多个窗口)

    • 将数据根据AggregateFunction聚合到对应窗口的状态中

    • 根据Trigger确定是否应该触发窗口结果的计算,如果使用 InternalWindowFunction 对窗口进行处理

    • 注册EventTimeTimer定时器,进行窗口触发计算及结束时清理窗口状态

    • 如果数据延迟到达,提交到SideOutput

  • 所以只要我们先从广播状态中根据当前分组拿到相应规则,就可以模拟窗口算子的逻辑,实现窗口的动态配置~

  • 这里需要注意一些问题:

    1. 窗口触发:因为需要处理数据和规则的双流输入,而Flink的Watermark是取得双流中最小的Watermark,所以这里我们需要定义规则流的Watermark为周期性触发的Long.MAX_VALUE,这样才不会影响数据流正常窗口触发计算。

    2. 状态清理:但规则移除后,肯定希望清理相应规则下的所有累积状态,要不然之后肯定会OOM。这里我们在移除广播流规则时,可以根据规则ID拿到规则相关的所有聚合状态进行删除。删除时也要注意并发问题,可以采用先复制相应键进行避免。

    3. 聚合计算:因为DataStream API中窗口是利用数据复制实现的,长时间周期,短步长的窗口类型会导致严重的性能问题。这里可以借鉴SQL API中Window slice实现。

注: 这里采用AbstractStreamOperator实现而不是标准的KeyedBroadcastProcessFunction实现是因为需要得到窗口触发时对应的窗口对象,需要利用底层状态的Namespace进行判断。

测试

  • 作为一个通用型的平台作业,所有需要编写相应的单元测试、算子测试、作业测试保证代码质量,具体操作可见Flink测试

    • 这里我们将source 和 sink 设置成可插拔的,可以在不改动代码的条件下实现作业测试。

业界实现

  • 社区提出动态CEP的提案, 希望提供动态CEP规则的支持,但暂时还没实现。

  • 不过业界有对应实现:

  • 我们提供的思路也可以去实现相应的动态CEP,不过感觉CEP的配置过于复杂,需要更好地设计前端平台,降低使用门槛是关键。

总结

本篇文章我们详细讨论了如何利用Flink构建规则引擎支持风控平台的建设,得到了一些构建复杂数据处理应用的经验,也更加深入理解了Flink处理数据的原理,向知其所以然迈向了坚实的一步~


Flink规则引擎
https://syntomic.cn/archives/flink-rule-engine
作者
syntomic
发布于
2023年07月15日
更新于
2024年08月27日
许可协议