原文地址-我的博客
兴许你在数据迷信/AI/机器学习的钻研中头疼于大型数据加载与落盘的速度问题,毕竟IO过程是最磨人工夫的。大家常调侃于python能优化的空间的不多,但事实上咱们能够尽量地做到更好。心愿本文对你的程序有点帮忙。
本文的IO效率晋升的探讨限定在数据迷信畛域内的以numpy.ndarray
为代表的大型数组(张量、矩阵)数据对象的IO问题上。解决问题的伎俩是以多线程/多过程为根底的并行写入/读取。同网络io和一般的小数据量的io问题不同,数据迷信的大矩阵对象往往随同着矩阵的切片等操作,他们对于内存的占用(是否复制、挪动等)不明,更容易陷入内存冗余占用问题,这些都会影响io效率。本文探讨如下几个主题:
- 基于多线程/过程的并行读写办法及性能比照
- 并行IO中留神内存的冗余拷贝景象
- 最佳实际总结
IO情景
本文探讨的IO情景很简略,从磁盘上加载大数据进行解决,再将后果存储。这种状况常见于各类机器学习框架中,对数据的load和dump是最根本要解决的问题。下文中探讨的一些原理和技巧也在pytorch
、tensorflow
等的IO接口中体现。
在数据迷信场景下,要优化读写的效率,能够从以下几个方向动手:
- 从文件编码格局动手,采纳
pkl
等的二进制编码减速读写 - 从读写接口优化动手,采纳DirectIO/零拷贝等的优化
- 分块、分批并行读写,适宜数据绝对独立情景
上述三种办法第一种操作简略,但编码的模式不不便与其余语言/工具兼容。第二种对于Python来讲有点小题大做,而且Python的IO接口不如动态语言那样显式,尽管也能间接采纳os.open(CLONE_FLATS=...)
的最底层接口,但采纳DirectIO
[4]或mmap
之类的优化都须要减少设计老本。第三种办法虽波及多线程/过程,但不波及通信与同步,实际绝对简略。
多线程/多过程并行读写
并行根本逻辑
多过程导致的并行读写逻辑很简略,次要的开销在操作系统对过程的治理上。多线程对并行读写的实践撑持有必要再提一下(针对Cpython), 下图[1]所示的是GIL针对线程IO情景的解决。
上图也显示了多线程的次要开销是各个线程run
阶段的总和以及操作系统对线程的治理开销。
针对Cpython的多线程仍须要留神的是
- Linux下齐全是POSIX-thread, 这意味着调度模式依然是1:1的用户-内核映射关系
- Cpython多线程默认共享解释器中的全局变量
- 线程开释GIL的IO机会是进行底层根本的IO零碎调用后
- 多线程对于调度通信应用信号量、条件变量等办法
规范库接口测评
咱们设计一个小试验对CPython规范库提供的多线程/过程后果的并行写文件效率进行测试:
import os import numpy as np import time from multiprocessing import Process from multiprocessing import Pool from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import Thread from memory_profiler import profile # Time calculator class Benchmark: def __init__(self, text): self.text = text def __enter__(self): self.start = time.time() def __exit__(self, *args): self.end = time.time() print("%s: consume: %s" % (self.text, self.end - self.start)) # Base Task def store_task(data: np.ndarray, output, index): fname = "%s_worker_%s.csv" % (output, index) np.savetxt(fname, data, delimiter='\t') #main data source worker_num = os.cpu_count() big_data = np.random.rand(1000000, 10) task_num = big_data.shape[0] // worker_num # 1. multiprocessing.Porcess @profile def loop_mp(): pool = [] for i in range(worker_num): start = i * task_num end = (i+1) * task_num p = Process(target=store_task, args=(big_data[start: end], 'testdata/', i)) p.start() pool.append(p) for p in pool: p.join() # 2. threading.Thread @profile def mt_thread(): pool = [] for i in range(worker_num): start = i * task_num end = (i+1) * task_num t = Thread(target=store_task, args=(big_data[start: end], 'testdata/thread', i)) t.start() pool.append(t) for p in pool: p.join() # 3. multiprocessing.Pool @profile def mp_pool(): with Pool(processes=worker_num) as pool: tasks = [] for i in range(worker_num): start = i * task_num end = (i+1) * task_num tasks.append( pool.apply_async(store_task_inner, (big_data[start: end], 'testdata/mp_pool', i))) pool.close() pool.join() # 4. ProcessPoolExecutor @profile def loop_pool(): with ProcessPoolExecutor(max_workers=worker_num) as exe: for i in range(worker_num): start = i * task_num end = (i+1) * task_num exe.submit(store_task, big_data[start: end], 'testdata/pool', i) # 5. ThreadPoolExecutor def loop_thread(): with ThreadPoolExecutor(max_workers=worker_num) as exe: for i in range(worker_num): start = i * task_num end = (i+1) * task_num exe.submit(store_task, big_data[start: end], 'testdata/pool_thread', i) # 6. direct @profile def direct(): store_task(big_data, 'testdata/all', 0) if __name__ == '__main__': with Benchmark("loop mp"): loop_mp() with Benchmark("mt thread"): mt_thread() with Benchmark("mp pool"): mp_pool() with Benchmark("loop pool"): loop_pool() with Benchmark("direct"): direct() with Benchmark("Thread"): loop_thread()
从工夫耗费和内存上剖析下各个接口的效率(测试环境MacOS 2.2 GHz 四核Intel Core i7
):
接口 | 耗时 | 内存 |
---|---|---|
multiprocessing.Process |
5.14s | p.start() 产生额定开销,触发参数的复制 |
theading.Thread |
10.34s | 无额定开销 |
multiprocessing.Pool |
4.18s | Pool() 构建额定开销, 参数未产生复制 |
ProcessPoolExecutor |
3.69s | 参数未产生复制 |
ThreadPoolExecutor |
10.82s | 无额定开销 |
direct | 22.04s | 无额定开销 |
工夫开销剖析
直观上看,多过程的接口减速了4-4.5x, 多线程减速了一半的工夫。多线程比多过程要慢的起因比较复杂,原则上切换的开销线程要小于过程,但此例中多线程还波及到线程间调度上的通信,而多过程则独立运行。当然有趣味的敌人也能够抉择asyncio.tasks
基于多路复用的接口比照下,毛病是比拟难找到适宜的非阻塞读写接口。
值得注意的是,多过程的两个接口的速度也有很大差异,Process
的模式比线程池的要慢很多,起因可能是数据拷贝的开销。下节探讨池技术为何防止了数据的拷贝。
内存开销剖析
因为CPython的数据类型的限度,对于多线程threading
和多过程multiprocessing
的数据是否复制不能显式地展示,从原理上讲Thread()
是无需拷贝数据的,Process
是须要拷贝数据的。然而上表中显示multiprcocessing.Pool
和ProcessPoolExecutor
这两个基于线程池的办法未产生数据的拷贝。
代码中的@profile
是一个内存剖析的三方库,但他的后果也不能充分说明实质。
其中Process
的后果是
Line # Mem usage Increment Occurences Line Contents ============================================================ 29 101.3 MiB 101.3 MiB 1 @profile 30 def loop_mp(): 31 101.3 MiB 0.0 MiB 1 pool = [] 32 120.6 MiB 0.0 MiB 9 for i in range(worker_num): 33 120.6 MiB 0.0 MiB 8 start = i * task_num 34 120.6 MiB 0.0 MiB 8 end = (i+1) * task_num 35 120.6 MiB 0.0 MiB 8 p = Process(target=store_task, args=(big_data[start: end], 'testdata/', i)) 36 120.6 MiB 19.3 MiB 8 p.start() 37 120.6 MiB 0.0 MiB 8 pool.append(p) 38 120.6 MiB 0.0 MiB 9 for p in pool: 39 120.6 MiB 0.0 MiB 8 p.join()
显著能够看出 p.start()
产生了数据的拷贝,拷贝的就是big_data[start: end]
理论大小。这与fork
零碎调用差异很大,零碎调用要明确地传入CLONE_FLAGS
来约定子过程与父过程的数据拷贝状况。再来看ProcessPoolExecutor
<code class="bash">Line # Mem usage Increment Occurences Line Contents ============================================================ 68 121.1 MiB 121.1 MiB 1 @profile 69 def loop_pool(): 70 121.1 MiB 0.0 MiB 1 with ProcessPoolExecutor(max_workers=worker_num) as exe: 71 121.2 MiB -0.0 MiB 9 for i in range(worker_num): 72 121.2 MiB 0.0 MiB 8 start = i * task_num 73 121.2 MiB 0.0 MiB 8 end = (i+1) * task_num 74 121.2 MiB 0.1 MiB 8 exe.submit(store_task, big_data[start: end], 'testdata/pool', i)
外表上看没有产生拷贝,但事实如此吗?因为exe.submit
毕竟不是间接触发了Process()
的构建,想弄明确这个问题还得深究Pool
技术的原理。
对于Cpython的源码解析,曾经不少Pythonista做了大量工作。从[2]的参考看到ProcessPoolExecutor
的封装逻辑是
<code class="bash">|======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | ... | | | | ... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +-----------+ | | | | | 6: call() | | | | ... | | | | | | future | | | | 4, result | | | | | | ... | | | | 3, except | | | + −+----------+ +------------+ +--------+ +-----------+ +---------+
这个流程是否似曾相识?没错,他与之前文章[[C++造轮子] 基于pthread线程池](http://zhikai.pro/post/103)中…:
- 应用队列保护工作task
- Pool随同着空过程的创立
- 有专门的治理线程来负责Pool的治理与监控
那么具体到参数数据拷贝上便是Queue.put()
与Queue.get()
的操作是否产生数据拷贝了。multiporcessing.Queue
是多过程通信的一种重要接口,他是基于共享内存的,参数数据的传递不产生拷贝,这对于大的ndarray
对象而言是极其重要的。
ndarray
的对象拷贝
Python世界里所有皆对象。 — Py圈名言
面对企业级大数据时,Python程序呈现的内存/显存占用率过高往往不是那么容易查明起因。动静援用类型+gc给python的内存治理带来了不便,但不必要的数据拷贝产生情景还是要尽量避免。
切片与组合
切片和组合是在以numpy
为代表的向量/矩阵/张量运算库的罕用操作,他们底层是否产生复制很难剖析:
import numpy as np A = np.random.rand(1 << 8, 1 << 8) B = A[:16] del A ## can not release A's mem, for B's reference print(A) ## error, the ref A has not exist yet,however its mem still exist C = np.random.rand(1 << 4, 1 << 8) D = np.concatenate([B, C], axis=1) ## D is a copy of B+C memory
对于concatenate
次要看内存散布决定是否产生复制[6]:
<code class="bash">00 04 08 0C 10 14 18 1C 20 24 28 2C | | | | | | | | | | | | [data1 ][foo ][data2 ][bar ][concat(data1, data2) ] data1 & data2 displayed in different place, concat them can only cover a new place.
切片同样是看内存散布,基于row和column的内存排列是不同的,具体的能够应用order=['C', 'F']
决定数组是按行在内存排列还是按列。[7] 还有一种方法是探索切片最终是否转换成slice(start, offset, stride)
的模式,如是则为view, 不能则大概率是copy, 例如诸多的fancy_index
模式都是copy, [:]
其实就是slice(None, None, None)
,它也是copy.[8]
切片到底是view还是copy在小数据量时无需care,但数据规模达到与内存下限时,大型的ndarray切片操作肯定要小心了.
过程创立时的复制
咱们心愿把数据切片后传递给子过程, 同时咱们心愿这份数据不产生复制,各个过程共享这一大型ndarray
。首先从上一章明确的是,采纳multiprocessing.Process(target=func, args=(ndarray[start:offset]))
创立子过程的形式是肯定会复制ndarray的。其实这里次要用到的技术是multiprocessing
的共享内存办法。
Python3.8之后新减少了shared_memeory
, 给之前各种共享内存的形式做了一个对立的繁难应用接口。咱们应用share_memory革新一下上节的代码:
from multiprocessing import shared_memory def store_task_sha_v2(start, end, output, index, sha_name, shape, dtype): fname = "%s_worker_%s.csv" % (output, index) exist_sham = shared_memory.SharedMemory(name=sha_name) data = np.ndarray(shape, dtype=dtype, buffer=exist_sham.buf) print(sha_name, data.shape, index) np.savetxt(fname, data[start: end], delimiter='\t') del data exist_sham.close() @profile def mp_pool_sha(): shm = shared_memory.SharedMemory(create=True, size=big_data.nbytes) b = np.ndarray(big_data.shape, dtype=big_data.dtype, buffer=shm.buf) b[:] = big_data[:] print(b.shape) with ProcessPoolExecutor(max_workers=worker_num) as pool: tasks = [] for i in range(worker_num): start = i * task_num end = (i+1) * task_num tasks.append( pool.submit(store_task_sha_v2, start, end, 'testdata/mp_pool_sha', i , shm.name, b.shape, b.dtype)) for t in tasks: # Note! 在这里捕捉异样,ProcessPoolExecutor举荐这么应用! try: print(t.result()) except Exception as e: print(f'{e}') del b shm.close() shm.unlink()
代码简单了不少,但逻辑很简略: 共享缓冲区申请->映射local-ndarray对象->放数据进入共享缓存区->其余过程读写->敞开缓存区。share_memeory
的益处还有他能够随时申请local-variable进行共享。
最佳实际总结
并行读文件加载ndarray
退出你的训练数据很大,须要流解决(训练),间接应用torch.datasets
等模块加载,他们封装好了并行流处理过程。
如果须要一次性载入RAM解决(如KNN等算法)则能够采纳分块并行读:
def parallize_load(file, total_num, worker_num): """Load embedding file parallelization @emb_file: source filename @total_num: total lines @worker_num: parallelize process num return: np.ndaary """ def load_from_txt(emb, start, n_rows, arr_list): data = np.loadtxt(emb, skiprows=start, max_rows=n_rows) arr_list.append(data) worker_load_num = total_num // worker_num pool = [] with Manager() as manager: arr_list = manager.list([]) for index in range(worker_num): s = index * worker_load_num if index != worker_num - 1: e = worker_load_num else: e = total_num - (worker_load_num * index) p = Process(target=load_from_txt, args=(emb_file, s, e, arr_list)) pool.append(p) p.start() for p in pool: p.join() arr = np.concatenate(arr_list) return arr source_total_num = sum(1 for line in open("souce_big_file", "rb")) source_emb_data = parallize_load("souce_big_file", source_total_num, worker_num)
这基本上是worker_num
X 倍的减速。
并行写入实际
- 尽量避免对large-ndarray对象的切片、组合操作。
- 尽量避免应用
for-loop
, 多用矩阵运算 - 写入文件多过程效率更高,逻辑更简洁,但要时刻留神过程间数据不要产生复制
- 尽可能采纳三方库的io接口如
np.savetxt
,df.to_csv
等,他们可能对异样、分chunk写入等方面都有优化 - 写入字符串时,能尽量地拼接
'\t'.join(List[])
, 就不要应用for ele in List: fp.write("%s\t%s\n" % (ele))
More work
本文探讨的对象只局限于host-device
的RAM和disk, 对于更常见的GPU-mem,对于Python诸多三方库的接口来讲可就太苦楚了,他们往往都省略了调配-申请-调度-通信-销毁的过程,呈现OOM异样后排查只能靠指标察看。于此,接下来能够持续钻研下显存的最佳实际。
最初,兴许本文的内容会让你很惊讶,因为对Python做优化是一件出力不讨好的事件。但不得不说这些方法在我目前的工作中,在肯定资源的constrain下解决了原程序的很多问题。当然目前支流的机器学习算法流程都基于流解决,一次性地过大占用很少呈现了,但也有存在embedding读写等须要用到手动读写的中央。
- [1] Understanding GIL
- [2] Lib/concurrent/futures/process.py
- [3] Python 之路
- [4] Direct IO in Python
- [5] Python Doc: 17.2.1.5. Sharing state between processes
- [6] In-place numpy array concatenation? #13279
- [7] Numpy: views vs copy by slicing
- [8] Views versus copies in NumPy