APP推广合作
联系“鸟哥笔记小乔”
代码级解答-流式数据的处理问题
2021-05-31 16:30:11

前言

在普通的数据处理场景中,处理数据很简单啊,因为数据都好好的放在库里,直接select出来就好了。


但是流式数据是一条一条过来的,期间还会因为网络延迟,有些数据还会迟到。这种“数据没排好队”的情况,叫做“乱序”。这可让我们非常麻烦!


我们咋解决呢?来,今天让“中国好胖子”同学给你来一个代码级的解答!

乱序

大家知道,所有数据理论上都应该有时间戳,在流式数据中,时间戳更重要。可以说时间戳就是流式数据区别于离线数据的重要标志。


在Flink中,我们大多使用EventTime作为时间戳。当我们用这个时间来参与计算的时候,由于EventTime是真实世界的时间,那么百分之100可能会发生乱序数据。


那么何为乱序数据呢,前面说过了,乱序数据就是迟到的数据。1分钟前产生的数据,1分钟之后才进入到系统中,这就延迟了。


所以那么乱序数据就是在正常的时间数据流中夹杂着一些非顺序的一些数据。


乱序是怎么产生的呢?因素太多了,例如某台机器的网络抖动,或者网卡和系统的延迟,都会导致这台机器上报的数据延迟到达。


那么flink在处理的时候,就可能收到了系统在好几秒之前产生的数据。这个一点非常讨厌,会直接导致实时Join失败。


Flink必须得解决这个问题啊,否则怎么保证迟到的数据都能用上呢,对吧?watermark就是用来解决这个问题的。


Watermark,就是水位线,用来测量乱序数据的进度的。


Flink用watermark来确定这条迟到的数据如何触发计算或者其他操作。嘿嘿,所以Watermark也是一种特殊的数据!

Watermark

单纯的从概念上不好理解,我们先假设一个场景,这样更容易理解这个事情。你最好有一些流失数据的基础,否则不太容易理解这些原理。


假设,我们有一个5s的窗口,并且我们可以容忍的延迟时间为2s,就是说5秒一计算,允许数据迟到2秒。


那么也就是说,从0开始,在7s的时候会触发一次计算。我画个图解释一下为什么会7s触发计算,或者永不触发。


为了排除其他影响因素,我们假设是单task,单分区的场景。其中的33 是第一条数据, 2 是他携带的时间戳,在右侧有一个5秒的窗口:



那么我们的watermark的计算公式就是 watermark = time - latertime 。那么这个时候我们可以得到这个watermark是0,那么他属于0-5s的窗口,那么我们就放到窗口里面去。



这个时候又来了一条数据,就会变成下面这样对吧,为什么会变成两个窗口呢?


因为99这条数据并不属于0-5秒这个窗口里面,因为flink窗口的大小是包左不包右的,这点很关键。


这样你就能明白,为什么33和99应该各自进到单独的窗口。所以,数据是根据EventTime来决定应该进哪个桶或者说窗口的。


现在你能理解为啥EventTime这么重要了吧?



假如,这个时候来了一条乱序数据,23号(时间戳3S),这条数据迟到了,那么我们的watermark怎么更新呢?

现在,请你停下来思考一下,新来的23号数据应该进那个窗口?



案揭晓:我们可以看到我画的图:其中,23数据携带的时间戳是3,watermark也是3,应该归到[0,5)的窗口。


你是不是会奇怪,这迟到的数据序号比前面的99号序号要大啊,怎么在后面呢,并且计算出来的watermark是3?这不是违背了我们的公式计算规则么?


按照前面的公式,watermark = time - latertime,那么23号的watermark应该是3-2=1,应该排到99号的前面去啊。


其实不是的,watermark首先是时间尺度,然后才是衡量标准。所以watermark 不能倒着走啊,因为他是负责测量数据的时间进度的。


所以他的watermark 并不会按照公式计算,而是采用的上一个数字的watermark,也就是3。


为了让你看的更清楚,我们多插几条数据看看。



