掘金 后端 ( ) • 2024-03-31 10:26

报错详情:

Exception iterating responses: Can't pickle local object 'Servicer.Build.<locals>.run.<locals>.t'

调用堆栈

File "/home/rtx2080ti/code/xhspark/src/knowledge/servicer/views.py", line 337, in Build
run()
File "/home/rtx2080ti/code/xhspark/src/knowledge/servicer/views.py", line 317, in run
    p.apply(func=t, args=['a',])
File "/home/rtx2080ti/anaconda3/envs/torch/lib/python3.11/multiprocessing/pool.py", line 360, in apply
    return self.apply_async(func, args, kwds).get()
                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/rtx2080ti/anaconda3/envs/torch/lib/python3.11/multiprocessing/pool.py", line 774, in get
    raise self._value
File "/home/rtx2080ti/anaconda3/envs/torch/lib/python3.11/multiprocessing/pool.py", line 540, in _handle_tasks
    put(task)
File "/home/rtx2080ti/anaconda3/envs/torch/lib/python3.11/multiprocessing/connection.py", line 205, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
    ^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/rtx2080ti/anaconda3/envs/torch/lib/python3.11/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
    AttributeError: Can't pickle local object 'Servicer.Build.<locals>.run.<locals>.t'

代码复现

'''
Author: BeYoung
Date: 2024-03-08 14:05:20
LastEditTime: 2024-03-30 18:07:43
'''
import multiprocessing
import time
import asyncio

    
if __name__ == "__main__":
    async def sleep2(nums):
        await asyncio.sleep(2)
        return nums
    
    class A:
    
        def main():
            def callback(num):
                q.put(num)
                print(f"callback {num}")
                
            def func(num):
                asyncio.set_event_loop(asyncio.new_event_loop())
                nums = list(range(num))
                res = asyncio.run(sleep2(nums))
                print(res)
                return num
                
            with multiprocessing.Pool() as pool:
                for i in range(50):
                    pool.apply_async(func,args=[i],callback=callback)
                pool.close()
                pool.join()
                
            q.put("OK")
    
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=A.main,args=[])
    p.start()
    
    try:
        while p.is_alive():
            r = q.get()
            if r == "OK":
                p.terminate()
                time.sleep(1)
                p.close()
                
            print(f"main {r}")
    except Exception as e:
            print("ok")
    

原因

func函数为类方法中的局部变量,无法序列化。即 func 仅仅在方法执行期间有效,一旦方法执行完毕,这些局部变量就会被销毁,它们不再存在于内存中。

解决方式

将 func 移出 main 的闭包中

'''
Author: BeYoung
Date: 2024-03-08 14:05:20
LastEditTime: 2024-03-30 18:22:55
'''
import multiprocessing
import time
import asyncio

    
if __name__ == "__main__":
    async def sleep2(nums):
        await asyncio.sleep(2)
        return nums
    
    class A:
        def func(num):
            asyncio.set_event_loop(asyncio.new_event_loop())
            nums = list(range(num))
            res = asyncio.run(sleep2(nums))
            print(res)
            return num
            
        def main():
            def callback(num):
                q.put(num)
                print(f"callback {num}")
                
            with multiprocessing.Pool() as pool:
                for i in range(50):
                    pool.apply_async(A.func,args=[i],callback=callback)
                pool.close()
                pool.join()
                
            q.put("OK")
    
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=A.main,args=[])
    p.start()
    
    try:
        while p.is_alive():
            r = q.get()
            if r == "OK":
                p.terminate()
                time.sleep(1)
                p.close()
                
            print(f"main {r}")
    except Exception as e:
            print("ok")