Streaming 102 The world beyond batch

概述

欢迎回来!如果你错过了第一部分,The world beyond batch: Streaming 101,那么我强烈建议你先阅读第一部分。它为我们在这一部分所讨论的内容铺垫了所需的基础,所以我会假定你已经熟悉了相关术语和概念。读者需要注意。

并且需要提醒的是,这一部分中包含了许多动画,如果你尝试打印这一章,那你可能丢失了最精彩的部分。读者需要注意。

好了,说完了免责声明,现在可以开始这部分的学习了。简单地说,之前我们主要聚焦于三个主要部分:术语,当我使用了“流”这样的重载术语时,准确定义了我要表达的意思;批处理与流处理,比较了这两种系统理论上的能力,并且提出流式系统要超越批处理系统所需的两样东西:正确性和推测时间的工具;数据处理模式,浏览了批处理和流处理系统在处理有界数据和无界数据时的基本方法。

在这篇文章中,我想要更进一步地探讨数据处理模式,抓住更多的细节,并且结合它们的实际使用案例进行分析。这篇文章的脉络主要依照如下两个部分展开:

在学完这两部分内容之后,我们就掌握了处理无序数据所需的核心法则和概念;事实上,推测时间的公式正是帮助流处理超越批处理的有效工具。

为了使你更直观地感受到它们在做什么,我会应用Dataflow SDK里面的一些代码(即,Google Cloud Dataflow的API),并且使用动画来提供可视化的理解角度。之所以我使用Dataflow SDK而不是大家所更熟悉的一些工具,比如Spark或者Storm,是因为目前没有其他系统可以涵盖所有我想表达的东西。好消息是,一些项目已经开始朝这个方向转移了。更好的消息是,我们(谷歌)向Apache公司提交了一份关于创建Apache Dataflow 孵化项目的建议(与Artisans,Cloudera,Talend和其他一些公司合作),旨在建立一个基于Dataflow无序数据处理模型的良好社区氛围和生态环境。这会让2016年变得非常有趣。不好意思,我跑题了。

这篇文章缺少的是我上次承诺的比较部分,对不起。我低估了我打算放在这篇文章中的内容量,也低估了完成它们需要花费的时间。并且在这个节骨眼上,我不希望由于这部分内容而导致文章的推迟发布。但是在Strata + Hadoop World Singapore 2015上我做的报告The evolution of massive-scale data processing中,提供了许多我想展示的对比资料,希望这能做一些补偿;幻灯片非常漂亮,你可以在网上阅读。可以肯定的是,其中的内容和这篇文章内容不完全一样,有一些独特的东西。

好了,让我们进入流处理的学习吧!

概括和路线图

在Streaming 101汇总,我首先阐述了一些术语。首先我区分了有界和无界数据。有界数据源大小有限,并且经常与“批”数据联系在一起。无界数据源的大小无限,并且经常与“流”数据联系在一起。我呼吁尽可能地避免使用“批”和“流”来表征数据源,因为这样的名词使用经常会导致一些误导或者局限。

接下来,我定义了批处理引擎和流处理引擎的区别:在人们的传统思想里,都是认为批处理引擎是那些只用来处理有界数据的引擎,而流处理引擎是那些只用来处理无界数据的引擎。但是我的所建议的是,仅仅用批和流来表述执行引擎。

在介绍完术语之后,我又引入了关于处理无界数据的两个重要概念。首先指出了事件时间和处理时间的区别。这为Streaming 101的文章展开提供了一个重要基础:如果你既关心正确性又需要基于事件真正发生的时间来处理它,那么你需要使用数据真正开始的事件时间,而不是数据被观测到时的处理时间,来分析数据。

接着我简单地介绍了窗口化的有关概念(即,将数据集按照时间边界分割),窗口化在处理无界数据源的无休止性时经常使用。关于窗口化的简单例子有固定窗口和滑动窗口,复杂一些的例子有会话(窗口由数据的特征所决定,例如,捕捉每个用户的活动,并且以不活动的间隙来分隔),它们都有广泛的应用。

除了这两个概念之外,现在我们还要进一步补充三个概念:

最近我发现,如果想要更轻松地理解这些概念之间的关系,那么我们需要带着四个重要问题来复习旧知识探索新知识,这四个问题在每一个无界数据处理案例中都十分重要:

