kafka controller 处理事件

KafkaController 需要处理各种各样的事件,事件统一投递到队列里面,由一个线程进行消费。

// kafka.controller.ControllerEventManager

  private val putLock = new ReentrantLock()
  // 存放事件的队列
  private val queue = new LinkedBlockingQueue[ControllerEvent]
  // 消费事件的线程
  private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)

  // 使用 putLock 加锁,这其实是一个多余的动作,LinkedBlockingQueue put 方法内部本身就有锁
  def put(event: ControllerEvent): Unit = inLock(putLock) {
    queue.put(event)
  }

  // kafka.controller.ControllerEventManager.ControllerEventThread#doWork
  // 线程执行的 run 方法内部逻辑
    override def doWork(): Unit = {
      // 从队列中取出事件
      queue.take() match {
        case KafkaController.ShutdownEventThread => initiateShutdown()
        case controllerEvent =>
          _state = controllerEvent.state

          try {
            rateAndTimeMetrics(state).time {
              // 执行事件逻辑
              controllerEvent.process()
            }
          } catch {
            case e: Throwable => error(s"Error processing event $controllerEvent", e)
          }

          try eventProcessedListener(controllerEvent)
          catch {
            case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e)
          }

          _state = ControllerState.Idle
      }
    }

这本来是一个很简单的逻辑,但是 inLock 这个地方有点故事。最近在面试的时候,有被问到”在使用 ReentrantLock 的时候,如何保证 lock 和 unlock 成对?”

我在 kafka 中看到了 scala 的解法:

def put(event: ControllerEvent): Unit = inLock(putLock) {
    queue.put(event)
  }

  /**
   * Execute the given function inside the lock
   */
  def inLock[T](lock: Lock)(fun: => T): T = {
    lock.lock()
    try {
      fun
    } finally {
      lock.unlock()
    }
  }

那么 java 如何做呢?使用 Function 和 Consumer 试了半天,结果发现用 Runnable 最合适

LinkedBlockingQueue queue = new LinkedBlockingQueue<>();
    ReentrantLock outLock = new ReentrantLock();

    void inLock(ReentrantLock lock, Runnable fun) {
        lock.lock();
        try {
            fun.run();
        } finally {
            lock.unlock();
        }
    }

    public void put(Object msg) {
        inLock(outLock, () -> {
            try {
                queue.put(msg);
            } catch (InterruptedException e) {
            }
        });
    }

Original: https://www.cnblogs.com/allenwas3/p/13176269.html
Author: 偶尔发呆
Title: kafka controller 处理事件

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/534017/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球