本篇文章给大家带来的内容是关于python中进程间数据通讯模块multiprocessing.Manager的介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。
目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Manager, 主要会以dict为例子, 说明下进程间共享(同一个父进程).
dict使用说明
import multiprocessing# 1. 创建一个Manger对象manager = multiprocessing.Manager()# 2. 创建一个dicttemp_dict = manager.dict()# 3. 创建一个测试程序def test(idx, test_dict): test_dict[idx] = idx# 4. 创建进程池进行测试pool = multiprocessing.Pool(4)for i in range(100): pool.apply_async(test, args=(i, temp_dict))pool.close()pool.join()print(temp_dict)
too simple.
简单的源码分析
这时我们再看一个例子
import multiprocessing# 1. 创建一个Manger对象manager = multiprocessing.Manager()# 2. 创建一个dicttemp_dict = manager.dict()temp_dict['test'] = {}# 3. 创建一个测试程序def test(idx, test_dict): test_dict['test'][idx] = idx# 4. 创建进程池进行测试pool = multiprocessing.Pool(4)for i in range(100): pool.apply_async(test, args=(i, temp_dict))pool.close()pool.join()print(temp_dict)
可以看到输出结果是奇怪的{'test': {}}
如果我们简单修改一下代码
import multiprocessing# 1. 创建一个Manger对象manager = multiprocessing.Manager()# 2. 创建一个dicttemp_dict = manager.dict()temp_dict['test'] = {}# 3. 创建一个测试程序def test(idx, test_dict): row = test_dict['test'] row[idx] = idx test_dict['test'] = row# 4. 创建进程池进行测试pool = multiprocessing.Pool(4)for i in range(100): pool.apply_async(test, args=(i, temp_dict))pool.close()pool.join()print(temp_dict)
这时输出结果就符合预期了.
为了了解这个现象背后的原因, 我简单去读了一下源码, 主要有以下几段代码很关键.
def Manager(): ''' Returns a manager associated with a running server process The managers methods such as `Lock()`, `Condition()` and `Queue()` can be used to create shared objects. ''' from multiprocessing.managers import SyncManager m = SyncManager() m.start() return m ... def start(self, initializer=None, initargs=()): ''' Spawn a server process for this manager object <strong>本文来源gaodai#ma#com搞@@代~&码网</strong> ''' assert self._state.value == State.INITIAL if initializer is not None and not hasattr(initializer, '__call__'): raise TypeError('initializer must be a callable') # pipe over which we will retrieve address of server reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server self._process = Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, self._serializer, writer, initializer, initargs), ) ident = ':'.join(str(i) for i in self._process._identity) self._process.name = type(self).__name__ + '-' + ident self._process.start()...
上面代码可以看出, 当我们声明了一个Manager对象的时候, 程序实际在其他进程启动了一个server服务, 这个server是阻塞的, 以此来实现进程间数据安全.
我的理解就是不同进程之间操作都是互斥的, 一个进程向server请求到这部分数据, 再把这部分数据修改, 返回给server, 之后server再去处理其他进程的请求.
回到上面的奇怪现象上, 这个操作test_dict['test'][idx] = idx
实际上在拉取到server上的数据后进行了修改, 但并没有返回给server, 所以temp_dict的数据根本没有变化. 在第二段正常代码, 就相当于先向服务器请求数据, 再向服务器传送修改后的数据. 这样就可以解释这个现象了.