Python 并发编程有很多方法,多线程的标准库 threading,concurrency,协程 asyncio,当然还有 grequests 这种异步库,每一个都可以实现上述需求,下面一一用代码实现一下,本文的代码可以直接运行,给你以后的并发编程作为参考:
队列+多线程
定义一个大小为 400 的队列,然后开启 200 个线程,每个线程都是不断的从队列中获取 url 并访问。
主线程读取文件中的 url 放入队列中,然后等待队列中所有的元素都被接收和处理完毕。代码如下:
from threading import Thread import sys from queue import Queue import requests concurrent = 200 def doWork(): while True: url = q.get() status, url = getStatus(url) doSomethingWithResult(status, url) q.task_done() def getStatus(ourl): try: res = requests.get(ourl) return res.status_code, ourl except: return "error", ourl def doSomethingWithResult(status, url): print(status, url) q = Queue(concurrent * 2) for i in range(concurrent): t = Thread(target=doWork) t.daemon = True t.start() try: for url in open("urllist.txt"): q.put(url.strip()) q.join() except KeyboardInterrupt: sys.exit(1)
运行结果如下:
有没有 get 到新技能?
线程池
如果你使用线程池,推荐使用更高级的 concurrent.futures 库:
import concurrent.futures import requests out = [] CONNECTIONS = 100 TIMEOUT = 5 urls = [] with open("urllist.txt") as reader: for url in reader: urls.append(url.strip()) def load_url(url, timeout): ans = requests.get(url, timeout=timeout) return ans.status_code with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor: future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls) for future in concurrent.futures.as_completed(future_to_url): try:<p>本文来源gao!%daima.com搞$代*!码9网(</p> data = future.result() except Exception as exc: data = str(type(exc)) finally: out.append(data) print(data)
协程 + aiohttp
协程也是并发非常常用的工具了:
import asyncio from aiohttp import ClientSession, ClientConnectorError async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple: try: resp = await session.request(method="GET", url=url, **kwargs) except ClientConnectorError: return (url, 404) return (url, resp.status) async def make_requests(urls: set, **kwargs) -> None: async with ClientSession() as session: tasks = [] for url in urls: tasks.append( fetch_html(url=url, session=session, **kwargs) ) results = await asyncio.gather(*tasks) for result in results: print(f'{result[1]} - {str(result[0])}') if __name__ == "__main__": import sys assert sys.version_info >= (3, 7), "Script requires Python 3.7+." with open("urllist.txt") as infile: urls = set(map(str.strip, infile)) asyncio.run(make_requests(urls=urls))