本文讨论了当使用Python中的multiprocessing模块时,数据共享和进程间消息传递/通信的概念。
在多处理中, 任何新创建的进程都将执行以下操作:
- 独立运行
- 有自己的记忆空间。
import multiprocessing# empty list with global scope
result = []def square_list(mylist):
"""
function to square a given list
"""
global result
# append squares of mylist to global list result
for num in mylist:
result.append(num * num)
# print global list result
print ( "Result(in process p1): {}" . format (result))if __name__ = = "__main__" :
# input list
mylist = [ 1 , 2 , 3 , 4 ]# creating new process
p1 = multiprocessing.Process(target = square_list, args = (mylist, ))
# starting process
p1.start()
# wait until process is finished
p1.join()# print global result list
print ( "Result(in main program): {}" . format (result))
Result(in process p1): [1, 4, 9, 16]
Result(in main program): []
在上面的示例中, 我们尝试打印全局列表的内容结果在两个地方:
- 在square_list函数。因为这个函数是由进程p1调用的,所以只在进程p1的内存空间中改变结果列表。
- 在主程序中完成p1进程后。因为主程序是由另一个进程运行的,所以它的内存空间仍然包含空的结果列表。
文章图片
在流程之间共享数据
共享内存:多处理模块提供数组和值对象在进程之间共享数据。
- 数组:从分配的ctypes数组共享内存.
- 值:从分配的ctypes对象共享内存.
import multiprocessingdef square_list(mylist, result, square_sum):
"""
function to square a given list
"""
# append squares of mylist to result array
for idx, num in enumerate (mylist):
result[idx] = num * num# square_sum value
square_sum.value = https://www.lsbin.com/sum (result)# print result Array
print ("Result(in process p1): {}" . format (result[:]))# print square_sum Value
print ( "Sum of squares(in process p1): {}" . format (square_sum.value))if __name__ = = "__main__" :
# input list
mylist = [ 1 , 2 , 3 , 4 ]# creating Array of int data type with space for 4 integers
result = multiprocessing.Array( 'i' , 4 )# creating Value of int data type
square_sum = multiprocessing.Value( 'i' )# creating new process
p1 = multiprocessing.Process(target = square_list, args = (mylist, result, square_sum))# starting process
p1.start()# wait until process is finished
p1.join()# print result array
print ( "Result(in main program): {}" . format (result[:]))# print square_sum Value
print ( "Sum of squares(in main program): {}" . format (square_sum.value))
Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30
让我们尝试逐行理解上面的代码:
首先,我们像这样创建一个数组result:
result = multiprocessing.Array('i', 4)
- 第一个参数是数据类型。 " i"代表整数, 而" d"代表浮点数据类型。
- 第二个参数是数组大小。在这里, 我们创建一个包含4个元素的数组。
square_sum = multiprocessing.Value('i')
在这里, 我们只需要指定数据类型。可以给该值一个初始值(例如10), 如下所示:
square_sum = multiprocessing.Value('i', 10)
其次,在创建Process对象时传递result和square_sum作为参数。
p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
通过指定数组元素的索引为result数组元素指定一个值。
for idx, num in enumerate(mylist):
result[idx] = num * num
square_sum通过使用它的值值属性:
square_sum.value = https://www.lsbin.com/sum(result)
为了打印result数组元素,我们使用result[:]来打印完整的数组。
print("Result(in process p1): {}".format(result[:]))
square_sum的值简单地打印为:
print("Sum of squares(in process p1): {}".format(square_sum.value))
下图描述了进程如何共享数组和值对象:
文章图片
服务器进程:每当python程序启动时,服务器进程也会启动。从那时起,每当需要一个新进程时,父进程就会连接到服务器,并请求它派生一个新进程。
服务器进程可以保存Python对象,并允许其他进程使用代理来操作它们。
multiprocessing模块提供了一个管理器类来控制服务器进程。因此,管理器提供了一种创建数据的方法,这些数据可以在不同的进程之间共享。
服务器进程管理器比使用共享内存对象更灵活, 因为它们可以支持任意对象类型, 例如列表, 字典, 队列, 值, 数组等。而且, 单个管理器可以由网络上不同计算机上的进程共享。 。但是, 它们比使用共享内存慢。考虑下面给出的示例:
import multiprocessingdef print_records(records):
"""
function to print record(tuples) in records(list)
"""
for record in records:
print ( "Name: {0}\nScore: {1}\n" . format (record[ 0 ], record[ 1 ]))def insert_record(record, records):
"""
function to add a new record to records(list)
"""
records.append(record)
print ( "New record added!\n" )if __name__ = = '__main__' :
with multiprocessing.Manager() as manager:
# creating a list in server process memory
records = manager. list ([( 'Sam' , 10 ), ( 'Adam' , 9 ), ( 'Kevin' , 9 )])
# new record to be inserted in records
new_record = ( 'Jeff' , 8 )# creating new processes
p1 = multiprocessing.Process(target = insert_record, args = (new_record, records))
p2 = multiprocessing.Process(target = print_records, args = (records, ))# running process p1 to insert new record
p1.start()
p1.join()# running process p2 to print records
p2.start()
p2.join()
New record added!Name: Sam
Score: 10Name: Adam
Score: 9Name: Kevin
Score: 9Name: Jeff
Score: 8
让我们尝试理解以上代码:
首先,我们使用以下方法创建一个管理器对象:
with multiprocessing.Manager() as manager:
with语句块下面的所有行都在管理器对象的范围内。
然后,我们在服务器进程内存中创建一个列表记录:
records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin', 9)])
类似地,您可以创建一个字典管manager.dict方法。
- 最后, 我们创建流程p1(在中插入新记录记录清单)和p2(打印记录)并在通过时运行它们记录作为参数之一。
文章图片
进程间通信
有效使用多个流程通常需要它们之间进行某种沟通, 以便可以划分工作并可以汇总结果。
multiprocessing支持两种类型的进程之间的通信通道::
- 队列Queue
- 管道Pipe
注意:multiprocessing.Queue类是queue.Queue的近似克隆。
考虑下面给出的示例程序:
import multiprocessingdef square_list(mylist, q):
"""
function to square a given list
"""
# append squares of mylist to queue
for num in mylist:
q.put(num * num)def print_queue(q):
"""
function to print queue elements
"""
print ( "Queue elements:" )
while not q.empty():
print (q.get())
print ( "Queue is now empty!" )if __name__ = = "__main__" :
# input list
mylist = [ 1 , 2 , 3 , 4 ]# creating multiprocessing Queue
q = multiprocessing.Queue()# creating new processes
p1 = multiprocessing.Process(target = square_list, args = (mylist, q))
p2 = multiprocessing.Process(target = print_queue, args = (q, ))# running process p1 to square list
p1.start()
p1.join()# running process p2 to get queue elements
p2.start()
p2.join()
Queue elements:
1
4
9
16
Queue is now empty!
让我们尝试逐步理解上面的代码:
- 首先, 我们创建一个多处理队列使用:
q = multiprocessing.Queue()
- 然后我们传递空队列qtosquare_list通过进程发挥作用p1。使用插入元素以排队放方法。
q.put(num * num)
- 为了打印队列元素, 我们使用得到直到队列不为空的方法。
while not q.empty(): print(q.get())
文章图片
管道:一个管道只能有两个端点。因此,当只需要双向通信时,它比队列更可取。
multiprocessing模块提供了Pipe()函数,该函数返回由管道连接的一对连接对象。Pipe()返回的两个连接对象表示管道的两端。每个连接对象都有send()和recv()方法。
考虑下面给出的程序:
import multiprocessingdef sender(conn, msgs):
"""
function to send messages to other end of pipe
"""
for msg in msgs:
conn.send(msg)
print ( "Sent the message: {}" . format (msg))
conn.close()def receiver(conn):
"""
function to print the messages received from other
end of pipe
"""
while 1 :
msg = conn.recv()
if msg = = "END" :
break
print ( "Received the message: {}" . format (msg))if __name__ = = "__main__" :
# messages to be sent
msgs = [ "hello" , "hey" , "hru?" , "END" ]# creating a pipe
parent_conn, child_conn = multiprocessing.Pipe()# creating new processes
p1 = multiprocessing.Process(target = sender, args = (parent_conn, msgs))
p2 = multiprocessing.Process(target = receiver, args = (child_conn, ))# running processes
p1.start()
p2.start()# wait until processes finish
p1.join()
p2.join()
Sent the message: hello
Sent the message: hey
Sent the message: hru?
Received the message: hello
Sent the message: END
Received the message: hey
Received the message: hru?
让我们尝试理解上面的代码:
只需使用以下命令即可创建管道:
parent_conn, child_conn = multiprocessing.Pipe()
该函数为管道的两端返回了两个连接对象。
消息使用send方法从管道的一端发送到另一端。
conn.send(msg)
为了接收管道一端的任何消息,我们使用recv方法。
msg = conn.recv()
在上面的程序中, 我们从一端发送消息列表到另一端。在另一端, 我们阅读消息, 直到收到" END"消息。
考虑下面给出的图表, 该图表显示了黑白管和流程之间的关系:
文章图片
注意:如果两个进程(或线程)试图同时从管道的同一端读取或写入管道的同一端, 则管道中的数据可能会损坏。当然, 不存在同时使用管道不同端的进程造成损坏的风险。还应注意, 队列在进程之间进行适当的同步, 但代价是更加复杂。因此, 据说队列是线程和进程安全的!
下一个:
- Python中进程的同步和池化
【Python如何使用多处理(进程间通信)(简介和用法指南|S2)】首先, 你的面试准备可通过以下方式增强你的数据结构概念:Python DS课程。
推荐阅读
- 算法设计(如何使用递归反转栈(Stack)())
- Python中如何实现密码验证(两种方法)
- 如何使用递归实现打印给定总和的所有子集()
- 图的深度优先搜索或DFS算法如何实现()
- C/C++中的函数如何使用(通俗解释和代码示例)
- jQuery如何使用bind()方法(代码示例)
- 移除最小数量的元素,使两个数组中不存在公共元素
- 苏格兰皇家银行面试经验(校园)
- win10正式版安装最新推荐