KafkaController 需要处理各种各样的事件,事件统一投递到队列里面,由一个线程进行消费。
// kafka.controller.ControllerEventManager
private val putLock = new ReentrantLock()
// 存放事件的队列
private val queue = new LinkedBlockingQueue[ControllerEvent]
// 消费事件的线程
private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
// 使用 putLock 加锁,这其实是一个多余的动作,LinkedBlockingQueue put 方法内部本身就有锁
def put(event: ControllerEvent): Unit = inLock(putLock) {
queue.put(event)
}
// kafka.controller.ControllerEventManager.ControllerEventThread#doWork
// 线程执行的 run 方法内部逻辑
override def doWork(): Unit = {
// 从队列中取出事件
queue.take() match {
case KafkaController.ShutdownEventThread => initiateShutdown()
case controllerEvent =>
_state = controllerEvent.state
try {
rateAndTimeMetrics(state).time {
// 执行事件逻辑
controllerEvent.process()
}
} catch {
case e: Throwable => error(s"Error processing event $controllerEvent", e)
}
try eventProcessedListener(controllerEvent)
catch {
case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e)
}
_state = ControllerState.Idle
}
}
这本来是一个很简单的逻辑,但是 inLock 这个地方有点故事。最近在面试的时候,有被问到”在使用 ReentrantLock 的时候,如何保证 lock 和 unlock 成对?”
我在 kafka 中看到了 scala 的解法:
def put(event: ControllerEvent): Unit = inLock(putLock) {
queue.put(event)
}
/**
* Execute the given function inside the lock
*/
def inLock[T](lock: Lock)(fun: => T): T = {
lock.lock()
try {
fun
} finally {
lock.unlock()
}
}
那么 java 如何做呢?使用 Function 和 Consumer 试了半天,结果发现用 Runnable 最合适
LinkedBlockingQueue queue = new LinkedBlockingQueue<>();
ReentrantLock outLock = new ReentrantLock();
void inLock(ReentrantLock lock, Runnable fun) {
lock.lock();
try {
fun.run();
} finally {
lock.unlock();
}
}
public void put(Object msg) {
inLock(outLock, () -> {
try {
queue.put(msg);
} catch (InterruptedException e) {
}
});
}
Original: https://www.cnblogs.com/allenwas3/p/13176269.html
Author: 偶尔发呆
Title: kafka controller 处理事件
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/534017/
转载文章受原作者版权保护。转载请注明原作者出处!