Flink Process Function
处理函数: ProcessFunction: 含有状态流的特性
处理函数面对的是数据流中的最基本元素: 数据事件 event, 状态 state, 时间 time
文章目录
- 1.基本处理函数 ProcessFunction
- 1.1 处理函数的功能和使用
- 1.2 ProcessFunction 解析
- 2.处理函数的分类
- 2.1 按键分区处理函数 KeyedProcessFunction
- 2.2 窗口处理函数 ProcessWindowFunction
- 3.TopN案例
- 3.1 ProcessAllWindowFunctionTopN
- 3.2 KeyedProcessWindowFunctionTopN
1.基本处理函数 ProcessFunction
1.1 处理函数的功能和使用
处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册定时事件。
继承了AbstractRichFunction, 可以获取到访问状态state和运行信息。
ProcessFunction 函数有点像 FlatMapFunction 的升级版。可以实现 Map、Filter、FlatMap 的所有功能。很明显,处理函数非常强大,能够做很多之前做不到的事情。
1.2 ProcessFunction 解析
抽象类 ProcessFunction 继承了 AbstractRichFunction
内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement(), 另一个是非抽象方法.onTimer()
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
...
public abstract void processElement(I value, Context ctx, Collector<O> out)throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
...
}
processElement(): 处理元素, 输入数据值 value, 上下文 ctx, 以及“收集器”(Collector)out。
- value: 当前流中的输入元素, 也就是正在处理的数据, 类型与流中数据类型一致。
- ctx: 类型是 ProcessFunction 中定义的内部抽象类 Context
public abstract class Context {
// 时间戳
public abstract Long timestamp();
// 定时服务
public abstract TimerService timerService();
// 侧输出流
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
onTimer(): 定时器服务, 用来触发定时器
public class ProcessTimeTimerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
environment.addSource(new ClickSource())
.keyBy(data -> data.user)
.process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
long curTs = ctx.timerService().currentProcessingTime();
out.collect(ctx.getCurrentKey() + "数据到达, 到达时间:" + new Timestamp(curTs));
ctx.timerService().registerProcessingTimeTimer(curTs + 10 * 1000L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + " 定时触发, 时间为:" + new Timestamp(timestamp));
}
})
.print();
environment.execute();
}
}
2.处理函数的分类
- ProcessFunction
- KeyedProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
2.1 按键分区处理函数 KeyedProcessFunction
KeyedProcessFunction 可以看作是ProcessFunction的扩展, 我们只要基于 keyBy 之后的 KeyedStream, 直接调用.process()方法。
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
{
...
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
public abstract class Context {...}
...
}
与 ProcessFunction 的定义几乎完全一样
使用处理时间定时器的具体示例
1.处理时间的定时器案例
public class ProcessTimeTimerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
environment.addSource(new ClickSource())
.keyBy(data -> data.user)
.process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
long curTs = ctx.timerService().currentProcessingTime();
out.collect(ctx.getCurrentKey() + "数据到达, 到达时间:" + new Timestamp(curTs));
ctx.timerService().registerProcessingTimeTimer(curTs + 10 * 1000L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + " 定时触发, 时间为:" + new Timestamp(timestamp));
}
})
.print();
environment.execute();
}
}
2.事件事件的定时器
public class EventTimeTimerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
)
.keyBy(data -> data.url)
.process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
long currTs = ctx.timerService().currentWatermark();
}
})
.print();
environment.execute();
}
}
2.2 窗口处理函数 ProcessWindowFunction
除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction 和 ProcessAllWindowFunction
stream.keyBy( t -> t.f0 )
.window( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessWindowFunction())
ProcessWindowFunction 是WindowedStream调用process()
ProcessWindowFunction 既是处理函数又是全窗口函数
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
extends AbstractRichFunction {
...
public abstract void process(
KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
public void clear(Context context) throws Exception {}
public abstract class Context implements java.io.Serializable {...}
}
Context 为上下文内部类
public abstract class Context implements java.io.Serializable {
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
3.TopN案例
- ProcessAllWindowFunctionTopN
- KeyedProcessWindowFunctionTopN
3.1 ProcessAllWindowFunctionTopN
方式1
public class ProcessAllWindowTopN {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}));
dataStream.map(new MapFunction<Event, String>() {
@Override
public String map(Event value) throws Exception {
return value.url;
}
})
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
@Override
public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
HashMap<String, Long> urlCountMap = new HashMap<>();
// 遍历窗口中数据,将浏览量保存到一个 HashMap 中
for (String url : elements) {
if (urlCountMap.containsKey(url)) {
long count = urlCountMap.get(url);
urlCountMap.put(url, count + 1L);
} else {
urlCountMap.put(url, 1L);
}
}
ArrayList<Tuple2<String, Long>> mapList = new
ArrayList<Tuple2<String, Long>>();
// 将浏览量数据放入 ArrayList,进行排序
for (String key : urlCountMap.keySet()) {
mapList.add(Tuple2.of(key, urlCountMap.get(key)));
}
mapList.sort(new Comparator<Tuple2<String, Long>>() {
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String,
Long> o2) {
return o2.f1.intValue() - o1.f1.intValue();
}
});
// 取排序后的前两名,构建输出结果
StringBuilder result = new StringBuilder();
result.append("========================================\n");
for (int i = 0; i < 2; i++) {
Tuple2<String, Long> temp = mapList.get(i);
String info = "浏览量 No." + (i + 1) +
" url:" + temp.f0 +
" 浏览量:" + temp.f1 +
" 窗 口 结 束 时 间 : " + new
Timestamp(context.window().getEnd()) + "\n";
result.append(info);
}
result.append("========================================\n");
out.collect(result.toString());
}
})
.print();
environment.execute();
}
}
不区分 url 链接, 全部的访问数据收集起来, 统一统计计算, 不用keyBy(), 直接开窗。
利用HashMap 保存 url 访问, 最后转为ArrayList, 然后排序, 取出前两名。
3.2 KeyedProcessWindowFunctionTopN
TopN: 方式2
public class TopNKeyedProcessFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
SingleOutputStreamOperator<UrlCountView> urlCountStream = dataStream.keyBy(data -> data.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
urlCountStream.keyBy(data -> data.end)
.process(new TopN(2))
.print();
environment.execute();
}
public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long>{
@Override
public Long createAccumulator() {
return 0l;
}
@Override
public Long add(Event value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) {
return accumulator;
}
@Override
public Long merge(Long a, Long b) {
return null;
}
}
public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlCountView, String, TimeWindow>{
@Override
public void process(String s, Context context, Iterable<Long> elements, Collector<UrlCountView> out) throws Exception {
Long start = context.window().getStart();
Long end = context.window().getEnd();
out.collect(new UrlCountView(s, elements.iterator().next(), start, end));
}
}
public static class TopN extends KeyedProcessFunction<Long, UrlCountView, String>{
private Integer count;
private ListState<UrlCountView> urlViewCountListState;
public TopN(Integer count){
this.count = count;
}
@Override
public void open(Configuration parameters) throws Exception {
urlViewCountListState = getRuntimeContext().getListState(
new ListStateDescriptor<UrlCountView>("url-view-count-list",
Types.POJO(UrlCountView.class)));
}
@Override
public void processElement(UrlCountView value, Context ctx, Collector<String> out) throws Exception {
urlViewCountListState.add(value);
ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 将数据从列表状态变量中取出,放入 ArrayList,方便排序
ArrayList<UrlCountView> urlViewCountArrayList = new ArrayList<>();
for (UrlCountView urlViewCount : urlViewCountListState.get()) {
urlViewCountArrayList.add(urlViewCount);
}
// 清空状态,释放资源
urlViewCountListState.clear();
urlViewCountArrayList.sort(new Comparator<UrlCountView>() {
@Override
public int compare(UrlCountView o1, UrlCountView o2) {
return o2.count.intValue() - o1.count.intValue();
}
});
// 取前两名,构建输出结果
StringBuilder result = new StringBuilder();
result.append("========================================\n");
result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");
for (int i = 0; i < this.count; i++) {
UrlCountView UrlViewCount = urlViewCountArrayList.get(i);
String info = "No." + (i + 1) + " "
+ "url:" + UrlViewCount.url + " "
+ "浏览量:" + UrlViewCount.count + "\n";
result.append(info);
}
result.append("========================================\n");
out.collect(result.toString());
}
}
}
========================================
窗口结束时间:2022-12-22 19:17:30.0
No.1 url:./fav 浏览量:2
No.2 url:./home 浏览量:2
========================================
========================================
窗口结束时间:2022-12-22 19:17:35.0
No.1 url:./home 浏览量:4
No.2 url:./fav 浏览量:3
========================================
========================================
窗口结束时间:2022-12-22 19:17:40.0
No.1 url:./home 浏览量:5
No.2 url:./fav 浏览量:2
========================================
这里利用了状态去保存访问次数