我们在这篇文章中会更深入地讨论这些问题。

Streaming 101 终极版

我们先回到Streaming 101中学到的一些概念,在这次的学习中,我们会使用更多的例子来帮助理解,得到更多关于底层细节的知识。

What:转换

在传统批处理中应用的转换,回答了这个问题:“我们要计算什么样的结果?”尽管很多人已经对传统流式计算非常熟悉了,但是我们还是要从这里开始,因为这是我们进一步学习其他概念的基础。

在这部分内容中,我们将要先看一个简单的例子:在一个由10个值组成的简单数据集的基础上计算键控整数和。如果你想要更直观地理解它,你可以将它理解为:一队人各自在玩一个手机游戏,最后把它们的分数上传到一起,你需要计算分数的总和。你可以把它看作和计费与使用情况统计案例具有相同的思想。

对于每一个例子,我都会使用一小段Dataflow Java SDK的伪代码来帮助大家更清楚地理解数据管道的定义。某些情况下,我给出的伪代码将弯曲规则来让例子更明晰(例如具体IO源的使用),或者简化名字(Java最新的触发器的名字真的是冗长;我会用简化的名字来让表达更加清晰)。忽略掉这些次要的东西后(其中的大多数我都在Postscript中详细列举了),剩下基本就是真实的Dataflow SDK代码了。之后我也会提供一些真实的代码给那些对类似例子感兴趣并且想要自己编译并运行的读者。

如果你对于Spark和Flink至少达到了熟悉程度的话,你应该很容易就能理解Dataflow的代码在做什么。出于给你一个快速上手指南的考虑,这里有两个Dataflow中的原函数:

Figure 1 转换的类型

如果你感到困惑或者是想要一些参考的话,你可以看看 Dataflow Java SDK docs。

我们从一个被称为输入PCollection<KV<String,Integer>>开始(这是一个由关于 String 和 Integer 的 KV 对组成的 PCollectrion,其中的 String 是一些例如队伍名称的东西,Integer 是相应队伍中每个个体的分数)。在一个真实的数据管道中,我们需要通过 PCollection 来从 IO 数据源中读取原始数据输入(例如日志数据),然后通过解析日志为 KV 对,将其转换到 PCollection<KV<String,Integer» 中。为了清楚起见,在第一部分例子中,我会给出每一步的伪代码,但是在随后的例子中,我将会简化 IO 和解析部分。

因此,对于一个简单从 IO 源读取数据,然后解析出队伍名/分数对,然后计算每一个队伍总分的数据管道,我们会有如下操作:

PCollection<String> raw = IO.read(...); 
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn()); PCollection<KV<String, Integer>> scores = input.apply(Sum.integersPerKey());

对于所有后续的例子,在我们看了一些需要分析的表述数据管道的代码片之后,我们会看一些通过具体数据集执行该流水线的动画渲染。特别地,我们将看到在单个Key的10个数据输入的情况下执行数据管道的过程。

每个管道都通过两个维度来绘制输入和输出:事件时间(在X轴上)和处理时间(在Y轴上)。这样,由被数据管道所观测到的时间推移自底向上,正如那条粗的上升白线所示。每一个圆圈代表一个输入,圆圈中的数字代表了该条数据的独特信息。它们开始时灰色的,随着管道观测到它们而改变颜色。

当数据管道观测到数值时,它会将它们累积到自己的状态中,最终将结果汇总为输出。状态和输出由矩形表示,总值接近顶部,矩形所覆盖的区域代表事件时间和处理时间累积到结果中的部分。对于List 1 中的数据管道,其在经典批处理引擎上执行时看起来像这样。

Figure 2 传统批处理方法。

由于这是一个批处理数据管道,它只会一直累积状态直到它看见了所有的输入为止(由顶部的灰绿色标志),在图中的例子中,最后它得到了唯一的输出结果51。在这个例子中,因为我们并未进行任何特殊的窗口化转换,所以是通过事件时间来求和的;于是,状态和输出的矩阵覆盖了整个X轴。如果我们想要处理无界数据源的话,那么传统批处理系统并不能很好地胜任;我们没法等到输入的结束,因为数据源根本就不会停止。所以,我们需要引入窗口化的概念,这个概念已经在 Streaming 101 里面介绍过了。接下来,在第二个问题“在哪儿计算基于事件时间的结果”的背景下,我们会重新复习一下窗口化。

