Python中进程的同步和池化(代码实现和图解)

先决条件– Python中的多处理|
本文讨论了与Python中的多处理相关的两个重要概念:

  • 进程之间的同步
  • 进程合并
【Python中进程的同步和池化(代码实现和图解)】进程之间的同步
进程同步被定义为一种机制, 该机制可确保两个或多个并发进程不会同时执行某些特定的程序段, 即关键部分.
关键部分是指程序中访问共享资源的部分。
例如, 在下图中, 3个进程尝试同时访问共享资源或关键部分。
Python中进程的同步和池化(代码实现和图解)

文章图片
对共享资源的并发访问可能导致竞争条件。
当两个或多个进程可以访问共享数据并且它们试图同时更改它们时, 就会发生争用情况。结果, 变量的值可能是不可预测的, 并且取决于进程的上下文切换的时间而变化。
考虑下面的程序以了解竞争条件的概念:
# Python program to illustrate # the concept of race condition # in multiprocessing import multiprocessing# function to withdraw from account def withdraw(balance): for _ in range ( 10000 ): balance.value = https://www.lsbin.com/balance.value - 1# function to deposit to account def deposit(balance): for _ in range ( 10000 ): balance.value = balance.value + 1def perform_transactions():# initial balance (in shared memory) balance = multiprocessing.Value('i' , 100 )# creating new processes p1 = multiprocessing.Process(target = withdraw, args = (balance, )) p2 = multiprocessing.Process(target = deposit, args = (balance, ))# starting processes p1.start() p2.start()# wait until processes are finished p1.join() p2.join()# print final balance print ( "Final balance = {}" . format (balance.value))if __name__ = = "__main__" : for _ in range ( 10 ):# perform same transaction process 10 times perform_transactions()

如果你在上述程序上运行, 你将获得一些非预期的值, 例如:
Final balance = 1311 Final balance = 199 Final balance = 558 Final balance = -2265 Final balance = 1371 Final balance = 1158 Final balance = -577 Final balance = -1300 Final balance = -341 Final balance = 157

在上述程序中, 以初始余额为100进行了10000次提款和10000次存款交易。预期的最终余额为100, 但我们在10次迭代中得到了perform_transactions函数是一些不同的值。
发生这种情况是由于进程同时访问共享数据平衡。余额的这种不可预测性不过是比赛条件.
让我们尝试使用下面给出的序列图更好地理解它。这些是在以上示例中针对单个撤回和存款操作可能产生的不同顺序。
这是一个可能的序列, 由于两个进程读取相同的值并相应地将其写回, 因此会给出错误的答案。
p1 p2 balance
read(balance)
current= 100
100
read(balance)
current= 100
100
balance=current-1 = 99
write(balance)
99
balance=current+ 1 = 101
write(balance)
101
这些是以上场景中需要的2个可能的序列。
p1 p2 balance
read(balance)
current= 100
100
balance=current-1 = 99
write(balance)
99
read(balance)
current= 99
99
balance=current+ 1 = 100
write(balance)
100
p1 p2 balance
read(balance)
current= 100
100
balance=current+ 1 = 101
write(balance)
101
read(balance)
current= 101
101
balance=current-1 = 100
write(balance)
100
使用锁
多处理模块提供了锁上课以应对比赛条件。锁是使用信号操作系统提供的对象。
信号量是一个同步对象, 它控制多个进程在并行编程环境中对公共资源的访问。它只是操作系统(或内核)存储中指定位置的值, 每个进程可以检查然后更改。根据找到的值, 该进程可以使用该资源, 或者会发现该资源已在使用中, 并且必须等待一段时间才能再次尝试。信号量可以是二进制的(0或1), 也可以具有其他值。通常, 使用信号量的进程会检查该值, 然后(如果使用资源)则更改该值以反映该值, 以便后续的信号量用户将知道等待。
考虑下面给出的示例:
# Python program to illustrate # the concept of locks # in multiprocessing import multiprocessing# function to withdraw from account def withdraw(balance, lock): for _ in range ( 10000 ): lock.acquire() balance.value = https://www.lsbin.com/balance.value - 1 lock.release()# function to deposit to account def deposit(balance, lock): for _ in range ( 10000 ): lock.acquire() balance.value = balance.value + 1 lock.release()def perform_transactions():# initial balance (in shared memory) balance = multiprocessing.Value('i' , 100 )# creating a lock object lock = multiprocessing.Lock()# creating new processes p1 = multiprocessing.Process(target = withdraw, args = (balance, lock)) p2 = multiprocessing.Process(target = deposit, args = (balance, lock))# starting processes p1.start() p2.start()# wait until processes are finished p1.join() p2.join()# print final balance print ( "Final balance = {}" . format (balance.value))if __name__ = = "__main__" : for _ in range ( 10 ):# perform same transaction process 10 times perform_transactions()

