Flink的异步算子的原理及使用

1、简介

Flink的特点是高吞吐低延迟。但是Flink中的某环节的数据处理逻辑需要和外部系统交互,调用耗时不可控会显著降低集群性能。这时候就可能需要使用异步算子让耗时操作不需要等待结果返回就可以继续下面的耗时操作。

2、本章可以了解到啥

  • 异步算子源码分析
  • 异步算子为啥能够保证有序性
  • flinksql中怎么自定义使用异步lookup join

3、异步算子的测试代码

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 网上copy的模拟一个耗时的异步操作
 */
public class AsyncIODemo implements Serializable {

    private final ExecutorService executorService = Executors.newFixedThreadPool(4);

    public CompletableFuture<string> pullData(final String source) {

        CompletableFuture<string> completableFuture = new CompletableFuture<>();

        executorService.submit(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /**
             * &#x524D;&#x9762;&#x7761;&#x7720;&#x51E0;&#x79D2;&#x540E;&#xFF0C;&#x8C03;&#x7528;&#x4E00;&#x4E0B;&#x5B8C;&#x6210;&#x65B9;&#x6CD5;&#xFF0C;&#x62FC;&#x63A5;&#x4E00;&#x4E2A;&#x7ED3;&#x679C;&#x5B57;&#x7B26;&#x4E32;
             */
            completableFuture.complete("Output value: " + source);
        });

        return completableFuture;
    }
}</string></string>
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * &#x7F51;&#x4E0A;copy&#x7684;&#x4EE3;&#x7801;
 */
public class AsyncTest {

    public static void main(String[] args) throws Exception {
        /**
         * &#x83B7;&#x53D6;Flink&#x6267;&#x884C;&#x73AF;&#x5883;&#x5E76;&#x8BBE;&#x7F6E;&#x5E76;&#x884C;&#x5EA6;&#x4E3A;1&#xFF0C;&#x65B9;&#x4FBF;&#x540E;&#x9762;&#x89C2;&#x6D4B;
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        /**
         * &#x6784;&#x9020;&#x4E00;&#x4E2A;DataStreamSource&#x7684;&#x5E8F;&#x5217;
         */
        DataStreamSource stream = env.fromElements("11", "22", "33", "44");

        /**
         * &#x4F7F;&#x7528;AsyncDataStream&#x6784;&#x9020;&#x4E00;&#x4E2A;&#x5F02;&#x6B65;&#x987A;&#x5E8F;&#x6D41;&#xFF0C;&#x8FD9;&#x91CC;&#x5F02;&#x6B65;&#x987A;&#x5E8F;&#x6D41;&#x4ECE;&#x540D;&#x5B57;&#x5C31;&#x53EF;&#x4EE5;&#x770B;&#x51FA;&#x6765;&#x867D;&#x7136;&#x662F;&#x5F02;&#x6B65;&#x7684;&#xFF0C;&#x4F46;&#x662F;&#x5374;&#x53EF;&#x4EE5;&#x4FDD;&#x6301;&#x987A;&#x5E8F;&#xFF0C;
         * &#x8FD9;&#x4E2A;&#x540E;&#x9762;&#x6E90;&#x7801;&#x5206;&#x6790;&#x53EF;&#x4EE5;&#x77E5;&#x9053;&#x539F;&#x56E0;
         */
        SingleOutputStreamOperator asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction<string, string>() {
            @Override
            public void asyncInvoke(String input, ResultFuture<string> resultFuture) throws Exception {
                /**
                 * &#x8FD9;&#x91CC;&#x8C03;&#x7528;&#x6A21;&#x62DF;&#x7684;&#x83B7;&#x53D6;&#x5F02;&#x6B65;&#x8BF7;&#x6C42;&#x7ED3;&#x679C;&#xFF0C;&#x5E76;&#x8FD4;&#x56DE;&#x4E00;&#x4E2A;CompletableFuture
                 */
                CompletableFuture<string> future = new AsyncIODemo().pullData(input);
                /**
                 * &#x6CE8;&#x518C;&#x4E00;&#x4E2A;future&#x5904;&#x7406;&#x5B8C;&#x6210;&#x7684;&#x56DE;&#x8C03;&#xFF0C;&#x5F53;future&#x5904;&#x7406;&#x5B8C;&#x6210;&#x62FF;&#x5230;&#x7ED3;&#x679C;&#x540E;&#xFF0C;&#x8C03;&#x7528;resultFuture&#x7684;
                 * complete&#x65B9;&#x6CD5;&#x771F;&#x6B63;&#x5410;&#x51FA;&#x6570;&#x636E;
                 */
                future.whenCompleteAsync((d,t) ->{
                    resultFuture.complete(Arrays.asList(d));
                });
            }
            // &#x8BBE;&#x7F6E;&#x6700;&#x957F;&#x5F02;&#x6B65;&#x8C03;&#x7528;&#x8D85;&#x65F6;&#x65F6;&#x95F4;&#x4E3A;10&#x79D2;
        }, 10, TimeUnit.SECONDS);
        asyncStream.print();
        env.execute();
    }
}</string></string></string,>

