Flink-出租车-基于 DataStream API 计算每小时赚取最多小费的司机

案例来源 https://github.com/apache/flink-training/blob/release-1.14/hourly-tips/README_zh.md

案例介绍

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:db76afe4-74a3-4d5a-b575-40b2f23e4486

[En]

[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:329a31d8-acff-4380-87d3-a2d735fc7499

结果输出:
每小时产生一个 HourlyTip对象 记录的数据流。 这个记录应包含该小时结束时的时间戳、 该小时内获得小费最多的司机的 driverId 以及他的实际小费总数。

public class HourlyTip {

    /**
     * 小时结束时的时间戳
     */
    private Long eventTime;

    /**
     * 司机id driverId
     */
    private Long driverId;

    /**
     * 该小时获得的小费总数
     */
    private Float tips;

}

核心代码

       // 初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 定义出租车-车费数据源
        KafkaSource<taxifare> fareSource = KafkaSource.<taxifare>builder()
                .setBootstrapServers("192.168.0.192:9092")
                .setTopics("TOPIC_FARE")
                .setGroupId("TEST_GROUP")
                .setClientIdPrefix("fare") // &#x907F;&#x514D;kafka clientId&#x91CD;&#x590D;
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new TaxiFareDeserialization())
                .build();

        DataStreamSource<taxifare> fareStream = env.fromSource(fareSource, WatermarkStrategy.<taxifare>forMonotonousTimestamps().withTimestampAssigner((fare, t) -> fare.getStartTime()), "fare source");

        // &#x6309;&#x53F8;&#x673A;&#x5206;&#x7EC4;&#xFF0C;&#x5BF9;&#x6BCF;&#x5C0F;&#x65F6;&#x5185;&#x7684;&#x6570;&#x636E;&#x8FDB;&#x884C;&#x7EDF;&#x8BA1;&#xFF0C;&#x6C42;&#x51FA;&#x6BCF;&#x4E2A;&#x53F8;&#x673A;&#x6BCF;&#x5C0F;&#x65F6;&#x7684;&#x603B;&#x5C0F;&#x8D39;
        SingleOutputStreamOperator<hourlytip> hourlyTipsStream = fareStream.keyBy(TaxiFare::getDriverId)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .process(new AddTipsFunction());

        /**
         *  window&#x548C;windowAll&#x7684;&#x533A;&#x522B;
         *
         *  keyBy&#x540E;&#x6570;&#x636E;&#x5206;&#x6D41;&#xFF0C;window&#x662F;&#x628A;&#x4E0D;&#x540C;&#x7684;key&#x5206;&#x5F00;&#x805A;&#x5408;&#x6210;&#x7A97;&#x53E3;
         *      &#x800C;windowAll&#x662F;&#x628A;&#x6240;&#x6709;&#x7684;key&#x90FD;&#x805A;&#x5408;&#x8D77;&#x6765;&#xFF0C;&#x6240;&#x4EE5;windowAll&#x7684;&#x5E76;&#x884C;&#x5EA6;&#x53EA;&#x80FD;&#x4E3A;1&#xFF0C;&#x800C;window&#x53EF;&#x4EE5;&#x6709;&#x591A;&#x4E2A;&#x5E76;&#x884C;&#x5EA6;
         *
         */

        // &#x628A;&#x6240;&#x6709;key&#x6C47;&#x603B;&#x8D77;&#x6765;&#xFF0C;&#x627E;&#x51FA;&#x6BCF;&#x4E2A;&#x5C0F;&#x65F6;&#x603B;&#x5C0F;&#x8D39;&#x6700;&#x591A;&#x7684;&#x53F8;&#x673A;
        SingleOutputStreamOperator<hourlytip> hourlyMaxStream = hourlyTipsStream.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).max("tips");

        hourlyMaxStream.addSink(new PrintSinkFunction<>());

        env.execute("Hourly Tips");
</hourlytip></hourlytip></taxifare></taxifare></taxifare></taxifare>

完整代码

https://github.com/Mr-LuXiaoHua/study-flink

&#x4EE3;&#x7801;&#x5165;&#x53E3;  com.example.datastream.hourlytips.HourlyTipsJob

Original: https://www.cnblogs.com/luxh/p/16434821.html
Author: CN.programmer.Luxh
Title: Flink-出租车-基于 DataStream API 计算每小时赚取最多小费的司机

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/562525/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球