Python的多任务编程

莫道桑榆晚,为霞尚满天。这篇文章主要讲述Python的多任务编程相关的知识,希望能为你提供帮助。
前言【Python的多任务编程】python程序代码都是按自上而下的顺序加载并执行的,但实际需要代码处理的任务并不都是需要按部就班的顺序执行,通常为提高代码执行的效率,需要多个代码执行任务同时执行,也就是多任务编程的需求。
基本的计算机模型是由CPU、RAM及各种资源(键盘、硬盘、显卡、网卡等)组成,代码的执行过程,实际就是CPU和相关寄存器及RAM之间的相关处理的过程。在单核CPU场景下,一段代码交由CPU执行前,都会处于就绪队列中,CPU执行时很快就会返回该段代码的结果,所以不同进程的代码是轮流由CPU执行的,由于CPU执行速度很快,在表现上仍会被感觉是同时执行的。不同就绪队列间的读入与结果保存被称之为上下文切换,由于进程间切换会产生一定的时间等待及资源的消耗,所以为了减少等待时间和资源的消耗,就引入了线程的设计。线程是当进程的队列被授权占用CPU时,该进程的所有线程队列在共享该进程资源的环境下按优先级由CPU执行。无论是进程还是线程,其队列及资源切换都是由操作系统进行控制的,同时线程的切换也是非常消耗性能的,为了使各线程的调度更节约资源,就出现了协程的设计。协程是在进程或线程环境下执行的,其拥有自己的寄存器上下文和栈,调度是完全由用户控制的,相当于函数方法的调度。对于多任务编程,若要实现代码的多任务高效率执行,我们要明晰如下这几个概念的特点及其区别,才能根据实际需求,选用最佳的多任务编程方法。

  • 并行
    指在同一时刻有多个进程的指令在多个处理器上同时执行。
  • 并发
    是指在同一时刻只能有一个进程的指令执行,但多个进程指令被快速轮换执行,使得在宏观上具有多个进程同时执行的效果。
  • 进程
    进程是程序的运行态,进程间数据共享需要借助外部存储空间。
  • 线程
    线程是进程的组成部分,一个进程可以包含一个或多个线程,同一进程内线程间数据共享属于内部共享。
  • 协程
    协程是一种用户态的轻量级线程,一个进程可以包含一个或多个协程,也可以在一个线程包含一个或多个协程。协程的调度完全由用户控制,同一进程内协程间数据共享属于内部共享。