4、异步算子源码分析

4.1、AsyncDataStream

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;

import java.util.concurrent.TimeUnit;

/**
 * &#x7528;&#x4E8E;&#x5C06;AsyncFunction&#x5E94;&#x7528;&#x5230;&#x6570;&#x636E;&#x6D41;&#x7684;&#x4E00;&#x4E2A;helper&#x7C7B;
 *
 * <pre>{@code
 * DataStream<string> input = ...

 * AsyncFunction<string, tuple<string, string>> asyncFunc = ...

 *
 * AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100);
 * }</string,></string></pre>
 */
@PublicEvolving
public class AsyncDataStream {

    /** &#x5F02;&#x6B65;&#x64CD;&#x4F5C;&#x7684;&#x8F93;&#x51FA;&#x6A21;&#x5F0F;&#xFF0C;&#x6709;&#x5E8F;&#x6216;&#x8005;&#x65E0;&#x5E8F;. */
    public enum OutputMode {
        ORDERED,
        UNORDERED
    }

    private static final int DEFAULT_QUEUE_CAPACITY = 100;

    /**
     * flag_2&#xFF0C;&#x6DFB;&#x52A0;&#x4E00;&#x4E2A;AsyncWaitOperator.

     *
     * @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.

     * @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.

     * @param timeout for the asynchronous operation to complete
     * @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.

     * @param mode Processing mode for {@link AsyncWaitOperator}.

     * @param <in> Input type.

     * @param <out> Output type.

     * @return A new {@link SingleOutputStreamOperator}
     */
    private static <in, out> SingleOutputStreamOperator<out> addOperator(
            DataStream<in> in,
            AsyncFunction<in, out> func,
            long timeout,
            int bufSize,
            OutputMode mode) {

        TypeInformation<out> outTypeInfo =
                TypeExtractor.getUnaryOperatorReturnType(
                        func,
                        AsyncFunction.class,
                        0,
                        1,
                        new int[] {1, 0},
                        in.getType(),
                        Utils.getCallLocationName(),
                        true);

        /**
            &#x8FD9;&#x91CC;&#x751F;&#x6210;&#x4E86;&#x4E00;&#x4E2A;AsyncWaitOperatorFactory
        */
        AsyncWaitOperatorFactory<in, out> operatorFactory =
                new AsyncWaitOperatorFactory<>(
                        in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);

        return in.transform("async wait operator", outTypeInfo, operatorFactory);
    }

    /**
     * &#x6DFB;&#x52A0;&#x4E00;&#x4E2A;AsyncWaitOperator&#x3002;&#x8F93;&#x51FA;&#x6D41;&#x65E0;&#x987A;&#x5E8F;&#x3002;
     *
     * @param in Input {@link DataStream}
     * @param func {@link AsyncFunction}
     * @param timeout for the asynchronous operation to complete
     * @param timeUnit of the given timeout
     * @param capacity The max number of async i/o operation that can be triggered
     * @param <in> Type of input record
     * @param <out> Type of output record
     * @return A new {@link SingleOutputStreamOperator}.

     */
    public static <in, out> SingleOutputStreamOperator<out> unorderedWait(
            DataStream<in> in,
            AsyncFunction<in, out> func,
            long timeout,
            TimeUnit timeUnit,
            int capacity) {
        return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
    }

    /**
     * &#x6DFB;&#x52A0;&#x4E00;&#x4E2A;AsyncWaitOperator&#x3002;&#x8F93;&#x51FA;&#x6D41;&#x65E0;&#x987A;&#x5E8F;&#x3002;
     * @param in Input {@link DataStream}
     * @param func {@link AsyncFunction}
     * @param timeout for the asynchronous operation to complete
     * @param timeUnit of the given timeout
     * @param <in> Type of input record
     * @param <out> Type of output record
     * @return A new {@link SingleOutputStreamOperator}.

     */
    public static <in, out> SingleOutputStreamOperator<out> unorderedWait(
            DataStream<in> in, AsyncFunction<in, out> func, long timeout, TimeUnit timeUnit) {
        return addOperator(
                in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.UNORDERED);
    }

