Flink 重点概念

  • A+
所属分类:Flink

Levels of Abstraction

“流”并不是一个新的概念,视频流、音频流很多场景都用到了这个概念,这里主要理解“流式计算”和“批次计算”的区别,流式没有边界的,实时性更强,但相应对于failover等机制,流式计算更难控制,在诸多流式计算框架中Flink目前很好的已经解决了这些问题。最简单的理解“流式计算”可以认为它是由一组数据流和运算符组成的DAG(有向无环图)或拓扑图。

Flink提供了不同的抽象层来满足使用者开发流式/批次计算。

Flink 重点概念

  • Stageful Stream Processing:最低抽象级别的API一般不会用到,它实际是Flink操作的原语,我们开发过程中更多使用上层的封装

  • DataStream/DataSet API:这是Flink的核心API,其中DataStream API既可以处理流式计算也可以处理批次计算,DataSet API用来处理批次计算。

  • Table API:Table API和SQL API是两种基于核心API的扩展,都支持对流式计算和批次计算的任务处理,其中Table API主要是Scala或Java代码来使用,SQL API通过标准的SQL语言来查询。

  • SQL:SQL语义的查询实际是Flink对查询API的扩展,SQL语言一直被业界认为在查询系统中最合适的一种方式,很多查询系统(包括Apache Flink、Apache Kylin、Apache Drill、Apache Hive、Apache Kylin、Apache Phoenix、Apache Samza以及我们自己开发的DWS)都是基于Apache Calcite构建的。(Flink中的SQL API部分主要是由Alibaba Blink团队主导开发的)

[补充]关于Table API & SQL

Table API 是一种关系型API,类 SQL 的API,用户可以像操作表一样地操作数据,非常的直观和方便。用户只需要说需要什么东西,系统就会自动地帮你决定如何最高效地计算它,而不需要像 DataStream 一样写一大堆 Function,优化还得纯靠手工调优。另外,SQL 作为一个“人所皆知”的语言,如果一个引擎提供 SQL,它将很容易被人们接受。这已经是业界很常见的现象了。值得学习的是,Flink 的 Table API 与 SQL API 的实现,有 80% 的代码是共用的。所以当我们讨论 Table API 时,常常是指 Table & SQL API。

Table & SQL API 还有另一个职责,就是流处理和批处理统一的API层。Flink 在runtime层是统一的,因为Flink将批任务看做流的一种特例来执行,这也是 Flink 向外鼓吹的一点。然而在编程模型上,Flink 却为批和流提供了两套API (DataSet 和 DataStream)。为什么 runtime 统一,而编程模型不统一呢? 在我看来,这是本末倒置的事情。用户才不管你 runtime 层是否统一,用户更关心的是写一套代码。这也是为什么现在 Apache Beam 能这么火的原因。所以 Table & SQL API 就扛起了统一API的大旗,批上的查询会随着输入数据的结束而结束并生成有限结果集,流上的查询会一直运行并生成结果流。Table & SQL API 做到了批与流上的查询具有同样的语法,因此不用改代码就能同时在批和流上跑。

Flink 重点概念

内容参考自:http://wuchong.me/blog/2017/03/30/flink-internals-table-and-sql-api/,该博客作者也是Flink的committer。

Programs and Dataflows

Flink程序的基本构建块是streams和transformations(注意,DataSet在内部也是一个stream)。一个stream可以看成一个中间结果,而一个transformations是以一个或多个stream作为输入的某种operation,该operation利用这些stream进行计算从而产生一个或多个result stream。

Flink 重点概念

在运行时,Flink上运行的程序会被映射成streaming dataflows,它包含了streams和transformations operators。每一个dataflow以一个或多个sources开始以一个或多个sinks结束。dataflow类似于任意的有向无环图(DAG),当然特定形式的环可以通过iteration构建。在大部分情况下,程序中的transformations跟dataflow中的operator是一一对应的关系。但有时候,一个transformation可能对应多个operator。

Parallel Dataflows

程序在Flink内部的执行具有并行、分布式的特性。stream被分割成stream partition,operator被分割成operator subtask,这些operator subtasks在不同的线程、不同的物理机或不同的容器中彼此互不依赖得执行。

一个特定operator的subtask的个数被称之为其parallelism(并行度)。一个stream的并行度总是等同于其producing operator的并行度。一个程序中,不同的operator可能具有不同的并行度。

Flink 重点概念

