• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

关于python:使用Python进行并发编程

python 搞代码 3年前 (2022-02-20) 21次浏览 已收录 0个评论

让计算机程序并发的运行是一个常常被探讨的话题,明天我想讨论一下Python下的各种并发形式。

并发形式

线程([Thread])

多线程简直是每一个程序猿在应用每一种语言时都会首先想到用于解决并发的工具(JS程序员请回避),应用多线程能够无效的利用CPU资源(Python例外)。然而多线程所带来的程序的复杂度也不可避免,尤其是对竞争资源的同步问题。

然而在python中因为应用了全局解释锁(GIL)的起因,代码并不能同时在多核上并发的运行,也就是说,Python的多线程不能并发,很多人会发现应用多线程来改良本人的Python代码后,程序的运行效率却降落了,这是如许蛋疼的一件事呀!实际上应用多线程的编程模型是很艰难的,程序员很容易犯错,这并不是程序员的谬误,因为并行思维是反人类的,咱们大多数人的思维是串行(精神分裂不探讨),而且冯诺依曼设计的计算机架构也是以程序执行为根底的。所以如果你总是不能把你的多线程程序搞定,祝贺你,你是个思维失常的程序猿:)

Python提供两组线程的接口,一组是thread模块,提供根底的,低等级(Low Level)接口,应用Function作为线程的运行体。还有一组是threading模块,提供更容易应用的基于对象的接口(相似于Java),能够继承Thread对象来实现线程,还提供了其它一些线程相干的对象,例如Timer,Lock

应用thread模块的例子

import thread
def worker():
    """thread worker function"""
    print 'Worker'
thread.start_new_thread(worker)

应用threading模块的例子

import threading
def worker():
    """thread worker function"""
    print 'Worker'
t = threading.Thread(target=worker)
t.start()

或者Java Style

import threading
class worker(threading.Thread):
    def __init__(self):
        pass
    def run():
        """thread worker function"""
        print 'Worker'
    
t = worker()
t.start()

过程 (Process)

因为前文提到的全局解释锁的问题,Python下比拟好的并行形式是应用多过程,这样能够十分无效的应用CPU资源,并实现真正意义上的并发。当然,过程的开销比线程要大,也就是说如果你要创立数量惊人的并发过程的话,须要考虑一下你的机器是不是有一颗弱小的心。

Python的mutliprocess模块和threading具备相似的接口。

from multiprocessing import Process

def worker():
    """thread worker function"""
    print 'Worker'
p = Process(target=worker)
p.start()
p.join()

因为线程共享雷同的地址空间和内存,所以线程之间的通信是非常容易的,然而过程之间的通信就要简单一些了。常见的过程间通信有,管道,音讯队列,Socket接口(TCP/IP)等等。

Python的mutliprocess模块提供了封装好的管道和队列,能够不便的在过程间传递音讯。

Python过程间的同步应用锁,这一点喝线程是一样的。

另外,Python还提供了过程池Pool对象,能够不便的治理和控制线程。

近程分布式主机 (Distributed Node)

随着大数据时代的到临,摩尔定理在单机上仿佛曾经失去了成果,数据的计算和解决须要分布式的计算机网络来运行,程序并行的运行在多个主机节点上,曾经是当初的软件架构所必须思考的问题。

近程主机间的过程间通信有几种常见的形式

  • TCP/IP

    TCP/IP是所有近程通信的根底,然而API比拟低级别,应用起来比拟繁琐,所以个别不会思考

  • 近程办法调用 Remote Function Call

    [RPC]

  • 近程对象 Remote Object

    近程对象是更高级别的封装,程序能够想操作本地对象一样去操作一个近程对象在本地的代理。近程对象最广为应用的标准CORBA,CORBA最大的益处是能够在不同语言和平台中进行通信。当让不必的语言和平台还有一些各自的近程对象实现,例如Java的RMI,MS的DCOM

    Python的开源实现,有许多对近程对象的反对

  • Dopy]
  • Fnorb (CORBA)
  • ICE
  • omniORB (CORBA)
  • Pyro
  • YAMI
  • 音讯队列 Message Queue

    比起RPC或者近程对象,音讯是一种更为灵便的通信伎俩,常见的反对Python接口的音讯机制有

  • RabbitMQ
  • ZeroMQ
  • Kafka
  • AWS SQS + BOTO

在近程主机上执行并发和本地的多过程并没有十分大的差别,都须要解决过程间通信的问题。当然对近程过程的治理和协调比起本地要简单。