输出如下:
Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 100 Final balance = 100

让我们尝试逐步理解上面的代码:
首先, 锁对象使用以下对象创建:
lock = multiprocessing.Lock()

然后,将lock作为目标函数参数传递:
p1 = multiprocessing.Process(target=withdraw, args=(balance, lock)) p2 = multiprocessing.Process(target=deposit, args=(balance, lock))

在目标函数的临界区,我们使用lock.acquire()方法应用锁。一旦获得了锁,其他进程就不能访问它的临界区,直到使用lock.release()方法释放锁。
lock.acquire() balance.value = https://www.lsbin.com/balance.value - 1 lock.release()

正如你在结果中看到的, 最终余额每次都是100(这是预期的最终结果)。
进程之间的池化
让我们考虑一个简单的程序来查找给定列表中的数字平方。
# Python program to find # squares of numbers in a given list def square(n): return (n * n)if __name__ = = "__main__" :# input list mylist = [ 1 , 2 , 3 , 4 , 5 ]# empty list to store result result = []for num in mylist: result.append(square(num))print (result)

输出如下:
[1, 4, 9, 16, 25]

这是一个用于计算给定列表元素平方的简单程序。在多核/多处理器系统中, 请考虑下图以了解上述程序的工作方式:
Python中进程的同步和池化(代码实现和图解)

文章图片
只有一个内核用于程序执行, 其他内核很可能保持空闲状态。
为了利用所有核心, 多处理模块提供了泳池类。的泳池类表示工作进程池。它具有允许以几种不同方式将任务卸载到工作进程的方法。考虑下图:
Python中进程的同步和池化(代码实现和图解)

文章图片
在此, 任务由以下服务器自动在核心/进程之间卸载/分配:泳池目的。用户无需担心明确创建进程。
考虑下面给出的程序:
# Python program to understand # the concept of pool import multiprocessing import osdef square(n): print ( "Worker process id for {0}: {1}" . format (n, os.getpid())) return (n * n)if __name__ = = "__main__" : # input list mylist = [ 1 , 2 , 3 , 4 , 5 ]# creating a pool object p = multiprocessing.Pool()# map list to target function result = p. map (square, mylist)print (result)

输出如下:
Worker process id for 2: 4152 Worker process id for 1: 4151 Worker process id for 4: 4151 Worker process id for 3: 4153 Worker process id for 5: 4152 [1, 4, 9, 16, 25]

让我们尝试逐步理解以上代码:
我们使用以下方法创建一个Pool对象:
p = multiprocessing.Pool()

对于获得更多的任务卸载控制,有一些参数。这些都是:
  • processes:指定工作进程数。
  • maxtasksperchild:指定每个孩子要分配的最大任务数。
可以使用以下参数使池中的所有进程执行一些初始化:
  • initializer:指定工作进程的初始化函数。
  • initargs:要传递给初始化程序的参数。
现在,为了完成某个任务,我们必须将它映射到某个函数。在上面的例子中,我们将mylist映射到square函数。这样mylist的内容和square的定义就会分布在核之间。
result = p.map(square, mylist)

所有工作进程完成任务后, 将返回带有最终结果的列表。
如果发现任何不正确的地方, 或者想分享有关上述主题的更多信息, 请写评论。
首先, 你的面试准备可通过以下方式增强你的数据结构概念:Python DS课程。

    推荐阅读