Where:窗口化

正如之前我们所讨论的那样,窗口化是通过时间边界来对数据源进行分割的方法。常见的窗口化策略包括固定窗口,滑动窗口,以及会话窗口。

Figure 3 窗口化策略的一些例子

为了更好地了解窗口化是什么样子,让我们以一个使用固定两分钟窗口的整数求和数据管道为例说明。在使用Dataflow SDK 的时候,我们要做的只是增加一个 Window.into 转换(图中蓝色高亮部分):

PCollection<KV<String, Integer>> scores = input  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))  .apply(Sum.integersPerKey());

Dataflow提供了一个在批处理和流处理下统一的模型,因为批实质上只是流的一个子集。因此,让我们先在批处理引擎上执行数据管道;它的直观流程看起来十分直接,可以与流式引擎有一个直观的对比。

Figure 4 批处理引擎下的窗口化求和

正如之前那样,输入在被处理为输出之前一直都在累积到状态中。在这种情况下,我们得到了四个而不是一个输出:对于每一个事件时间窗口得到一个对应的输出。

现在,我们已经复习了 Streaming 101 中介绍的两个主要概念:事件时间和处理时间域,以及窗口化。如果我们想要让自己更进一步,那么就需要引入这章开头所提到的概念了:水印、触发器、累积。那么我们开始Streaming 102 吧!

Streaming 102

我们刚刚已经看了一个窗口化的批处理数据管道执行过程。但理想情况下,我们应该得到延迟更低的结果,并且我们还希望能够本地处理无界数据源。使用流式引擎是我们朝着正确方向迈出的第一步,虽然批处理引擎能够知道何时某个窗口的输入已完成(即,一旦有界数据源的所有数据都已被处理),但是我们仍缺少实际的办法来判断一个无界数据源的完备性。让我们开始学习水印吧。

When:水印

水印是解答“基于处理时间的结果该何时被处理”的第一部分。水印是在事件时间域中关于输入完备性的一个时间概念。不同的是,它们是系统衡量相对于事件流中处理记录的事件时间的进展与完整性关系的方式(有界或无界都可以,尽管在无界情况下更明显)。

回顾一下Streaming 101的图表,我们进行了一些简单的修改,将实际运用中事件时间和处理时间之间的偏差表述为关于时间不断变化的函数。

Figure 5 事件时间进程,偏差,以及水印

那条被我标注为实际情况的弯曲红线实质上就是水印;它以处理时间进程的方式捕捉到了事件时间完备性的进程。实际上,你可以把水印看作一个函数,F(P) -> E,它将处理时间上的一点映射到了事件时间上的一点。(更准确地说,函数的自变量是当水印被观测到时,对应的那一点上游数据在数据管道中的状态:输入源,缓存数据,正在被处理的数据等等;当然,如果我们将它看作一个从处理时间到事件时间的映射的话更容易理解。)事件时间上的那一点E,被系统当作“事件时间小于E的所有输入都已经被观测到”的标志点。换句话说,不会再有事件时间小于E的数据出现了。根据水印的类型,完美型或启发式,这个推断可能分别是严格有效的或合理猜测的:

水印是一个迷人又复杂的主题,其内容比我在文中讨论的要多得多,所以如果想要更深入地了解它就需要等到之后的新文章发表了。现在来说,为了更加好地感受到水印的优点以及缺点,让我们来看两个例子,这两个例子都是在使用流式引擎来处理窗口化的数据管道时,使用水印来决定何时处理数据输出的。

Figure 6 在具有完美水印(左)和启发式水印(右)的流式引擎下进行窗口化求和

在这两种情况中,我们都是在水印发出窗口结束信号后对窗口进行处理的。它们之间的主要区别是,启发式引擎没能成功地把数据9计入结果中,这就大幅改变了水印的形状。这些例子展示了水印的两个缺点(以及一些关于完备性的概念):

在Streaming 101中,我强调过完备性的有关概念不足以完全应对无序无界数据的处理。水印的太快或太慢这两个缺点,是这些争议的基本由来。我们确实很难简单地通过只关注完备性的系统来得到既有低延迟又有正确性的数据。解决这些缺点正是触发器起作用的地方。

When:触发器的伟大之处在于触发器是伟大的东西!

