• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

07_线程池

python 搞java代码 3年前 (2022-05-21) 24次浏览 已收录 0个评论

1.为什么用线程池

    1.启动一个新线程的消耗较高且涉及与操作系统的交互,尤其是程序中需要创建大量生存期很短暂的线程,而使用线程池可以很好地提升性能

    2.线程池则是创建指定线程数量等待执行事件,当该事件执行结束后该线程并不会死亡,而是回到线程池中变成空闲状态等待执行下一个事件

    3.当系统中包含有大量的并发线程时,会导致系统性能急剧下降甚至导致解释器崩溃,而使用线程池可以有效地控制系统中并发线程的数量

2.线程池的使用步骤

    1.threadpool 模块实现线程池不推荐继续使用,此处推荐是用 concurrent.futures 模块中的 ThreadPoolExecutor 类实现线程池

    2.调用 ThreadPoolExecutor 类的构造器创建一个线程池,定义一个普通函数作为线程任务

    3.调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池

3.语法概述

<span style="color: #0000ff">from</span> concurrent.futures <span style="color: #0000ff">import</span><span style="color: #000000"> ThreadPoolExecutor

thread_pool </span>= ThreadPoolExecutor(max_workers=5)  <span style="color: #008000">#</span><span style="color: #008000"> 创建指定进程数量进程池并返回进程池对象</span>
 future = thread_pool.submit(fn, *args, **kwargs)  <span style="color: #008000">#</span><span style="color: #008000"> 将fn函数提交给线程池,并返回一个 future 对象</span>
<span style="color: #000000">    参数:
        </span>*<span style="color: #000000">args 代表传给 fn 函数的参数
        </span>*<span style="color: #000000">kwargs 代表以关键字参数的形式为 fn 函数传入参数
</span><span style="color: #008000">#</span><span style="color: #008000"> 该函数类似于内建函数 map(func, *iterables) 只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理</span>
thread_pool.map(func, *iterables, timeout=None, chunksize=1)  <span style="color: #008000">#</span><span style="color: #008000"> 提交多个任务给池中,等效for + submit</span>
thread_pool.shutdown(wait=True)  <span style="color: #008000">#</span><span style="color: #008000"> 等待池内所有任务执行完毕后关闭线程池</span>
<span style="color: #000000">
future.cancel()  </span><span style="color: #008000">#</span><span style="color: #008000"> 取消该future代表的线程任务,如果该任务正在执行不可取消则该方法返回False,否则程序会取消该任务并返回True</span>
future.cancelled()  <span style="color: #008000">#</span><span style="color: #008000"> 返回future代表的线程任务是否被成功取消</span>
future.running()  <span style="color: #008000">#</span><span style="color: #008000"> 如果该future代表的线程任务正在执行不可被取消,该方法返回True</span>
future.done()  <span style="color: #008000">#</span><span style="color: #008000"> 如果该funture代表的线程任务被成功取消或执行完成,则该方法返回True</span>
future.result(timeout=None) <span style="color: #008000">#</span><span style="color: #008000"> 获取该future代表的线程任务最后返回的结果,如果future代表的线程任务还未完成该方法将会阻塞当前线程</span>
<span style="color: #000000">    参数: timeout 指定最多阻塞等待多少秒
future.exception(timeout</span>=None)  <span style="color: #008000">#</span><span style="color: #008000"> 获取该future代表的线程任务所引发的异常,如果该任务成功完成没有异常则该方法返回None</span>
future.add_done_callback(fn)  <span style="color: #008000">#</span><span style="color: #008000"> 为该future代表的线程任务注册一个回调函数,当该任务成功完成时程序会自动触发该fn函数</span>

www#gaodaima.com来源gaodai#ma#com搞*代#码网搞代码

4.线程池的基本使用

<span style="color: #0000ff">import</span><span style="color: #000000"> time
</span><span style="color: #0000ff">from</span> concurrent.futures <span style="color: #0000ff">import</span><span style="color: #000000"> ThreadPoolExecutor


