众所周知,CPU是计算机的核心,它承担了所有的计算任务。而操作系统是计算机的管理者,是一个大管家,它负责任务的调度,资源的分配和管理,统领整个计算机硬件。应用程序是具有某种功能的程序,程序运行与操作系统之上
在很早的时候计算机并没有线程这个概念,但是随着时代的发展,只用进程来处理程序出现很多的不足。如当一个进程堵塞时,整个程序会停止在堵塞处,并且如果频繁的切换进程,会浪费系统资源。所以线程出现了
线程是能拥有资源和独立运行的最小单位,也是程序执行的最小单位。一个进程可以拥有多个线程,而且属于同一个进程的多个线程间会共享该进行的资源
进程时一个具有一定功能的程序在一个数据集上的一次动态执行过程。进程由程序,数据集合和进程控制块三部分组成。程序用于描述进程要完成的功能,是控制进程执行的指令集;数据集合是程序在执行时需要的数据和工作区;程序控制块(PCB)包含程序的描述信息和控制信息,是进程存在的唯一标志
在Python中,通过两个标准库 thread和 Threading提供对线程的支持,threading对 thread进行了封装。threading模块中提供了 Thread,Lock, RLOCK, Condition等组件
在Python中线程和进程的使用就是通过Thread这个类。这个类在我们的thread和threading模块中。我们一般通过threading导入
默认情况下,只要在解释器中,如果没有报错,则说明线程可用
from threading import ThreadThread.run(self) # 线程启动时运行的方法,由该方法调用 target 参数所指定的函数Thread.start(self) # 启动线程,start 方法就是去调用 run 方法Thread.terminate(self) # 强制终止线程Thread.join(self, timeout) # 阻塞调用,主线程进行等待Thread.setDaemon(self, daemonic) # 将子线程设置为守护线程Thread.getName(self, name) # 获取线程名称Thread.setName(self, name) # 设置线程名称| 参数 | 说明 |
|---|---|
| target | 表示调用对象,即子线程要执行的任务 |
| name | 子线程的名称 |
| args | 传入 target 函数中的位置参数,是一个元组,参数后必须添加逗号 |
import time, queue, threadingclass MyThread(threading.Thread): def __init__(self): super().__init__() self.daemon = True # 开启守护模式 self.queue = queue.Queue(3) # 开启队列对象,存储三个任务 self.start() # 实例化的时候直接启动线程,不需要手动启动线程 def run(self) -> None: # run方法线程自带的方法,内置方法,在线程运行时会自动调用 while True: # 不断处理任务 func, args, kwargs = self.queue.get() func(*args, **kwargs) # 调用函数执行任务 元组不定长记得一定要拆包 self.queue.task_done() # 解决一个任务就让计数器减一,避免阻塞 # 生产者模型 def submit_tasks(self, func, args=(), kwargs={}): # func为要执行的任务,加入不定长参数使用(默认使用默认参数) self.queue.put((func, args, kwargs)) # 提交任务 # 重写join方法 def join(self) -> None: self.queue.join() # 查看队列计时器是否为0 任务为空 为空关闭队列 def f2(*args, **kwargs): time.sleep(2) print("任务2完成", args, kwargs)# 实例化线程对象mt = MyThread()# 提交任务mt.submit_tasks(f2, args=("aa", "aasd"), kwargs={"a": 2, "s": 3})# 让主线程等待子线程结束再结束mt.join()守护模式:
- 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束
def f2(i): time.sleep(2) print("任务2完成", i)lis = []for i in range(5): t = Thread(target=f2, args=(i,)) t.start() # 启动 5 个线程 lis.append(t)for i in lis: i.join() # 线程等待现在我们程序代码中,有多个线程, 并且在这个几个线程中都会去 操作同一部分内容,那么如何实现这些数据的共享呢?
这时,可以使用 threading库里面的锁对象 Lock 去保护
Lock 对象的acquire方法 是申请锁
每个线程在操作共享数据对象之前,都应该申请获取操作权,也就是调用该共享数据对象对应的锁对象的acquire方法,如果线程A 执行了 acquire()方法,别的线程B 已经申请到了这个锁, 并且还没有释放,那么 线程A的代码就在此处 等待 线程B 释放锁,不去执行后面的代码。
直到线程B 执行了锁的 release 方法释放了这个锁, 线程A 才可以获取这个锁,就可以执行下面的代码了
如:
import threadingvar = 1# 添加互斥锁,并且拿到锁lock = threading.Lock()# 定义两个线程要用做的任务def func1(): global var # 声明全局变量 for i in range(1000000): lock.acquire() # 操作前上锁 var += i lock.release() # 操作完后释放锁 def func2(): global var # 声明全局变量 for i in range(1000000): lock.acquire() # 操作前上锁 var -= i lock.release() # 操作完后释放锁 # 创建2个线程t1 = threading.Thread(target=func1)t2 = threading.Thread(target=func2)t1.start()t2.start()t1.join()t2.join()print(var)到在使用多线程时,如果数据出现和自己预期不符的问题,就可以考虑是否是共享的数据被调用覆盖的问题
使用
threading库里面的锁对象Lock去保护
Python中的多进程是通过multiprocessing包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法差不多也有start(), run(), join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的
import timefrom multiprocessing import Processclass MyProcess(Process): # 继承Process类 def __init__(self, target, args=(), kwargs={}): super(MyProcess, self).__init__() self.daemon = True # 开启守护进程 self.target = target self.args = args self.kwargs = kwargs self.start() # 自动开启进程 def run(self): self.target(*self.args, **self.kwargs)def fun(*args, **kwargs): print(time.time()) print(args[0])if __name__ == '__main__': lis = [] for i in range(5): p = MyProcess(fun, args=(1, )) lis.append(p) for i in lis: i.join() # 让进程等待守护模式:
- 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束
import timefrom multiprocessing import Processdef fun(*args, **kwargs): print(time.time()) print(args[0])if __name__ == '__main__': lis = [] for i in range(5): p = Process(target=fun, args=(1, )) lis.append(p) for i in lis: i.join() # 让进程等待其使用方法和线程的那个 Lock 使用方法类似
Manager的作用是提供多进程共享的全局变量,Manager()方法会返回一个对象,该对象控制着一个服务进程,该进程中保存的对象运行其他进程使用代理进行操作
Manager支持的类型有:list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array语法:
from multiprocessing import Process, Lock, Managerdef f(n, d, l, lock): lock.acquire() d[str(n)] = n l[n] = -99 lock.release()if __name__ == '__main__': lock = Lock() with Manager() as manager: d = manager.dict() # 空字典 l = manager.list(range(10)) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] # 启动10个进程,不同的进程对d和l中的不同元素进行操作 for i in range(10): p = Process(target=f, args=(i, d, l, lock)) p.start() p.join() print(d) print(l)线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定
Exectuor 提供了如下常用方法:
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表
Future 提供了如下方法:
from multiprocessing import cpu_count # cpu核心数模块,其可以获取 CPU 核心数n = cpu_count() # 获取cpu核心数使用线程池来执行线程任务的步骤如下:
from concurrent.futures import ThreadPoolExecutorimport threadingimport time# 定义一个准备作为线程任务的函数def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ' ' + str(i)) my_sum += i return my_sum# 创建一个包含2条线程的线程池pool = ThreadPoolExecutor(max_workers=2)# 向线程池提交一个task, 50会作为action()函数的参数future1 = pool.submit(action, 50)# 向线程池再提交一个task, 100会作为action()函数的参数future2 = pool.submit(action, 100)def get_result(future): print(future.result()) # 为future1添加线程完成的回调函数future1.add_done_callback(get_result)# 为future2添加线程完成的回调函数future2.add_done_callback(get_result)# 判断future1代表的任务是否结束print(future1.done())time.sleep(3)# 判断future2代表的任务是否结束print(future2.done())# 查看future1代表的任务返回的结果print(future1.result())# 查看future2代表的任务返回的结果print(future2.result())# 关闭线程池pool.shutdown() # 序可以使用 with 语句来管理线程池,这样即可避免手动关闭线程池最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
也可以低于 CPU 核心数
使用线程池来执行线程任务的步骤如下:
关于进程的开启代码一定要放在if __name__ == '__main__':代码之下,不能放到函数中或其他地方
开启进程的技巧
from concurrent.futures import ProcessPoolExecutorpool = ProcessPoolExecutor(max_workers=cpu_count()) # 根据cpu核心数开启多少个进程开启进程的数量最好低于最大 CPU 核心数
本文来自博客园,作者:A-L-Kun,转载请注明原文链接:https://www.cnblogs.com/liuzhongkun/p/15950124.html