    /**
     * flag_1&#xFF0C;&#x6DFB;&#x52A0;&#x4E00;&#x4E2A;AsyncWaitOperator&#x3002;&#x5904;&#x7406;&#x8F93;&#x5165;&#x8BB0;&#x5F55;&#x7684;&#x987A;&#x5E8F;&#x4FDD;&#x8BC1;&#x4E0E;&#x8F93;&#x5165;&#x8BB0;&#x5F55;&#x7684;&#x987A;&#x5E8F;&#x76F8;&#x540C;
     *
     * @param in Input {@link DataStream}
     * @param func {@link AsyncFunction}
     * @param timeout for the asynchronous operation to complete
     * @param timeUnit of the given timeout
     * @param capacity The max number of async i/o operation that can be triggered
     * @param <in> Type of input record
     * @param <out> Type of output record
     * @return A new {@link SingleOutputStreamOperator}.

     */
    public static <in, out> SingleOutputStreamOperator<out> orderedWait(
            DataStream<in> in,
            AsyncFunction<in, out> func,
            long timeout,
            TimeUnit timeUnit,
            int capacity) {
        return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
    }

    /**
     * &#x6DFB;&#x52A0;&#x4E00;&#x4E2A;AsyncWaitOperator&#x3002;&#x5904;&#x7406;&#x8F93;&#x5165;&#x8BB0;&#x5F55;&#x7684;&#x987A;&#x5E8F;&#x4FDD;&#x8BC1;&#x4E0E;&#x8F93;&#x5165;&#x8BB0;&#x5F55;&#x7684;&#x987A;&#x5E8F;&#x76F8;&#x540C;
     * @param in Input {@link DataStream}
     * @param func {@link AsyncFunction}
     * @param timeout for the asynchronous operation to complete
     * @param timeUnit of the given timeout
     * @param <in> Type of input record
     * @param <out> Type of output record
     * @return A new {@link SingleOutputStreamOperator}.

     */
    public static <in, out> SingleOutputStreamOperator<out> orderedWait(
            DataStream<in> in, AsyncFunction<in, out> func, long timeout, TimeUnit timeUnit) {

        return addOperator(
                in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);
    }
}</in,></in></out></in,></out></in></in,></in></out></in,></out></in></in,></in></out></in,></out></in></in,></in></out></in,></out></in></in,></out></in,></in></out></in,></out></in>

如上从测试代码开始调用链为:AsyncDataStream.orderedWait -> addOperator,然后addOperator中new了一个AsyncWaitOperatorFactory。然后到这里其实可以告一段落了,因为没有必要往下看了,这个时候就需要猜了,一般我们类名叫XXFactory基本都是工厂模式,然后工厂生产的就是XX了,这里就是生成AsyncWaitOperator对象的工厂了,然后我们就可以直接在AsyncWaitOperator类的构造方法第一行打个断点,看看啥时候会进去了。为啥要这样做,因为我们看到的Flink源码其实并不是一个线性的执行过程,架构图如下

Flink的异步算子的原理及使用

他的代码实际上并不是都在一个节点执行,虽然我们在本地调试,但是也是在模拟的一个本地集群中执行,怎么模拟出不同的节点呢,很明显要通过线程,也就是说不同的节点用不同的线程来代表并执行。所以我们无脑断点是没法看到全貌的。看代码的一个技巧,根据各方面的经验猜测,比如这里就是根据类名的特点进行猜测。

4.2、AsyncWaitOperator

我们在AsyncWaitOperator类的所有公共方法和构造方法里打个断点,debug运行程序进入调试

Flink的异步算子的原理及使用

很明显这个构造方法,在一个独立的sink线程中运行,如果还按照上面的方式断点,估计找一辈子都找不到了

public AsyncWaitOperator(
            @Nonnull AsyncFunction<in, out> asyncFunction,
            long timeout,
            int capacity,
            @Nonnull AsyncDataStream.OutputMode outputMode,
            @Nonnull ProcessingTimeService processingTimeService,
            @Nonnull MailboxExecutor mailboxExecutor) {
    super(asyncFunction);

    setChainingStrategy(ChainingStrategy.ALWAYS);

    Preconditions.checkArgument(
            capacity > 0, "The number of concurrent async operation should be greater than 0.");
    this.capacity = capacity;

    this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

    this.timeout = timeout;

    this.processingTimeService = Preconditions.checkNotNull(processingTimeService);

    this.mailboxExecutor = mailboxExecutor;
}</in,>

