SQL Is All Your Need: Flink SQL UDF

随着数据处理的逻辑变得越来越复杂,编写的SQL也会变得越加复杂,有时甚至会感觉SQL力不从心。这个时候就需要扩展SQL的表达能力,而UDF(用户自定义函数)就是这样一种扩张开发的机制,拓展系统的内置函数,实现自定义逻辑。本编文章我们就从具体场景出发,使用各种Flink UDF去优化或解决相关问题。

事前准备

  • UDF大致有以下几种

    • 数值函数(UDF): 将标量值转换成一个新标量值,如:DATE_FORMAT

    • 表值函数(UDTF): 将标量值转换成新的行数据, 如:EXPLODE

    • 聚合函数(UDAF):将多行数据里的标量值转换为成一个新标量值, 如:SUM

  • 不同计算引擎有相应的实现方法,本编文章我们只考虑Flink自定义UDF

    • 继承相应类:注意输出输入类型推导

    • 实现相应方法

    • 注册使用

注:文中不会给出详细代码,只会对一些重要的地方给予说明,详细实现可到代码库中查看。

ETL

文章SQL Is All Your Need: Flink SQL中我们提出了一个问题,如何简化ETL中重复的SQL代码,现在我们就用UDTF来实现这一目的。

INSERT INTO `dwd_log`
SELECT
    REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 1) AS `time`,
    REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 2) AS `key_word`,

    JSON_VALUE(REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 3), '$.key1' RETURNING INT) AS `key1`,
    JSON_VALUE(REGEXP_EXTRACT(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 3), '$.key2') AS `key2`
FROM
    `ods_log`
  • 原因:上述SQL看起来冗长最重要的原因是需要先将每个字段用正则提取,然后再对字段进行相同JSON解析操作,导致了很多重复代码。

  • 解决:可以在Flink SQL中编写UDTF,与常规的标量函数只能返回一个值不同,它可以返回任意多行,且每一行可以包含多列。通过LATERAL算子将外表(算子左侧)的每一行跟表值函数返回的所有行(算子右侧)进行笛卡尔积,这样就一次性提取多个字段,那我们的代码就能简化很多:

CREATE TEMPORARY FUNCTION IF NOT EXISTS q_regex_extract AS 'cn.syntomic.qflink.sql.udf.table.QRegexExtract' LANGUAGE JAVA;

SELECT
    `time`,
    `key_word`,
    `json`
FROM
    `ods_log`, LATERAL TABLE(q_regex_extract(`log`, '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.*?) (.*)', 'time', 'key_word', 'json'))
  • 同样地,我们利用hive module使用hive函数一次性进行JSON解析

LOAD MODULE hive WITH ('hive-version' = '3.1.3');

SELECT
    `time`,
    `key_word`,
    -- json_tuple return string type
    CAST(`key1` AS INT) AS `key1`,
    `key2`
FROM
    (
        -- above sql
        ...
    ) a, LATERAL TABLE(json_tuple(`json`, 'key1', 'key2')) AS b(`key1`, `key2`);
  • 这样我们就利用UDTF,减少了SQL中重复代码,形成统一的清洗逻辑,同时避免Flink重复计算的问题,对比之前的实现就看起来逻辑更加清晰,也更加简洁~

注: 我们再自定义函数中q_regex_extract没有别名就可以直接相应字段, 这是因为我们自定义了类型推导, 而在函数json_tuple中就必须显示别名相应字段:

public TypeInference getTypeInference(DataTypeFactory typeFactory) {
    return TypeInference.newBuilder()
                .outputTypeStrategy(
                        callContext -> {
                            List<Field> fields = new ArrayList<>(argsLen - 2);

                            for (int i = 2; i < argsLen; i++) {
                                // use literal parameter as field name
                                String fieldName =
                                        callContext
                                                .getArgumentValue(i, String.class)
                                                .orElse("f" + (i - 2));
                                fields.add(i - 2, DataTypes.FIELD(fieldName, DataTypes.STRING()));
                            }

                            return Optional.of(DataTypes.ROW(fields.toArray(new Field[0])));
                        })
                .build();
}

Analysis

文章SQL Is All Your Need: Flink SQL中我们计算了每分钟滑动窗口的UV。现在我们把问题变复杂一点:每分钟计算一次当天截至到目前的累积UV

  • Flink SQL在1.13之后提供了累积窗口的计算方式:

SELECT
    DATE_FORMAT(`window_start`, 'yyyy-MM-dd HH:mm:ss') AS `window_start`,
    DATE_FORMAT(`window_end`, 'yyyy-MM-dd HH:mm:ss') AS `window_end`,
    `key_word` AS `dim`,
    COUNT(DISTINCT `key2`) AS `metric`
FROM
    TABLE(CUMULATE(TABLE `dwd_log`, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES, INTERVAL '1' DAYS))
