隐藏

Python并发编程之线程的玩法

发布:2023/10/24 22:47:31作者:管理员 来源:本站 浏览次数:488

一、线程基础以及守护进程


线程是CPU调度的最小单位

全局解释器锁


全局解释器锁GIL(global interpreter lock)


全局解释器锁的出现主要是为了完成垃圾回收机制的回收机制,对不同线程的引用计数的变化记录的更加精准。


全局解释器锁导致了同一个进程中的多个线程只能有一个线程真正被CPU执行。


GIL锁每执行700条指令才会进行一次(轮转)切换(从一个线程切换到另外一个线程)


节省的是IO操作(不占用CPU)的时间,而不是CPU计算的时间,因为CPU的计算速度非常快,大多数情况下,我们没有办法把一条进程中所有的IO操作都规避掉。

threading模块


   import time

   from threading import Thread, current_thread, enumerate, active_count

   def func(i):

       print('start%s' % i, current_thread().ident)  # 函数中获取当前线程id

       time.sleep(1)

       print('end%s' % i)

   if __name__ == '__main__':

       t1 = []

       for i in range(3):

           t = Thread(target=func, args=(i,))

           t.start()

           print(t.ident)  # 查看当前线程id

           t1.append(t)

       print(enumerate(), active_count())

       for t in t1:

           t.join()

   print('所有线程执行完毕')


线程是不能从外部强制终止(terminate),所有的子线程只能是自己执行完代码之后就关闭。


current_thread 获取当前的线程对象


current_thread().ident 或者 线程对象.ident 获取当前线程id。


enumerate返回一个列表,存储了所有活着的线程对象,包括主线程。


active_count返回一个数字,存储了所有活着的线程个数。


【注意】enumerate导入之后,会和内置函数enumerate重名,需要做特殊的处理


   from threading import enumerate as en


   import threading


   threading.enumerate()


面向对象方式开启一个线程


   from threading import Thread

   class MyThread(Thread):

       def __init__(self, a, b):

           super(MyThread, self).__init__()

           self.a = a

           self.b = b

       def run(self):

           print(self.ident)

   t = MyThread(1, 3)

   t.start()  # 开启线程,才在线程中执行run方法

   print(t.ident)


线程之间的数据是共享的


   from threading import Thread

   n = 100

   def func():

       global n

       n -= 1

   t_li = []

   for i in range(100):

       t = Thread(target=func)

       t.start()

       t_li.append(t)

   for t in t_li:

       t.join()

   print(n)


结果是:0

守护线程


   主线程会等待子线程结束之后才结束,为什么?


因为主线程结束,进程就会结束。


   守护线程随着主线程的结束而结束

   守护进程会随着主进程的代码结束而结束,如果主进程代码之后还有其他子进程在运行,守护进程不守护。


   守护线程会随着主线程的结束而结束,如果主线程代码结束之后还有其他子线程在运行,守护线程也守护。


   import time

   from threading import Thread


   def son():

       while True:

           print('in son')

           time.sleep(1)

   def son2():

       for i in range(3):

           print('in son2...')

           time.sleep(1)

   # flag a

   t = Thread(target=son)

   t.daemon = True

   t.start()

   # flag b a-->b用时0s

   Thread(target=son2).start()


为什么守护线程会在主线程的代码结束之后继续守护其他子线程?


答:因为守护进程和守护线程的结束原理不同。守护进程需要主进程来回收资源,守护线程是随着主线程的结束而结束,其他子线程–>主线程结束–>主进程结束–>整个进程中所有的资源都被回收,守护线程也会被回收。

二、线程锁(互斥锁)

线程之间也存在数据不安全


   import dis

   a = 0

   def func():

       global a

       a += 1

   dis.dis(func)  # 得到func方法中的代码翻译成CPU指令

   """ 结果 0 LOAD_GLOBAL 0 (a) 2 LOAD_CONST 1 (1) 4 INPLACE_ADD 6 STORE_GLOBAL 0 (a) 8 LOAD_CONST 0 (None) 10 RETURN_VALUE """


+=、-=、*=、/=、while、if、带返回值的方法(都是先计算后赋值,前提要涉及到全局变量或静态变量) 等都是数据不安全的,append、pop、queue、logging模块等都是数据安全的。


列表中的方法或者字典中的方法去操作全局变量的时候,数据是安全的。