我们看一下构造方法的内容,发现都是一些初始化操作,看着没啥营养,看代码的另外一个技巧:抓大放小,路边的野花不要理睬,忽略一些不重要的初始化和参数校验等代码,重点关注大的流程的东西。

我们继续直接放开往下运行,直到下一个断点

Flink的异步算子的原理及使用
@Override
    public void setup(
            StreamTask<?, ?> containingTask,
            StreamConfig config,
            Output<streamrecord<out>> output) {
        super.setup(containingTask, config, output);

        this.inStreamElementSerializer =
                new StreamElementSerializer<>(
                        getOperatorConfig().<in>getTypeSerializerIn1(getUserCodeClassloader()));

        switch (outputMode) {
            case ORDERED:
                queue = new OrderedStreamElementQueue<>(capacity);
                break;
            case UNORDERED:
                queue = new UnorderedStreamElementQueue<>(capacity);
                break;
            default:
                throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
        }

        this.timestampedCollector = new TimestampedCollector<>(super.output);
    }</in></streamrecord<out>

一眼望去就发现下面switch case那里比较有用,根据名字可以知道,这里根据outputMode判断分别实例化有序的队列和无需的队列,联想到AsyncDataStream类里的几个orderedWait和unorderedWait方法,很快就能想到是否有序这个队列就是关键了。好了没什么可以留恋了,继续执行到下一个断点吧!

Flink的异步算子的原理及使用

初始化状态,没啥可留恋的,先跳过继续到下一个断点

Flink的异步算子的原理及使用
@Override
    public void open() throws Exception {
        super.open();

        this.isObjectReuseEnabled = getExecutionConfig().isObjectReuseEnabled();

        if (recoveredStreamElements != null) {
            for (StreamElement element : recoveredStreamElements.get()) {
                if (element.isRecord()) {
                    processElement(element.<in>asRecord());
                } else if (element.isWatermark()) {
                    processWatermark(element.asWatermark());
                } else if (element.isLatencyMarker()) {
                    processLatencyMarker(element.asLatencyMarker());
                } else {
                    throw new IllegalStateException(
                            "Unknown record type "
                                    + element.getClass()
                                    + " encountered while opening the operator.");
                }
            }
            recoveredStreamElements = null;
        }
    }</in>

如上从7行开始貌似是开始处理数据了,但是根据recoveredStreamElements这个名称一看,很显然recovered是恢复的意思,这里判断是否为空,不为空再做,很明显是做修复数据相关的逻辑,我们处理数据的正主都没找到这里很明显没啥意义,属于路边的野花,直接忽略到下一个断点去。

Flink的异步算子的原理及使用
@Override
public void processElement(StreamRecord<in> record) throws Exception {
    StreamRecord<in> element;
    // copy the element avoid the element is reused
    if (isObjectReuseEnabled) {
        //noinspection unchecked
        element = (StreamRecord<in>) inStreamElementSerializer.copy(record);
    } else {
        element = record;
    }

    // add element first to the queue
    final ResultFuture<out> entry = addToWorkQueue(element);

    final ResultHandler resultHandler = new ResultHandler(element, entry);

    // register a timeout for the entry if timeout is configured
    if (timeout > 0L) {
        resultHandler.registerTimeout(getProcessingTimeService(), timeout);
    }

    userFunction.asyncInvoke(element.getValue(), resultHandler);
}</out></in></in></in>

很明显根据方法名称可以知道这里就是在处理真正的数据了,反复断点几次,可以发现,每一条数据都会进来这个方法一次

Flink的异步算子的原理及使用

这个方法的参数就是source流里的一个元素,下面我们再看一下addToWorkQueue方法吧

/**
    &#x5C06;&#x7ED9;&#x5B9A;&#x7684;&#x6D41;&#x5143;&#x7D20;&#x6DFB;&#x52A0;&#x5230;&#x64CD;&#x4F5C;&#x7B26;&#x7684;&#x6D41;&#x5143;&#x7D20;&#x961F;&#x5217;&#x4E2D;&#x3002;&#x8BE5;&#x64CD;&#x4F5C;&#x4F1A;&#x963B;&#x585E;&#xFF0C;&#x76F4;&#x5230;&#x5143;&#x7D20;&#x88AB;&#x6DFB;&#x52A0;&#x3002;
*/
private ResultFuture<out> addToWorkQueue(StreamElement streamElement)
            throws InterruptedException {

    Optional<resultfuture<out>> queueEntry;
    while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {
        mailboxExecutor.yield();
    }

    return queueEntry.get();
}</resultfuture<out></out>

