澳门新萄京python中临盆者与客商形式,Python全栈
分类:www.澳门新萄京赌场

Python queue队列

在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产⽣数据的模块,就形象地称为生产者;⽽而处理数据的模块,就称为消费者。

本节内容

作用:

单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。

  1. 进程与线程的概念
  2. Python threading 模块
  3. GIL——global interpreter lock
  4. Mutex互斥锁(线程锁)
  5. Semaphore信号量
  6. Events事件
  7. Queue队列

   解耦:使程序直接实现松耦合,修改一个函数,不会有串联关系。

Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先⼊先出)队列Queue,LIFO(后⼊先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语(可以理解为原⼦操作,即要么不做,要么就做完),能够在多线程中直接使⽤。可以使⽤队列来实现线程间的同步。

 

   提高处理效率:FIFO = 现进先出,LIFO = 后入先出。

⽣产者消费者模式的说明:

1、进程与线程的概念

程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。正是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。

 

在线程世界⾥,⽣产者就是⽣产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果⽣产者处理速度很快,⽽消费者处理速度很慢,那么⽣产者就必须等待消费者处理完,才能继续⽣产数据。同样的道理,如果消费者的处理能⼒⼤于⽣产者,那么消费者就必须等待⽣产者。为了解决这个问题于是引⼊了⽣产者和消费者模式。

有了进程为什么还要线程?

进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。但进程还是有很多缺陷的,主要体现为:

  • 进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。

  • 进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

队列:

⽣产者消费者模式是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。

什么是进程(process)?

对各种资源管理的集合,称之为进程。各类资源包括:内存的调用、网卡的调用等等。

每一个应用程序就是一个进程。

一个进程至少包含一个线程。

正在运行的进程都有唯一的一个ID号,即PID。如下图:

澳门新萄京 1

操作系统调用进程的时候,不会用进程名称进行调用,而是用唯一的PID进行调用。

  队列可以并发的派多个线程,对排列的线程处理,并切每个需要处理线程只需要将请求的数据放入队列容器的内存中,线程不需要等待,当排列完毕处理完数据后,线程在准时来取数据即可。请求数据的线程只与这个队列容器存在关系,处理数据的线程down掉不会影响到请求数据的线程,队列会派给其他线程处理这分数据,它实现了解耦,提高效率。队列内会有一个有顺序的容器,列表与这个容器是有区别的,列表中数据虽然是排列的,但数据被取走后还会保留,而队列中这个容器的数据被取后将不会保留。当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。

这个阻塞队列就是⽤来给⽣产者和消费者解耦的。纵观⼤多数设计模式,都会找⼀个第三者出来进⾏解耦

什么是线程(thread)?

线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并发的执行不同的任务。

所以线程可以简单的理解为:  线程 = 一堆指令

【注意】:进程要操作CPU,必须先创建一个线程。

总结:进程就是各种资源的集合,线程是一堆指令。进程包含一个或多个线程,进程的操作是要靠线程进行的。

 

进程、线程与内存的关系:

  进程的内存空间相对独立,进程之间不能互相访问对方的内存空间。

  所有在同一个进程里的线程是共享同一块内存空间的。

 

 

进程与线程的区别?

Q1:进程快还是线程快?

A:这没有可比性,进程是资源的集合,线程是一堆指令,进程想要执行也是依靠线程进行的。

Q2:启动一个进程快还是启动一个线程快?

A:肯定是启动一个线程快了。启动进程需要向OS申请各种资源,但是启动线程就是生成一堆指令,一下子就出来了。

区别:

  1、线程共享内存空间,进程的内存是相互独立的。

  2、同一个父进程创建的子进程之间,内存空间相互独立。但同一个父进程创建的子线程之间,内存空间共享。

  3、同一个进程的线程之间可以直接交流;两个进程之间想通信,必须通过一个中间代理。

  4、创建新线程很简单,创建新进程需要对其父进程进行克隆

  5、一个线程可以控制和操作同一个进程里的其他线程,但进程只能操作子进程。

  6、对主线程的修改,可能会影响同一个进程里的其他线程的行为。但是对父进程的修改不会影响其他子进程。

 

Python四种类型的队例:

什么时候使用多线程:

  I/O操作不占CPU

  计算占用CPU

python多线程不适合CPU密集操作型的任务。适合I/O操作密集型的任务。

因为python多线程是伪多线程,其实是在不同的线程之间进行切换,切换就要保持当前线程状态,读取下一线程状态,这些操作会增加CPU负载。所以对于本身就是CPU密集型的任务而言并不适合多线程。

 

Queue:FIFO 即first in first out 先进先出

LifoQueue:LIFO 即last in first out 后进先出

PriorityQueue:优先队列,级别越低,越优先
deque:双边队列


导入三种队列,包

from queue import Queue,LifoQueue,PriorityQueue

2、Python threading模块

线程有2种调用方式,如下:

直接调用(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading
import time
 
def sayhi(num): #定义每个线程要运行的函数
 
    print("running on number:%s" %num)
 
    time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
 
    t1.start() #启动线程
    t2.start() #启动另一个线程
 
    print(t1.getName()) #获取线程名
    print(t2.getName())

继承式调用(不推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time
 
 
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num = num
 
    def run(self):#定义每个线程要运行的函数
 
        print("running on number:%s" %self.num)
 
        time.sleep(3)
 
if __name__ == '__main__':
 
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

这里会出现一个问题:

在用time模块统计一个主线程运行时间时,由于多线程独立运行,所以这里的计时器只能统计主线程运行时间,这个时间不包括子线程运行时间,即不管子线程是否执行完毕,只要主线程执行完毕,计时器就得出结果。

在使用time模块计时时需要注意这个坑。。。。。。

如果非要在主线程中计算所有线程执行的时间的话,可以先让主线程等待子线程的执行结果,然后再计算时间。这里用到的方法是.join()

例如,我想等待t1这个子线程的执行结果,就在主线程中使用指令:  t1.join()

 

如果用循环的方式启动子线程,会出现一个问题,没办法对子线程进行命名,或者说没办法给子线程门牌号。这样不利于后段程序的调用。

这时只需要在实例化子线程时使用一个临时变量,然后通过.append()方法,把每个子线程对应的对象加入之前设置好的列表中即可。

然后用for循环遍历存放子线程的列表,即可按顺序取出之前新建的子线程对象。

 

主线程就是程序本身,自己是看不到的。但是可以通过一个命令来证明,.py文件就是主线程。

  print(threading.current_thread())   #打印当前线程

如果结果中出现 MainThread就是表示这个线程是主线程。没有则表示是子线程。

 

 

守护线程

守护线程相当于主线程的仆人。当主线程执行完毕后,不管守护线程是否执行完毕,程序都会退出。

主程序退出前,会等待所有的非守护线程执行完毕再退出。

 t.setDaemon(True)  #把t这个子线程设置为守护线程。

上面这条指令一定要在子线程开始之前,即t.star()指令之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
 
import time
import threading
 
 
def run(n):
 
    print('[%s]------running----n' % n)
    time.sleep(2)
    print('--done--')
 
def main():
    for in range(5):
        = threading.Thread(target=run,args=[i,])
        t.start()
        t.join(1)
        print('starting thread', t.getName())
 
 
= threading.Thread(target=main,args=[])
m.setDaemon(True#将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
m.start()
m.join(timeout=2)
print("---main thread done----")

  

Queue 先进先出队列:

澳门新萄京python中临盆者与客商形式,Python全栈开垦。3、Python GIL(Global Interpreter Lock)  

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

上面的核心意思就是,无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行。

所以我们之前说的多线程其实是伪多线程,本质也是CPU在代码部分的上下文切换。只是这种切换太快了。

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C 是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C ,Visual C 等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

这篇文章透彻的剖析了GIL对python多线程的影响: 

 

#基本FIFO队列  先进先出 FIFO即First in First Out,先进先出
#maxsize设置队列中,数据上限,小于或等于0则不限制,容器中大于这个数则阻塞,直到队列中的数据被消掉
q = Queue(maxsize=0)

#写入队列数据
q.put(0)
q.put(1)
q.put(2)

#输出当前队列所有数据
print(q.queue)
#删除队列数据,并返回该数据
q.get()
#输也所有队列数据
print(q.queue)

# 输出:
# deque([0, 1, 2])
# deque([1, 2])

4、Mutex互斥锁(线程锁)

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import threading
 
def addNum():
    global num #在每个线程中都获取这个全局变量
    print('--get num:',num )
    time.sleep(1)
    num  -=1 #对此公共变量进行-1操作
 
num = 100  #设定一个共享变量
thread_list = []
for in range(100):
    = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
 
for in thread_list: #等待所有线程执行完毕
    t.join()
 
 
print('final num:', num )

正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0。这个原因很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处理完的结果是99,但此时B线程运算完的结果也是99,两个线程同时将CPU运算的结果再赋值给num变量后,结果就都是99。所以每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。 

*注:python3.x上的结果总是正确的,可能是自动加了锁。

所以多个线程同时修改同一份数据时,必须加互斥锁即Mutex。

加锁版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
import threading
 
def addNum():
    global num #在每个线程中都获取这个全局变量
    print('--get num:',num )
    time.sleep(1)
    lock.acquire() #修改数据前加锁
    num  -=1 #对此公共变量进行-1操作
    lock.release() #修改后释放
 
num = 100  #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for in range(100):
    = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
 
for in thread_list: #等待所有线程执行完毕
    t.join()
 
print('final num:', num )

 

GIL VS Lock 

Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 这里的lock是用户级的lock,跟那个GIL没关系 。原因如下图:

澳门新萄京 2

加入GIL主要的原因是为了降低程序的开发的复杂度,比如现在写的python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时自己写的程序里的线程和 py解释器自己的线程是并发运行的,假设自己的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题,  这可以说是Python早期版本的遗留问题。

 

RLock(递归锁)

就是在一个大锁中还要再包含子锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading,time
 
def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num  =1
    lock.release()
    return num
def run2():
    print("grab the second part data")
    lock.acquire()
    global  num2
    num2 =1
    lock.release()
    return num2
def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res,res2)
 
 
if __name__ == '__main__':
 
    num,num2 = 0,0
    lock = threading.RLock()
    for in range(10):
        = threading.Thread(target=run3)
        t.start()
 
while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num,num2)

上述代码中,如果用lock ``= threading.Lock()代替l``ock ``= threading.RLock(),会出现程序把解锁的钥匙弄混,导致死循环,永远出不去的情况。

所以当连续锁很多次时,就需要使用递归锁。

 

 

5、Semaphore(信号量)

互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,当之前启动的线程完成后,新的线程才能启动、执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import threading,time
 
def run(n):
    semaphore.acquire()  #获得一个信号量
    time.sleep(1)
    print("run the thread: %sn" %n)
    semaphore.release()  #释放一个信号量
 
if __name__ == '__main__':
    semaphore  = threading.BoundedSemaphore(5#最多允许5个线程同时运行
    for in range(20):
        = threading.Thread(target=run,args=(i,))
        t.start()
 
while threading.active_count() != 1:
    pass #print threading.active_count()
else:
    print('----all threads done---')

 

LifoOueue 后进先出队列:

6、Events事件

事件是一个简单的同步对象,事件代表一个内置的标志位,并且线程可以等待标志位被设置为真或清除标志位。

调用事件前,需要先实例化一个事件对象。

event = threading.Event()

事件的4个方法:
  1、event.wait()  # 一个客户端线程可以等待事件标志位被设置为真。此时wait()使得线程处于阻塞状态
  2、event.set()  #一个服务线程可以设置标志位或清空标志位。 set()方法是设置标志位。
  3、event.clear()  #clear()方法是清空标志位

  4、event.isSet()  #isSet方法是判断标志位是否被设置。

如果事件标志位被设置,等待方法什么也不会做。

如果事件标志位被清空,等待方法将会处于阻塞状态,直到事件标志位被再一次设置。

多个线程可以同时等待一个相同的事件标志位的设置。

澳门新萄京, 

通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import threading,time
import random
def light():
    if not event.isSet():
        event.set() #wait就不阻塞 #绿灯状态
    count = 0
    while True:
        if count < 10:
            print('33[42;1m--green light on---33[0m')
        elif count <13:
            print('33[43;1m--yellow light on---33[0m')
        elif count <20:
            if event.isSet():
                event.clear()
            print('33[41;1m--red light on---33[0m')
        else:
            count = 0
            event.set() #打开绿灯
        time.sleep(1)
        count  =1
def car(n):
    while 1:
        time.sleep(random.randrange(10))
        if  event.isSet(): #绿灯
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for in range(3):
        = threading.Thread(target=car,args=(i,))
        t.start()

 

#LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上
lq = LifoQueue(maxsize=0)

#队列写入数据
lq.put(0)
lq.put(1)
lq.put(2)

#输出队列所有数据
print(lq.queue)
#删除队尾数据,并返回该数据
lq.get()
#输出队列所有数据
print(lq.queue)

#输出:
# [0, 1, 2]
# [0, 1]

7、Queue队列 

queue队列的作用:

  1、提高运行效率

  2、程序的解耦

队列的本质:一个有顺序的容器

其他的容器还有 列表、字典等。

队列与列表的区别:

  数据从列表中取出后,数据依然存在于列表之中。但是数据从队列中取出后,原始数据会被从队列中删除。即对队列而言,数据只有一份,取走就没。

队列在线程编程中是十分有用的,尤其是在信息必须被安全的交换在多个线程之间的时候。

队列的种类:

class queue.``Queue(maxsize=0) #先入先出

class queue.``LifoQueue(maxsize=0) #last in fisrt out 

class queue.``PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

队列的方法:

Queue.``qsize()

Queue.``empty() #return True if empty  

Queue.``full() # return True if full 

Queue.``put(itemblock=Truetimeout=None)

Queue.``put_nowait(item)

Queue.``get(block=Truetimeout=None)

Queue.``get_nowait()

Queue.``task_done()

Queue.``join() block直到queue被消费完毕

  

 

用队列的生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

生产者消费者模型目的就是解耦,而队列只是用来解耦的一个工具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import threading
import queue
 
def producer():
    for in range(10):
        q.put("骨头 %s" % i )
 
    print("开始等待所有的骨头被取走...")
    q.join()
    print("所有的骨头被取完了...")
 
 
def consumer(n):
 
    while q.qsize() >0:
 
        print("%s 取到" %n  , q.get())
        q.task_done() #告知这个任务执行完了
 
 
= queue.Queue()
 
 
 
= threading.Thread(target=producer,)
p.start()
 
c1 = consumer("Gavin")

  

 

优先队列:

 

# 存储数据时可设置优先级的队列
# 优先级设置数越小等级越高
pq = PriorityQueue(maxsize=0)

#写入队列,设置优先级
pq.put((9,'a'))
pq.put((7,'c'))
pq.put((1,'d'))

#输出队例全部数据
print(pq.queue)

#取队例数据,可以看到,是按优先级取的。
pq.get()
pq.get()
print(pq.queue)

#输出:
[(9, 'a')]

 

双边队列:

#双边队列
dq = deque(['a','b'])

#增加数据到队尾
dq.append('c')
#增加数据到队左
dq.appendleft('d')

#输出队列所有数据
print(dq)
#移除队尾,并返回
print(dq.pop())
#移除队左,并返回
print(dq.popleft())
#输出:
deque(['d', 'a', 'b', 'c'])
c
d

 

 

生产消费模型:

#生产消费模型
qq = Queue(maxsize=10)

def product(name):
    count = 1
    while True:
        q.put('步枪{}'.format(count))
        print('{}生产步枪{}支'.format(name,count))
        count =1
        time.sleep(0.3)

def cousume(name):
    while True:
        print('{}装备了{}'.format(name,q.get()))
        time.sleep(0.3)

        q.task_done()


#部队线程
p = threading.Thread(target=product,args=('张三',))
k = threading.Thread(target=cousume,args=('李四',))
w = threading.Thread(target=cousume,args=('王五',))

p.start()
k.start()
w.start()

 

 

本文由澳门新萄京发布于www.澳门新萄京赌场,转载请注明出处:澳门新萄京python中临盆者与客商形式,Python全栈

上一篇:澳门新萄京内建函数getattr工厂情势,类民用成员 下一篇:没有了
猜你喜欢
热门排行
精彩图文