触发器是问题“什么时候处理基于处理时间的数据结果”答案的第二部分。触发器声明了何时应该依据处理时间来处理窗口输出(尽管触发器本身是根据其他时间域中发生的事情来做出的决定,例如事件时间域中的水印处理)。一个窗口的每一个特定输出都被称为窗口的一个窗格。

用于触发的信号示例包括:

除了这些基于具体信号的简单触发器外,还有一些可以实现复杂逻辑的综合触发器。例如:

为了更详细地了解触发器的概念(并且给我们一些建设基础),让我们继续前进,把图6中的隐式默认触发器添加到List 2中:

PCollection<KV<String, Integer>> scores = input   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))                .triggering(AtWatermark())).apply(Sum.integersPerKey());

了解了这些概念,并且对于构造器能提供什么有了一个基本了解后,我们就可以着重于解决水印太慢或者太快的问题了。在这两种情况下,我们本质上想要的是提供一些规则的、整理过的更新给窗口,不管在水印超过窗口末端之前还是之后都可以(除了更新之外,我们还会在通过窗口末端时收到更新)。所以我们也需要一些重复触发器。现在问题变成了:我们怎么才能进行重复?

在太慢的情况下(即,提供更快的投机结果),我们可能需要假定对于给定窗口而言,其需要接收的数据量是稳定的(根据窗口在早期的定义),因为我们知道,我们对该窗口观察到的数据输入不完整。因此,当处理时间提前时(例如每分钟一次)周期性地触发可能是较为明智的,因为触发器发射的数量并不会取决于窗口实际观察到的数据量。

在太快的情况下(即,对启发式水印导致的迟到数据提供更新结果),让我们假定我们的启发式水印是相对准确的(通常是一个较为安全的假设)。在这种情况下,我们并不会经常看见迟到数据,但是当我们看到的时候,如果我们能迅速解决它们会更好。在观测到元素数为1的时候进行触发,我们就能很快地更新我们的结果,但考虑到迟到数据的不常见,这不太可能压倒该系统。

请注意,这些仅仅是例子:我们可以根据需求来自由选择不同的触发器(或者选择不使用触发器)。

最后,我们需要协调这些触发器的时间安排:提早的,准时的,迟到的。我们可以用一个Sequence触发器和一个特殊的Orfinally触发器,这会安装一些会在自己发射时终止父触发器的子触发器。

PCollection<KV<String, Integer>> scores = input   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))                .triggering(Sequence(Repeat(AtPeriod(Duration.standardMinutes(1)))                    .OrFinally(AtWatermark()),epeat(AtCount(1))))                                       .apply(Sum.integersPerKey());

这代码看起来显得很啰嗦。由于重复早期、及时、迟到发射信号很常见,所以我们在Dataflow中提供了一个API接口以使触发器的使用更简洁。

PCollection<KV<String, Integer>> scores = input   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))                .triggering(AtWatermark()                    .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))                    .withLateFirings(AtCount(1))))
.apply(Sum.integersPerKey());

在流式引擎上执行List 4或5的代码(使用完美和启发式水印,就像之前那样)那么产生的结果像下面那样:

Figure 7 用具有提前和迟到发射信号的流式引擎进行窗口化求和

和图6相比,图7的情况有了两个明显的提升:

这些新触发器有效的使完美水印和启发式水印的输出图像正常化了。图6中两种触发器的输出图像并不大一样,但是在图7中它们看起来非常相似。

它们之间仍然存在的最大不同是窗口存活时间。在完美水印的情况下,我们知道我们收到窗口结束信号后才不会再有新的数据过来,所以我们在那个时候才会关掉窗口。在启发式水印的情况下,我们只能通过让窗口等待一段时间来避免错失迟到数据。但是目前为止,我们也不知道怎么设置等待时间最合适。这就导致了延迟的出现。

When:允许的延迟(即垃圾回收)

在我们进入下一个问题之前,我想要再讨论一个在长期无序流式处理系统中比较重要的问题:垃圾回收。在图7的启发式水印中,每一个窗口的持久性状态都会在其整个生命周期中持续存在。想要妥善处理好迟到数据,那这个东西是必须的。尽管保存窗口的持久性状态直到窗口关闭是比较好的,但是实际上,在处理无界数据源的时候,我们经常没法做到对一个窗口无限期的保存数据(包括元数据);因为这样过于占用磁盘空间。

这样的结果就是,在实际使用中,无序数据处理系统需要一些方法来设定窗口处理数据的生命时长。一个简洁的方法是设定一个允许的迟到时间范围,即估计出一个可能的迟到数据所需处理时间(相对于水印),然后根据这个时间来设定一个底线,任何超过这个底线才到达的数据都会被丢弃。一旦你划定了一条数据可能会迟到的时间,那你就能够以此界定窗口的持久性状态需要保留多久了:保留到超过迟到底线为止。除此之外,你也可以让系统在观察到任何超过迟到底线的数据后立即删除它,这意味着系统不会为一个无用的数据浪费时间了。

由于允许延迟和水印之间的相互作用比较微妙,所以我们需要来看一个例子。让我们使用List5/图7的启发式水印管道,并且为其设定一个一分钟的迟到底线(注意,迟到底线的设定一定要经过深思熟虑,因为这样有利于改善输出图形;对实际运用场景而言,使用一个更大的迟到底线会更好)。

PCollection<KV<String, Integer>> scores = input   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))                .triggering(AtWatermark()                    .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))                    .withLateFirings(AtCount(1)))                .withAllowedLateness(Duration.standardMinutes(1)))   
.apply(Sum.integersPerKey());

数据管道的执行流程会与下图8类似,我将允许延迟的影响高亮表示出来了:

Figure 8 使用包含迟到信息、提前信息和允许延迟的流处理引擎进行窗口化求和

最后总结与迟到底线有关的两个要点:

现在,让我们进入最后一个问题的讨论吧。

How:累积

当我们使用触发器来随时间推移对单个窗口划分多个窗格时,我们就来到了最后一个问题:“如何改进结果”。在随后我们将看到的例子中,每一个连续的窗格都是建立在与它相邻的前一个窗格基础之上的。对于累积来说,有三种不同的模式:

为了更清晰地理解这三种模式的不同,我们来看下面的表格。考虑图7中的第二个窗口(事件时间范围是[12:02,12:04))。表格中展示了三种模式下,每个窗格的样子:

  Discarding Accumulating Accumulating & Retracting
       
Pane 1: [7] 7 7 7
Pane 2: [3, 4] 7 14 14, -7
Pane 3: [8] 8 22 22, -14
Last Value Observed 8 22 22
Total Sum 22 51 22

为了了解撤销模式的工作方式,我们需要对List 5做一些改动:

PCollection<KV<String, Integer>> scores = input   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))                .triggering(AtWatermark()                    .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))                    .withLateFirings(AtCount(1)))                
.discardingFiredPanes())   
.apply(Sum.integersPerKey());

再次运行一个使用启发式窗口的流式引擎能够产生如下结果:

Figure 9 使用丢弃模式对提前/迟到数据进行处理的流式引擎

虽然输出的总体形状与图7的累积模式类似,但是要注意的是,该模式下任何窗格都没有重叠。

如果你我们要了解撤销模式的工作方式,那么同样也需要做一些修改(注意,撤销模式仍在Google Cloud Dataflow的研发中,因此现在使用的API不一定是以后正式的API):

PCollection<KV<String, Integer>> scores = input   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))                .triggering(AtWatermark()                    .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))                    .withLateFirings(AtCount(1)))                
.accumulatingAndRetractingFiredPanes())   
.apply(Sum.integersPerKey());

接着我们在流式引擎中运行它,得到的输出图像会像下面这样:

Figure 10 使用累积与丢弃模式对提前/迟到数据进行处理的流式引擎

由于每一个窗口的窗格都是重叠的,因此要清楚地认识到撤回有些棘手。撤回用红色表示,与重叠的蓝色窗格相结合产生略带紫色的颜色。我还将给定窗格中的两个输出稍微水平移动(并且用逗号分隔),以使他们更易区分。

使用图中的最后一帧来对比,9,7(仅仅使用启发式),10一个一个地提供了很好的视觉对比:

Figure 11 丢弃模式(左)、累积模式(中)、累积与撤销模式(右)进行对比

正如你所想象的,以这种顺序排列的模式,每一个都消耗更多的存储与计算资源。最后,累积模式的选择为正确性、等待时间和成本的权衡提供了一个新的选择。