这个方法就是将前面source里的元素放入前面new出来的队列,本例这里是一个有序的队列OrderedStreamElementQueue,并返回了一个ResultFuture对象,我们需要看一下这个对象是个啥

4.3、ResultFuture

@PublicEvolving
public interface ResultFuture<out> {
    /**
     * &#x5C06;&#x6240;&#x6709;&#x7ED3;&#x679C;&#x653E;&#x5728;Collection&#x4E2D;&#xFF0C;&#x7136;&#x540E;&#x8F93;&#x51FA;&#x3002;
     */
    void complete(Collection<out> result);

    /**
     * &#x5C06;&#x5F02;&#x5E38;&#x8F93;&#x51FA;
     */
    void completeExceptionally(Throwable error);
}</out></out>

我们再来看下tryPut是如何包装出了一个ResultFuture对象的

4.4、OrderedStreamElementQueue

@Internal
public final class OrderedStreamElementQueue<out> implements StreamElementQueue<out> {

    private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);

    /** Capacity of this queue. */
    private final int capacity;

    /** Queue for the inserted StreamElementQueueEntries. */
    private final Queue<streamelementqueueentry<out>> queue;

    public OrderedStreamElementQueue(int capacity) {
        Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");

        this.capacity = capacity;
        this.queue = new ArrayDeque<>(capacity);
    }

    @Override
    public boolean hasCompletedElements() {
        return !queue.isEmpty() && queue.peek().isDone();
    }

    @Override
    public void emitCompletedElement(TimestampedCollector<out> output) {
        if (hasCompletedElements()) {
            final StreamElementQueueEntry<out> head = queue.poll();
            head.emitResult(output);
        }
    }

    @Override
    public List<streamelement> values() {
        List<streamelement> list = new ArrayList<>(this.queue.size());
        for (StreamElementQueueEntry e : queue) {
            list.add(e.getInputElement());
        }
        return list;
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public Optional<resultfuture<out>> tryPut(StreamElement streamElement) {
        if (queue.size() < capacity) {
            StreamElementQueueEntry<out> queueEntry = createEntry(streamElement);

            queue.add(queueEntry);

            LOG.debug(
                    "Put element into ordered stream element queue. New filling degree "
                            + "({}/{}).",
                    queue.size(),
                    capacity);

            return Optional.of(queueEntry);
        } else {
            LOG.debug(
                    "Failed to put element into ordered stream element queue because it "
                            + "was full ({}/{}).",
                    queue.size(),
                    capacity);

            return Optional.empty();
        }
    }

    private StreamElementQueueEntry<out> createEntry(StreamElement streamElement) {
        if (streamElement.isRecord()) {
            return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
        }
        if (streamElement.isWatermark()) {
            return new WatermarkQueueEntry<>((Watermark) streamElement);
        }
        throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
    }
}</out></out></resultfuture<out></streamelement></streamelement></out></out></streamelementqueueentry<out></out></out>

我们重点关注一下52行以下的部分,可以看到new了一个StreamElementQueueEntry对象放入了queue队列中,那就需要看一下StreamRecordQueueEntry类了

4.5、StreamRecordQueueEntry

@Internal
class StreamRecordQueueEntry<out> implements StreamElementQueueEntry<out> {
    @Nonnull private final StreamRecord<?> inputRecord;

    private Collection<out> completedElements;

    StreamRecordQueueEntry(StreamRecord<?> inputRecord) {
        this.inputRecord = Preconditions.checkNotNull(inputRecord);
    }

    @Override
    public boolean isDone() {
        return completedElements != null;
    }

    @Nonnull
    @Override
    public StreamRecord<?> getInputElement() {
        return inputRecord;
    }

    @Override
    public void emitResult(TimestampedCollector<out> output) {
        output.setTimestamp(inputRecord);
        for (OUT r : completedElements) {
            output.collect(r);
        }
    }

    @Override
    public void complete(Collection<out> result) {
        this.completedElements = Preconditions.checkNotNull(result);
    }
}</out></out></out></out></out>

如上之后,现在已经可以有一个大概的认识了,就是随着程序的运行,一个个的数据被封装成了StreamRecordQueueEntry对象,并阻塞的放入了OrderedStreamElementQueue队列中了,这个队列中的StreamRecordQueueEntry对象有一些方法标志性的方法,如:isDone,根据名字就可以知道是否完成的意思;emitResult方法如果写过flink程序的人一看到output.collect(r)这种代码就知道是向下游发出数据的方法;complete方法字母意思就是一个完成动作方法,内容就是把传入的数据判空后赋给了成员变量completedElements。

