当前位置: 首页 > news >正文

2023-01-18 flink 11.6 时间水印 和 窗口周期的关系计算方法

forBoundedOutOfOrderness 和 TumblingEventTimeWindows

forBoundedOutOfOrderness(M)

TumblingEventTimeWindows(N)

第一条数据的时间TS1

第一个窗口期公式:

窗口开始时间:

win_start = ((TS1-M)/N) * N

窗口结束时间:

win_end = win_start+N

数据过期:

凡是<win_start都是过期数据

第一个窗口汇总计算触发:

与数据之间的接收的间隔时间无关,与总时长也无关。

只与接收到的数据的时间TS2有关。

当 TS2>=win_end+M 时会将时间水印在 >= win_start && <=win_end 给到Apply。

TS2>=win_end+M 是唯一条件。


import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class WaterMarkTest {
    public WaterMarkTest() {
        
    }
    static StringBuilder sb = new StringBuilder();
    static long sts = 0L;
    static long ets = 0L;
    static long sleeps = 500; 
    public <R> void run()  throws Exception{
        sts = System.currentTimeMillis();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        OutputTag<Tuple2<String, String>> lateOutputTag = new OutputTag<Tuple2<String, String>>("late-data-lx"){private static final long serialVersionUID = 154621L;};

        DataStream<String> dataStream = env.addSource(new SourceFunction<String>() {
            private static final long serialVersionUID = 1134546L;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                ctx.collect("hello,1553503188000");
                Thread.sleep(sleeps);//水印计算间隔是200ms,所以不要低于这个值
                ctx.collect("hello,1553503186000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503183000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503180000"); 
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503185000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503188000");
                Thread.sleep(sleeps);
                ctx.collect("hello,1553503189000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503188000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503189000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503190000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503191000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503186000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503187000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503185000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503184000"); //丢弃
                Thread.sleep(1000);
                ctx.collect("hello,1553503183000"); //丢弃
                Thread.sleep(1000);
                ctx.collect("hello,1553503190000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503192000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503193000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503194000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503195000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503196000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503197000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503198000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503199000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503200000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503201000");
                Thread.sleep(1000);
                ctx.collect("hello,1553503202000");
//                Thread.sleep(15000);
                System.out.println("1 ============================================================");
                sb.append("time use 1="+(ets-sts)+", ets="+ets+"\n");
            }
 
            @Override
            public void cancel() {
 
            }
        }, "source1")
         /**
          *  assignTimestampsAndWatermarks 的代码注释翻译:
          *  Assigns timestamps to the elements in the data stream and generates watermarks to signalevent time progress. 
          *  The given WatermarkStrategy is used to create a TimestampAssigner and WatermarkGenerator.
          *
          *  为数据流里面的元素设置时间,并且给”信号事件时间处理”计算水印
          *  给定的水印策略是用来创建 TimestampAssigner and WatermarkGenerator
          *
          *  For each event in the data stream, the TimestampAssigner.extractTimestamp(Object, long) method is called to assign an event timestamp.
          *
          *  数据流里面的每一个事件,都会调用 TimestampAssigner.extractTimestamp(Object, long) 方法去给事件添加时间记录。
          *
          *  For each event in the data stream, the WatermarkGenerator.onEvent(Object, long, WatermarkOutput) will be called.
          *  数据流里面的每个事件,都会调用WatermarkGenerator.onEvent方法
          *
          */
        .assignTimestampsAndWatermarks(
                WatermarkStrategy                    
                    .<String>forBoundedOutOfOrderness(Duration.ofSeconds(3))                    
                    /**
                     * 给数据打上时间信息
                     */
                    .withTimestampAssigner(
                        new SerializableTimestampAssigner<String>() {
                            private static final long serialVersionUID = 134231L;
                            long recordTimestamp = 0L;
                            long lst_ts = 0L;
                            @Override
                            public long extractTimestamp(String element, long _recordTimestamp) {//_recordTimestamp 是element的内部时间
                                String[] fields = element.split(",");
                                Long aLong = new Long(fields[1]);
                                long now = System.currentTimeMillis();
//                                if(aLong>recordTimestamp) {
                                    String msg = now+"["+(lst_ts>0?(now-lst_ts):0)+"]: Key-> " + fields[0] + ",EventTime:" + aLong +", recordTimestamp="+recordTimestamp;
                                    System.out.println(msg);
                                    sb.append(msg).append("\n");
                                    if(lst_ts==0)lst_ts=  now;
//                                }
                                recordTimestamp  = Math.max(aLong, recordTimestamp);
                                return aLong;
                            }
                    }
            )
        );

        dataStream.map(new MapFunction<String, Tuple2<String, String>>() {
            private static final long serialVersionUID = 12342L;
            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                return new Tuple2<String, String>(s.split(",")[0], s.split(",")[1]);
            }
        }).keyBy(f->f.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))//n秒种滚动窗口
        .allowedLateness(Time.seconds(0))//这个设置大于0,就会出现一个周期反复出现结果的情况,而且是会把当前周期退回晚的数据的周期,就是有一条迟到,就会改允许迟到的周期。
        .sideOutputLateData(lateOutputTag)
        .apply(new WindowFunction<Tuple2<String,String>, String, String, TimeWindow>() {
            private static final long serialVersionUID = 1112151L;
            private long last_deal_ts = 0L;
            int pos = 0;
            String msg0 = "";
            String msg1 = "";
            @Override
            public void apply(java.lang.String key, TimeWindow window,    Iterable<Tuple2<java.lang.String, java.lang.String>> input, Collector<java.lang.String> out) throws Exception {
                long cur_ts = System.currentTimeMillis();
                if(last_deal_ts==0)
                    last_deal_ts = sts;
                String msg =  cur_ts+"["+(last_deal_ts>0?(cur_ts-last_deal_ts):"-")+"]"+" 当前窗口开始时间[" + window.getStart() + ",结束时间" + window.getEnd() + ")";
                sb.append(msg).append("\n");
                System.out.println(msg);
                last_deal_ts = cur_ts;
                List<Tuple2<String, String>> list = new ArrayList<>();
                input.forEach(o -> list.add(o));
                list.sort((o1, o2) -> o1.f1.compareTo(o2.f1));
                //list.sort(Comparator.comparing(o -> o.f1)); // 与上句代码同义,按照第二个属性升序排序
                pos = 0;
                msg0 = "";
                msg1 = "";
                list.forEach(o -> {
                    if(pos++<1)
                        msg0 ="> "+o.f1+"\n";
                    else
                        msg1 ="> "+o.f1+"\n";
                    out.collect(" - " + o.f1);
                    
                    System.out.println("> "+o.f1);
                });
                sb.append(msg0).append(msg1);
                ets = System.currentTimeMillis();
            }
        })
        .getSideOutput(lateOutputTag).map(new MapFunction<Tuple2<String,String>, String>() {
            private static final long serialVersionUID = 341902L;
            @Override
            public String map(Tuple2<String, String> value) throws Exception {
                String msg = "[Expire Data]> "+value.f0+"->"+value.f1;
                sb.append(msg).append("\n");
                return msg;
            }
        }).print();

        env.execute("Flink WaterMark Test1");
        
        
        /* 
2 ============================================================
1674029100314[0]: Key-> hello,EventTime:1553503188000, recordTimestamp=0
1674029100832[518]: Key-> hello,EventTime:1553503186000, recordTimestamp=1553503188000
1674029101341[1027]: Key-> hello,EventTime:1553503183000, recordTimestamp=1553503188000
[Expire Data]> hello->1553503183000
1674029101850[1536]: Key-> hello,EventTime:1553503180000, recordTimestamp=1553503188000
[Expire Data]> hello->1553503180000
1674029102362[2048]: Key-> hello,EventTime:1553503185000, recordTimestamp=1553503188000
1674029102872[2558]: Key-> hello,EventTime:1553503188000, recordTimestamp=1553503188000
1674029103384[3070]: Key-> hello,EventTime:1553503189000, recordTimestamp=1553503188000
1674029104392[4078]: Key-> hello,EventTime:1553503188000, recordTimestamp=1553503189000
1674029105400[5086]: Key-> hello,EventTime:1553503189000, recordTimestamp=1553503189000
1674029106404[6090]: Key-> hello,EventTime:1553503190000, recordTimestamp=1553503189000
1674029107408[7094]: Key-> hello,EventTime:1553503191000, recordTimestamp=1553503190000
1674029108420[8106]: Key-> hello,EventTime:1553503186000, recordTimestamp=1553503191000
1674029109426[9112]: Key-> hello,EventTime:1553503187000, recordTimestamp=1553503191000
1674029110433[10119]: Key-> hello,EventTime:1553503185000, recordTimestamp=1553503191000
1674029111438[11124]: Key-> hello,EventTime:1553503184000, recordTimestamp=1553503191000
[Expire Data]> hello->1553503184000
1674029112444[12130]: Key-> hello,EventTime:1553503183000, recordTimestamp=1553503191000
[Expire Data]> hello->1553503183000
1674029113450[13136]: Key-> hello,EventTime:1553503190000, recordTimestamp=1553503191000
1674029114464[14150]: Key-> hello,EventTime:1553503192000, recordTimestamp=1553503191000
1674029115467[15153]: Key-> hello,EventTime:1553503193000, recordTimestamp=1553503192000
1674029115625[19265] 当前窗口开始时间[1553503185000,结束时间1553503190000)
> 1553503185000
> 1553503189000
1674029116473[16159]: Key-> hello,EventTime:1553503194000, recordTimestamp=1553503193000
1674029117488[17174]: Key-> hello,EventTime:1553503195000, recordTimestamp=1553503194000
1674029118489[18175]: Key-> hello,EventTime:1553503196000, recordTimestamp=1553503195000
1674029119489[19175]: Key-> hello,EventTime:1553503197000, recordTimestamp=1553503196000
1674029120496[20182]: Key-> hello,EventTime:1553503198000, recordTimestamp=1553503197000
1674029120716[5091] 当前窗口开始时间[1553503190000,结束时间1553503195000)
> 1553503190000
> 1553503194000
1674029121508[21194]: Key-> hello,EventTime:1553503199000, recordTimestamp=1553503198000
1674029122516[22202]: Key-> hello,EventTime:1553503200000, recordTimestamp=1553503199000
1674029123517[23203]: Key-> hello,EventTime:1553503201000, recordTimestamp=1553503200000
1674029124533[24219]: Key-> hello,EventTime:1553503202000, recordTimestamp=1553503201000
time use 1=24357, ets=1674029120717
1674029124553[3837] 当前窗口开始时间[1553503195000,结束时间1553503200000)
> 1553503195000
> 1553503199000
1674029124554[1] 当前窗口开始时间[1553503200000,结束时间1553503205000)
> 1553503200000
> 1553503202000
time use2=28256, sts=1674029096360

         */
        
    }
     public static void main(String[] args) throws Exception {
         WaterMarkTest test = new WaterMarkTest();
         test.run();
         
         System.out.println("2 ============================================================");
         sb.append("time use2="+(System.currentTimeMillis()-sts)+", sts="+sts+"\n");
         System.out.println(sb.toString());
     }
}

