- 1.概述
- 2.多进程
- 3.子进程
- 4.进程间通信
- 5.多线程
- 6.Lock
- 7.ThreadLocal
- 8.进程VS线程
- 9.分布式进程
- 总结
import osprint("Process (%s) start..." % os.getpid())"""只能在Linux/Unix/Mac上工作pid = os.fork()if pid == 0:print("I am child process (%s) and my parent is %s." % (os.getpid(), os.getppid()))else:print("I (%s) just created a child process (%s)." % (os.getpid(), pid))"""print("Hello.")
# multiprocessing:跨平台多线程模块# process_test.py文件,在交互下python process_test.pyfrom multiprocessing import Processimport osdef run_process(name):print("Run child process %s (%s)..." % (name, os.getpid()))if __name__ == "__main__":print("Parent process %s." % os.getpid())p = Process(target = run_process, args = ("test",))print("Child process will start.")p.start()p.join()# join()方法可以等待子进程结束后再继续往下运行,用于进程间的同步print("Child process end.")
# 结果输出:
Parent process 28340.
Child process will start.
Run child process test (31152)...
Child process end.
# Pool:用进程池批量创建子进程# process.py文件,交互下python process.pyfrom multiprocessing import Poolimport os, time, randomdef long_time_task(name):print('Run task %s (%s)...' % (name, os.getpid()))start = time.time()time.sleep(random.random() * 3)end = time.time()print('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__=='__main__':print('Parent process %s.' % os.getpid())p = Pool(4)for i in range(5):p.apply_async(long_time_task, args=(i,))print('Waiting for all subprocesses done...')p.close()p.join()print('All subprocesses done.')
# 结果输出:
Parent process 31576.
Waiting for all subprocesses done...
Run task 0 (20416)...
Run task 1 (15900)...
Run task 2 (24716)...
Run task 3 (31148)...
Task 2 runs 0.72 seconds.
Run task 4 (24716)...
Task 4 runs 1.03 seconds.
Task 3 runs 1.82 seconds.
Task 1 runs 2.73 seconds.
Task 0 runs 2.82 seconds.
All subprocesses done.
# subprocess模块:启动一个子进程,控制其输入和输出# subprocess_test.py文件,注:文件名不要和模块名相同,否则报错import subprocessprint("$ nslookup")r =["nslookup", ""])print("Exit code:", r)
# 结果输出:
$ nslookup
Exit code: 0
# 子进程需要输入,通过communicate()方法import subprocessprint("$ nslookup")p = subprocess.Popen(["nslookup"], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)output, err = p.communicate(b"set q = mx\\nexit\n")print(output.decode("gbk"))print("Exit code:", p.returncode)
# 结果输出:
$ nslookup
> Unrecognized command: set q = mx
> 服务器
Exit code: 0
# 在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据# queue_test.py文件,交互下python queue_test.pyfrom multiprocessing import Process, Queueimport os, time, randomdef write(q):print("Process to write:%s" % os.getpid())for value in ["W", "I", "L", "L", "A", "R", "D"]:print("Put %s to queue..." % value)q.put(value)time.sleep(random.random())def read(q):print("Process to read:%s" % os.getpid())while True:value ="Get %s from queue." % value)if __name__ == "__main__":# 父进程创建Queue,并传给各个子进程q = Queue()pw = Process(target = write, args = (q,))pr = Process(target = read, args = (q,))# 启动子进程pw,写入pw.start()# 启动子进程pr,读取pr.start()# 等待pw结束pw.join()# pr进程是死循环,无法等待其结束,需要强行终止pr.terminate()
# 结果输出:
Process to write:15720
Process to read:21524
Put W to queue...
Get W from queue.
Put I to queue...
Get I from queue.
Put L to queue...
Get L from queue.
Put L to queue...
Get L from queue.
Put A to queue...
Get A from queue.
Put R to queue...
Get R from queue.
Put D to queue...
Get D from queue.
# 线程库:_thread和threading# 启动一个线程:即把一个函数传入并创建一个Thread实例,然后调用start()开始执行# 任何进程默认启动一个线程,该线程称为主线程,主线程可以启动新的线程# current_thread()函数:返回当前线程的实例;# 主线程实例名字:MainThread;# 子线程名字的创建时指定,如果不指定,则自动给线程命名为Thread-1、Thread-2...import time, threadingdef loop():print("Thread %s is running..." % threading.current_thread().name)n = 0while n < 5:n = n + 1print("Thread %s >>> %s" % (threading.current_thread().name, n))time.sleep(1)print("Thread %s ended." % threading.current_thread().name)print("Thread %s is running..." % threading.current_thread().name)thread1 = threading.Thread(target = loop, name = "LoopThread")thread1.start()thread1.join()print("Thread %s ended." % threading.current_thread().name)
# 结果输出:
Thread MainThread is running...
Thread LoopThread is running...
Thread LoopThread >>> 1
Thread LoopThread >>> 2
Thread LoopThread >>> 3
Thread LoopThread >>> 4
Thread LoopThread >>> 5
Thread LoopThread ended.
Thread MainThread ended.
# 多进程:同一个变量,各自有一份拷贝存在于每个进程中,互不影响;# 多线程:所有变量由所有线程共享,任何一个变量可以被任何一个线程修改;# 多线程同时操作一个变量# 多运行几次,发现结果不为0import time, threadingbalance = 0def change_it(n):global balancebalance = balance + nbalance = balance - ndef run_thread(n):# 线程交替执行,balance结果不一定为0for i in range(2000000):change_it(n)thread1 = threading.Thread(target = run_thread, args = (5,))thread2 = threading.Thread(target = run_thread, args = (8,))thread1.start()thread2.start()thread1.join()thread2.join()print(balance)# 结果输出:# 5(各自不同)
# 确保balance计算正确,需要给change_it()上一把锁# 当线程开始执行change_it()时,该线程获得锁,其他线程不能同时执行change_it(),# 只能等待,直到锁被释放,获得该锁后才能改;# 通过threading.Lock()创建锁import time, threadingbalance = 0lock = threading.Lock()def change_it(n):global balancebalance = balance + nbalance = balance - ndef run_thread(n):for i in range(2000000):lock.acquire()try:change_it(n)finally:# 释放锁lock.release()thread1 = threading.Thread(target = run_thread, args = (5,))thread2 = threading.Thread(target = run_thread, args = (8,))thread1.start()thread2.start()thread1.join()thread2.join()print(balance)# 结果输出:# 0
# 多线程环境下,每个线程有自己的数据;# 一个线程使用自己的局部变量比使用全局变量好;import threading# 创建全局ThreadLocal对象local_school = threading.local()def process_student():# 获取当前线程关联的studentstd = local_school.studentprint("Hello,%s (in %s)" % (std, threading.current_thread().name))def process_thread(name):# 绑定ThreadLocal的studentlocal_school.student = nameprocess_student()thread1 = threading.Thread(target = process_thread, args = ("Willard",), name = "Thread-1")thread2 = threading.Thread(target = process_thread, args = ("WenYu",), name = "Thread-2")thread1.start()thread2.start()thread1.join()thread2.join()
# 结果输出:
# Hello,Willard (in Thread-1)
# Hello,WenYu (in Thread-2)
# 进程和线程优缺点:# 1.要实现多任务,会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务,# 在多任务环境下,通常是一个Master,多个Worker;#a.如果使用多进程实现Master-Worker,主进程即Master,其他进程即Worker;#b.如果使用多线程实现Master-Worker,主线程即Master,其他线程即Worker;# 2.多进程优点:稳定性高,一个子进程崩溃不会影响主进程和其他子进程;# 3.多进程缺点:创建进程的代价大,操作系统能同时运行的进程数有限;# 4.多线程缺点:任何一个线程崩溃,可能直接造成整个进程崩溃;# 线程切换:# 1.依次完成任务的方式称为单任务模型,或批处理任务模型;# 2.任务1先做n分钟,切换到任务2做n分钟,再切换到任务3做n分钟,依此类推,称为多任务模型;# 计算密集型 VS IO密集型# 1.计算密集型任务:要进行大量的计算,消耗CPU资源,如:对视频进行高清解码等;# 2.IO密集型任务:涉及到网络、磁盘IO的任务,均为IO密集型任务;# 3.IO密集型任务消耗CPU少,大部分时间在等待IO操作完成;# 异步IO# 1.事件驱动模型:用单进程单线程模型来执行多任务;# 2.Python语言中,单线程的异步编程模型称为协程;
"""实例:有一个通过Queue通信的多进程程序在同一机器上运行,但现在处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程发布到两台机器上;"""# 交互环境中:python task_master_test.pyimport random, time, queuefrom multiprocessing.managers import BaseManager# 发送任务的队列task_queue = queue.Queue()# 接收结果的队列result_queue = queue.Queue()def return_task_queue():global task_queuereturn task_queuedef return_result_queue():global task_queuereturn task_queue# 从BaseManager继承的QueueManagerclass QueueManager(BaseManager):passif __name__ == "__main__":# 把两个Queue注册到网络上,callable参数关联Queue对象QueueManager.register("get_task_queue", callable = return_task_queue)QueueManager.register("get_result_queue", callable = return_result_queue)# 绑定端口5000,设置验证码"Willard"manager = QueueManager(address = ("", 5000), authkey = b"Willard")# 启动Queuemanager.start()# 获得通过网络访问的Queue对象task = manager.get_task_queue()result = manager.get_result_queue()# 放任务进去for i in range(10):n = random.randint(0, 10000)print("Put task %d..." % n)task.put(n)# 从result队列读取结果print("Try get results...")for i in range(10):r = result.get(timeout = 10)print("Result:%s" % r)# 关闭manager.shutdown()print("Master Exit.")
# task_worker_test.py文件# 交互环境python task_worker_test.pyimport time, sys, queuefrom multiprocessing.managers import BaseManager# 创建QueueManagerclass QueueManager(BaseManager):passQueueManager.register("get_task_queue")QueueManager.register("get_result_queue")# 连接到服务器server_address = ""print("Connect to server %s..." % server_address)# 端口和验证码m = QueueManager(address = (server_address, 5000), authkey = b"Willard")# 网络连接m.connect()# 获取Queue对象task = m.get_task_queue()result = m.get_result_queue()# 从task队列取任务,把结果写入result队列for i in range(10):try:n = task.get(timeout = 1)print("Run task %d * %d..." % (n, n))r = "%d * %d = %d" % (n, n, n * n)time.sleep(1)result.put(r)except Queue.Empty:print("Task queue is empty.")print("Worker Exit.")
总结 【Python3的进程和线程你了解吗】本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注脚本之家的更多内容!
