flink架构原理,flink documentation
在典型的应用场景中,当连接外部数据库进行读写操作时,将连接操作放在map()中显然不是一个好的选择。 —— 每当数据到达时,数据库可以用open()建立连接,用map()读写数据,用close()关闭连接。
publicclass MyFlatMap extends RichFlatMapFunction {@Overridepublic void open(Configurationconfiguration) { //做一些初始化工作//例如与MySQL建立连接}@Overridepublic void flatMap(IN in, Collector9.3.4 物理分区运算符(Physical Partitioning) 常用物理分区策略包括随机、循环、重新缩放和广播。
随机分区(随机播放) 最简单的重新分区方法是直接“随机播放”。通过调用DataStream的.shuffle()方法,数据随机分布在下游算子的并行任务中。
随机分区遵循均匀分布,因此流中的数据会随机中断并均匀分布到下游任务分区,如图5-9所示。它是完全随机的,因此对于相同的输入数据,每次运行都不会得到相同的结果。
即使在随机分区之后,您得到的也是一个数据流。
您可以运行一个简单的测试。读取数据后直接打印到控制台,设置输出并行度为2,一路shuffle。运行多次,看看结果是否相同。
publicclass ShuffleExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource 流=env.fromElements(1, 2, 3, 4).setParallelism(1); Stream.shuffle( ).print().setParallelism(2); env.execute(); }} 分区轮询(Round Robin) 轮询也是一种常见的重新分区方法。简单来说,就是“发牌”,数据按顺序分发,如图5-10所示。您可以通过调用DataStream的.rebalance()方法来实现轮询重新分区。重新平衡使用循环负载平衡算法。这使得输入流数据可以均匀地分配给下游并行任务。
重新缩放分区(Rescale) 重新缩放分区与轮询分区非常相似。当调用rescale()方法时,底层实际上使用了轮询算法进行轮询,但只是将数据轮询发送到一些下游并行任务,如图5-11所示。换句话说,如果有多个“庄家”,则重新平衡意味着每个庄家都会向所有人发牌,而重新缩放意味着庄家只会向其组中的所有玩家发牌。人们轮流发牌。
从根本实现的角度来看,再平衡和重缩放的根本区别在于任务之间的连接机制不同。重新平衡在所有上游任务(数据发送者)和所有下游任务(数据接收者)之间建立通信通道。这是笛卡尔积关系。另一方面,重新缩放仅影响每个任务及其相应的下游任务。他们之间建立了沟通渠道。节省大量资源。
广播不应该被称为“重新分区”,因为广播后数据可能会保存在不同的分区中并被重复处理。通过调用DataStream的broadcast()方法,您可以将输入数据复制并发送给下游算子的所有并行任务。
全局分区(Global) 全局分区也是一种特殊的分区方法。这种方法相当极端,所有输入流数据都通过调用.global() 方法发送到下游算子的第一个并行子任务。这与强制下游任务的并行度为1 相同,因此使用此操作时应非常小心,它可能会对您的程序造成重大损害。
自定义分区(Custom) 如果Flink提供的所有分区策略都不能满足您的需求,您可以使用partitionCustom()方法自定义分区策略。
“9.4 输出运算符(Sink)”
Flink 作为一个数据处理框架,最终将计算操作的结果写入外部存储,并为外部应用程序提供支持。
”
9.4.1 连接到外部系统Flink 的DataStream API 提供了addSink 方法用于向外部写入数据。与addSource类似,addSink方法支持“Sink”运算符,主要用于连接外部系统并发送数据进行写入。 Flink 程序中的所有外部输出操作通常都是使用Sink 算子完成的。
与Source 运算符非常相似,除了一些Flink 预先实现的接收器之外,接收器运算符通常是通过调用DataStream 的.addSink() 方法创建的。
Stream.addSink(newSinkFunction(…)); addSource参数必须实现SourceFunction接口,同样addSink方法也必须传递一个实现SinkFunction接口的参数。通过这个接口,你只需要重写一个方法invoke(),就可以将指定的值写入外部系统。当每条数据记录到达时都会调用此方法。
当然,大多数情况下你不需要自己实现SinkFuntion。到目前为止我们使用的打印方法实际上是接收器,这意味着它们将数据流写入标准控制台打印输出。 Flink官方在其框架中提供了一些Sink连接器。列出了Flink目前支持的第三方系统连接器,如图5-13所示。
Kafka等流系统发现Flink提供了完美的对接。源/接收器可以两端连接,并且可以读写,而Elasticsearch、FileSystem 和JDBC 等数据存储系统只有用于输出写入的接收器连接器。
除了官方的Flink 之外,Apache Bahir 作为一个为Spark 和Flink 提供扩展支持的项目,还实现了其他几个第三方系统与Flink 之间的连接器,如图所示。
此外,用户必须定制接收器连接器。
9.4.2 输出到文件Flink 提供了专门针对流文件系统的连接器。 StreamingFileSink 提供了用于批处理和流处理的集成接收器,允许您将分区文件写入Flink 支持的文件系统。
StreamingFileSink 支持行和批量编码格式。这两个不同的方法都有自己的构建器,调用方法也非常简单,可以直接调用StreamingFileSink的静态方法。
行编码:StreamingFileSink.forRowFormat(basePath, rowEncoder)。批量编码:StreamingFileSink.forBulkFormat(basePath,BulkWriterFactory)。以行编码为例,让我们将测试数据直接写入文件。
publicclass SinkFile { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream input=env.fromElements('hello world', 'hello flink'); }最终StreamingFileSink 接收器=StreamingFileSink.forRowFormat(new Path('./output'), new SimpleStringEncoder('UTF-8')).withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval (TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build()).build(); input.addSink(sink); env.execute(); }} 这里,简单的File sinks 指定通过.withRollingPolicy() 方法的“滚动策略”。由于内容不断写入文件,因此我们需要提供何时打开新文件和归档以前内容的标准。也就是说,上面的代码设置分区文件在三种情况下滚动:
包含至少15 分钟的数据最近5 分钟内没有收到新数据文件大小已达到1 GB 9.4.3 输出到Kafka 从Kafka 数据源添加Kafka Connector 依赖由于我们测试了读取数据,与Connector 相关的依赖都有介绍,这里就不再介绍了。
创建启动Kafka集群并输出到Kafka的示例代码,可以将用户行为数据直接保存为clicks.csv文件,读取后直接写入Kafka,无需任何转换,添加主题“clicks.csv” 《致卡夫卡》。
publicclass SinkKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties property=new Properties();properties.put('bootstrap.servers', 'hadoop102:9092'); DataStreamSource stream=env.readTextFile('input/clicks.csv'); stream.addSink(new FlinkKafkaProducer( 'clicks', new SimpleStringSchema(),properties)); env.execute(); }} 执行该代码在Linux 主机上启动消费者并检查是否收到数据。 bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic clicks9.4.4 输出到Redis 虽然Flink 没有直接提供官方的Redis 连接器,但是Bahir 这个项目仍然有合格的配角,并且提供了Flink-Redis连接工具。不过更新有点慢,当前连接器版本为1.0,最新支持的Scala版本为2.11。本次测试不包含Scala相关版本变更,因此不影响使用。实际的项目应用程序应该使用匹配的组件版本运行。
具体测试步骤如下。
导入的Redis连接器依赖于org.apache.bahir flink-connector-redis_2.11 1.0 启动Redis集群为了测试方便,我们只在单个节点上启动Redis。
编写示例代码输出到Redis连接器提供了RedisSink,它继承自抽象类RichSinkFunction。这是一个SinkFunction ,用于将数据写入Redis。您可以将事件数据直接输出到Redis。
public class SinkRedis { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); FlinkJedisPoolConfig conf=new FlinkJedisPoolConfig.Builder().setHost('hadoop200'). build (); env.addSource(new ClickSource()) .addSink(new RedisSink(conf, new MyRedisMapper())); env.execute(); }}这里的RedisSink构造方法传递了两个参数:是需要的。
JFlinkJedisConfigBase:Jedis连接配置RedisMapper:Redis映射类接口。了解如何将数据转换为可写入Redis 的类型。下一步是定义Redis 映射类并实现RedisMapper 接口。
publicstaticclass MyRedisMapper 实现RedisMapper { @Override public String getKeyFromData(Event e) { return e.user; } @Override public String getValueFromData(Event e) { return e.url; } @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand) ) . HSET, 'clicks'); }} 这里保存到Redis时调用的命令是HSET,所以保存为哈希表(hash),表名是'clicks' 我是这么理解的。保存的数据为:每次数据到达时,都会转换以URL 作为其值的用户Key。
当您运行代码时,Redis 会检查是否已收到数据。 9.4.5 输出到Elasticsearch Flink 专门为ElasticSearch 提供了官方的sink 连接器。
ElasticSearch数据写入测试流程如下。
添加Elasticsearch 连接器依赖项org.apache.flink flink-connector-elasticsearch6_${scala.binary.version} ${flink.version} 启动Elasticsearch 集群创建示例代码以输出到Elasticsearch publicclass SinkToEs { public static void main (String [] args) 抛出异常{ StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); ArrayList httpHosts=new ArrayList(); httpHosts.add(new HttpHost('hadoop102', 9200, 'http') ); ElasticsearchSink. Builder esBuilder=new ElasticsearchSink.Builder( httpHosts, new ElasticsearchSinkFunction() { @Override public void process(Eventevent, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { HashMap data=new HashMap(); data.put(event.user, event) .url ); IndexRequestindexRequest=request .indexRequest() .index('clicks') .type('type') .source(data); requestIndexer.add(indexRequest); } } ); DataStream 流=env.fromElements ( new Event ( 'Mary', './home', 1000L)); stream.addSink(esBuilder.build()); env.execute(); }} 与RedisSink 类似,连接器也实现了对Elasticsearch SinkFunction——ElasticsearchSink 的写入。不同的是这个类的构造方法是私有的。要创建实际的SinkFunction,您必须使用ElasticsearchSink 的Builder 内部静态类并调用其build() 方法。
Builder的构造方法有两个参数。
httpHosts:连接到Elasticsearch集群主机的列表elasticsearchSinkFunction:这不叫SinkFunction,而是一个用于编写具体处理逻辑并准备向Elasticsearch发送请求的数据的函数。某些操作需要您重写elasticsearchSinkFunction处理方法。你可以将想要发送的数据放入HashMap中,将其包装在IndexRequest中,然后向外部发送HTTP请求。
运行代码并联系Elasticsearch 检查是否收到数据。 9.4.6 输出到MySQL(JDBC) 下面是MySQL 写入数据的测试步骤。
添加依赖org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} mysql mysql-connector-java 5.1.47 启动MySQL并在测试库下创建表Clicksmysql createtable clicks( - uservarchar ( 20) notnull, - urlvarchar(100) notnull); 编写示例代码输出到MySQL publicclass SinkToMySQL {public static void main(String[] args) throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism( 1 ) ; env .fromElements( new Event('Mary', './home', 1000L), new Event('Bob', './cart', 2000L) ) .addSink( JdbcSink.sink('INSERT INTO clicks) ( user, url) VALUES ( , )',(statement, r) - {statement.setString(1, r.user);statement.setString(2, r.url); },JdbcExecutionOptions.builder() .withBatchSize( 1000 ).WithbatchIntervalms(200).Withmaxretries(5).build(),New JDBCCONNECTIONS.JDBCCONNECTIONSBUILDER().Withrl('jdbc:MySql: //LocalHost:3306/Test')//不带CJ使用mysql 5.7.WithDrivers('COM.MY SQL. cj.jdbc.Driver') .withUsername('username') .withPassword('password') .build() ) ); env.execute();} } 运行代码并使用客户端连接MySQL并检查是否写入成功输入数据。 9.4.7 自定义Sink 输出如果您想将数据存储在自己的存储设备上,但Flink 没有提供可以直接使用的连接器,则只能自定义输出的Sink。与Source 类似,Flink 提供了通用的SinkFunction 接口和对应的RichSinkDunction 抽象类。实现后,您可以通过简单地调用DataStream 的.addSink() 方法来自定义对外部存储的写入。
Stream.addSink(new MySinkFunction()); 实现SinkFunction时,需要重写的关键方法是invoke(),在这里您可以实现向流发送数据的逻辑。
这种方法比较常见,适用于任何外部存储系统,但与自定义接收器实现状态一致性并不容易,因此通常仅在没有其他选择时使用。自定义场景并不常见,Flink官方在实际项目中使用的外部连接器基本上已经实现并不断扩展。
10. Flink Time 和Window 在流数据处理应用中,一个非常重要且常见的操作是窗口计算。所谓“窗口”就是一般定义的一段时间,或者说“时间窗口”,处理这个范围内的数据就是所谓的窗口计算。因此,窗口和时间往往是密不可分的。接下来我们详细了解一下Flink 的时间语义和窗口应用。
10.1 时间语义10.1.1 Flink 的时间语义如图所示,事件发生后,产生的数据被收集起来,首先进入分布式消息队列,然后被Flink 系统的源算子读取并消费。然后传递下游的变换算子(窗口算子),最后利用窗口算子进行计算处理。
显然,这里有两个非常重要的时间点:一是数据产生的时间,称为“事件时间”。另一种是数据实际被处理的时间,称为“处理时间”(processing time)。我们以什么时间作为窗口操作的标准,就是我们所说的“时间概念”。分布式系统中的网络传输延迟和时钟漂移导致处理时间相对于事件发生的时间滞后。
处理时间”
处理时间的概念很简单,指的是机器执行处理操作的系统时间。最简单的时间语义涉及时间。
”
事件时间事件时间是指每个事件在相应设备上发生并因此生成数据的时间。时间一旦生成数据就自然确定,因此可以将其作为属性嵌入到数据中。这实际上是该数据记录的“时间戳”。使用事件时间语义,时间的测量不依赖于机器的系统时间,而是依赖于数据本身。由于分布式系统中网络传输延迟的不确定性,实际应用中必须面对的数据流常常表现不正确。在这种情况下,不能简单地使用数据的时间戳作为时钟,必须使用另一个标志。
表示事件时间进展,在Flink中把它叫作事件时间的“水位线”(Watermarks)。10.1.2 哪种时间语义更重要从《星球大战》说起为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例子:电影《星球大战》。如图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。 数据处理系统中的时间语义在计算机系统中,考虑数据处理的“时代变化”是没什么意义的,我们更关心的,显然是数据本身产生的时间。 所以在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。 在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从1.12版本开始,Flink已经将事件时间作为了默认的时间语义。 10.2 水位线(Watermark)10.2.1 事件时间和窗口在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。 这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。而一般实时流处理的场景中,事件时间可以基本与处理时间保持同步,只是略微有一点延迟,同时保证了窗口计算的正确性。 10.2.2 什么是水位线在Flink中,用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。 具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。 有序流中的水位线在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;也就是说,它们处理的过程会保持原先的顺序不变,遵守先来后到的原则。这样的话我们从每个数据中提取时间戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推进。 实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳,如图6-6所示。所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。 乱序流中的水位线在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的“乱序数据”。 这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。如图6-7所示,一个7秒时产生的数据,生成时间自然要比9秒的数据早;但是经过数据缓存和传输之后,处理任务可能先收到了9秒的数据,之后7秒的数据才姗姗来迟。这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢? 解决思路也很简单:我们还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。不过现在的情况是数据乱序,所以插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线,如图6-8所示。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。 如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线,如下图所示。 但是这样做会带来一个非常大的问题:我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳,如下图所示。这样的话,9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,这时迟到数据也都已收集齐,0~9秒的窗口就可以正确计算结果了。 水位线的特性现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。 总结一下水位线的特性: 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据水位线主要的内容是一个时间戳,用来表示当前事件时间的进展水位线是基于数据的时间戳生成的水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进水位线可以通过设置延迟,来保证正确处理乱序数据一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t, 这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。10.2.3 如何生成水位线水位线是用来保证窗口处理结果的正确性的,如果不能正确处理所有乱序数据,可以尝试调大延迟的时间。 生成水位线的总体原则完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。 如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。 如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。 所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。 水位线生成策略(Watermark Strategies)在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间: public SingleOutputStreamOperator assignTimestampsAndWatermarks(WatermarkStrategy watermarkStrategy)具体使用时,直接用DataStream调用该方法即可,与普通的transform方法完全一样。 DataStream stream = env.addSource(new ClickSource());DataStream withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(**);.assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy中包含了一个“时间戳分配器” TimestampAssigner和一个“水位线生成器” WatermarkGenerator。 publicinterface WatermarkStrategy extends TimestampAssignerSupplier, WatermarkGeneratorSupplier{ @Override TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context); @Override WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);}TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator接口中,主要又有两个方法:onEvent()和onPeriodicEmit()。onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作onPeriodicEmit:周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。env.getConfig().setAutoWatermarkInterval(60 * 1000L);Flink内置水位线生成器WatermarkStrategy这个接口是一个生成水位线策略的抽象,而Flink提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。 这两个生成器可以通过调用WatermarkStrategy的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。 有序流对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。 stream.assignTimestampsAndWatermarks( WatermarkStrategy.forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }));上面代码中我们调用.withTimestampAssigner()方法,将数据中的timestamp字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是我们处理计算的事件时间。 这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。 乱序流由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。 代码示例如下: publicclass WatermarkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env .addSource(new ClickSource()) // 插入水位线的逻辑 .assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为5sWatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner() { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ) .print(); env.execute(); }}上面代码中,我们同样提取了timestamp字段作为时间戳,并且以5秒的延迟时间创建了处理乱序流的水位线生成器。 自定义水位线策略一般来说,Flink内置的水位线生成器就可以满足应用需求了。不过有时我们的业务逻辑可能非常复杂,这时对水位线生成的逻辑也有更高的要求,就必须自定义实现水位线策略WatermarkStrategy了。 在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于WatermarkGenerator的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。 WatermarkGenerator接口中有两个方法,onEvent()和onPeriodicEmit(),前者是在每个事件到来时调用,而后者由框架周期性调用。周期性调用的方法中发出水位线,自然就是周期性生成水位线;而在事件触发的方法中发出水位线,自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上。 周期性水位线生成器(Periodic Generator)周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。 下面是一段自定义周期性生成水位线的代码: import com.atguigu.bean.Event;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; // 自定义水位线的产生publicclass CustomPeriodicWatermarkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env .addSource(new ClickSource()) .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()) .print(); env.execute(); } publicstaticclass CustomWatermarkStrategy implements WatermarkStrategy { @Override public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) { returnnew SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段 } }; } @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { returnnew CustomBoundedOutOfOrdernessGenerator(); } } publicstaticclass CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator { private Long delayTime = 5000L; // 延迟时间 private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳 @Override public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) { // 每来一条数据就调用一次 maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳 } @Override public void onPeriodicEmit(WatermarkOutput output) { // 发射水位线,默认200ms调用一次 output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); } }}我们在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。 断点式水位线生成器(Punctuated Generator)断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过onPeriodicEmit()发出水位线。 自定义的断点式水位线生成器代码如下: publicclass PunctuatedGenerator implements WatermarkGenerator { @Override public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的itemId时,才发出水位线 if (r.user.equals("Mary")) { output.emitWatermark(new Watermark(r.timestamp - 1)); } } @Override public void onPeriodicEmit(WatermarkOutput output) { // 不需要做任何事情,因为我们在onEvent方法中发射了水位线 }}我们在onEvent()中判断当前事件的user字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。 在自定义数据源中发送水位线我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下: import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.watermark.Watermark;import java.sql.Timestamp;import java.util.Calendar;import java.util.Random; publicclass EmitWatermarkInSourceFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.addSource(new ClickSource()).print(); env.execute(); } // 泛型是数据源中的类型 publicstaticclass ClickSource implements SourceFunction { privateboolean running = true; @Override public void run(SourceContext sourceContext) throws Exception { Random random = new Random(); String[] userArr = {"Mary", "Bob", "Alice"}; String[] urlArr = {"./home", "./cart", "./prod id=1"}; while (running) { long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳 String username = userArr[random.nextInt(userArr.length)]; String url = urlArr[random.nextInt(urlArr.length)]; Event event = new Event(username, url, currTs); // 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段 sourceContext.collectWithTimestamp(event, event.timestamp); // 发送水位线 sourceContext.emitWatermark(new Watermark(event.timestamp - 1L)); Thread.sleep(1000L); } } @Override public void cancel() { running = false; } }}在自定义水位线中生成水位线相比assignTimestampsAndWatermarks方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写Flink的测试程序,测试Flink的各种各样的特性。 10.2.4 水位线的传递在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收刀多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。 如图所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下: (1)上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线”(Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。(2)当有一个新的水位线(第一分区的4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的3,于是当前任务时钟就推进到了3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。(3)再次收到新的水位线(第二分区的7)后,执行同样的处理流程。首先将第二个分区时钟更新为7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。(4)同样道理,当又一次收到新的水位线(第三分区的6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的4,所以当前任务的时钟推进到4,并发出时间戳为4的水位线,广播到下游各个分区任务。水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完天地劫幽城再临归真4-5攻略:第四章归真4-5八回合图文通关教学[多图],天地劫幽城再临归真4-5怎么样八回合内通
2024-03-30