Pool.apply_async()。get()导致_thread.lock pickle错误

逆水行舟用力撑,一篙松劲退千寻。这篇文章主要讲述Pool.apply_async()。get()导致_thread.lock pickle错误相关的知识,希望能为你提供帮助。
我最近制作了一个python程序,它将从消费者/生产者并行计算策略中受益匪浅。我试图开发一个模块(Class)来简化这种处理策略的实现,但我很快遇到了问题。
我的ProducerConsumer类:

class ProducerConsumer(object): def __init__(self, workers_qt, producer, consumer, min_producer_qt=1): self.producer_functor = producer# Pointer to the producer function self.consumer_functor = consumer# Pointer to the consumer functionself.buffer = deque([])# Thread-safe double-ended queue item for intermediate result bufferself.workers_qt = workers_qt self.min_producer_qt = min_producer_qt# Minimum quantity of active producers (if enough remaining input data)self.producers = []# List of producers async results self.consumers = []# List of consumers async resultsdef produce(self, params, callback=None): result = self.producer_functor(*params)# Execute the producer function if callback is not None: callback()# Call the callback (if there is one) return resultdef consume(self, params, callback=None): result = self.consumer_functor(params)# Execute the producer function if callback is not None: callback()# Call the callback (if there is one) return result# Map a list of producer's input data to a list of consumer's output data def map_result(self, producers_param): result = []# Result container producers_param = deque(producers_param)# Convert input to double-ended queue (for popleft() member)with Pool(self.workers_qt) as p:# Create a worker pool while self.buffer or producers_param or self.consumers or self.producers:# Work remaining # Create consumers if self.buffer and (len(self.producers) > = self.min_producer_qt or not producers_param): consumer_param = self.buffer.popleft()# Pop one set from the consumer param queue if not isinstance(consumer_param, tuple): consumer_param = (consumer_param,)# Force tuple typeself.consumers.append(p.apply_async(func=self.consume, args=consumer_param))# Start new consumer# Create producers elif producers_param: producer_param = producers_param.popleft()# Pop one set from the consumer param queue if not isinstance(producer_param, tuple): producer_param = (producer_param,)# Force tuple typeself.producers.append(p.apply_async(func=self.produce, args=producer_param))# Start new producer# Filter finished async_tasks finished_producers = [r for r in self.producers if r.ready()] if self.producers else [] finished_consumers = [r for r in self.consumers if r.ready()] if self.consumers else []# Remove finished async_tasks from the running tasks list self.producers = [r for r in self.producers if r not in finished_producers] self.consumers = [r for r in self.consumers if r not in finished_consumers]# Extract result from finished async_tasks for r in finished_producers: assert r.ready() self.buffer.append(r.get())# Get the producer result and put it in the buffer for r in finished_consumers: assert r.ready() result.append(r.get())# Get the consumer tesult and put in in the function local result varreturn result

在成员map_result()中,当我尝试“get()”apply_async(...)函数的结果时,我得到以下错误(请注意我正在运行python3):
Traceback (most recent call last): File "ProducerConsumer.py", line 91, in < module> test() File "ProducerConsumer.py", line 85, in test result = pc.map_result(input) File "ProducerConsumer.py", line 64, in map_result self.buffer.append(r.get())# Get the producer result and put it in the buffer File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get raise self._value File "/usr/lib/python3.5/multiprocessing/pool.py", line 385, in _handle_tasks put(task) File "/usr/lib/python3.5/multiprocessing/connection.py", line 206, in send self._send_bytes(ForkingPickler.dumps(obj)) File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps cls(buf, protocol).dump(obj) TypeError: can't pickle _thread.lock objects

这里有一些代码来重现我的错误(显然取决于类):
def test_producer(val): return val*12def test_consumer(val): return val/4def test(): pc = ProducerConsumer(4, test_producer, test_consumer) input= [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]# Input for the test of the ProducerConsumer class expected = [0, 3, 6, 9, 15, 18, 21, 23, 27]# Expected output for the test of the ProducerConsumer classresult = pc.map_result(input)print('got: {}'.format(result)) print('expected : {}'.format(expected))if __name__ == '__main__': test()

请注意,在我的类的map_result()成员中,我只有“get()”结果为“ready()”。
【Pool.apply_async()。get()导致_thread.lock pickle错误】根据我对酸洗的了解(我承认并不是那么多),我会说我在成员函数上使用Pool.apply_async(...)的事实可以发挥作用,但我真的想保留如果可以的话,班级结构。
感谢您的帮助!
答案所以,当我也纠正了一些概念错误时,问题已得到纠正:
我的3个缓冲区变量(缓冲区,生成器,消费者)与该类的成员无关,因为它们在语义上绑定到“map_result()”成员本身。
因此补丁正在删除这些成员并将其创建为成员“map_result()”的局部变量。
问题是,即使概念有缺陷,我仍然很难理解为什么工人不能腌制锁(我现在假设的那个)所以......如果有人对发生的事情有明确的解释(或者某些链接)真的很感激。

    推荐阅读