只有一个线程,永远不会出现线程不安全现象。


采用加锁的方式来保证数据安全。


   from threading import Thread, Lock

   n = 0

   def add(lock):

       for i in range(500000):

           global n

           with lock:

               n += 1

   def sub(lock):

       for i in range(500000):

           global n

           with lock:

               n -= 1

   t_li = []

   lock = Lock()

   for i in range(2):

       t1 = Thread(target=add, args=(lock,))

       t1.start()

       t2 = Thread(target=sub, args=(lock,))

       t2.start()

       t_li.append(t1)

       t_li.append(t2)

   for t in t_li:

       t.join()

   print(n)


线程安全的单例模式


   import time

   from threading import Thread, Lock

   class A:

       __instance = None

       lock = Lock()

       def __new__(cls, *args, **kwargs):

           with cls.lock:

               if not cls.__instance:

                   time.sleep(0.00001)

                   cls.__instance = super().__new__(cls)

           return cls.__instance

   def func():

       a = A()

       print(a)

   for i in range(10):

       Thread(target=func).start()


不用考虑加锁的小技巧


   不要操作全局变量

   不要在类中操作静态变量


因为多个线程同时操作全局变量/静态变量,会产生数据不安全现象。

三、线程锁(递归锁)


   from threading import Lock, RLock

   # Lock 互斥锁

   # RLock 递归(recursion)锁

   l = Lock()

   l.acquire()

   print('希望被锁住的代码')

   l.release()

   rl = RLock()  # 在同一个线程中可以被acquire多次

   rl.acquire()

   rl.acquire()

   rl.acquire()

   print('希望被锁住的代码')

   rl.release()

   from threading import Thread, RLock

   def func(i, lock):

       lock.acquire()

       lock.acquire()

       print(i, ':start')

       lock.release()

       lock.release()

       print(i, ':end')

   lock = RLock()

   for i in range(5):

       Thread(target=func, args=(i, lock)).start()


互斥锁与递归锁


递归锁在同一个线程中可以被acquire多次,而互斥锁不行


互斥锁效率高,递归锁效率相对低


多把互斥锁容易产生死锁现象,递归锁可以快速解决死锁

四、死锁


死锁:指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象。


死锁现象是怎么产生的?


答:有多把锁,并且在多个线程中交叉使用。与互斥锁、递归锁无关,都会发生死锁。如果是互斥锁,出现了死锁现象,最快速的解决方案是把所有的互斥锁都改成一把递归锁(noodle_lock = fork_lock = RLock()),程序的效率会降低。


   from threading import Thread, Lock

   import time

   noodle_lock = Lock()

   fork_lock = Lock()

   def eat1(name):

       noodle_lock.acquire()

       print(name, '抢到面了')

       fork_lock.acquire()

       print(name, '抢到叉子了')

       print(name, '吃面')

       time.sleep(0.0001)

       fork_lock.release()

       print(name, '放下叉子了')

       noodle_lock.release()

       print(name, '放下面了')

   def eat2(name):

       fork_lock.acquire()

       print(name, '抢到叉子了')

       noodle_lock.acquire()

       print(name, '抢到面了')

       print(name, '吃面')

       noodle_lock.release()

       print(name, '放下面了')

       fork_lock.release()

       print(name, '放下叉子了')

   Thread(target=eat1, args=('lucy',)).start()

   Thread(target=eat2, args=('jack',)).start()

   Thread(target=eat1, args=('rose',)).start()

   Thread(target=eat2, args=('disen',)).start()


五、队列


队列:线程之间数据安全的容器


线程队列:数据安全,先进先出


原理:加锁 + 链表

Queue


fifo 先进先出的队列

get和put


   import queue

   q = queue.Queue(3)  # fifo 先进先出的队列

   q.put(1)

   q.put(2)

   print(q.get())

   print(q.get())

   1

   2


get_nowait


   import queue

   # from queue import Empty # 不是内置的错误类型,而是queue模块中的错误

   q = queue.Queue()  # fifo 先进先出的队列

   try:

       q.get_nowait()

   except queue.Empty:

       pass

   print('队列为空,继续执行其他代码')


put_nowait


用的很少,因为队列满时,抛异常,数据放不进去,丢失了。

LifoQueue


