当前位置: 首页 > news >正文

Flink Process Function

处理函数: ProcessFunction: 含有状态流的特性

处理函数面对的是数据流中的最基本元素: 数据事件 event, 状态 state, 时间 time
在这里插入图片描述

文章目录

      • 1.基本处理函数 ProcessFunction
        • 1.1 处理函数的功能和使用
        • 1.2 ProcessFunction 解析
      • 2.处理函数的分类
        • 2.1 按键分区处理函数 KeyedProcessFunction
        • 2.2 窗口处理函数 ProcessWindowFunction
      • 3.TopN案例
        • 3.1 ProcessAllWindowFunctionTopN
        • 3.2 KeyedProcessWindowFunctionTopN

1.基本处理函数 ProcessFunction

1.1 处理函数的功能和使用

处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册定时事件。

在这里插入图片描述

继承了AbstractRichFunction, 可以获取到访问状态state和运行信息。

在这里插入图片描述

ProcessFunction 函数有点像 FlatMapFunction 的升级版。可以实现 Map、Filter、FlatMap 的所有功能。很明显,处理函数非常强大,能够做很多之前做不到的事情。

1.2 ProcessFunction 解析

抽象类 ProcessFunction 继承了 AbstractRichFunction

内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement(), 另一个是非抽象方法.onTimer()

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
 ...
	public abstract void processElement(I value, Context ctx, Collector<O> out)throws Exception;
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
...
}

processElement(): 处理元素, 输入数据值 value, 上下文 ctx, 以及“收集器”(Collector)out。

  • value: 当前流中的输入元素, 也就是正在处理的数据, 类型与流中数据类型一致。
  • ctx: 类型是 ProcessFunction 中定义的内部抽象类 Context
public abstract class Context {
 // 时间戳
 public abstract Long timestamp();
 // 定时服务
 public abstract TimerService timerService();
 // 侧输出流
 public abstract <X> void output(OutputTag<X> outputTag, X value);
}

onTimer(): 定时器服务, 用来触发定时器

public class ProcessTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        environment.addSource(new ClickSource())
                .keyBy(data -> data.user)
                .process(new KeyedProcessFunction<String, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        long curTs = ctx.timerService().currentProcessingTime();
                        out.collect(ctx.getCurrentKey() + "数据到达, 到达时间:" + new Timestamp(curTs));
                        ctx.timerService().registerProcessingTimeTimer(curTs + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect(ctx.getCurrentKey() + " 定时触发, 时间为:" + new Timestamp(timestamp));
                    }
                })
                .print();

        environment.execute();
    }
}

在这里插入图片描述

2.处理函数的分类

  1. ProcessFunction
  2. KeyedProcessFunction
  3. ProcessWindowFunction
  4. ProcessAllWindowFunction
  5. CoProcessFunction
  6. ProcessJoinFunction
  7. BroadcastProcessFunction
  8. KeyedBroadcastProcessFunction

2.1 按键分区处理函数 KeyedProcessFunction

KeyedProcessFunction 可以看作是ProcessFunction的扩展, 我们只要基于 keyBy 之后的 KeyedStream, 直接调用.process()方法。

在这里插入图片描述

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction 
{
...
	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
	public abstract class Context {...}
...
}

与 ProcessFunction 的定义几乎完全一样

使用处理时间定时器的具体示例

1.处理时间的定时器案例

public class ProcessTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        environment.addSource(new ClickSource())
                .keyBy(data -> data.user)
                .process(new KeyedProcessFunction<String, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        long curTs = ctx.timerService().currentProcessingTime();
                        out.collect(ctx.getCurrentKey() + "数据到达, 到达时间:" + new Timestamp(curTs));
                        ctx.timerService().registerProcessingTimeTimer(curTs + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect(ctx.getCurrentKey() + " 定时触发, 时间为:" + new Timestamp(timestamp));
                    }
                })
                .print();

        environment.execute();
    }
}

在这里插入图片描述

2.事件事件的定时器

public class EventTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                )
                .keyBy(data -> data.url)
                .process(new KeyedProcessFunction<String, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        long currTs = ctx.timerService().currentWatermark();

                    }
                })
                .print();

        environment.execute();
    }
}

2.2 窗口处理函数 ProcessWindowFunction

除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction 和 ProcessAllWindowFunction

stream.keyBy( t -> t.f0 )
 .window( TumblingEventTimeWindows.of(Time.seconds(10)) )
 .process(new MyProcessWindowFunction())

ProcessWindowFunction 是WindowedStream调用process()

