• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

Python 使用生成器代替线程的方法

python 搞代码 4年前 (2022-01-08) 16次浏览 已收录 0个评论

这篇文章主要介绍了Python 使用生成器代替线程的方法,文中讲解非常细致,代码帮助大家更好的理解和学习,感兴趣的朋友可以了解下

问题

你想使用生成器(协程)替代系统线程来实现并发。这个有时又被称为用户级线程或绿色线程。

解决方案

要使用生成器实现自己的并发,你首先要对生成器函数和 yield 语句有深刻理解。 yield 语句会让一个生成器挂起它的执行,这样就可以编写一个调度器, 将生成器当做某种“任务”并使用任务协作切换来替换它们的执行。 要演示这种思想,考虑下面两个使用简单的 yield 语句的生成器函数:

 # Two simple generator functions def countdown(n): while n > 0: print('T-minus', n) yield n -= 1 print('Blastoff!') def countup(n): x = 0 while x <n: print('counting up', x) yield x +=1

这些函数在内部使用yield语句,下面是一个实现了简单任务调度器的代码:

 from collections import deque class TaskScheduler: def __init__(self): self._task_queue = deque() def new_task(self, task): ''' Admit a newly started task to the scheduler ''' self._task_queue.append(task) def run(self): ''' Run until there are no more tasks ''' while self._task_queue: task = self._task_queue.popleft() try: # Run until the next yield statement next(task) self._task_queue.append(task) except StopIteration: # Generator is no longer executing pass # Example use sched = TaskScheduler() sched.new_task(countdown(10)) sched.new_task(countdown(5)) sched.new_task(countup(15)) sched.run()

TaskScheduler 类在一个循环中运行生成器集合――每个都运行到碰到yield语句为止。 运行这个例子,输出如下:

T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2

到此为止,我们实际上已经实现了一个“操作系统”的最小核心部分。来源gao.dai.ma.com搞@代*码网 生成器函数就是任务,而yield语句是任务挂起的信号。 调度器循环检查任务列表直到没有任务要执行为止。

实际上,你可能想要使用生成器来实现简单的并发。 那么,在实现actor或网络服务器的时候你可以使用生成器来替代线程的使用。

下面的代码演示了使用生成器来实现一个不依赖线程的actor:

 from collections import deque class ActorScheduler: def __init__(self): self._actors = {}     # Mapping of names to actors self._msg_queue = deque()  # Message queue def new_actor(self, name, actor): ''' Admit a newly started actor to the scheduler and give it a name ''' self._msg_queue.append((actor,None)) self._actors[name] = actor def send(self, name, msg): ''' Send a message to a named actor ''' actor = self._actors.get(name) if actor: self._msg_queue.append((actor,msg)) def run(self): ''' Run as long as there are pending messages. ''' while self._msg_queue: actor, msg = self._msg_queue.popleft() try: actor.send(msg) except StopIteration: pass # Example use if __name__ == '__main__': def printer(): while True: msg = yield print('Got:', msg) def counter(sched): while True: # Receive the current count n = yield if n == 0: break # Send to the printer task sched.send('printer', n) # Send the next count to the counter task (recursive) sched.send('counter', n-1) sched = ActorScheduler() # Create the initial actors sched.new_actor('printer', printer()) sched.new_actor('counter', counter(sched)) # Send an initial message to the counter to initiate sched.send('counter', 10000) sched.run()

完全弄懂这段代码需要更深入的学习,但是关键点在于收集消息的队列。 本质上,调度器在有需要发送的消息时会一直运行着。 计数生成器会给自己发送消息并在一个递归循环中结束。

下面是一个更加高级的例子,演示了使用生成器来实现一个并发网络应用程序:

 from collections import deque from select import select # This class represents a generic yield event in the scheduler class YieldEvent: def handle_yield(self, sched, task): pass def handle_resume(self, sched, task): pass # Task Scheduler class Scheduler: def __init__(self): self._numtasks = 0    # Total num of tasks self._ready = deque()  # Tasks ready to run self._read_waiting = {} # Tasks waiting to read self._write_waiting = {} # Tasks waiting to write # Poll for I/O events and restart waiting tasks def _iopoll(self): rset,wset,eset = select(self._read_waiting, self._write_waiting,[]) for r in rset: evt, task = self._read_waiting.pop(r) evt.handle_resume(self, task) for w in wset: evt, task = self._write_waiting.pop(w) evt.handle_resume(self, task) def new(self,task): ''' Add a newly started task to the scheduler ''' self._ready.append((task, None)) self._numtasks += 1 def add_ready(self, task, msg=None): ''' Append an already started task to the ready queue. msg is what to send into the task when it resumes. ''' self._ready.append((task, msg)) # Add a task to the reading set def _read_wait(self, fileno, evt, task): self._read_waiting[fileno] = (evt, task) # Add a task to the write set def _write_wait(self, fileno, evt, task): self._write_waiting[fileno] = (evt, task) def run(self): ''' Run the task scheduler until there are no tasks ''' while self._numtasks: if not self._ready: self._iopoll() task, msg = self._ready.popleft() try: # Run the coroutine to the next yield r = task.send(msg) if isinstance(r, YieldEvent): r.handle_yield(self, task) else: raise RuntimeError('unrecognized yield event') except StopIteration: self._numtasks -= 1 # Example implementation of coroutine-based socket I/O class ReadSocket(YieldEvent): def __init__(self, sock, nbytes): self.sock = sock self.nbytes = nbytes def handle_yield(self, sched, task): sched.

以上就是Python 使用生成器代替线程的方法的详细内容,更多请关注gaodaima搞代码网其它相关文章!


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:Python 使用生成器代替线程的方法

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址