后进先出的队列,也就是栈。last in first out


   from queue import LifoQueue

   lq = LifoQueue()

   lq.put(1)

   lq.put(2)

   print(lq.get())

   print(lq.get())

   2

   1


PriorityQueue


优先级队列,按照放入数据的第一位数值从小到大输出


   from queue import PriorityQueue

   priq = PriorityQueue()

   priq.put((2, 'lucy'))

   priq.put((0, 'rose'))

   priq.put((1, 'jack'))

   print(priq.get())

   print(priq.get())

   print(priq.get())

   (0, 'rose')

   (1, 'jack')

   (2, 'lucy')


三种队列使用场景


先进先出:用于处理服务类任务(买票任务)


后进先出:算法中用的比较多


优先级队列:比如,VIP制度,VIP用户优先;

六、相关面试题

请聊聊进程队列的特点和实现原理


特点:实现进程之间的通信;数据安全;先进先出。


实现原理:基于管道 + 锁 实现的,管道是基于文件级别的socket + pickle 实现的。

你了解生产者消费者模型吗,如何实现


了解


为什么了解?工作经验


   采集图片/爬取音乐:由于要爬取大量的数据,想提高爬取效率

       有用过一个生产者消费者模型,这个模型是我自己写的,消息中间件,用的是xxx(redis),我获取网页的过程作为生产者,分析网页,获取所有歌曲歌曲链接的过程作为消费者。

       自己写监控,或者是自己写邮件报警系统,监控程序作为生产者,一旦发现有问题的程序,就需要把要发送的邮件信息交给消息中间件redis,消费者就从中间件中取值,然后来处理发邮件的逻辑。


什么时候用过?


   项目 或者 例子,结合上面一起


在python中实现生产者消费者模型可以用哪些机制


   消息中间件

       celery(分布式框架):定时发短信的任务


从你的角度说说进程在计算机中扮演什么角色


进程用来管理一个运行中的程序的资源,是资源分配的最小单位


进程与进程之间内存是隔离的


进程是由操作系统负责调度的,并且多个进程之间是一种竞争关系,所以我们应该对进程的三状态时刻关注,尽量减少进程中的IO操作,或者在进程里面开线程来规避IO,让我们写的程序在运行的时候能够更多的占用CPU资源。

为什么线程之间的数据不安全


线程之间数据共享


多线程的情况下,


   如果在计算某一个变量的时候,还要进行赋值操作,这个过程不是由一条完整的CPU指令完成的;

       如果在判断某个bool表达式之后,再做某些操作,这个过程也不是由一条完整的CPU指令完成的;

       在中间发生了GIL锁的切换(时间片的轮转),可能会导致数据不安全。


读程序,请确认执行到最后number的长度是否一定为 1


   import threading

   import time

   # loop = 1E7 # 10000000.

   loop = int(1E7)  # 10000000

   def _add(loop: int = 1):

       global numbers

       for _ in range(loop):

           numbers.append(0)

   def _sub(loop: int = 1):

       global numbers

       for _ in range(loop):

           while not numbers:

               time.sleep(1E-8)

           numbers.pop()

   numbers = [0]

   ta = threading.Thread(target=_add, args=(loop,))

   ts = threading.Thread(target=_sub, args=(loop,))

   # ts1 = threading.Thread(target=_sub, args=(loop,))

   ta.start()

   ts.start()

   # ts1.start()

   ta.join()

   ts.join()

   # ts1.join()


因为只开启了一个进行pop操作的线程,如果开启多个pop操作的线程,必须在while前面加锁,因为可能有两个线程,一个执行了while not numbers,发生了GIL的切换,另外一个线程执行完了代码,numbers刚好没有了数据,导致结果一个pop成功,一个pop不成功。


所以number长度一定为1,如果把注释去了,不一定为1

读程序,请确认执行到最后number的长度是否一定为 1


   import threading

   import time

   loop = int(1E7)

   def _add(loop: int = 1):

       global numbers

       for _ in range(loop):

           numbers.append(0)

   def _sub(loop: int = 1):

       global numbers

       for _ in range(loop):

           while not numbers:

               time.sleep(1E-8)

           numbers.pop()

   numbers = [0]

   ta = threading.Thread(target=_add, args=(loop,))

   ts = threading.Thread(target=_sub, args=(loop,))

   ta.start()

   ta.join()

   ts.start()

   ts.join()