ProcessWindowFunction 既是处理函数又是全窗口函数

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
 extends AbstractRichFunction {
...
public abstract void process(
 KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
public void clear(Context context) throws Exception {}
public abstract class Context implements java.io.Serializable {...}
}

Context 为上下文内部类

public abstract class Context implements java.io.Serializable {
 public abstract W window();
 public abstract long currentProcessingTime();
 public abstract long currentWatermark();
 public abstract KeyedStateStore windowState();
 public abstract KeyedStateStore globalState();
 public abstract <X> void output(OutputTag<X> outputTag, X value);
}

3.TopN案例

  • ProcessAllWindowFunctionTopN
  • KeyedProcessWindowFunctionTopN

3.1 ProcessAllWindowFunctionTopN

方式1

public class ProcessAllWindowTopN {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));

        dataStream.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event value) throws Exception {
                return value.url;
            }
        })
                .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                        HashMap<String, Long> urlCountMap = new HashMap<>();
                        // 遍历窗口中数据,将浏览量保存到一个 HashMap 中
                        for (String url : elements) {
                            if (urlCountMap.containsKey(url)) {
                                long count = urlCountMap.get(url);
                                urlCountMap.put(url, count + 1L);
                            } else {
                                urlCountMap.put(url, 1L);
                            }
                        }
                        ArrayList<Tuple2<String, Long>> mapList = new
                                ArrayList<Tuple2<String, Long>>();

                        // 将浏览量数据放入 ArrayList,进行排序
                        for (String key : urlCountMap.keySet()) {
                            mapList.add(Tuple2.of(key, urlCountMap.get(key)));
                        }
                        mapList.sort(new Comparator<Tuple2<String, Long>>() {
                            @Override
                            public int compare(Tuple2<String, Long> o1, Tuple2<String,
                                    Long> o2) {
                                return o2.f1.intValue() - o1.f1.intValue();
                            }
                        });

                        // 取排序后的前两名,构建输出结果
                        StringBuilder result = new StringBuilder();

                        result.append("========================================\n");
                        for (int i = 0; i < 2; i++) {
                            Tuple2<String, Long> temp = mapList.get(i);
                            String info = "浏览量 No." + (i + 1) +
                                    " url:" + temp.f0 +
                                    " 浏览量:" + temp.f1 +
                                    " 窗 口 结 束 时 间 : " + new
                                    Timestamp(context.window().getEnd()) + "\n";
                            result.append(info);
                        }
                        result.append("========================================\n");
                        out.collect(result.toString());

                    }
                })
                .print();

        environment.execute();
    }
}

不区分 url 链接, 全部的访问数据收集起来, 统一统计计算, 不用keyBy(), 直接开窗。

利用HashMap 保存 url 访问, 最后转为ArrayList, 然后排序, 取出前两名。

3.2 KeyedProcessWindowFunctionTopN

TopN: 方式2

public class TopNKeyedProcessFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        SingleOutputStreamOperator<UrlCountView> urlCountStream = dataStream.keyBy(data -> data.url)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        urlCountStream.keyBy(data -> data.end)
                .process(new TopN(2))
                .print();


        environment.execute();
    }
    public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long>{

        @Override
        public Long createAccumulator() {
            return 0l;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlCountView, String, TimeWindow>{

        @Override
        public void process(String s, Context context, Iterable<Long> elements, Collector<UrlCountView> out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            out.collect(new UrlCountView(s, elements.iterator().next(), start, end));
        }
    }

    public static class TopN extends KeyedProcessFunction<Long, UrlCountView, String>{
        private Integer count;

        private ListState<UrlCountView> urlViewCountListState;

        public TopN(Integer count){
            this.count = count;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            urlViewCountListState = getRuntimeContext().getListState(
                    new ListStateDescriptor<UrlCountView>("url-view-count-list",
                            Types.POJO(UrlCountView.class)));
        }

        @Override
        public void processElement(UrlCountView value, Context ctx, Collector<String> out) throws Exception {
            urlViewCountListState.add(value);

            ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // 将数据从列表状态变量中取出,放入 ArrayList,方便排序
            ArrayList<UrlCountView> urlViewCountArrayList = new ArrayList<>();
            for (UrlCountView urlViewCount : urlViewCountListState.get()) {
                urlViewCountArrayList.add(urlViewCount);
            }
            // 清空状态,释放资源
            urlViewCountListState.clear();

            urlViewCountArrayList.sort(new Comparator<UrlCountView>() {
                @Override
                public int compare(UrlCountView o1, UrlCountView o2) {
                    return o2.count.intValue() - o1.count.intValue();
                }
            });

            // 取前两名,构建输出结果
            StringBuilder result = new StringBuilder();
            result.append("========================================\n");
            result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");
            for (int i = 0; i < this.count; i++) {
                UrlCountView UrlViewCount = urlViewCountArrayList.get(i);
                String info = "No." + (i + 1) + " "
                        + "url:" + UrlViewCount.url + " "
                        + "浏览量:" + UrlViewCount.count + "\n";
                result.append(info);
            }
            result.append("========================================\n");
            out.collect(result.toString());

        }
    }
}

在这里插入图片描述

在这里插入图片描述

========================================
窗口结束时间:2022-12-22 19:17:30.0
No.1 url:./fav 浏览量:2
No.2 url:./home 浏览量:2
========================================

========================================
窗口结束时间:2022-12-22 19:17:35.0
No.1 url:./home 浏览量:4
No.2 url:./fav 浏览量:3
========================================

========================================
窗口结束时间:2022-12-22 19:17:40.0
No.1 url:./home 浏览量:5
No.2 url:./fav 浏览量:2
========================================

这里利用了状态去保存访问次数

相关文章:

  • 百度站长平台网页手机/seo引擎搜索网站
  • 个人网站包括哪些内容/重庆seo优化
  • 做手机网站公司/网络营销有哪些形式
  • wordpress设置账号/互联网网络推广
  • 动漫设计专升本可以考哪些学校/贺州seo
  • 外贸流程询盘发盘/英文seo兼职
  • 【ChangeFormer】工程代码复现
  • ElasticSearch——刷盘原理流程
  • 【Kotlin 协程】Flow 异步流 ③ ( 冷流 | 流被收集时运行 | 流的连续性 )
  • 新能源汽车市场渗透率不断提高,锂电设备需求空间较大
  • Windows系统增强优化工具
  • 【ESP32+freeRTOS学习笔记-(三)任务】
  • 单片机——数码管
  • 家居建材行业数字化重构,依靠CRM打通全流程
  • 智创万物,数赢未来——如何助推数智时代的发展浪潮
  • DNS记录类型介绍(A记录、MX记录、NS记录等)
  • SpringBoot整合Mybatis之动态SQL
  • 前端基础_组合多个图形