多线程处理由于Python是动态编译的语言,与C/C++、java等静态语言不同,它是在运行时一句一句代码地边编译边执行的。用C语言实现的Python解释器,通常称为CPython,也是Python环境默认的编译器。在Cpython解释器中,为防止多个线程同时执行同一 Python 的代码段,确保线程数据安全,引入了全局解释器锁(GIL, Global Interpreter Lock)的处理机制, 该机制相当于一个互斥锁,所以即便一个进程下开启了多线程,但同一时刻只能有一个线程被执行。所以Python 的多线程是伪线程,性能并不高,也无法利用CPU多核的优势。
另,GIL并不是Python的特性,他是在实现Python解释器(Cpython)时所引入的一个概念,GIL保护的是解释器级的数据,保护用户自己的数据仍需要自己加锁处理。在默认情况下,由于GIL的存在,为了使多线程(threading)执行效率更高,需要使用join方法对无序的线程进行阻塞,如下代码可以看到区别。
from multiprocessing import Process import threading import os,timel=[] stop=time.time() def work(): global stop time.sleep(2) print(\'===> \',threading.current_thread().name) stop=time.time()def test1(): for i in range(400): p=threading.Thread(target=work,name="test"+str(i)) l.append(p) p.start()def test2(): for i in range(400): p=threading.Thread(target=work,name="test"+str(i)) l.append(p) p.start()for p in l: p.join()if __name__ == \'__main__\': print("CPU Core:",os.cpu_count()) #本机为4核 print("Worker: 400") #测试线程数start=time.time() test1() active_count=threading.active_count() while (active_count> 1): active_count=threading.active_count() continue test1_result=stop-startstart=time.time() l=[] test2() active_count=threading.active_count() while (active_count> 1): active_count=threading.active_count() continueprint(\'Thread run time is %s\' %(test1_result)) print(\'Thread join run time is %s\' %(stop-start))

执行结果如下:
Thread run time is 4.829492807388306 Thread join run time is 2.053645372390747

由上结果可以看到, 多线程时join阻塞后执行效率提高了很多。
多进程与多线程多任务编程的本质是CPU占用方法的调度处理,对于python下多任务处理有多种编程方法可供选择,分别有多进程(multiprocessing)、多线程(threading)及异步协程(Asyncio),在实际使用中该如何选择呢?我们先看如下一段程序的执行效果。
from multiprocessing import Process from threading import Thread import os,timel=[] def work(): res=0 for i in range(100000000): res*=idef test1(): for i in range(4): p=Process(target=work) l.append(p) p.start()def test2(): for i in range(4): p=Thread(target=work) l.append(p) p.start()for p in l: p.join()if __name__ == \'__main__\': print("CPU Core:",os.cpu_count()) #本机为4核 print("Worker: 4") #工作线程或子进程数start=time.time() test1() while (l[len(l)-1].is_alive()): continue stop=time.time() print(\'Process run time is %s\' %(stop-start))start=time.time() l=[] test2() while (l[len(l)-1].is_alive()): continue stop=time.time() print(\'Thread run time is %s\' %(stop-start))

执行结果如下:
CPU Core: 4 Worker: 4 Process run time is 11.030176877975464 Thread run time is 17.0117769241333

从上面的结果,我们可以看到同一个函数用Process及Thread 不同的方法,执行的时间是不同的,为什么会产生这样的差异?
多进程(multiprocessing)方法使用子进程而非线程,其有效地绕过了全局解释器锁GIL(Global Interpreter Lock), 并充分利用了多核CPU的性能,所以在多核CPU环境下,其比多线程方式效率要高。
协程又称为微线程,协程也可被看作是被标注的函数,不同被表注函数的执行和切换就是协程的切换,其完全由编程者自行控制。协程一般是使用 gevent库,在早期这个库用起来比较麻烦,所以在python 3.7以后的版本,对协程的使用方法做了优化。执行代码如下:
import asyncio import timeasync def work(i): await asyncio.sleep(2) print(\'===> \',i)async def main(): start=time.time() l=[] for i in range(400): p=asyncio.create_task(work(i)) l.append(p)for p in l: await pstop=time.time() print(\'run time is %s\' %(stop-start))asyncio.run(main())

执行结果如下:
run time is 2.0228068828582764

另,默认环境下,协程是在单线程模式下执行的异步操作,其并不能发挥多处理器的性能。为了提升执行效率,可以在多进程中执行协程调用方法,代码用例如下:
from multiprocessing import Process import asyncio import os,timel=[] async_result=0 async def work1(): res=0 for i in range(100000000): res*=i# 协程入口 async def async_test(): m=[] for i in range(4): p=asyncio.create_task(work1()) m.append(p)for p in m: await pasync def async_test1(): await asyncio.create_task(work1())def async_run(): asyncio.run(async_test1())# 多进程入口 def test1(): for i in range(4): p=Process(target=async_run) l.append(p) p.start()if __name__ == \'__main__\': print("CPU Core:",os.cpu_count()) #本机为4核 print("Worker: 4") #工作线程或子进程数 start=time.time() asyncio.run(async_test()) stop=time.time()print(\'Asyncio run time is %s\' %(stop-start))start=time.time() test1() while (l[len(l)-1].is_alive()): continue stop=time.time()print(\'Process Asyncio run time is %s\' %(stop-start))

执行结果如下:
CPU Core: 4 Worker: 4 Asyncio run time is 18.89663052558899 Process Asyncio run time is 10.865562438964844

如上结果,在多进程中调用多协程的方法,执行效率明显提高。
多任务编程选择如上所结果是否是就决定一定要选择多进程(multiprocessing)模式呢?我们再看下如下代码:
from multiprocessing import Process from threading import Thread import os,timel=[] # 仅计算 def work1(): res=0 for i in range(100000000): res*=i# 仅输出 def work2(): time.sleep(2) print(\'===> \')# 多进程,仅计算 def test1(): for i in range(4): p=Process(target=work1) l.append(p) p.start()# 多进程,仅输出 def test_1(): for i in range(400): p=Process(target=work2) l.append(p) p.start()# 多线程,仅计算 def test2(): for i in range(4): p=Thread(target=work1) l.append(p) p.start()for p in l: p.join()# 多线程,仅输出 def test_2(): for i in range(400): p=Thread(target=work2) l.append(p) p.start()for p in l: p.join()if __name__ == \'__main__\': print("CPU Core:",os.cpu_count()) #本机为4核start=time.time() test1() while (l[len(l)-1].is_alive()): continue stop=time.time() test_result=stop-startstart=time.time() l=[] test_1() while (l[len(l)-1].is_alive()): continue stop=time.time() test1_result=stop-startstart=time.time() l=[] test2() while (l[len(l)-1].is_alive()): continue stop=time.time() test2_result=stop-start start=time.time() l=[] test_2() while (l[len(l)-1].is_alive()): continue stop=time.time() test3_result=stop-start print(\'Process run time is %s\' %(test_result)) print(\'Process I/O run time is %s\' %(test1_result)) print(\'Thread run time is %s\' %(test2_result)) print(\'Thread I/O run time is %s\' %(stop-start))

执行结果如下:
Process run time is 10.77662968635559 Process I/O run time is 2.9869778156280518 Thread run time is 16.842355012893677 Thread I/O run time is 2.024587869644165

由结果可看,在仅计算的操作时,多进程效率比较高,在仅输出的操作时,多线程的效率比较高,所以在实际使用中要根据实际情况测试决定。通用的建议如下:
  • 多线程(threading)用于IO密集型,如socket,爬虫,web
  • 多进程(multiprocessing)用于计算密集型,如数据分析
参考
  • 多线程
    https://docs.python.org/zh-cn/3/library/threading.html
  • 多进程
    https://docs.python.org/zh-cn/3/library/multiprocessing.html#
  • 协程
    https://docs.python.org/zh-cn/3/library/asyncio-task.html

    推荐阅读