一定为1,因为是同步的。

读程序,请确认执行到最后number是否一定为 0


   import threading

   loop = int(1E7)

   def _add(loop: int = 1):

       global numbers

       for _ in range(loop):

           numbers += 1

   def _sub(loop: int = 1):

       global numbers

       for _ in range(loop):

           numbers -= 1

   numbers = 0

   ta = threading.Thread(target=_add, args=(loop,))

   ts = threading.Thread(target=_sub, args=(loop,))

   ta.start()

   ta.join()

   ts.start()

   ts.join()


一定等于0,因为是同步的。

读程序,请确认执行到最后number是否一定为 0


   import threading

   loop = int(1E7)

   def _add(loop: int = 1):

       global numbers

       for _ in range(loop):

           numbers += 1

   def _sub(loop: int = 1):

       global numbers

       for _ in range(loop):

           numbers -= 1

   numbers = 0

   ta = threading.Thread(target=_add, args=(loop,))

   ts = threading.Thread(target=_sub, args=(loop,))

   ta.start()

   ts.start()

   ta.join()

   ts.join()


不一定为0,因为是异步的且存在 += 操作

七、判断数据是否安全


是否数据共享,是同步还是异步(数据共享并且异步的情况下)


   +=、-=、*=、/=、a = 计算之后赋值给变量

   if、while 条件,这两个判断是由多个线程完成的


这两种情况下,数据不安全。

八、进程池 & 线程池


以前,有多少个任务就开多少个进程或线程。


什么是池


要在程序开始的时候,还没有提交任务,先创建几个线程或者进程,放在一个池子里,这就是池


为什么要用池


如果先开好进程/线程,那么有任务之后就可以直接使用这个池中的数据了;并且开好的进程/线程会一直存在在池中,可以被多个任务反复利用,这样极大的减少了开启/关闭/调度进程/调度线程的时间开销。


池中的线程/进程个数控制了操作系统需要调用的任务个数,控制池中的单位,有利于提高操作系统的效率,减轻操作系统的负担。


   from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

   # threading模块 没有提供池

   # multiprocessing模块 仿照threading增加了Pool(逐渐被淘汰)

   # concurrent.futures模块 线程池,进程池都能够用相似的方式开启/使用

   ThreadPoolExecutor()  # 参数代表开启多少个线程,线程的个数一般起cpu个数*4(或者*5)

   ProcessPoolExecutor()  # 参数代表开启多少个进程,进程的个数一般起cpu个数+1


创建线程池并提交任务


   from concurrent.futures import ThreadPoolExecutor

   from threading import current_thread

   import time

   def func(a, b):

       print(current_thread().ident, a, b)

       time.sleep(1)

   tp = ThreadPoolExecutor(4)  # 创建线程池对象

   for i in range(20):

       # tp.submit(func, i, i + 1)

       # 向池中提交任务

       tp.submit(func, a=i, b=i + 1)  # 位置传参,关键字传参都可以


创建进程池并提交任务


   from concurrent.futures import ProcessPoolExecutor

   import os

   import time

   def func(a, b):

       print(os.getpid(), 'start', a, b)

       time.sleep(1)

       print(os.getpid(), 'end', a, b)

   if __name__ == '__main__':

       tp = ProcessPoolExecutor(4)  # 创建进程池对象

       for i in range(20):

           # tp.submit(func, i, i + 1)

           # 向池中提交任务

           tp.submit(func, a=i, b=i + 1)  # 位置传参,关键字传参都可以


获取任务结果


   from concurrent.futures import ProcessPoolExecutor

   import os

   import time

   def func(a, b):

       print(os.getpid(), 'start', a, b)

       time.sleep(1)

       print(os.getpid(), 'end', a, b)

       return a * b

   if __name__ == '__main__':

       tp = ProcessPoolExecutor(4)  # 创建进程池对象

       future_d = { }

       for i in range(20):  # 异步非阻塞的

           ret = tp.submit(func, a=i, b=i + 1)  # future未来对象

           # print(ret) # <Future at 0x1ad918e1148 state=running>

           # print(ret.result()) # 这样需要等待,同步的

           future_d[i] = ret

       for key in future_d:  # 同步阻塞的

           print(key, future_d[key].result())


tp对象的map


map 只适合传递简单的参数,并且必须是一个可迭代的类型


   from concurrent.futures import ProcessPoolExecutor

   import os

   import time

   def func(a):

       print(os.getpid(), 'start', a[0], a[1])

       time.sleep(1)

       print(os.getpid(), 'end', a[0], a[1])

       return a[0] * a[1]

   if __name__ == '__main__':

       tp = ProcessPoolExecutor(4)

       ret = tp.map(func, ((i, i + 1) for i in range(20)))  # 一般函数只接收一个参数,要想传入多个,使用元组方式

       for r in ret:

           print(r)


回调函数


当有一个结果需要进行处理时,都会绑定一个回调函数来处理,除非是得到所有结果之后才做处理,我们使用 把结果存入列表 遍历列表 的方式。


回调函数效率最高的。


   from concurrent.futures import ThreadPoolExecutor

   from threading import current_thread

   import time

   def func(a, b):

       print(current_thread().ident, 'start', a, b)

       time.sleep(1)

       print(current_thread().ident, 'end', a)

       return a * b

   if __name__ == '__main__':

       tp = ThreadPoolExecutor(4)

       future_d = { }

       for i in range(20):  # 异步非阻塞的

           ret = tp.submit(func, a=i, b=i + 1)

           future_d[i] = ret

       for key in future_d:  # 同步阻塞的

           print(key, future_d[key].result())


上述代码,打印结果是按照顺序(0,1,2,3……),并不是谁先结束就打印谁。


使用回调函数以后,谁先执行完就打印谁,代码如下:


   from concurrent.futures import ThreadPoolExecutor

   from threading import current_thread

   import time

   def func(a, b):

       print(current_thread().ident, 'start', a, b)

       time.sleep(1)

       print(current_thread().ident, 'end', a)

       return a, a * b

   def print_func(ret):  # 异步阻塞 每个任务都是各自阻塞各自,谁先执行完谁先打印

       print(ret.result())

   if __name__ == '__main__':

       tp = ThreadPoolExecutor(4)

       for i in range(20):  # 异步非阻塞的

           ret = tp.submit(func, a=i, b=i + 1)  # [ret0, ret1, ..., ret19]

           ret.add_done_callback(print_func)  # 异步阻塞 [print_func, print_func,...,print_func]

           # 回调机制

           # 回调函数 给ret对象绑定一个回调函数,等待ret对应的任务有了结果之后立即调用print_func函数

           # 就可以对结果立即进行处理,而不用按照顺序接收结果处理结果


ret这个任务会在执行完毕的瞬间立即触发print_func函数,并且把任务的返回值对象传递到print_func做参数。

回调函数的例子


   from concurrent.futures import ThreadPoolExecutor

   import requests

   def get_page(url):  # 访问网页,获取网页源代码,用线程池中的线程来操作

       respone = requests.get(url)

       if respone.status_code == 200:

           return { 'url': url, 'text': respone.text}

   def parse_page(res):  # 获取到字典结果之后,计算网页源代码的长度,把'https://www.baidu.com : 长度值'写到文件里,线程任务执行完毕之后绑定回调函数

       res = res.result()

       parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text']))

       with open('db.txt', 'a') as f:

           f.write(parse_res)

   if __name__ == '__main__':

       urls = [

           'https://www.baidu.com',

           'https://www.python.org',

           'https://www.openstack.org',

           'https://www.tencent.com/zh-cn',

           'http://www.sina.com.cn/'

       ]

       tp = ThreadPoolExecutor(4)

       for url in urls:

           ret = tp.submit(get_page, url)

           ret.add_done_callback(parse_page)  # 谁先回来谁就先把结果写进文件

   # 不用回调函数:

       # 按照顺序获取网页,baidu python openstack tencent sina

       # 也只能按照顺序写

   # 用上了回调函数

       # 按照顺序获取网页,baidu python openstack tencent sina

       # 哪一个网页先返回结果,就先执行哪个网页对应的回调函数(parse_page)


进程池线程池的应用场景

进程池:


场景:高计算的场景,没有IO操作(没有文件操作,没有数据库操作,没有网络操作,没有input);


进程的个数:[cpu_count*1, cpu_count*2]

线程池:


场景:爬虫


线程的个数:一般根据IO的比例定制,cpu_count*5