Python下有许多开源的框架来反对分布式的并发,提供无效的管理手段包含:

  • Celery

    Celery是一个十分成熟的Python分布式框架,能够在分布式的零碎中,异步的执行工作,并提供无效的治理和调度性能。

  • SCOOP

    SCOOP (Scalable COncurrent Operations in Python)提供简略易用的分布式调用接口,应用Future接口来进行并发。

  • Dispy

    相比起Celery和SCOOP,Dispy提供更为轻量级的分布式并行服务

  • PP

    PP (Parallel Python)是另外一个轻量级的Python并行服务

  • Asyncoro

    Asyncoro是另一个利用Generator实现分布式并发的Python框架,

当然还有许多其它的零碎,我没有一一列出

另外,许多的分布式系统多提供了对Python接口的反对,例如Spark

伪线程 (Pseudo-Thread)

还有一种并发伎俩并不常见,咱们能够称之为伪线程,就是看上去像是线程,应用的接口相似线程接口,然而理论应用非线程的形式,对应的线程开销也不存的。

  • greenlet

    greenlet提供轻量级的coroutines来反对过程内的并发。

    greenlet是Stackless的一个副产品,应用tasklet来反对一中被称之为微线程(mirco-thread)的技术,这里是一个应用greenlet的伪线程的例子

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34
    
def test2():
    print 56
    gr1.switch()
    print 78
    
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
    运行以上程序失去如下后果:  
12
56
34

伪线程gr1 switch会打印12,而后调用gr2 switch失去56,而后switch回到gr1,打印34,而后伪线程gr1完结,程序退出,所以78永远不会被打印。通过这个例子咱们能够看出,应用伪线程,咱们能够无效的控制程序的执行流程,然而伪线程并不存在真正意义上的并发。

eventlet,gevent和concurence都是基于greenlet提供并发的。

eventlet是一个提供网络调用并发的Python库,使用者能够以非阻塞的形式来调用阻塞的IO操作。

import eventlet
from eventlet.green import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def fetch(url):
    return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
    print("got body", len(body))

执行后果如下

('got body', 17629)
('got body', 1270)
('got body', 46949)

eventlet为了反对generator的操作对urllib2做了批改,接口和urllib2是统一的。这里的GreenPool和Python的Pool接口统一。

  • gevent

gevent和eventlet相似,

import gevent
from gevent import socket
urls = ['www.google.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)

print [job.value for job in jobs]

执行后果如下:

['206.169.145.226', '93.184.216.34', '23.235.39.223']

concurence是另外一个利用greenlet提供网络并发的开源库,我没有用过,大家能够本人尝试一下。

实战使用

通常须要用到并发的场合有两种,一种是计算密集型,也就是说你的程序须要大量的CPU资源;另一种是IO密集型,程序可能有大量的读写操作,包含读写文件,收发网络申请等等。

计算密集型

对应计算密集型的利用,咱们选用驰名的蒙特卡洛算法来计算PI值。基本原理如下

蒙特卡洛算法利用统计学原理来模拟计算圆周率,在一个正方形中,一个随机的点落在1/4圆的区域(红色点)的概率与其面积成正比。也就该概率 p = Pi * R*R /4 : R* R , 其中R是正方形的边长,圆的半径。也就是说该概率是圆周率的1/4, 利用这个论断,只有咱们模仿出点落在四分之一圆上的概率就能够晓得圆周率了,为了失去这个概率,咱们能够通过大量的试验,也就是生成大量的点,看看这个点在哪个区域,而后统计出后果。

根本算法如下:

from math import hypot
from random import random

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

这里test办法做了n(tries)次试验,返回落在四分之一圆中的点的个数。判断办法是查看该点到圆心的间隔,如果小于R则是在圆上。

通过大量的并发,咱们能够疾速的运行屡次试验,试验的次数越多,后果越靠近实在的圆周率。

这里给出不同并发办法的程序代码

  • 非并发

    咱们先在单线程,但过程运行,看看性能如何

from math import hypot
from random import random
import eventlet
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    result = map(test, [tries] * nbFutures)
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • 多线程 thread

    为了应用线程池,咱们用multiprocessing的dummy包,它是对多线程的一个封装。留神这里代码尽管一个字的没有提到线程,但它千真万确是多线程。

    通过测试咱们开(jing)心(ya)的发现,果然不出所料,当线程池为1是,它的运行后果和没有并发时一样,当咱们把线程池数字设置为5时,耗时简直是没有并发的2倍,我的测试数据从5秒到9秒。所以对于计算密集型的工作,还是放弃多线程吧。

from multiprocessing.dummy import Pool

from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    p = Pool(1)
    result = p.map(test, [tries] * nbFutures)
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == '__main__':
    p = Pool()
    print("pi = {}".format(calcPi(3000, 4000)))
  • 多过程 multiprocess

    实践上对于计算密集型的工作,应用多过程并发比拟适合,在以下的例子中,过程池的规模设置为5,批改过程池的大小能够看到对后果的影响,当过程池设置为1时,和多线程的后果所需的工夫相似,因为这时候并不存在并发;当设置为2时,响应工夫有了显著的改良,是之前没有并发的一半;然而持续扩充过程池对性能影响并不大,甚至有所降落,兴许我的Apple Air的CPU只有两个核?

    当心,如果你设置一个十分大的过程池,你会遇到 Resource temporarily unavailable的谬误,零碎并不能反对创立太多的过程,毕竟资源是无限的。

from multiprocessing import Pool

from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    p = Pool(5)
    result = p.map(test, [tries] * nbFutures)
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == '__main__':
    print("pi = {}".format(calcPi(3000, 4000)))
  • gevent (伪线程)

    不论是gevent还是eventlet,因为不存在理论的并发,响应工夫和没有并发区别不大,这个和测试后果统一。

import gevent
from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]
    gevent.joinall(jobs, timeout=2)
    ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • eventlet (伪线程)
