vnpy源码阅读学习(6):事件引擎

看完了 MainEngine的代码,大概有一个了解以后。我们在初始化 MainEngine的时候,要传入或者实例化一个事件引擎对象。

代码基本结构

按照惯例,我把所有的方法体折叠,只保留类和方法,先大概对这个类要实现的功能有一个基本的了解。然后再逐个深入。

class EventEngine:
    def __init__(self, interval: int = 1):
        pass

    def _run(self):
        pass

    def _process(self, event: Event):
        pass

    def _run_timer(self):
        pass

    def start(self):
        pass

    def stop(self):
        pass

    def put(self, event: Event):
        pass

    def register(self, type: str, handler: HandlerType):
        pass

    def unregister(self, type: str, handler: HandlerType):
        pass

    def register_general(self, handler: HandlerType):
        pass

    def unregister_general(self, handler: HandlerType):
        pass

这个 EventEngine看起来要比 MainEngin有点货,至少从方法名字上很难一目了然的猜测到方法的作用。脑海中浮过VC的消息循环,或者C#的事件委托,又或者PyQt5的事件和插槽,不会是和那个机制类似的东东吧。

__init__(self, interval: int = 1)

    def __init__(self, interval: int = 1):
"""
        Timer event is generated every 1 second by default, if
        interval not specified.

"""
        self._interval = interval
        self._queue = Queue()
        self._active = False
        self._thread = Thread(target=self._run)
        self._timer = Thread(target=self._run_timer)
        self._handlers = defaultdict(list)
        self._general_handlers = []

从初始化的代码中可以看到, EventEngine用到了两个线程 Thread同时初始化了一个队列 __queue

register

    def register(self, type: str, handler: HandlerType):
"""
        Register a new handler function for a specific event type. Every
        function can only be registered once for each event type.

"""
        handler_list = self._handlers[type]
        if handler not in handler_list:
            handler_list.append(handler)

我们找到 HandlerType的定义

HandlerType = Callable[[Event], None]

基本上可以理解到注册一个可以被回调的方法,就是把这个方法的type和方法传给EventEngine然后,EventEngine会把这个回调方法保存到_handlers[type]中的数组中。

unregister

    def unregister(self, type: str, handler: HandlerType):
"""
        Unregister an existing handler function from event engine.

"""
        handler_list = self._handlers[type]

        if handler in handler_list:
            handler_list.remove(handler)

        if not handler_list:
            self._handlers.pop(type)

正好和register相反的方法,把一个事件从列表中移除。

register_general

    def register_general(self, handler: HandlerType):
"""
        Register a new handler function for all event types. Every
        function can only be registered once for each event type.

"""
        if handler not in self._general_handlers:
            self._general_handlers.append(handler)

这个方法相当于把一个不指定type的回调方法放入_general_handlers,跟他相反的方法:unregister_general

put

    def put(self, event: Event):
"""
        Put an event object into event queue.

"""
        self._queue.put(event)

这个是传入一个Event,然后把这个Event放入到一个队列中去
我们先来看看Event这个类。

class Event:
"""
    Event object consists of a type string which is used
    by event engine for distributing event, and a data
    object which contains the real data.

"""

    def __init__(self, type: str, data: Any = None):
""""""
        self.type = type
        self.data = data

从以上的代码,我们可以基本猜测在vnpy的事件引擎中,过来一个类型的数据,然后把这个数据存放到队列。然后再通过某种机制,定时的去调用那些通过regisiter进来的EventHandler。而什么类型的数据进来,给什么样的Handler回调。应该是通过Type去对应的。接下来,我们继续深入。

start和stop

    def start(self):
"""
        Start event engine to process events and generate timer events.

"""
        self._active = True
        self._thread.start()
        self._timer.start()

    def stop(self):
"""
        Stop event engine.

"""
        self._active = False
        self._timer.join()
        self._thread.join()

设置_active状态,启动或者阻塞线程。

核心部分的代码

    def _run(self):
        while self._active:
            try:
                event = self._queue.get(block=True, timeout=1)
                self._process(event)
            except Empty:
                pass

    def _process(self, event: Event):

        if event.type in self._handlers:
            [handler(event) for handler in self._handlers[event.type]]

        if self._general_handlers:
            [handler(event) for handler in self._general_handlers]

    def _run_timer(self):

        while self._active:
            sleep(self._interval)
            event = Event(EVENT_TIMER)
            self.put(event)

通过核心代码代码,我们豁然开朗。原来就是启动了两个线程,一个线程不断的循环把所有接受到的 Event取出来发送给需要它的 Handler进行回调,另外一个线程不断定时成Event,然后这个Event可以定时回调某些 Handler

总结

我总结成一个图片

vnpy源码阅读学习(6):事件引擎

Original: https://www.cnblogs.com/bbird/p/12460898.html
Author: bbird2018
Title: vnpy源码阅读学习(6):事件引擎

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/685135/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球