所有数据在watermark上,都是顺序排列的,6号数据的watermark,按照公式,应该是4-2=2,但是很遗憾,前面已经有3了,所以只能排在3后面。谁让你迟到了呢,对吧?


当然,这个时候他们只是在排队,还没有触发窗口的计算操作。


那么窗口计算什么时候触发呢?很简单,当watermark大于等于窗口触发时间。


第一个窗口触发计算操作的时间也就是大于等于5秒的时候。


提问:第二个窗口出发计算操作的时间是什么时候呢?



答案是10秒。


那么我们讲到这里,应该大部分人都能够了解到了watermark的运行机制,以及窗口什么时候计算。


现在还是一条线的情况。现实情况比这个要复杂的多的多。


那么我们接下来就来考虑一下我们的多并行度下,我们的watermark如何传递?

传递

我们应该知道 在一个 task中有很多的subtask,那么这些subtask都有自己的watermark。


所有的数据时间上都应该同步啊,要不然怎么多并行度计算啊?就全乱套了。


所以这个时候就会涉及到 watermark的传递,因为下游也是依赖这些watermark的。



如上图所示,我们可以看到Watermark在顺序的向下游流动,左侧的向右箭头,就是这个意思。


那么我们这个时候发现有一个Partition WM 这个其实就是各个分区的 SubTask的Watermark。


我这个时候发现,每个subtask的watermark都是不一样的,并且task会存储这些watermark,记录下来各个分区的watermark,并且把最小的watermark广播出去。


当前需要记录的是2、4、3、6号watermark,其中2是最小的,好,我们记下来。


这个时候当传递过来的4号watermark更新了,把原来的2给顶走了。那么现在是4、4、3、6,最小的是3了。这个时候我们就将最小的3作为watermark传递出去。


当7传递过来的时候,我们就会发现,传递过去的依然是最小的3,所以不动。


这样,我们就能解决多并行度下,watermark的传递问题。其实就是挑一个最小的watermark放出去。


你现在对watermark的机制应该是比较了解的吧?

咋用?

现在开始上代码!

1、常见用法

WatermarkStrategy
       .>forBoundedOutOfOrderness(Duration.ofSeconds(20))
       .withTimestampAssigner((event, timestamp) ->timestamp);

怎么样?简单吗?哈哈哈!所以不要以为Flink很难!越高级的语言,其实越简单。只是理解起来比较费劲而已。

2、WatermarkGenerator

/**
* {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
*
*

注意:  WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks}
* 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。
*/
@Public
public interface WatermarkGenerator {

   /**
    * 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark。
    */
   void onEvent(T event, long eventTimestamp, WatermarkOutput output);

   /**
    * 周期性的调用,也许会生成新的 watermark,也许不会。
    *
    *

调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。
    */
   void onPeriodicEmit(WatermarkOutput output);
}

3、watermark 分区数据倾斜解决方案

 在数据源直接使用时如果因为数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着watermarkStrategy也不会获得任何数据去生成watermark,在这种情况下可以通过设置有一个空闲时间,当超过这个时间则将这个分片或分区标记为空闲状态。


WatermarkStrategy

 .>forBoundedOutOfOrderness(Duration.ofSeconds(20))

.withIdleness(Duration.ofMinutes(1));//当时间超过1分钟则设置为空闲状态

结语

Flink的很多设计都非常精巧,watermark就是其中之一。我们研究这些实现原理并不是想做源码级的开发,而是欣赏这种精妙的思想,真是为之叹息。


如果你觉得有启发,欢迎留言,一起交流。

-END-

大数据架构师
分享到朋友圈
收藏
收藏
评分

综合评分:

我的评分
Xinstall 15天会员特权
Xinstall是专业的数据分析服务商,帮企业追踪渠道安装来源、裂变拉新统计、广告流量指导等,广泛应用于广告效果统计、APP地推与CPS/CPA归属统计等方面。
20羽毛
立即兑换
一书一课30天会员体验卡
领30天VIP会员,110+门职场大课,250+本精读好书免费学!助你提升职场力!
20羽毛
立即兑换
顺丰同城急送全国通用20元优惠券
顺丰同城急送是顺丰推出的平均1小时送全城的即时快送服务,专业安全,准时送达!
30羽毛
立即兑换
大数据架构师
大数据架构师
发表文章272
历任多家公司大数据总监、大数据架构师,专注于数字化转型领域。
确认要消耗 0羽毛购买
代码级解答-流式数据的处理问题吗?
考虑一下
很遗憾,羽毛不足
我知道了

我们致力于提供一个高质量内容的交流平台。为落实国家互联网信息办公室“依法管网、依法办网、依法上网”的要求,为完善跟帖评论自律管理,为了保护用户创造的内容、维护开放、真实、专业的平台氛围,我们团队将依据本公约中的条款对注册用户和发布在本平台的内容进行管理。平台鼓励用户创作、发布优质内容,同时也将采取必要措施管理违法、侵权或有其他不良影响的网络信息。


一、根据《网络信息内容生态治理规定》《中华人民共和国未成年人保护法》等法律法规,对以下违法、不良信息或存在危害的行为进行处理。
1. 违反法律法规的信息,主要表现为:
    1)反对宪法所确定的基本原则;
    2)危害国家安全,泄露国家秘密,颠覆国家政权,破坏国家统一,损害国家荣誉和利益;
    3)侮辱、滥用英烈形象,歪曲、丑化、亵渎、否定英雄烈士事迹和精神,以侮辱、诽谤或者其他方式侵害英雄烈士的姓名、肖像、名誉、荣誉;
    4)宣扬恐怖主义、极端主义或者煽动实施恐怖活动、极端主义活动;
    5)煽动民族仇恨、民族歧视,破坏民族团结;
    6)破坏国家宗教政策,宣扬邪教和封建迷信;
    7)散布谣言,扰乱社会秩序,破坏社会稳定;
    8)宣扬淫秽、色情、赌博、暴力、凶杀、恐怖或者教唆犯罪;
    9)煽动非法集会、结社、游行、示威、聚众扰乱社会秩序;
    10)侮辱或者诽谤他人,侵害他人名誉、隐私和其他合法权益;
    11)通过网络以文字、图片、音视频等形式,对未成年人实施侮辱、诽谤、威胁或者恶意损害未成年人形象进行网络欺凌的;
    12)危害未成年人身心健康的;
    13)含有法律、行政法规禁止的其他内容;


2. 不友善:不尊重用户及其所贡献内容的信息或行为。主要表现为:
    1)轻蔑:贬低、轻视他人及其劳动成果;
    2)诽谤:捏造、散布虚假事实,损害他人名誉;
    3)嘲讽:以比喻、夸张、侮辱性的手法对他人或其行为进行揭露或描述,以此来激怒他人;
    4)挑衅:以不友好的方式激怒他人,意图使对方对自己的言论作出回应,蓄意制造事端;
    5)羞辱:贬低他人的能力、行为、生理或身份特征,让对方难堪;
    6)谩骂:以不文明的语言对他人进行负面评价;
    7)歧视:煽动人群歧视、地域歧视等,针对他人的民族、种族、宗教、性取向、性别、年龄、地域、生理特征等身份或者归类的攻击;
    8)威胁:许诺以不良的后果来迫使他人服从自己的意志;


3. 发布垃圾广告信息:以推广曝光为目的,发布影响用户体验、扰乱本网站秩序的内容,或进行相关行为。主要表现为:
    1)多次发布包含售卖产品、提供服务、宣传推广内容的垃圾广告。包括但不限于以下几种形式:
    2)单个帐号多次发布包含垃圾广告的内容;
    3)多个广告帐号互相配合发布、传播包含垃圾广告的内容;
    4)多次发布包含欺骗性外链的内容,如未注明的淘宝客链接、跳转网站等,诱骗用户点击链接
    5)发布大量包含推广链接、产品、品牌等内容获取搜索引擎中的不正当曝光;
    6)购买或出售帐号之间虚假地互动,发布干扰网站秩序的推广内容及相关交易。
    7)发布包含欺骗性的恶意营销内容,如通过伪造经历、冒充他人等方式进行恶意营销;
    8)使用特殊符号、图片等方式规避垃圾广告内容审核的广告内容。


