案例来源 https://github.com/apache/flink-training/blob/release-1.14/hourly-tips/README_zh.md
案例介绍
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:db76afe4-74a3-4d5a-b575-40b2f23e4486
[En]
[TencentCloudSDKException] code:FailedOperation.ServiceIsolate message:service is stopped due to arrears, please recharge your account in Tencent Cloud requestId:329a31d8-acff-4380-87d3-a2d735fc7499
结果输出:
每小时产生一个 HourlyTip对象 记录的数据流。 这个记录应包含该小时结束时的时间戳、 该小时内获得小费最多的司机的 driverId 以及他的实际小费总数。
public class HourlyTip {
/**
* 小时结束时的时间戳
*/
private Long eventTime;
/**
* 司机id driverId
*/
private Long driverId;
/**
* 该小时获得的小费总数
*/
private Float tips;
}
核心代码
// 初始化环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义出租车-车费数据源
KafkaSource<taxifare> fareSource = KafkaSource.<taxifare>builder()
.setBootstrapServers("192.168.0.192:9092")
.setTopics("TOPIC_FARE")
.setGroupId("TEST_GROUP")
.setClientIdPrefix("fare") // 避免kafka clientId重复
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TaxiFareDeserialization())
.build();
DataStreamSource<taxifare> fareStream = env.fromSource(fareSource, WatermarkStrategy.<taxifare>forMonotonousTimestamps().withTimestampAssigner((fare, t) -> fare.getStartTime()), "fare source");
// 按司机分组,对每小时内的数据进行统计,求出每个司机每小时的总小费
SingleOutputStreamOperator<hourlytip> hourlyTipsStream = fareStream.keyBy(TaxiFare::getDriverId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.process(new AddTipsFunction());
/**
* window和windowAll的区别
*
* keyBy后数据分流,window是把不同的key分开聚合成窗口
* 而windowAll是把所有的key都聚合起来,所以windowAll的并行度只能为1,而window可以有多个并行度
*
*/
// 把所有key汇总起来,找出每个小时总小费最多的司机
SingleOutputStreamOperator<hourlytip> hourlyMaxStream = hourlyTipsStream.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).max("tips");
hourlyMaxStream.addSink(new PrintSinkFunction<>());
env.execute("Hourly Tips");
</hourlytip></hourlytip></taxifare></taxifare></taxifare></taxifare>
完整代码
https://github.com/Mr-LuXiaoHua/study-flink
代码入口 com.example.datastream.hourlytips.HourlyTipsJob
Original: https://www.cnblogs.com/luxh/p/16434821.html
Author: CN.programmer.Luxh
Title: Flink-出租车-基于 DataStream API 计算每小时赚取最多小费的司机
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/562525/
转载文章受原作者版权保护。转载请注明原作者出处!