Flink规则引擎
随着业务发展,对风控能力的要求会越来越高,比如丰富的事件类型处理、不同的统计方式计算、动态的规则配置支持等。本篇文章我们就来讨论如何利用Flink构建一个规则引擎,去解决这些问题,来支持风控平台的建设。
风控业务
类型
事先风控:提前辨识异常,避免风险事件的发生。
事中风控:实时识别异常,减少风险事件的影响。
事后风控:总结分析异常,防止类似事件再次产生。
方法
基于规则
统计规则:例如5分钟以内访问次数大于100次
序列规则:例如用户点击、加入购物车、删除事件序列
基于算法
本篇文章只考虑利用统计规则做事中的风控:根据规则将实时数据源中异常事件筛选出来
文章所有代码见:RuleEngineJob
设计
Flink作业
利用参数划分作业处理规则的范围,一个作业只处理相同作业ID
job.id
的规则。由于Flink序列化效率的差别, 所以一个作业只处理相同Schema的数据,这样的就能统一采用
Row
数据类型进行高效序列化。这里我们引入Avro Schema定义字段类型:
{
"type": "record",
"name": "default",
"fields": [
{
"name": "rule_id",
"type": "int"
}
]
}
同时如果原始数据源量级比较大,我们可以先统一合并读取拆分出关心的数据,减少公共层压力, 所以这里也顺便提供了纯ETL的功能,通过作业参数
agg.enable = false
设置。
规则
我们提供下列配置项
过滤条件: 根据计算表达式过滤满足条件的数据
清洗条件
正则解析
JSONPath解析
字段表达式
分组条件:根据关注对象进行分组计算
窗口条件
窗口类型:滚动/滑动/累积
窗口触发:单次/批次
窗口大小及偏移
聚合条件
聚合过滤:计算表达式
聚合方法:SUM/COUNT/COUNT DISTINCT等
阈值条件:利用聚合指标的计算表达式判断
以及规则元数据
规则ID:主键
规则状态:控制规则生效状态
作业ID: 匹配相应Flink作业
例子:将数据进行过滤、解析、字段映射、窗口聚合计算、阈值判断的整体流程
{
"rule_id": 100,
"rule_state": "ACTIVE",
"job_id": "test",
"filter": {
"expr": "string.contains(raw, 'stdout')",
"params": [
"raw"
]
},
"flat_map": {
"pattern": "(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (.*?) (.*)",
"normal_fields": [
{
"name": "time",
"type": "STRING",
"default": null
},
{
"name": "key_word",
"type": "STRING",
"default": null
},
{
"name": "",
"type": "JSON",
"json_paths": [
{
"name": "key1",
"expr": "$.key1",
"type": "INT",
"default": 0
},
{
"name": "key2",
"expr": "$.key2",
"type": "STRING",
"default": null
}
],
"default": null
}
],
"mapping_fields": [
{
"name": "is_odd",
"type": "BOOLEAN",
"expr": "key1 % 2 == 1",
"default": null
}
]
},
"keys": [
"is_odd"
],
"window": {
"type": "TUMBLE",
"trigger": "SINGLE",
"offset": 0,
"size": 86400000,
"step": 0
},
"aggregates": [
{
"name": "val_cnt",
"inputs": [
"key2"
],
"method": "COUNT_DISTINCT"
}
],
"threshold": {
"expr": "val_cnt > 1",
"params": [
"val_cnt"
]
}
}
实现
Connector
Source:通过自定义
RowDeserializationSchema
根据传入的Avro Schema进行数据规范化Sink:根据
TopicSelector
(Kafka)根据关键字拆分到不同Topic
ETL算子
规则流:根据规则状态更新广播状态
数据流:遍历所有广播状态中规则进行处理
引入高性能、轻量级Aviator表达式引擎提升表达能力
如果下游需要聚合,根据
rule_id
+ 按照keys
字段取值进行分组。这样下游就相当于固定逻辑处理,减少代码复杂度。
注1:因为我们这里可以得到规则相应的聚合条件,所以可以在ETL算子中做预聚合减少下游数据量,提升吞吐量。
注2: 遍历规则会导致数据重复, 当规则过多时可能会产生性能问题。这里可以先合并相同过滤+窗口+聚合条件的规则, 减少处理压力。
AGG算子
Flink原生窗口算子不支持动态变更,所以我们需要设计重新窗口算子。
通过阅读Flink源码可知窗口实现过程:
通过
WindowAssigner
确定消息所在的窗口(可能属于多个窗口)将数据根据
AggregateFunction
聚合到对应窗口的状态中根据
Trigger
确定是否应该触发窗口结果的计算,如果使用 InternalWindowFunction 对窗口进行处理注册
EventTimeTimer
定时器,进行窗口触发计算及结束时清理窗口状态如果数据延迟到达,提交到
SideOutput
中
所以只要我们先从广播状态中根据当前分组拿到相应规则,就可以模拟窗口算子的逻辑,实现窗口的动态配置~
这里需要注意一些问题:
窗口触发:因为需要处理数据和规则的双流输入,而Flink的Watermark是取得双流中最小的Watermark,所以这里我们需要定义规则流的Watermark为周期性触发的
Long.MAX_VALUE
,这样才不会影响数据流正常窗口触发计算。状态清理:但规则移除后,肯定希望清理相应规则下的所有累积状态,要不然之后肯定会OOM。这里我们在移除广播流规则时,可以根据规则ID拿到规则相关的所有聚合状态进行删除。删除时也要注意并发问题,可以采用先复制相应键进行避免。
聚合计算:因为DataStream API中窗口是利用数据复制实现的,长时间周期,短步长的窗口类型会导致严重的性能问题。这里可以借鉴SQL API中Window slice实现。
注: 这里采用
AbstractStreamOperator
实现而不是标准的KeyedBroadcastProcessFunction
实现是因为需要得到窗口触发时对应的窗口对象,需要利用底层状态的Namespace
进行判断。
测试
作为一个通用型的平台作业,所有需要编写相应的单元测试、算子测试、作业测试保证代码质量,具体操作可见Flink测试。
这里我们将source 和 sink 设置成可插拔的,可以在不改动代码的条件下实现作业测试。
业界实现
社区提出动态CEP的提案, 希望提供动态CEP规则的支持,但暂时还没实现。
不过业界有对应实现:
我们提供的思路也可以去实现相应的动态CEP,不过感觉CEP的配置过于复杂,需要更好地设计前端平台,降低使用门槛是关键。
总结
本篇文章我们详细讨论了如何利用Flink构建规则引擎支持风控平台的建设,得到了一些构建复杂数据处理应用的经验,也更加深入理解了Flink处理数据的原理,向知其所以然迈向了坚实的一步~