</span><span style="color: #0000ff">def</span><span style="color: #000000"> func(num):
    time.sleep(</span>1<span style="color: #000000">)
    </span><span style="color: #0000ff">print</span>(<span style="color: #800000">"</span><span style="color: #800000">num is %s</span><span style="color: #800000">"</span> %<span style="color: #000000"> num)
    </span><span style="color: #0000ff">return</span> num *<span style="color: #000000"> num


</span><span style="color: #0000ff">def</span><span style="color: #000000"> main():
    ret_list </span>=<span style="color: #000000"> list()
    executor </span>= ThreadPoolExecutor(max_workers=5)  <span style="color: #008000">#</span><span style="color: #008000"> 指定线程池中的线程数量</span>
    <span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span> range(5<span style="color: #000000">):
        future </span>= executor.submit(func, i)  <span style="color: #008000">#</span><span style="color: #008000"> 异步提交任务,返回一个未来对象future</span>
<span style="color: #000000">        ret_list.append(future)
    executor.shutdown(True)  </span><span style="color: #008000">#</span><span style="color: #008000"> 等待池内所有任务执行完毕回收完资源后才继续</span>

    <span style="color: #0000ff">for</span> ret <span style="color: #0000ff">in</span><span style="color: #000000"> ret_list:
        ret </span>= ret.result()  <span style="color: #008000">#</span><span style="color: #008000"> 获取该future代表的线程任务最后返回的结果</span>
        <span style="color: #0000ff">print</span><span style="color: #000000">(ret)


</span><span style="color: #0000ff">if</span> <span style="color: #800080">__name__</span> == <span style="color: #800000">"</span><span style="color: #800000">__main__</span><span style="color: #800000">"</span><span style="color: #000000">:
    main()
</span><span style="color: #800000">"""</span><span style="color: #800000">执行结果
    num is 1
    num is 0
    num is 3
    num is 2
    num is 4
    0
    1
    4
    9
    16
</span><span style="color: #800000">"""</span>

5.线程池的多任务提交

<span style="color: #0000ff">from</span> concurrent.futures <span style="color: #0000ff">import</span><span style="color: #000000"> ThreadPoolExecutor
</span><span style="color: #0000ff">import</span><span style="color: #000000"> time


</span><span style="color: #0000ff">def</span><span style="color: #000000"> func(num):
    sum </span>=<span style="color: #000000"> 0
    </span><span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span><span style="color: #000000"> range(num):
        sum </span>+= i ** 2
    <span style="color: #0000ff">print</span><span style="color: #000000">(sum)


t </span>= ThreadPoolExecutor(20<span style="color: #000000">)
start </span>=<span style="color: #000000"> time.time()
t.map(func, range(</span>1000))  <span style="color: #008000">#</span><span style="color: #008000"> 提交多个任务给池中,等效于 for + submit</span>
<span style="color: #000000">t.shutdown()
</span><span style="color: #0000ff">print</span>(time.time() - start)

6.线程池的返回值

<span style="color: #0000ff">from</span> concurrent.futures <span style="color: #0000ff">import</span><span style="color: #000000"> ThreadPoolExecutor
</span><span style="color: #0000ff">import</span><span style="color: #000000"> time


</span><span style="color: #0000ff">def</span><span style="color: #000000"> func(num):
    sum </span>=<span style="color: #000000"> 0
    </span><span style="color: #008000">#</span><span style="color: #008000"> time.sleep(5)</span>
    <span style="color: #008000">#</span><span style="color: #008000"> print(num)  # 异步的效果</span>
    <span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span><span style="color: #000000"> range(num):
        sum </span>+= i ** 2
    <span style="color: #0000ff">return</span><span style="color: #000000"> sum


t </span>= ThreadPoolExecutor(20<span style="color: #000000">)

</span><span style="color: #008000">#</span><span style="color: #008000"> 下列代码是用map的方式提交多个任务,对应拿结果的方法是__next__()返回的是一个生成器对象</span>
res = t.map(func, range(1000<span style="color: #000000">))
t.shutdown()
</span><span style="color: #0000ff">print</span>(res.<span style="color: #800080">__next__</span><span style="color: #000000">())
</span><span style="color: #0000ff">print</span>(res.<span style="color: #800080">__next__</span><span style="color: #000000">())
</span><span style="color: #0000ff">print</span>(res.<span style="color: #800080">__next__</span><span style="color: #000000">())
</span><span style="color: #0000ff">print</span>(res.<span style="color: #800080">__next__</span><span style="color: #000000">())

</span><span style="color: #008000">#</span><span style="color: #008000"> 下列代码是用for + submit提交多个任务的方式,对应拿结果的方法是result</span>
t = ThreadPoolExecutor(20<span style="color: #000000">)
res_l </span>=<span style="color: #000000"> []
</span><span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span> range(1000<span style="color: #000000">):
    re </span>=<span style="color: #000000"> t.submit(func, i)
    res_l.append(re)
t.shutdown()
[</span><span style="color: #0000ff">print</span>(i.result()) <span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span><span style="color: #000000"> res_l]
</span><span style="color: #008000">#</span><span style="color: #008000"> 在Pool进程池中拿结果,是用get方法,在ThreadPoolExecutor里边拿结果是用result方法</span>

7.线程池中子线程调用回调函数

<span style="color: #0000ff">from</span> threading <span style="color: #0000ff">import</span><span style="color: #000000"> current_thread
</span><span style="color: #0000ff">from</span> concurrent.futures <span style="color: #0000ff">import</span><span style="color: #000000"> ThreadPoolExecutor
</span><span style="color: #008000">#</span><span style="color: #008000"> 在线程池中,回调函数是子线程调用的,和父线程没有关系</span>
<span style="color: #0000ff">from</span> concurrent.futures <span style="color: #0000ff">import</span><span style="color: #000000"> ProcessPoolExecutor
</span><span style="color: #008000">#</span><span style="color: #008000"> 不管是ProcessPoolExecutor的进程池 还是Pool的进程池,回调函数都是父进程调用的,和子进程没有关系</span>
<span style="color: #0000ff">import</span><span style="color: #000000"> os


</span><span style="color: #0000ff">def</span><span style="color: #000000"> func(num):
    sum </span>=<span style="color: #000000"> 0
    </span><span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span><span style="color: #000000"> range(num):
        sum </span>+= i ** 2
    <span style="color: #0000ff">print</span>(<span style="color: #800000">"</span><span style="color: #800000">这是在子线程中</span><span style="color: #800000">"</span>, current_thread())  <span style="color: #008000">#</span><span style="color: #008000"> current_thread()查看线程标识,类似于进程中的getpid()</span>
    <span style="color: #0000ff">return</span><span style="color: #000000"> sum


</span><span style="color: #0000ff">def</span><span style="color: #000000"> call_back_fun(res):
    </span><span style="color: #008000">#</span><span style="color: #008000"> print(res.result(), os.getpid())</span>
    <span style="color: #0000ff">print</span>(<span style="color: #800000">"</span><span style="color: #800000">这是在回调函数中</span><span style="color: #800000">"</span><span style="color: #000000">, res.result(), current_thread())
    </span><span style="color: #008000">#</span><span style="color: #008000"> print(os.getpid())</span>


<span style="color: #0000ff">if</span> <span style="color: #800080">__name__</span> == <span style="color: #800000">"</span><span style="color: #800000">__main__</span><span style="color: #800000">"</span><span style="color: #000000">:
    </span><span style="color: #0000ff">print</span><span style="color: #000000">(os.getpid())
    t </span>= ThreadPoolExecutor(20)  <span style="color: #008000">#</span><span style="color: #008000"> 线程池</span>
    <span style="color: #008000">#</span><span style="color: #008000"> t = ProcessPoolExecutor(20)  # 进程池</span>
    <span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span> range(10<span style="color: #000000">):
        t.submit(func, i).add_done_callback(call_back_fun)
    t.shutdown()
    </span><span style="color: #0000ff">print</span>(<span style="color: #800000">"</span><span style="color: #800000">这是在主线程中</span><span style="color: #800000">"</span>, current_thread())

8.进程池和线程池效率对比

<span style="color: #0000ff">from</span> concurrent.futures <span style="color: #0000ff">import</span><span style="color: #000000"> ThreadPoolExecutor, ProcessPoolExecutor
</span><span style="color: #0000ff">from</span> multiprocessing <span style="color: #0000ff">import</span><span style="color: #000000"> Pool

</span><span style="color: #008000">#</span><span style="color: #008000"> concurrent.futures 这个模块是异步调用的机制</span><span style="color: #008000">
#</span><span style="color: #008000"> concurrent.futures 提交任务都是用submit</span><span style="color: #008000">
#</span><span style="color: #008000"> for + submit 多个任务的提交</span><span style="color: #008000">
#</span><span style="color: #008000"> shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务</span>

<span style="color: #008000">#</span><span style="color: #008000"> from multiprocessing import Pool.apply / apply_async</span>
<span style="color: #0000ff">import</span><span style="color: #000000"> time


</span><span style="color: #0000ff">def</span><span style="color: #000000"> func(num):
    sum </span>=<span style="color: #000000"> 0
    </span><span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span><span style="color: #000000"> range(num):
        </span><span style="color: #0000ff">for</span> j <span style="color: #0000ff">in</span><span style="color: #000000"> range(i):
            </span><span style="color: #0000ff">for</span> x <span style="color: #0000ff">in</span><span style="color: #000000"> range(j):
                sum </span>+= x ** 2
    <span style="color: #008000">#</span><span style="color: #008000"> print(sum)</span>


<span style="color: #0000ff">if</span> <span style="color: #800080">__name__</span> == <span style="color: #800000">"</span><span style="color: #800000">__main__</span><span style="color: #800000">"</span><span style="color: #000000">:
    </span><span style="color: #008000">#</span><span style="color: #008000"> pool的进程池的效率演示</span>
    p = Pool(5<span style="color: #000000">)
    start </span>=<span style="color: #000000"> time.time()
    </span><span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span> range(100<span style="color: #000000">):
        p.apply_async(func, args</span>=<span style="color: #000000">(i,))
    p.close()
    p.join()
    </span><span style="color: #0000ff">print</span>(<span style="color: #800000">"</span><span style="color: #800000">Pool进程池的效率时间是%s</span><span style="color: #800000">"</span> % (time.time() - start))  <span style="color: #008000">#</span><span style="color: #008000"> 0.51</span>

    <span style="color: #008000">#</span><span style="color: #008000"> 多进程的效率演示</span>
    tp = ProcessPoolExecutor(5<span style="color: #000000">)
    start </span>=<span style="color: #000000"> time.time()
    </span><span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span> range(100<span style="color: #000000">):
        tp.submit(func, i)
    tp.shutdown()  </span><span style="color: #008000">#</span><span style="color: #008000"> 等效于进程池中的 close + join</span>
    <span style="color: #0000ff">print</span>(<span style="color: #800000">"</span><span style="color: #800000">ProcessPoolExecutor进程池的消耗时间为%s</span><span style="color: #800000">"</span> % (time.time() - start))  <span style="color: #008000">#</span><span style="color: #008000"> 0.49</span>

    <span style="color: #008000">#</span><span style="color: #008000"> 多线程的效率</span>
    tp = ThreadPoolExecutor(20<span style="color: #000000">)
    start </span>=<span style="color: #000000"> time.time()
    </span><span style="color: #0000ff">for</span> i <span style="color: #0000ff">in</span> range(100<span style="color: #000000">):
        tp.submit(func, i)
    tp.shutdown()  </span><span style="color: #008000">#</span><span style="color: #008000"> 等效于 进程池中的 close + join</span>
    <span style="color: #0000ff">print</span>(<span style="color: #800000">"</span><span style="color: #800000">ThreadPoolExecutor线程池的消耗时间为%s</span><span style="color: #800000">"</span> % (time.time() - start))  <span style="color: #008000">#</span><span style="color: #008000"> 1.40</span>

<span style="color: #008000">#</span><span style="color: #008000"> 结果: 针对计算密集的程序来说</span><span style="color: #008000">
#</span><span style="color: #008000">     不管是Pool的进程池还是ProcessPoolExecutor()的进程池,执行效率相当</span><span style="color: #008000">
#</span><span style="color: #008000">     ThreadPoolExecutor 的效率要差很多</span><span style="color: #008000">
#</span><span style="color: #008000">     所以当计算密集时,使用多进程</span>

搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:07_线程池
喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址