GROUP BY `key_word`, `window_start`, `window_end`;
  • 但如果需要我们自己实现的时候,似乎比较难以实现?我们这里只考虑离线的情况,因为Flink(1.15)离线计算时暂不支持累计窗口去重计算。

  • 一种比较直接的想法是首先计算每天所有的时间窗口,然后与小于当前结束窗口的数据JOIN, 计算出相应时间窗口的基数。但这样计算会导致数据膨胀严重,当数据量大时会导致数据倾斜等问题。可以采取下列方法进行优化:

    • 累积窗口计算只需要当天分组下最早时间的记录,这样可以大大减少数据量。

    • 调整相应资源以及参数实现自适应执行。

  • 其实大数据场景中有很多基数估算的方法,这里我们可以引入HLL(HyperLogLog) —— 基数统计的概率算法,用另外一种方法实现

    • 对HLL感兴趣的同学可以参考文章神奇的HyperLogLog,以及线上演示Demo

    • HLL可以在1.2kb内存下估算高达1亿个元素,而只有2%的误差!

    • 我们可以使用HLL数据结构这样去计算当天每分钟的累积人数:

CREATE TEMPORARY FUNCTION IF NOT EXISTS hll_agg AS 'cn.syntomic.qflink.sql.udf.aggregate.HLLAggregate' LANGUAGE JAVA;
CREATE TEMPORARY FUNCTION IF NOT EXISTS hll_cardinality AS 'cn.syntomic.qflink.sql.udf.scalar.HLLCardinality' LANGUAGE JAVA;

SELECT
    DATE_FORMAT(`min_time`, 'yyyy-MM-dd 00:00:00') AS `window_start`,
    FROM_UNIXTIME(UNIX_TIMESTAMP(`min_time`) + 60) AS `window_end`,
    `key_word` AS `dim`,
    -- 按照每分钟时间排序,合并第一行到当前行每分钟的HLL结构,并估算其基数
    hll_cardinality(hll_agg(min_hll) OVER (PARTITION BY `key_word` ORDER BY `min_time`)) AS `metric`
FROM
    (
        -- 聚合每分钟数据形成HLL数据结构
        SELECT
            DATE_FORMAT(`time`, 'yyyy-MM-dd HH:mm:00') AS `min_time`,
            `key_word`,
            hll_agg(`key2`) AS min_hll
        FROM
            `dwd_log`
        GROUP BY
            DATE_FORMAT(`time`, 'yyyy-MM-dd HH:mm:00'),
            `key_word`
    ) a;
  • 在Flink中HLL不是预定义的数据结构, 所以累加器中需要将其视为RAW数据格式,利用KYRO序列化

public class HLLBuffer {

    @DataTypeHint(allowRawGlobally = HintFlag.TRUE)
    public HLL hll = new HLL(11, 5);
}
  • 这样我们就利用udf引入新的数据结构,扩展了SQL的表达能力~

Python UDF

有时我们需要结合Python生态去实现动态执行、模型预测等功能,这个时候就可以引入Python UDF解决。这里我们以经典的鸢尾花机器学习分类为例。

  • 原理

    • 进程模式:Python函数和Java算子之间采用Grpc服务通信

    • 线程模式:Python函数和Java算子运行在同一个进程,利用FFI通信

      • 这样可以减少进程间序列化的开销,提升性能

  • 实现:

    • 实现相应Python类,导入模型进行预测

class DemoUDF(ScalarFunction):

    def open(self, function_context: FunctionContext):
        self.model = joblib.load(path)

    def eval(self, value):
        return self.model.predict(np.array(value).reshape(1, -1)).tolist()[0]


demo_udf = udf(DemoUDF(), result_type=DataTypes.INT())
  • 注册Python UDF

SET 'python.files'='./qflink-python/src/main/python/q-pyflink/udf';
-- 保证python算子获得足够资源
SET 'pipeline.operator-chaining' = 'false';
SET 'python.execution-mode'='thread';

CREATE TEMPORARY FUNCTION IF NOT EXISTS demo_udf AS 'scalar.demo_udf.demo_udf' LANGUAGE PYTHON;

CREATE TABLE IF NOT EXISTS `source` (
    `sepal_length` FLOAT,
    `sepal_width` FLOAT,
    `petal_length` FLOAT,
    `petal_width` FLOAT
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1'
);

SELECT
    demo_udf(ARRAY[`sepal_length`, `sepal_width`, `petal_length`, `petal_width`]) AS `classifier`
FROM
    `source`;

注: 需提前创建Python虚拟环境,安装pyflink依赖且激活;若Python算子是CPU密集型任务,则需调整TM CPU个数

Summary

大数据技术发展日新月异,但_SQL NEVER DIE_ !本文讨论了如何利用UDF来来提升SQL的表达能力,希望可以提升你对SQL Is ALL Your Need的信心。但技术没有银弹,SQL不可能解决所有的问题。不管是SQL还是底层API,其实都是解决问题的方式,需要根据具体问题采用合适的工具,甚至可以结合SQL的通用性以及底层API的灵活性来优雅地解决问题~

本文所有SQL代码可参考:SQL IS ALL Your Need: Flink SQL UDF


SQL Is All Your Need: Flink SQL UDF
https://syntomic.cn/archives/sql-is-all-your-need-flink-sql-udf
作者
syntomic
发布于
2023年05月11日
更新于
2024年08月27日
许可协议