from math import hypot
from random import random
import eventlet
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    pool = eventlet.GreenPool()
    result = pool.imap(test, [tries] * nbFutures)
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • SCOOP

SCOOP中的Future接口合乎PEP-3148的定义,也就是在Python3中提供的Future接口。

在缺省的SCOOP配置环境下(单机,4个Worker),并发的性能有进步,然而不如两个过程池配置的多过程。

from math import hypot
from random import random
from scoop import futures

import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    expr = futures.map(test, [tries] * nbFutures)
    ret = 4. * sum(expr) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == "__main__":
    print("pi = {}".format(calcPi(3000, 4000)))
  • Celery

工作代码

from celery import Celery

from math import hypot
from random import random
 
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
 
@app.task
def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

客户端代码

from celery import group
from tasks import test

import time

def calcPi(nbFutures, tries):
    ts = time.time()
    result = group(test.s(tries) for i in xrange(nbFutures))().get()
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000, 4000)

应用Celery做并发的测试后果出其不意(环境是单机,4frefork的并发,音讯broker是rabbitMQ),是所有测试用例里最蹩脚的,响应工夫是没有并发的5~6倍。这兴许是因为管制协调的开销太大。对于这样的计算工作,Celery兴许不是一个好的抉择。

  • asyncoro

    Asyncoro的测试后果和非并发保持一致。

import asyncoro

from math import hypot
from random import random
import time

def test(tries):
    yield sum(hypot(random(), random()) < 1 for _ in range(tries))


def calcPi(nbFutures, tries):
    ts = time.time()
    coros = [ asyncoro.Coro(test,t) for t in [tries] * nbFutures]
    ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)

IO密集型

IO密集型的工作是另一种常见的用例,例如网络WEB服务器就是一个例子,每秒钟能解决多少个申请时WEB服务器的重要指标。

咱们就以网页读取作为最简略的例子

from math import hypot
import time
import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def test(url):
    return urllib2.urlopen(url).read()

def testIO(nbFutures):
    ts = time.time()
    map(test, urls * nbFutures)

    span = time.time() - ts
    print "time spend ", span

testIO(10)

在不同并发库下的代码,因为比拟相似,我就不一一列出。大家能够参考计算密集型中代码做参考。

通过测试咱们能够发现,对于IO密集型的工作,应用多线程,或者是多过程都能够无效的进步程序的效率,而应用伪线程性能晋升十分显著,eventlet比没有并发的状况下,响应工夫从9秒进步到0.03秒。同时eventlet/gevent提供了非阻塞的异步调用模式,十分不便。这里举荐应用线程或者伪线程,因为在响应工夫相似的状况下,线程和伪线程耗费的资源更少。

总结

Python提供了不同的并发形式,对应于不同的场景,咱们须要抉择不同的形式进行并发。抉择适合的形式,岂但要对该办法的原理有所理解,还应该做一些测试和试验,数据才是你做抉择的最好参考。


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:关于python:使用Python进行并发编程

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址