Flink问题总结

  • 多线程提交多个flinkJob
  • flink从kafka接收脏数据后一直restarting
  • springboot项目日志与flink日志冲突

  • 如果代码只是flinkJob代码,要保证创建多个线程的这个线程(main)在代码执行结束前完成flinkJob的提交(main线程sleep 10s),否则main线程退出,提交flinkJob的线程还没有完成提交那么这些线程就会退出。在main线程退出后,这些子线程也会退出(为什么会退出?)

  • 如果是异步提交,即使main线程退出,子线程也会执行完再退出,在flink集群上提交没问题,在本地同样可能失败,也需要在job提交之间加上间隔500ms
  • springboot的applicationRunner多线程提交没有问题

flink从kafka读取数据后做了一个map,也增加了异常保护,但只要是脏数据就会使flinkJob重启

原因:map方法所在的类没有实现Serializable接口,在并行度不是1的情况下需要传输算子

2022-04-13 20:39:57 Task.java WARN Source: Custom Source -> Map -> (Flat Map, Flat Map) (2/8) (9d545157586a67d28091360912033fc3) switched from RUNNING to FAILED.

java.lang.UnsupportedOperationException
        at java.base/java.util.ImmutableCollections.uoe(ImmutableCollections.java:71)
        at java.base/java.util.ImmutableCollections$AbstractImmutableCollection.add(ImmutableCollections.java:75)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
        at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)

删除logback依赖或者flink的slf4j-log4j12

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.slf4j.impl.Log4jLoggerFactory loaded from file:/opt/flink-1.10.1/lib/slf4j-log4j12-1.7.15.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.slf4j.impl.Log4jLoggerFactory
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.slf4j.impl.Log4jLoggerFactory loaded from file:/opt/flink-1.10.1/lib/slf4j-log4j12-1.7.15.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml: org.slf4j.impl.Log4jLoggerFactory
        at org.springframework.util.Assert.instanceCheckFailed(Assert.java:702)

Original: https://www.cnblogs.com/bingmous/p/16142391.html
Author: Bingmous
Title: Flink问题总结

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/565467/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球