我们继续回到processElement方法的主线上来,

// &#x9996;&#x5148;&#x5C06;&#x5143;&#x7D20;&#x6DFB;&#x52A0;&#x5230;&#x961F;&#x5217;&#x4E2D;
final ResultFuture<out> entry = addToWorkQueue(element);
final ResultHandler resultHandler = new ResultHandler(element, entry);

// &#x5982;&#x679C;&#x914D;&#x7F6E;&#x4E86;timeout&#xFF0C;&#x5219;&#x4E3A;&#x6761;&#x76EE;&#x6CE8;&#x518C;&#x4E00;&#x4E2A;&#x8D85;&#x65F6;&#xFF0C;&#x8FD9;&#x91CC;&#x7684;timeout&#x4E5F;&#x5C31;&#x662F;&#x6D4B;&#x8BD5;&#x4EE3;&#x7801;&#x91CC;&#x7684;10s
if (timeout > 0L) {
    resultHandler.registerTimeout(getProcessingTimeService(), timeout);
}

userFunction.asyncInvoke(element.getValue(), resultHandler);</out>

关注上面的最后一行,执行了asyncInvoke方法,也就回到了测试代码里覆写的asyncInvoke方法里了

/**
 * &#x4F7F;&#x7528;AsyncDataStream&#x6784;&#x9020;&#x4E00;&#x4E2A;&#x5F02;&#x6B65;&#x987A;&#x5E8F;&#x6D41;&#xFF0C;&#x8FD9;&#x91CC;&#x5F02;&#x6B65;&#x987A;&#x5E8F;&#x6D41;&#x4ECE;&#x540D;&#x5B57;&#x5C31;&#x53EF;&#x4EE5;&#x770B;&#x51FA;&#x6765;&#x867D;&#x7136;&#x662F;&#x5F02;&#x6B65;&#x7684;&#xFF0C;&#x4F46;&#x662F;&#x5374;&#x53EF;&#x4EE5;&#x4FDD;&#x6301;&#x987A;&#x5E8F;&#xFF0C;
 * &#x8FD9;&#x4E2A;&#x540E;&#x9762;&#x6E90;&#x7801;&#x5206;&#x6790;&#x53EF;&#x4EE5;&#x77E5;&#x9053;&#x539F;&#x56E0;
 */
SingleOutputStreamOperator asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction<string, string>() {
    @Override
    public void asyncInvoke(String input, ResultFuture<string> resultFuture) throws Exception {
        /**
         * &#x8FD9;&#x91CC;&#x8C03;&#x7528;&#x6A21;&#x62DF;&#x7684;&#x83B7;&#x53D6;&#x5F02;&#x6B65;&#x8BF7;&#x6C42;&#x7ED3;&#x679C;&#xFF0C;&#x5E76;&#x8FD4;&#x56DE;&#x4E00;&#x4E2A;CompletableFuture
         */
        CompletableFuture<string> future = new AsyncIODemo().pullData(input);
        /**
         * &#x6CE8;&#x518C;&#x4E00;&#x4E2A;future&#x5904;&#x7406;&#x5B8C;&#x6210;&#x7684;&#x56DE;&#x8C03;&#xFF0C;&#x5F53;future&#x5904;&#x7406;&#x5B8C;&#x6210;&#x62FF;&#x5230;&#x7ED3;&#x679C;&#x540E;&#xFF0C;&#x8C03;&#x7528;resultFuture&#x7684;
         * complete&#x65B9;&#x6CD5;&#x771F;&#x6B63;&#x5410;&#x51FA;&#x6570;&#x636E;
         */
        future.whenCompleteAsync((d,t) ->{
            resultFuture.complete(Arrays.asList(d));
        });
    }
    // &#x8BBE;&#x7F6E;&#x6700;&#x957F;&#x5F02;&#x6B65;&#x8C03;&#x7528;&#x8D85;&#x65F6;&#x65F6;&#x95F4;&#x4E3A;10&#x79D2;
}, 10, TimeUnit.SECONDS);</string></string></string,>

这时候我们可以打个断点到如上测试代码的17行上,然后执行进入方法,可以看到实际上回到了org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.ResultHandler这个内部类里的complete方法

