Flink-使用流批一体API统计单词数量

The DataStream API gets its name from the special DataStreamclass that is used to represent a collection of data in a Flink program. You can think of them as immutable collections of data that can contain duplicates. This data can either be finite or unbounded, the API that you use to work on them is the same.

DataStream API 支持不同的运行时执行模式,你可以根据你的用例需要和作业特点进行选择。

DataStream API 有一种”经典”的执行行为,我们称之为流(STREAMING)执行模式。这种模式适用于需要连续增量处理,而且预计无限期保持在线的无边界作业。

此外,还有一种批式执行模式,我们称之为批(BATCH)执行模式。这种执行作业的方式更容易让人联想到批处理框架,比如 MapReduce。这种执行模式适用于有一个已知的固定输入,而且不会连续运行的有边界作业。

Apache Flink 对流处理和批处理统一方法,意味着无论配置何种执行模式,在有界输入上执行的 DataStream 应用都会产生相同的最终 结果。重要的是要注意最终 在这里是什么意思:一个在流模式执行的作业可能会产生增量更新(想想数据库中的插入(upsert)操作),而批作业只在最后产生一个最终结果。尽管计算方法不同,只要呈现方式得当,最终结果会是相同的。

通过启用批执行,我们允许 Flink 应用只有在我们知道输入是有边界的时侯才会使用到的额外的优化。例如,可以使用不同的关联(join)/ 聚合(aggregation)策略,允许实现更高效的任务调度和故障恢复行为的不同 shuffle。下面我们将介绍一些执行行为的细节。

批执行模式只能用于 有边界 的作业/Flink 程序。边界是数据源的一个属性,告诉我们在执行前,来自该数据源的所有输入是否都是已知的,或者是否会有新的数据出现,可能是无限的。而对一个作业来说,如果它的所有源都是有边界的,则它就是有边界的,否则就是无边界的。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:ca8eee0b-c6d5-43af-8be9-3f11b022bc2b

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:08a46723-cc73-4b6b-829b-27f9b2a83b9e

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:08b06f6e-dac1-40e2-9d4b-6255924340ba

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:23c01bd4-f5a5-4e4d-91e7-7922e8defc5a

一个明显的例外是当你想使用一个有边界作业去自展一些作业状态,并将状态使用在之后的无边界作业的时候。例如,通过流模式运行一个有边界作业,取一个 savepoint,然后在一个无边界作业上恢复这个 savepoint。这是一个非常特殊的用例,当我们允许将 savepoint 作为批执行作业的附加输出时,这个用例可能很快就会过时。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:aa1b4214-e12e-426e-a4c3-d18cc2312d8b

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:07219dea-c91f-4b5c-96b8-1ebb94e422c8

执行模式可以通过 execute.runtime-mode 设置来配置。有三种可选的值:

  • STREAMING: 经典 DataStream 执行模式(默认)
  • BATCH: 在 DataStream API 上进行批量式执行
  • AUTOMATIC: 让系统根据数据源的边界性来决定

这可以通过 bin/flink run … 的命令行参数进行配置,或者在创建/配置 StreamExecutionEnvironment 时写进程序。

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:b487e05e-3cf1-4440-8781-0f74993a0841

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:bceb7e1e-d732-46e1-b951-d351b1a77b8b

$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:b0f41061-766a-45b4-9e18-2f64085e9b85

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:f08b1cb7-b38a-482b-819f-061518d4cb24

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

我们不建议用户在程序中设置运行模式,而是在提交应用程序时使用命令行进行设置。保持应用程序代码的免配置可以让程序更加灵活,因为同一个应用程序可能在任何执行模式下执行。

        ParameterTool parameterFromArgs = ParameterTool.fromArgs(args);
        String input = parameterFromArgs.getRequired("input");

        // 初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        // 加载数据源
        DataStreamSource<string> wordSource = env.readTextFile(input, "UTF-8");

        // &#x6570;&#x636E;&#x8F6C;&#x6362;
        SingleOutputStreamOperator<word> wordStreamOperator = wordSource.flatMap(new TokenizerFunction());

        // &#x6309;&#x5355;&#x8BCD;&#x5206;&#x7EC4;
        KeyedStream<word, string> wordKeyedStream = wordStreamOperator.keyBy(new KeySelector<word, string>() {
            @Override
            public String getKey(Word word) throws Exception {
                return word.getWord();
            }
        });

        // &#x6C42;&#x548C;
        SingleOutputStreamOperator<word> sumStream = wordKeyedStream.sum("frequency");

        sumStream.print();

        env.execute("WordCountBatch");
</word></word,></word,></word></string>
 // &#x521D;&#x59CB;&#x5316;&#x73AF;&#x5883;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // &#x5B9A;&#x4E49;kafka&#x6570;&#x636E;&#x6E90;
        KafkaSource<string> source = KafkaSource.<string>builder()
                .setBootstrapServers("192.168.0.192:9092")
                .setTopics("TOPIC_WORD")
                .setGroupId("TEST_GROUP")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        // &#x52A0;&#x8F7D;&#x6570;&#x636E;&#x6E90;
        DataStreamSource<string> kafkaWordSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Word Source");

        // &#x6570;&#x636E;&#x8F6C;&#x6362;
        SingleOutputStreamOperator<word> wordStreamOperator = kafkaWordSource.flatMap(new TokenizerFunction());

        // &#x6309;&#x5355;&#x8BCD;&#x5206;&#x7EC4;
        KeyedStream<word, string> wordKeyedStream = wordStreamOperator.keyBy(new KeySelector<word, string>() {
            @Override
            public String getKey(Word word) throws Exception {
                return word.getWord();
            }
        });

        // &#x6C42;&#x548C;
        SingleOutputStreamOperator<word> sumStream = wordKeyedStream.sum("frequency");
        sumStream.print();

        env.execute("WordCountStream");
</word></word,></word,></word></string></string></string>
com.example.datastream.wordcount.DataStreamApiWordCountBatch  --&#x4ECE;&#x6587;&#x4EF6;&#x8BFB;&#x53D6;&#x6570;&#x636E;&#x8FDB;&#x884C;&#x5355;&#x8BCD;&#x7EDF;&#x8BA1;
com.example.datastream.wordcount.DataStreamApiWordCountStream --&#x4ECE;Kafka&#x6D88;&#x8D39;&#x6570;&#x636E;&#x8FDB;&#x884C;&#x5355;&#x8BCD;&#x7EDF;&#x8BA1;

&#x63D0;&#x4EA4;&#x5230;flink&#x96C6;&#x7FA4;&#x6267;&#x884C;&#xFF1A;
bin/flink run -m 127.0.0.1:8081 -c com.example.datastream.wordcount.DataStreamApiWordCountBatch -input /mnt/data/words.txt /opt/apps/study-flink-1.0.jar
-input &#x6307;&#x5B9A;&#x8F93;&#x5165;&#x6587;&#x4EF6;&#x8DEF;&#x5F84;

Original: https://www.cnblogs.com/luxh/p/16424238.html
Author: CN.programmer.Luxh
Title: Flink-使用流批一体API统计单词数量

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/562527/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球