间奏曲

到目前为止,我们已经接触了所有四个问题:

但是,我们实际上仅仅了解了窗口化中的一种:基于事件时间的固定窗口窗口化。正如你在Streaming 101中所学的,我们有很多种窗口化的方法,其中有两种是:基于处理时间的固定窗口窗口化、基于事件时间的会话窗口化。

When/Where:处理时间窗口

基于两个理由,处理时间窗口化非常重要:

因此,我们需要对处理时间窗口化和事件时间窗口化的区别有一个深刻的理解与认识,特别是现在处理时间窗口化的应用越来越广泛了。

当我们使用基于事件时间的模型来工作时,就像这篇以窗口化概念为基础的文章所展示的,我们有两种方法来实现处理时间窗口化:

需要注意的是,这两种方法或多或少有些相似,尽管它们在多级管道的情况下略有不同:在触发器方法下,每一级都独立地划分了处理时间“窗口”,例如,窗口X中的数据可能在下一级的X-1或X+1窗口结束;在入口时间方法下,一旦数据被合并到窗口X中,那么由于水印(在Dataflow中)、微型批处理边界(在Spark流处理中)在不同层级之间进行同步,它将在持续存在于窗口直至数据管道结束。

正如我反复强调的,处理时间窗口的一大缺点是,当输入的观察顺序改变时,窗口的内容也会改变。为了更详细地了解这一点,我们需要看看下面这三种情况:

我们使用把两种不同的输入数据集提供给上面的每种情况(因此一共有六种情况)。这两个输入集是同一个事件(即相同的值,在相同的事件时间发生),但是观测顺序不同。第一个数据集顺序我们之前已经见过很多次了,用白色表示;第二个数据集的顺序将在处理时间轴上转换为下图12的样子,用紫色表示。你可以简单地认为紫色的样本是当吹东风而不是西风时真实发生的情况(即,复杂的分布式系统的底层数据集按照稍微不同的方式播放了内容)。

Figure 12 在处理时间的情况下改变观测顺序,保持数值、事件时间的一致

事件时间窗口化

为了建立基线,让我们先比较一下使用了启发式水印的事件时间固定窗口化在这两种数据顺序下的表现。我们将重用List 5/图7里面的提前/迟到代码来得到下面的结果。左边的是我们之前见过的;右边的是另一种顺序下的结果。这里要注意的是:尽管输出的总体形状不同,但是四个窗口的最终结果是一样的:14,22,3和12:

Figure 13 在两种不同序同数据的窗口下事件时间窗口化的情况

使用触发器的事件时间窗口化

现在让我们来比较一下上面所说的两种处理时间方法。首先,我们来看看触发器方法。用这种方法进行处理时间窗口化有三点需要注意:

相应的代码看起来就像List 9 那样;要注意的是全局窗口化是默认的,因此没有额外的窗口化策略重载:

PCollection<KV<String, Integer>> scores = input   
.apply(Window.triggering(Repeatedly(AtPeriod(Duration.standardMinutes(2))))          .discardingFiredPanes())   
.apply(Sum.integersPerKey());

当我们执行流式引擎来处理这两种不同顺序的输入时,结果看起来就像图14一样。有几点比较有趣的是:

Figure 14 使用触发器的处理时间“窗口化”在两种不同处理时间顺序下的输入

使用进入时间的处理时间窗口化

最后,让我们来看看通过把事件时间映射到入口时间来进行的处理时间窗口化。要注意的有以下四点:

所以实际代码看起来可能像这样:

PCollection<String> raw = IO.read().withIngressTimeAsTimestamp(); PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn()); PCollection<KV<String, Integer>> scores = input   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))   .apply(Sum.integersPerKey());

在流式引擎上执行代码将会看起来像下图15那样。当数据到达的时候,事件时间会更新为进入时间(即,处理时间到达的时候),导致理想水印线上的右向水平移动。图中比较有趣的地方有:

Figure 15 针对两种不同处理时间顺序的输入,使用进入时间的处理时间窗口化

尽管看到可以有多种不同的办法来实现处理时间窗口化很有趣,但是最大的亮点是我从第一篇文章一开始就一直关注的内容:事件时间窗口化是顺序无关的,至少是有限的(实际上窗格可能不同,直到输入完成);而处理时间窗口化并非如此。如果你非常在乎事件的实际发生时间,那么你必须使用事件时间窗口化,否则你的计算结果将毫无意义。我现在要扔肥皂了。