private void outputCompletedElement() {
    /**
        &#x5224;&#x65AD;&#x8FD9;&#x4E2A;OrderedStreamElementQueue&#x961F;&#x5217;&#x6709;&#x6CA1;&#x6709;&#x5B8C;&#x6210;&#x4E86;&#x7684;&#x5143;&#x7D20;&#xFF0C;&#x53C2;&#x89C1;&#x4E0A;&#x9762;&#x4EE3;&#x7801;
        @Override
        public boolean hasCompletedElements() {
            return !queue.isEmpty() && queue.peek().isDone();
        }
        &#x5176;&#x5B9E;&#x5C31;&#x662F;&#x67E5;&#x770B;&#x4E86;&#x4E00;&#x4E0B;&#x961F;&#x5217;&#x5934;&#x7684;&#x5143;&#x7D20;StreamRecordQueueEntry&#xFF0C;&#x8C03;&#x7528;&#x4E86;&#x4E00;&#x4E0B;isDone&#x65B9;&#x6CD5;
        @Override
        public boolean isDone() {
            return completedElements != null;
        }
        &#x5C31;&#x662F;&#x5224;&#x65AD;&#x6210;&#x5458;&#x53D8;&#x91CF;&#x662F;&#x4E0D;&#x662F;&#x7A7A;&#xFF0C;&#x56E0;&#x4E3A;&#x4E0A;&#x4E00;&#x6B65;&#x5DF2;&#x7ECF;&#x8D4B;&#x503C;&#x4E86;&#xFF0C;&#x6240;&#x4EE5;&#x8FD9;&#x91CC;isDone&#x5C31;&#x8FD4;&#x56DE;true&#x4E86;

    */
    if (queue.hasCompletedElements()) {
        /**
            &#x8C03;&#x7528;&#x4E86;&#x4E00;&#x4E0B;OrderedStreamElementQueue&#x961F;&#x5217;&#x7684;emitCompletedElement&#x65B9;&#x6CD5;&#xFF0C;

            @Override
            public void emitCompletedElement(TimestampedCollector<out> output) {
                if (hasCompletedElements()) {
                    final StreamElementQueueEntry<out> head = queue.poll();
                    head.emitResult(output);
                }
            }
            &#x79FB;&#x9664;&#x961F;&#x5217;&#x7684;&#x5934;&#x5143;&#x7D20;StreamElementQueueEntry&#xFF0C;&#x5E76;&#x8C03;&#x7528;&#x5176;emitResult&#x65B9;&#x6CD5;
            @Override
            public void emitResult(TimestampedCollector<out> output) {
                output.setTimestamp(inputRecord);
                for (OUT r : completedElements) {
                    output.collect(r);
                }
            }
            &#x8FD9;&#x91CC;&#x5C31;&#x662F;&#x771F;&#x6B63;&#x7684;&#x5FAA;&#x73AF;&#x8C03;&#x7528;collect&#x628A;&#x6570;&#x636E;&#x5410;&#x51FA;&#x5230;&#x4E0B;&#x6E38;&#x53BB;&#x4E86;
        */
        queue.emitCompletedElement(timestampedCollector);
        // if there are more completed elements, emit them with subsequent mails
        if (queue.hasCompletedElements()) {
            try {
                mailboxExecutor.execute(
                        this::outputCompletedElement,
                        "AsyncWaitOperator#outputCompletedElement");
            } catch (RejectedExecutionException mailboxClosedException) {
                // This exception can only happen if the operator is cancelled which means all
                // pending records can be safely ignored since they will be processed one more
                // time after recovery.

                LOG.debug(
                        "Attempt to complete element is ignored since the mailbox rejected the execution.",
                        mailboxClosedException);
            }
        }
    }
}

/** A handler for the results of a specific input record. */
private class ResultHandler implements ResultFuture<out> {
    /** Optional timeout timer used to signal the timeout to the AsyncFunction. */
    private ScheduledFuture<?> timeoutTimer;
    /** Record for which this result handler exists. Used only to report errors. */
    private final StreamRecord<in> inputRecord;
    /**
     * The handle received from the queue to update the entry. Should only be used to inject the
     * result; exceptions are handled here.

     */
    private final ResultFuture<out> resultFuture;
    /**
     * A guard against ill-written AsyncFunction. Additional (parallel) invokations of {@link
     * #complete(Collection)} or {@link #completeExceptionally(Throwable)} will be ignored. This
     * guard also helps for cases where proper results and timeouts happen at the same time.

     */
    private final AtomicBoolean completed = new AtomicBoolean(false);

    ResultHandler(StreamRecord<in> inputRecord, ResultFuture<out> resultFuture) {
        this.inputRecord = inputRecord;
        this.resultFuture = resultFuture;
    }