Stream在operator之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式。

  • One-to-one : strem(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着map operator的subtask看到的元素的个数以及顺序跟source operator的subtask生产的元素的个数、顺序相同。

  • Redistributing : stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个operator subtask依据所选择的transformation发送数据到不同的目标subtask。例如,keyBy() (基于hash码重分区),broadcast()或者rebalance()(随机redistribution)。在一个redistribution的交换中,只有每一个发送、接收task对的顺序才会被维持(比如map()的subtask和keyBy/window的subtask)。

Tasks and Operator Chains

为了更高效的分布式执行,Flink将operator的subtask链接在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换、减少序列化/反序列化开销和基于缓存区的数据交换,在减少时延的同时提升吞吐量。链接的行为可以在编程API中进行指定。

以WordCount为例,看下面这幅图,Source的并行度为1,FlatMap、KeyAggregation、Sink并行度为2,最终以5个并行的线程来执行优化的过程。

Flink 重点概念

上图中将KeyAggregation和Sink两个operator进行了合并,因为这两个合并后并不会改变整体的拓扑结构。但是,并不是任意两个 operator 就能 chain 一起的。其条件还是很苛刻的:

  1. 上下游的并行度一致

  2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)

  3. 上下游节点都在同一个 slot group 中

  4. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)

  5. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)

  6. 两个节点间数据分区方式是 forward(参考理解数据流的分区

  7. 用户没有禁用 chain

Operator chain的行为可以通过编程API中进行指定。可以通过在DataStream的operator后面(如someStream.map(..))调用startNewChain()来指示从该operator开始一个新的chain(与前面截断,不会被chain到前面)。或者调用disableChaining()来指示该operator不参与chaining(不会与前后的operator chain一起)。在底层,这两个方法都是通过调整operator的 chain 策略(HEAD、NEVER)来实现的。另外,也可以通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining。

关于Operator Chain的实现原理可参考:http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/

Window

窗口是为了解决对没有边界的数据进行聚合操作问题。对于batch类型的计算,能很方变的进行count、sum等操作,但对于stream,这种计算方式就比较困难,采用window来对数据进行隔离,比如“统计5分钟的数量”或“对最近100个元素求和”。

窗口分类

窗口可以是时间驱动(即Time Window)的(比如,每30秒)也可以是数据驱动(即Count Window)的(比如,每100个元素)。

另外一种方式将窗口可以分为三种:Tumbing window,Sliding Windows(有重叠)和session windows(有空隙的活动)。

Flink 重点概念

结合具体实例来理解几种窗口含义:电商网站会记录每个用户每次购买的商品个数,我们要做的是统计不同窗口中用户购买商品的总数。

Flink 重点概念

上图中,raw data stream 代表用户的购买行为流,圈中的数字代表该用户本次购买的商品个数,事件是按时间分布的,所以可以看出事件之间是有time gap的。

Tumbing Window(滚动窗口)

按固定大小来切分数据流,即在时间和窗口大小上都是固定的,相邻窗口没有重叠,也就是说数据要么在A窗口中,要么在B窗口中。

Flink 重点概念

Tumbling Time Window

场景举例:每1分钟累加一次数据,将合并后的结果写入数据库。

// Stream of (userId, buyCnt)
val buyCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = buyCnts
// key stream by userId
  .keyBy(0) 
// tumbling time window of 1 minute length
  .timeWindow(Time.minutes(1))
// compute sum over buyCnt
  .sum(1)

Tumbling Count Window

场景举例:每当窗口中填满100个元素了,就会对窗口进行计算

// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = buyCnts
// key stream by sensorId
  .keyBy(0)
// tumbling count window of 100 elements size
  .countWindow(100)
// compute the buyCnt sum 
  .sum(1)

Sliding Window(滑动窗口)

窗口大小不固定,而是按照一定的时间来“滑动”或“错位”。

Flink 重点概念

Sliding Time Window

场景举例:对于流数据,我们每隔30秒计算一次最后一分钟数据之和

val slidingCnts: DataStream[(Int, Int)] = buyCnts
  .keyBy(0) 
// sliding time window of 1 minute length and 30 secs trigger interval
  .timeWindow(Time.minutes(1), Time.seconds(30))
  .sum(1)

Sliding Count Window

场景举例:计算每10个元素计算一次最近100个元素的总和

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
// sliding count window of 100 elements size and 10 elements trigger interval
  .countWindow(100, 10)
  .sum(1)

Session Window(会话窗口)

类似Web应用中的Session,超过一段时间没有行为,就创建一个新的窗口。场景举例:场景一:对事件先按ID分流,然后对于每个流按照不同的session窗口统计。

Flink 重点概念

场景二:统计每个用户在活跃起见共购买商品的数量,如果用户30秒没有任何购买行为,则该会话断开。

// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...
val sessionCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
// session window based on a 30 seconds session gap interval 
  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
  .sum(1)

提示:实际应用中以前两种为主,重点理解。关于session window,详细说明参考:http://wuchong.me/blog/2016/06/06/flink-internals-session-window/

Time

Flink提供多样性的时间选择,方便在不同业务场景的流式计算中灵活选择。常用时间类型有三种:

Flink 重点概念

Processing Time(程序时间)

数据在Flink 的Operator中执行时的系统本地时间。(有些资料翻译为处理时间,即数据被处理的时长,这种理解应该不太正确)

Event Time(事件时间)

事件产生时间,比如服务器上报日志自带时间戳。

Ingestion Time(摄入时间)

数据读入Flink 时间,可以理解为数据进入Source Operator时刻的时间戳

提示:如果在代码中不配置Time类型,默认Processing Time类型。

如果需要修改,设置方式如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

Flink Architecture & Topology

基本架构

Flink也是主从模式,主要包括Master或JobManager、Worker或TaskManager,另外提交任务的是Clinet。当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。

  • Client 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回。

  • JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。

  • TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。

Flink 重点概念

任务提交

Job Client会将任务以JobGraph形式封装,然后提交给JobManager,JobManager会进行一系列的资源分配、任务调度等操作,将任务分配给TaskManager,后者负责具体的执行,同时会定时汇报任务的执行状态,待任务执行完成后,将结果返回给客户端。

Flink 重点概念

任务执行图

Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。

  • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。

  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。

  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

Flink 重点概念

这里对一些名词进行简单的解释。

  • StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。

    • StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。

    • StreamEdge:表示连接两个StreamNode的边。

  • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。

    • JobVertex:经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。

    • IntermediateDataSet:表示JobVertex的输出,即经过operator处理产生的数据集。producer是JobVertex,consumer是JobEdge。

    • JobEdge:代表了job graph中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。

  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。

    • ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有和并发度一样多的 ExecutionVertex。

    • ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。

    • IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。

    • IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是ExecutionVertex,consumer是若干个ExecutionEdge。

    • ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一个。

    • Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过 ExecutionAttemptID 来确定消息接受者。

  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

    • Task:Execution被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。

    • ResultPartition:代表由一个Task的生成的数据,和ExecutionGraph中的IntermediateResultPartition一一对应。

    • ResultSubpartition:是ResultPartition的一个子分区。每个ResultPartition包含多个ResultSubpartition,其数目要由下游消费 Task 数和 DistributionPattern 来决定。

    • InputGate:代表Task的输入封装,和JobGraph中JobEdge一一对应。每个InputGate消费了一个或多个的ResultPartition。

    • InputChannel:每个InputGate会包含一个以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一对应,也和ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。

那么 Flink 为什么要设计这4张图呢,其目的是什么呢?Spark 中也有多张图,数据依赖图以及物理执行的DAG。其目的都是一样的,就是解耦,每张图各司其职,每张图对应了 Job 不同的阶段,更方便做该阶段的事情。我们给出更完整的 Flink Graph 的层次图。

Flink 重点概念

首先我们看到,JobGraph 之上除了 StreamGraph 还有 OptimizedPlan。OptimizedPlan 是由 Batch API 转换而来的。StreamGraph 是由 Stream API 转换而来的。为什么 API 不直接转换成 JobGraph?因为,Batch 和 Stream 的图结构和优化方法有很大的区别,比如 Batch 有很多执行前的预分析用来优化图的执行,而这种优化并不普适于 Stream,所以通过 OptimizedPlan 来做 Batch 的优化会更方便和清晰,也不会影响 Stream。JobGraph 的责任就是统一 Batch 和 Stream 的图,用来描述清楚一个拓扑图的结构,并且做了 chaining 的优化,chaining 是普适于 Batch 和 Stream 的,所以在这一层做掉。ExecutionGraph 的责任是方便调度和各个 tasks 状态的监控和跟踪,所以 ExecutionGraph 是并行化的 JobGraph。而“物理执行图”就是最终分布式在各个机器上运行着的tasks了。所以可以看到,这种解耦方式极大地方便了我们在各个层所做的工作,各个层之间是相互隔离的。

参考资料:http://wuchong.me/blog/2016/05/03/flink-internals-overview/


Task Slots and Resources

每个Worker(TaskManager)都是一个独立的JVM进程,它通过Task Slot(任务槽)来控制能执行的线程数,且每个TaskManager至少有一个slot。有几个slot即平分该JVM内存。注意,这里指的是内存隔离,不会发生CPU隔离。

同一JVM种的任务共享TCP连接(通过多路复用)和心跳消息。它们也可能共享数据集和数据结构,从而减少每个任务的开销。

Flink 重点概念

默认,如果subtask是来自相同job,即使不是相同的task,Flink也允许subtask共享slot。结果是,一个slot可能包含该job的整个pipeline。允许slot共享有两个好处:

  • Flink集群确实需要许多task slots来让Job达到最高的并行度。不需要计算一个程序总共包含多少个task。

  • 更容易获得更好的资源利用。如果没有slot共享,非密集型的source/map()的subtask将阻塞跟密集型的window的subtask一样多的占用资源。而如果有slot共享,基本的并发度通过完整地利用共享的slot资源将获得2到6倍的提升,同时仍然保证每一个TaskManager会在任务繁重的subtask之间进行合理的slot共享。

Flink 重点概念

slot共享行为可以通过API来控制,以防止不合理的共享。这个机制称之为resource groups,它定义了subtask可能共享的slot是什么资源。

作为一个约定俗成的规则,task slot推荐的默认值是CPU的核数。基于超线程技术,每个slot占用两个或者更多的实际线程上下文。

参考文档

https://ci.apache.org/projects/flink/flink-docs-release-1.4/

http://wuchong.me/archives/


圈里圈外

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: