基于Python实现环形队列高效定时器
定时器Python实现代码
import time import redis import multiprocessing class Base: """ redis配置 """ redis_conf = {} """ 环形队列应用redis进行存储 """ _ri = None """ 定时器轮盘大小 """ slot_num = 15 """ 存储环形队列应用的redis缓存key """ cache_key = 'wheel:slot_' def __init__(self, **kwargs): for k in kwargs: if hasattr(self, k): setattr(self, k, kwargs[k]) self._ri = redis.Redis(**self.redis_conf) class Timer(Base): """ 以后slot的下标 """ _current = 0 """ 事件处理 """ event_handler = None def worker(self): """ # TODO 测试每个卡槽有1W事件ID的解决效率 独立过程,散发事件id给事件处理器 :return: """ key = self.cache_key + str(self._current) # 获取以后卡槽中须要触发的事件ID event_ids = self._ri.zrangebyscore(key, 0, 0) # 删除以后卡槽中须要触发的事件ID self._ri.zremrangebyscore(key, 0, 0) # 把以后卡槽剩下的事件ID全副遍历进去,缩小一次残余循环次数 surplus_event_ids = self._ri.zrange(key, 0, -1) for mid in surplus_event_ids: self._ri.zincrby(key, mid, -1) # 把事件ID转交给handler解决 for mid in event_ids: self.event_handler(eid=mid) exit(0) def run(self): """ 启动过程 :return: """ while True: p = multiprocessing.Process(target=self.worker) p.start() time.sleep(1) self._current = int(time.time()) % self.slot_num class TimerEvent(Base): def add(self, event_id, emit_time): """ 增加事件ID到定时器 :param event_id: 事件ID :param emit_time: 触发工夫 :return: """ current_time = int(time.time()) diff = emit_time - current_time if diff > 0: # 计算循环次数 cycle = int(diff / self.slot_num) # 计算要存入的slot的索引 index = (diff % self.slot_num + current_time % self.slot_num) % self.slot_num res = self._ri.zadd(self.cache_key + str(index), str(event_id), cycle) return True if res else False return False # TODO 批量增加同一时间,不同事件ID # TODO 批量增加不同工夫,不同事件ID
通过环形队列实现高效工作触发的设计说明
- redis汇合【slot】
- 以redis多个有法则的键名的有序汇合组成环形数组
key_1 key_2 .... key_n
- 有序汇合
命令
ZADD key score member
有序汇合中蕴含两局部, 一个是score, 一个是member score作为残余循环次数 meber作为事件ID
- python多过程
- 计算以后工夫应该解决的卡槽
以后slot索引 = (以后工夫 % 卡槽总数 + 以后工夫戳 % 卡槽总数) % 卡槽总数
“%”为取余数操作
- 创立独立子过程解决
以后子过程须要疾速读取的残余循环次数为0事件ID
删除以后slot已取出的事件ID
开始把事件ID顺次转交给事件handler解决
利用阐明
- 启动定时器
import Timer import time def event_handler(eid): print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), eid) t = Timer(redis_conf={ 'host': '127.0.0.1', 'port': 6379, 'password': '123456', 'db': 0 }, event_handler=event_handler) times = int(time.time()) print('Current Time is ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(times))) t.run()
- 增加须要延时触发事件ID
import TimerEvent import time te = TimerEvent(redis_conf={ 'host': '127.0.0.1', 'port': 6379, 'password': '123456', 'db': 0 }) times = int(time.time()) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(times))) after_seconds_alert = 20 for x in range(100): te.add(x, times + after_seconds_alert + x) print('Firs Emit will happened at ' + time.strftime( 'Start:%Y-%m-%d %H:%M:%S', time.localtime(times + after_seconds_alert)) )
参考文章 10w定时工作,如何高效触发超时