Where:会话窗口

我们就快要看完所有的例子了。如果你看到了这里,那么你是一个很有耐心的读者。好消息是,你的耐心将会得到回报。我们现在将要看看我最喜欢的特征之一:动态的、数据驱动的窗口化,它叫会话。现在赶紧坐正好好听。

会话是一种特殊的窗口,它的长度是由不活跃的间隔所划分的活跃时段。它们非常有用,因为它们能提供特定用户在特定活跃时段的行为偏好。这就可以将会话中的活动进行关联,根据会话的长度来推断出参与度等级。

从窗口化的视角来看,会话在两方面特别有趣:

在一些使用场景中,可以提前在一个会话中用通用标识符来标记数据(例如,一个视频播放器中发射出与服务信息相关的高质量心跳ping;对于任何给定的查看请求,所有的ping都会提前标定一个会话ID)。在这种情况下,会话更容易构建,因为这基本上只是按key分组的一种形式。

但是,在更普遍的例子中(即会话并未被提前知晓),那么会话需要根据数据的时间位置单独构建。当我们处理无序数据的时候,这就会变得很棘手。

它们提供一般会话支持的关键点在于,根据定义,完整的会话窗口是一组较小的重叠窗口的组合,每个窗口包含单个记录,并且序列中的每个记录与下一个记录之间的不活跃间隔大于预定义的某一值。这样一来,即使我们在会话窗口中观测到的数据是无序的,我们也能简单地通过将各个数据重叠的窗口合并到一起来构建最终会话。

Figure 16 未合并的原始会话窗口,以及由此产生的合并会话窗口。

让我们来看一个例子,使用List 8的回退代码来处理提前、延迟数据并且改为使用会话窗口化:

PCollection<KV<String, Integer>> scores = input   .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))                .triggering(                  AtWatermark()                    .withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))                    .withLateFirings(AtCount(1)))                
                                                         .accumulatingAndRetractingFiredPanes())  
.apply(Sum.integersPerKey());

在流式引擎中执行代码,你能够得到类似图17的结果:

Figure 17 处理提前和迟到数据的支持会话窗口与撤销的流式引擎

这里有很多事情要做,所以我会引导你看一看:

这是一个巨强大的东西。真正厉害的地方在于,使用这种将流处理的维度分解为独特、可组合片段的模型后,描述类似的东西是多么的容易。最后,你可以将更多的精力放在有趣的商业逻辑而不是数据整理形式的细节上了。

如果你不相信我,那么看看这篇描述了如何“手动地在Spark Streaming上构建会话”的博客(注意,这不是为了指责他们;Spark的伙计们做的足够好了,他们让那些被如何构建特殊的会话所困扰的人们得到了文档指导;其他系统团队根本没做这些)。这是相当复杂的,他们甚至没有做出正确的事件时间会话,或着提供投机或延迟发送处理,也没有支持撤回。

这是博客的结尾,我感觉很棒

好了!我已经介绍完例子了。你们的掌声在哪里?现在你们已经很好地了解了强大的流式处理方法基础,并且已经准备好踏入数据处理的世界做一番事业了。但是在你离开之前,我想要重申一些我们已经学过,但是你可能已经忘掉的东西。首先,我们接触的主体概念有:

接下来,我们提了四个问题来帮助我们详细理解(并且我保证不会让你们阅读更多关于这方面的东西了):

最后,为了了解这些流式处理模式带给我们的灵活性(因为到最后,我们真正在关注的是:如何平衡正确性、延迟性、成本等之间的关系),我们可以通过相同的数据集实现对输出的主要变化的简要回顾,只需要修改少量代码即可实现:

Classic batch Listing 1 / Figure 2 Fixed windows batch Listing 2 / Figure 4 Fixed windows streaming watermark Listing 2 / Figure 6
Early/late discarding Listing 7 / Figure 9 Early/late accumulating Listings 4 & 5 / Figure 7 Early/late retracting Listing 8 / Figure 10
Processing-time (triggers) Listing 9 / Figure 14 Processing-time (ingress time) Listing 10 / Figure 15 Sessions Listing 11 / Figure 17

感谢你的耐心和兴趣!咱们下次见!