阅读 147

FlinkSQL使用自定义UDTF函数行转列-IK分词器

FlinkSQL使用自定义UDTF函数行转列-IK分词器

一、背景说明

本文基于IK分词器,自定义一个UDTF(Table Functions),实现类似Hive的explode行转列的效果,以此来简明开发过程。

如下图Flink三层API接口中,Table API的接口位于最顶层也是最易用的一层,可以套用SQL语法进行代码编写,对于有SQL基础的能很快上手,但是不足之处在于灵活度有限,自有函数不能满足使用的时候,需要通过自定义函数实现,类似Hive的UDF/UDTF/UDAF自定义函数,在Flink也可以称之为Scalar Functions/Table Functions/Aggregate Functions。

二、效果预览

Kafka端建立生产者发送json片段:
在这里插入图片描述
IDEA侧消费数据处理后效果:
在这里插入图片描述
如上所示,形成类似Hive的exploed炸裂函数实现行转列的效果,当然也可以不用IK分词器,直接按空格进行split实现逻辑是一样的。

三、代码过程

由于Flink一般在流式环境使用,故这里数据源使用Kafka,并建立动态表的形式实现,以更好的贴近实际的业务环境。

  • 工具类:

package com.test.UDTF;import org.apache.flink.table.annotation.DataTypeHint;import org.apache.flink.table.annotation.FunctionHint;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.types.Row;import org.wltea.analyzer.core.IKSegmenter;import org.wltea.analyzer.core.Lexeme;import java.io.IOException;import java.io.StringReader;import java.util.ArrayList;import java.util.List;/**
 * @author: Rango
 * @create: 2021-05-04 16:50
 * @description: 建立函数,继承TableFunction并建立eval方法
 **/@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))public class KeywordUDTF extends TableFunction<Row> {    //按官方文档说明,须按eval命名
    public void eval(String value){
        List<String> stringList = analyze(value);        for (String s : stringList) {
            Row row = new Row(1);
            row.setField(0,s);
            collect(row);
        }
    }   //自定义分词方式
    public List<String> analyze(String text){        //字符串转文件流
        StringReader sr = new StringReader(text);        //建立分词器对象
        IKSegmenter ik = new IKSegmenter(sr,true);        //ik分词后对象为Lexeme
        Lexeme lex = null;        //分词后转入列表
        List<String> keywordList = new ArrayList<>();        while(true){            try {                if ((lex = ik.next())!=null){
                    keywordList.add(lex.getLexemeText());
                }else{                    break;
                }
            } catch(IOException e) {
                e.printStackTrace();
            }
        }return keywordList;
    }
}
  • 实现类

package com.test.UDTF;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;/**
 * @author: Rango
 * @create: 2021-05-04 17:11
 * @description:
 **/public class KeywordStatsApp {    public static void main(String[] args) throws Exception {        //建立环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);        //注册函数
        tableEnv.createTemporaryFunction("ik_analyze", KeywordUDTF.class);        //建立动态表
        tableEnv.executeSql("CREATE TABLE wordtable (" +                "word STRING" +                ") WITH ('connector' = 'kafka'," +                "'topic' = 'keywordtest'," +                "'properties.bootstrap.servers' = 'hadoop102:9092'," +                "'properties.group.id' = 'keyword_stats_app'," +                "'format' = 'json')");        //未切分效果
        Table wordTable = tableEnv.sqlQuery("select word from wordtable");        //利用自定义函数对文本进行分切,切分后计为1,方便后续统计使用
        Table wordTable1 = tableEnv.sqlQuery("select splitword,1 ct from wordtable," +                "LATERAL TABLE(ik_analyze(word)) as T(splitword)");

        tableEnv.toAppendStream(wordTable, Row.class).print("原格式>>>");
        tableEnv.toAppendStream(wordTable1, Row.class).print("使用UDTF函数效果>>>");

        env.execute();
    }
}
  • 补充下依赖

<properties>
        <java.version>1.8</java.version>
        <flink.version>1.12.0</flink.version>
        <scala.version>2.12</scala.version>
    </properties><dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.janeluo</groupId>
            <artifactId>ikanalyzer</artifactId>
            <version>2012_u6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

学习交流,有任何问题还请随时评论指出交流。

来源https://www.cnblogs.com/rango-lhl/p/14744611.html

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