4. 色情低俗信息,主要表现为:
    1)包含自己或他人性经验的细节描述或露骨的感受描述;
    2)涉及色情段子、两性笑话的低俗内容;
    3)配图、头图中包含庸俗或挑逗性图片的内容;
    4)带有性暗示、性挑逗等易使人产生性联想;
    5)展现血腥、惊悚、残忍等致人身心不适;
    6)炒作绯闻、丑闻、劣迹等;
    7)宣扬低俗、庸俗、媚俗内容。


5. 不实信息,主要表现为:
    1)可能存在事实性错误或者造谣等内容;
    2)存在事实夸大、伪造虚假经历等误导他人的内容;
    3)伪造身份、冒充他人,通过头像、用户名等个人信息暗示自己具有特定身份,或与特定机构或个人存在关联。


6. 传播封建迷信,主要表现为:
    1)找人算命、测字、占卜、解梦、化解厄运、使用迷信方式治病;
    2)求推荐算命看相大师;
    3)针对具体风水等问题进行求助或咨询;
    4)问自己或他人的八字、六爻、星盘、手相、面相、五行缺失,包括通过占卜方法问婚姻、前程、运势,东西宠物丢了能不能找回、取名改名等;


7. 文章标题党,主要表现为:
    1)以各种夸张、猎奇、不合常理的表现手法等行为来诱导用户;
    2)内容与标题之间存在严重不实或者原意扭曲;
    3)使用夸张标题,内容与标题严重不符的。


8.「饭圈」乱象行为,主要表现为:
    1)诱导未成年人应援集资、高额消费、投票打榜
    2)粉丝互撕谩骂、拉踩引战、造谣攻击、人肉搜索、侵犯隐私
    3)鼓动「饭圈」粉丝攀比炫富、奢靡享乐等行为
    4)以号召粉丝、雇用网络水军、「养号」形式刷量控评等行为
    5)通过「蹭热点」、制造话题等形式干扰舆论,影响传播秩序


9. 其他危害行为或内容,主要表现为:
    1)可能引发未成年人模仿不安全行为和违反社会公德行为、诱导未成年人不良嗜好影响未成年人身心健康的;
    2)不当评述自然灾害、重大事故等灾难的;
    3)美化、粉饰侵略战争行为的;
    4)法律、行政法规禁止,或可能对网络生态造成不良影响的其他内容。


二、违规处罚
本网站通过主动发现和接受用户举报两种方式收集违规行为信息。所有有意的降低内容质量、伤害平台氛围及欺凌未成年人或危害未成年人身心健康的行为都是不能容忍的。
当一个用户发布违规内容时,本网站将依据相关用户违规情节严重程度,对帐号进行禁言 1 天、7 天、15 天直至永久禁言或封停账号的处罚。当涉及欺凌未成年人、危害未成年人身心健康、通过作弊手段注册、使用帐号,或者滥用多个帐号发布违规内容时,本网站将加重处罚。


三、申诉
随着平台管理经验的不断丰富,本网站出于维护本网站氛围和秩序的目的,将不断完善本公约。
如果本网站用户对本网站基于本公约规定做出的处理有异议,可以通过「建议反馈」功能向本网站进行反馈。
(规则的最终解释权归属本网站所有)

我知道了
恭喜你~答对了
+5羽毛
下一次认真读哦
成功推荐给其他人
+ 10羽毛
评论成功且进入审核!审核通过后,您将获得10羽毛的奖励。分享本文章给好友阅读最高再得15羽毛~
(羽毛可至 "羽毛精选" 兑换礼品)
好友微信扫一扫
复制链接