posts - 210, comments - 61, trackbacks - 0, articles - 0
   :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

Python实现的一个消息总线+计划任务

Posted on 2020-01-21 14:28 魔のkyo 阅读(63) 评论(0)  编辑 收藏 引用 所属分类: Python

import threading
import time
import queue
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class ScheduledItem:
    time_sec: float
    cb: Any = field(compare=False)

class Driver:
    def __init__(self):
        self.scheduled = queue.PriorityQueue()
        self.scheduled_every_lock = threading.Lock()
        self.scheduled_every = []
        self.callbacks_lock = threading.Lock()
        self.callbacks = {}
        self.async_queue = queue.Queue()
        self.epoch_sec = time.time()

    # 得到driver内的当前时间秒数
    def get_epoch(self):
        return self.epoch_sec

    # 执行一趟主逻辑,一般放在主循环中执行
    def run(self):
        self.epoch_sec = time.time()
        self._do_async()
        self._do_schedule()
        self._do_schedule_every()

    # 计划单次定时任务
    def schedule(self, cb, time_sec):
        self.scheduled.put_nowait( ScheduledItem(time_sec, cb) )

    # 计划重复任务
    def schedule_every(self, cb, interval_sec):
        self.scheduled_every_lock.acquire()
        self.scheduled_every.append( { "next_sec":self.epoch_sec+interval_sec, "interval":interval_sec, "cb":cb } )
        self.scheduled_every_lock.release()

    # 增加消息接收者
    def add_receiver(self, topic_or_type, cb):
        self.callbacks_lock.acquire()
        if topic_or_type not in self.callbacks:
            self.callbacks[topic_or_type] = set()
        self.callbacks[topic_or_type].add(cb)
        self.callbacks_lock.release()
        return cb

    # 删除消息接收者
    def remove_receiver(self, topic_or_type, cb):
        self.callbacks_lock.acquire()
        if topic_or_type in self.callbacks:
            if cb in self.callbacks[topic_or_type]:
                self.callbacks[topic_or_type].remove(cb)
        self.callbacks_lock.release()

    # 同步发送消息
    def send(self, obj, topic=None):
        if topic == None:
            topic = type(obj)
        cbs = []
        self.callbacks_lock.acquire()
        if topic in self.callbacks.keys():
            cbs = list(self.callbacks[topic])
        self.callbacks_lock.release()
        for cb in cbs:
            cb(obj)

    # 异步发送消息
    def send_async(self, obj, topic=None):
        self.async_queue.put_nowait( (obj, topic) )

    def _do_async(self):
        while not self.async_queue.empty():
            self.send(*self.async_queue.get_nowait())

    def _do_schedule(self):
        i = 0
        while not self.scheduled.empty():
            item = self.scheduled.get_nowait()
            if item.time_sec > self.epoch_sec:
                self.scheduled.put_nowait(item)
                break
            item.cb(self.epoch_sec)

    def _do_schedule_every(self):
        cbs = []
        self.scheduled_every_lock.acquire()
        for o in self.scheduled_every:
            while self.epoch_sec >= o["next_sec"]:
                cbs.append(o["cb"])
                o["next_sec"] += o["interval"]
        self.scheduled_every_lock.release()
        for cb in cbs:
            cb(self.epoch_sec)

def bind(func, *args, **kw):
    return lambda *_args, **_kw: func(*args, *_args, **kw, **_kw)


if __name__ == '__main__':
    class Receiver:
        def onMsg(self, msg):
            print(f"Receiver.onMsg {msg}")
    receiver = Receiver()

    driver = Driver()
    driver.add_receiver("mytopic", bind(Receiver.onMsg, receiver), )
    driver.schedule_every(
        lambda _: driver.send("Hello""mytopic"),
        interval_sec=1)

    while True:
        driver.run()
        time.sleep(0.049)
只有注册用户登录后才能发表评论。