参考:

https://blog.csdn.net/Vector97/article/details/110150925

https://blog.csdn.net/RonieWhite/article/details/114386907

相关文章:

  • 重庆做网站设计/怎样才能上百度
  • 如何给网站做防御/优化搜索曝光次数的方法
  • 大气的门户网站/友情链接是啥意思
  • 微信公众号自定义菜单wordpress/怎么推广产品最有效
  • 拼多多关键词推广/seo入口
  • 免费推广网站短视频/酒店线上推广方案有哪些
  • 静态代理和JDK动态代理以及CGLIB动态代理
  • 【信息系统项目管理师】论文写作心得整理篇
  • 85.机器翻译与数据集
  • 【p2p】初识Safire 基于 libnice 的 World wide parment system
  • (小甲鱼python)函数笔记合集四 函数(IV)总结 函数中参数的作用域 局部作用域 全局作用域 global语句 嵌套函数 nonlocal语句等详解
  • pytorch【Conv2d参数介绍】
  • 初学者C语言练习题-入门
  • 跨境电商必知:这7种实用的保留策略,可以有效减少客户流失
  • 激光雷达对植被冠层结构和SIF同时探测展望
  • 代码随想录算法训练营第七天 | 454.四数相加II ,383. 赎金信 ,15. 三数之和,18. 四数之和
  • Go map 实现原理
  • 致 Tapdata 开源贡献者:聊聊 2022 年的进展和新一年的共建计划