    @Override
    public void complete(Collection<out> results) {
        Preconditions.checkNotNull(
                results, "Results must not be null, use empty collection to emit nothing");

        // cas&#x4FEE;&#x6539;&#x4E00;&#x4E0B;completed&#x7684;&#x72B6;&#x6001;&#xFF0C;&#x4E0D;&#x6210;&#x529F;&#x5C31;&#x8FD4;&#x56DE;
        if (!completed.compareAndSet(false, true)) {
            return;
        }

        processInMailbox(results);
    }

    private void processInMailbox(Collection<out> results) {
        // move further processing into the mailbox thread
        mailboxExecutor.execute(
                () -> processResults(results),
                "Result in AsyncWaitOperator of input %s",
                results);
    }

    private void processResults(Collection<out> results) {
        /**
            &#x5982;&#x679C;&#x8D85;&#x65F6;&#x7684;Timer&#x5BF9;&#x8C61;&#x4E0D;&#x4E3A;&#x7A7A;&#xFF0C;&#x5219;&#x5C06;&#x5B9A;&#x65F6;&#x4EFB;&#x52A1;&#x53D6;&#x6D88;&#x6389;&#xFF0C;&#x56E0;&#x4E3A;&#x8FD9;&#x91CC;&#x5DF2;&#x7ECF;&#x662F;&#x5728;&#x5B8C;&#x6210;&#x65B9;&#x6CD5;&#x91CC;&#x8C03;&#x7528;&#x4E86;,
            &#x6570;&#x636E;&#x90FD;&#x5B8C;&#x6210;&#x5904;&#x7406;&#x4E86;&#xFF0C;&#x8FD9;&#x4E2A;&#x6570;&#x636E;&#x7684;&#x8D85;&#x65F6;&#x4EFB;&#x52A1;&#x5C31;&#x53EF;&#x4EE5;&#x53D6;&#x6D88;&#x4E86;
        */
        if (timeoutTimer != null) {
            // canceling in mailbox thread avoids
            // https://issues.apache.org/jira/browse/FLINK-13635
            timeoutTimer.cancel(true);
        }

        /**
            &#x8FD9;&#x91CC;&#x8C03;&#x7528;&#x4E86;&#x4E00;&#x4E0B;StreamRecordQueueEntry&#x7684;complete&#x65B9;&#x6CD5;&#x5C06;&#x6210;&#x5458;&#x53D8;&#x91CF;completedElements
            &#x8D4B;&#x503C;&#x4E86;&#xFF0C;&#x53EF;&#x4EE5;&#x53C2;&#x89C1;&#x4E0A;&#x9762;StreamRecordQueueEntry&#x7C7B;
        */
        resultFuture.complete(results);
        // &#x8FD9;&#x91CC;&#x770B;&#x4E0A;&#x9762;&#x7B2C;1&#x884C;&#x4EE3;&#x7801;
        outputCompletedElement();
    }

}</out></out></out></out></in></out></in></out></out></out></out>

我们可以从上面的ResultHandler类的complete方法开始看,具体可以参见上面注释,总结起来就是如下几步

  1. 取消当前ResultHandler对象的超时定时任务
  2. 调用StreamRecordQueueEntry的complete方法将成员变量completedElements赋值
  3. 判断OrderedStreamElementQueue队列的队头元素StreamRecordQueueEntry的completedElements成员变量是不是不为空
  4. 第3步不为空,则调用OrderedStreamElementQueue队列的emitCompletedElement方法移除队列的头元素StreamElementQueueEntry并调用emitResult方法真正向下游吐出数据

从上面可以看出每次随着completableFuture的complete方法的调用,都会判断队头的元素是否处理完,处理完就移除队头元素并向吐出数据。所以异步算子每次来数据经过processElement方法,就已经将数据元素封装成StreamElementQueueEntry对象并放到了队列中,虽然异步算子执行过程是异步,每个元素的完成时间没有顺序,但是由于每个元素完成后,都是判断的队头元素有没有完成,完成后也是移除队头并向下游吐数据。所以整体过程还是按照processElement处理顺序也就是上游给过来的数据顺序严格有序的。

5、flinksql自定义AsyncLookupFunction

通常flinksql使用外部的数据源都需要引入一个flinksql-connector-xx这种jar包,比如我们想以kafka为流表join一个redis的维表,那么这时候查询redis的维表,通常使用的就是lookup join。但是网上提供的例子基本都是同步的lookup join,在有些场景下为了提高吞吐就需要使用异步的lookup join。详细实现可以直接看代码:https://gitee.com/rongdi/flinksql-connector-redis

Original: https://www.cnblogs.com/rongdi/p/16789730.html
Author: 码小D
Title: Flink的异步算子的原理及使用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/800866/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球