• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

python中进程间数据通讯模块multiprocessing.Manager的介绍

python 搞代码 4年前 (2022-01-09) 35次浏览 已收录 0个评论

本篇文章给大家带来的内容是关于python中进程间数据通讯模块multiprocessing.Manager的介绍,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

目前开发中有遇到进程间需要共享数据的情况. 所以研究了下multiprocessing.Manager, 主要会以dict为例子, 说明下进程间共享(同一个父进程).

dict使用说明

import multiprocessing# 1. 创建一个Manger对象manager = multiprocessing.Manager()# 2. 创建一个dicttemp_dict = manager.dict()# 3. 创建一个测试程序def test(idx, test_dict):    test_dict[idx] = idx# 4. 创建进程池进行测试pool = multiprocessing.Pool(4)for i in range(100):    pool.apply_async(test, args=(i, temp_dict))pool.close()pool.join()print(temp_dict)

too simple.

简单的源码分析

这时我们再看一个例子

import multiprocessing# 1. 创建一个Manger对象manager = multiprocessing.Manager()# 2. 创建一个dicttemp_dict = manager.dict()temp_dict['test'] = {}# 3. 创建一个测试程序def test(idx, test_dict):    test_dict['test'][idx] = idx# 4. 创建进程池进行测试pool = multiprocessing.Pool(4)for i in range(100):    pool.apply_async(test, args=(i, temp_dict))pool.close()pool.join()print(temp_dict)

可以看到输出结果是奇怪的{'test': {}}
如果我们简单修改一下代码

import multiprocessing# 1. 创建一个Manger对象manager = multiprocessing.Manager()# 2. 创建一个dicttemp_dict = manager.dict()temp_dict['test'] = {}# 3. 创建一个测试程序def test(idx, test_dict):    row = test_dict['test']    row[idx] = idx    test_dict['test'] = row# 4. 创建进程池进行测试pool = multiprocessing.Pool(4)for i in range(100):    pool.apply_async(test, args=(i, temp_dict))pool.close()pool.join()print(temp_dict)

这时输出结果就符合预期了.

为了了解这个现象背后的原因, 我简单去读了一下源码, 主要有以下几段代码很关键.

def Manager():    '''    Returns a manager associated with a running server process    The managers methods such as `Lock()`, `Condition()` and `Queue()`    can be used to create shared objects.    '''    from multiprocessing.managers import SyncManager    m = SyncManager()    m.start()    return m    ...    def start(self, initializer=None, initargs=()):        '''        Spawn a server process for this manager object      <strong>本文来源gaodai#ma#com搞@@代~&码网</strong>  '''        assert self._state.value == State.INITIAL        if initializer is not None and not hasattr(initializer, '__call__'):            raise TypeError('initializer must be a callable')        # pipe over which we will retrieve address of server        reader, writer = connection.Pipe(duplex=False)        # spawn process which runs a server        self._process = Process(            target=type(self)._run_server,            args=(self._registry, self._address, self._authkey,                  self._serializer, writer, initializer, initargs),            )        ident = ':'.join(str(i) for i in self._process._identity)        self._process.name = type(self).__name__  + '-' + ident        self._process.start()...

上面代码可以看出, 当我们声明了一个Manager对象的时候, 程序实际在其他进程启动了一个server服务, 这个server是阻塞的, 以此来实现进程间数据安全.
我的理解就是不同进程之间操作都是互斥的, 一个进程向server请求到这部分数据, 再把这部分数据修改, 返回给server, 之后server再去处理其他进程的请求.

回到上面的奇怪现象上, 这个操作test_dict['test'][idx] = idx实际上在拉取到server上的数据后进行了修改, 但并没有返回给server, 所以temp_dict的数据根本没有变化. 在第二段正常代码, 就相当于先向服务器请求数据, 再向服务器传送修改后的数据. 这样就可以解释这个现象了.

进程间数据安全


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:python中进程间数据通讯模块multiprocessing.Manager的介绍

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址