1.1 Python中的线程使用
1.1.1 函数式
1.2 创建threading.Thread的子类来包装一个线程对象
1.2.1 threading.Thread类的使用
1.3 线程优先级队列(Queue)
1.4 thread对象中的一些方法
1.4.1 join方法
1.4.2 setDaemon()方法
1.4.3 isAlive方法
1.4.4 线程名
1.5 线程同步
1.5.1 简单的线程同步
1.5.2 线程同步
1.5.3 使用条件变量保持线程同步
1.5.4 使用队列保持线程同步
Python中使用线程有两种方式:函数或者用类来包装线程对象。
多线程类似于同时执行多个不同程序
优点:
线程在执行过程中与进程还是有区别的。
l 每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。
l 线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
l 每个线程都有他自己的一组CPU寄存器,称为线程的上下文,该上下文反映了线程上次运行该线程的CPU寄存器的状态。
指令指针和堆栈指针寄存器是线程上下文中两个最重要的寄存器,线程总是在进程得到上下文中运行的,这些地址都用于标志拥有线程的进程地址空间中的内存。
l 线程可以被抢占(中断)。
l 在其他线程正在运行时,线程可以暂时搁置(也称为睡眠) -- 这就是线程的退让。
调用thread模块中的start_new_thread()函数来产生新线程。如下例:
1 import time 2 import thread 3 def timer(no, interval): 4 cnt = 0 5 while cnt<10: 6 print 'Thread:(%d) Time:%s/n'%(no, time.ctime()) 7 time.sleep(interval) 8 cnt+=1 9 thread.exit_thread() 10 11 12 def test(): #Use thread.start_new_thread() to create 2 new threads 13 thread.start_new_thread(timer, (1,1)) 14 thread.start_new_thread(timer, (2,2)) 15 16 if __name__=='__main__': 17 test()
上面的例子定义了一个线程函数timer,它打印出10条时间记录后退出,每次打印的间隔由interval参数决定。thread.start_new_thread(function, args[, kwargs])的第一个参数是线程函数(本例中的timer方法),第二个参数是传递给线程函数的参数,它必须是tuple类型,kwargs是可选参数。
线程的结束可以等待线程自然结束,也可以在线程函数中调用thread.exit()或thread.exit_thread()方法。
如下例:
1 import threading 2 import time 3 class timer(threading.Thread): #The timer class is derived from the class threading.Thread 4 def __init__(self, num, interval): 5 threading.Thread.__init__(self) 6 self.thread_num = num 7 self.interval = interval 8 self.thread_stop = False 9 10 def run(self): #Overwrite run() method, put what you want the thread do here 11 while not self.thread_stop: 12 print( 'Thread Object(%d), Time:%s/n' %(self.thread_num, time.ctime()) ) 13 time.sleep(self.interval) 14 def stop(self): 15 self.thread_stop = True 16 17 18 def test(): 19 thread1 = timer(1, 1) 20 thread2 = timer(2, 2) 21 thread1.start() 22 thread2.start() 23 time.sleep(10) 24 thread1.stop() 25 thread2.stop() 26 return 27 28 if __name__ == '__main__': 29 test()
该方法创建自己的线程类,必要时重写threading.Thread类的方法,线程的控制可以由自己定制。
1. 在自己的线程类的__init__里调用threading.Thread.__init__(self, name = threadname)
Threadname为线程的名字
2. run(),通常需要重写,编写代码实现做需要的功能。
3. getName(),获得线程对象名称
4. setName(),设置线程对象名称
5. start(),启动线程
6. jion([timeout]),等待另一线程结束后再运行。
7. setDaemon(bool),设置子线程是否随主线程一起结束,必须在start()之前调用。默认为False。
8. isDaemon(),判断线程是否随主线程一起结束。
9. isAlive(),检查线程是否在运行中。
此外threading模块本身也提供了很多方法和其他的类,可以帮助我们更好的使用和管理线程。
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
Queue模块中的常用方法:
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]])获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作
实例:
1 #coding=utf-8 2 import queue 3 import threading 4 import time 5 6 exitFlag = 0 7 8 class myThread (threading.Thread): 9 def __init__(self, threadID, name, q): 10 threading.Thread.__init__(self) 11 self.threadID = threadID 12 self.name = name 13 self.q = q 14 def run(self): 15 print( "Starting " + self.name) 16 process_data(self.name, self.q) 17 print( "Exiting " + self.name) 18 19 def process_data(threadName, q): 20 while not exitFlag: 21 queueLock.acquire() 22 if not workQueue.empty(): 23 data = q.get() 24 queueLock.release() 25 print ("%s processing %s" % (threadName, data)) 26 else: 27 queueLock.release() 28 time.sleep(1) 29 30 threadList = ["Thread-1", "Thread-2", "Thread-3"] 31 nameList = ["One", "Two", "Three", "Four", "Five"] 32 queueLock = threading.Lock() 33 workQueue = queue.Queue(10) 34 threads = [] 35 threadID = 1 36 37 # 创建新线程 38 for tName in threadList: 39 thread = myThread(threadID, tName, workQueue) 40 thread.start() 41 threads.append(thread) 42 threadID += 1 43 44 # 填充队列 45 queueLock.acquire() 46 for word in nameList: 47 workQueue.put(word) 48 queueLock.release() 49 50 # 等待队列清空 51 while not workQueue.empty(): 52 pass 53 54 # 通知线程是时候退出 55 exitFlag = 1 56 57 # 等待所有线程完成 58 for t in threads: 59 t.join() 60 print ("Exiting Main Thread")
以前说过多线程,用到threading模块中的Thread对象
start是重载了Thread对象中的run方法,其实作用还是,当执行这个start方法的时候,将运行run方法。
如果一个线程或者一个函数在执行过程中要调用另外一个线程,并且待到其完成以后才能接着执行,那么在调用这个线程时可以使用被调用线程的join方法。
join([timeout])
里面的参数时可选的,代表线程运行的最大时间,即如果超过这个时间,不管这个线程有没有执行完毕,主线程或函数都会接着执行的。
看个例子:
1 #coding=utf-8 2 import threading 3 import time 4 class MyThread(threading.Thread): 5 def __init__(self,id): 6 threading.Thread.__init__(self) 7 self.id=id 8 def run(self): 9 x=0 10 time.sleep(20) 11 print self.id 12 13 14 >>> def func(): 15 t.start() 16 for i in range(5): 17 print i 18 19 20 >>> t=MyThread(2) 21 >>> func() 22 0 23 1 24 2 25 3 26 4 27 >>> 2
可以看到,虽然在func里面线程已经运行,但是函数并没有等线程运行结束在执行,而是先把func执行完毕,打印0到4,然后等sleep(20),20秒结束后,这个MyThread(2),传进去的2才打印出。
1 >>> def func(): 2 t.start() 3 t.join() 4 for i in range(5): 5 print i 6 7 8 >>> t=MyThread(3) 9 >>> func() 10 3 11 0 12 1 13 2 14 3 15 4
而这个呢,是当t.start()运行开始计时,20秒后,打印出id是3,然后func才接着运行,打印出0到4.
这个方法基本和join是相反的。当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是,只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon方法啦,
下面的例子就是设置子线程随主线程的结束而结束:
1 import threading 2 import time 3 4 class myThread(threading.Thread): 5 def __init__(self,threadname): 6 threading.Thread.__init__(self,name=threadname) 7 def run(self): 8 time.sleep(5) 9 10 print self.getName() 11 12 def fun1(): 13 t1.start() 14 print 'fun1 done' 15 16 def fun2(): 17 t2.start() 18 print 'fun2 done' 19 20 t1=myThread('t1') 21 t2=myThread('t2') 22 t2.setDaemon(True) 23 fun1() 24 fun2()
上面这个例子,按照我们设想的输出时:
fun1 done
fun2 done
t1
但是实际上我们在交互模式,主线程只有在python退出时终止,所以结果t2也是被打印出来啦。
当线程创建以后,可以使用Thread对象的isAlive方法查看线程是否运行。
1 >>> import threading 2 >>> import time 3 >>> class myThread(threading.Thread): 4 def __init__(self,id): 5 threading.Thread.__init__(self) 6 self.id=id 7 def run(self): 8 time.sleep(5) 9 print self.id 10 11 12 >>> t=myThread(1) 13 >>> def func(): 14 t.start() 15 print t.isAlive() 16 17 18 >>> func() 19 True 20 >>> 1
当线程创建后可以设置线程名来区分不同的线程,以便对线程进行控制。线程名可以在类的初始化函数中定义,也可以使用Thread对象的setName方法设置。下面是不同的方法来设置线程名。
1 >>> import threading 2 >>> class mythread(threading.Thread): 3 def __init__(self,threadname): 4 threading.Thread.__init__(self,name=threadname) 5 def run(self): 6 print self.getName() 7 8 9 >>> 10 >>> t1=mythread('t1') 11 >>> t1.getName() 12 't1' 13 >>> t1.setName('T') 14 >>> t1.getName() 15 'T' 16 >>> t2=mythread('t2') 17 >>> t2.start() 18 t2 19 >>> 20 21 >>> t2.getName() 22 't2' 23 >>> t2.setName('TT') 24 >>> t2.getName() 25 'TT'
使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。如下:
1 import threading 2 import time 3 class mythread(threading.Thread): 4 def __init__(self,threadname): 5 threading.Thread.__init__(self,name=threadname) 6 def run(self): 7 global x 8 lock.acquire() 9 for i in range(3): 10 x=x+1 11 time.sleep(2) 12 print x 13 lock.release() 14 lock=threading.RLock() 15 t1=[] 16 for i in range(10): 17 t=mythread(str(i)) 18 t1.append(t) 19 x=0 20 for i in t1: 21 i.start() 22 运行结果如下: 23 >>> 3 24 6 25 9 26 12 27 15 28 18 29 21 30 24 31 27 32 30
而如果我们把acquire()和release()去掉,结果就不同了:
30303030303030303030
这是因为每个线程执行后在打印出x之前都要休眠2秒钟,所以在这个过程中,每个线程都被执行了,所以等到休眠结束,打印出的X的值自然就是经过多次运算以后的X的值了。
而第一次,我们把全局变量X放到了acquire()和release()之间,python解释器每次会只允许一个线程对x进行操作,只有这个线程结束对其操作并且休眠结束打印出来以后,才允许下一个线程对x操作,所以输出的X是每次递增的,而且用时间也是比较长的。
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
l 使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法.
l 对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。如下:
多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。
考虑这样一种情况: 一个列表里所有元素都是0,线程"set"从后向前把所有元素改成1,而线程"print"负责从前往后读取列表并打印。
那么,可能线程"set"开始改的时候,线程"print"便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。为了避免这种情况,引入了锁的概念。
锁有两种状态——锁定和未锁定。每当一个线程比如"set"要访问共享数据时,必须先获得锁定;如果已经有别的线程比如"print"获得锁定了,那么就让线程"set"暂停,也就是同步阻塞;等到线程"print"访问完毕,释放锁以后,再让线程"set"继续。
经过这样的处理,打印列表时要么全部输出0,要么全部输出1,不会再出现一半0一半1的尴尬场面。
实例:
1 #coding=utf-8 2 import threading 3 import time 4 5 class myThread (threading.Thread): 6 def __init__(self, threadID, name, counter): 7 threading.Thread.__init__(self) 8 self.threadID = threadID 9 self.name = name 10 self.counter = counter 11 def run(self): 12 print ("Starting " + self.name) 13 # 获得锁,成功获得锁定后返回True 14 # 可选的timeout参数不填时将一直阻塞直到获得锁定 15 # 否则超时后将返回False 16 threadLock.acquire() 17 print_time(self.name, self.counter, 3) 18 # 释放锁 19 threadLock.release() 20 21 def print_time(threadName, delay, counter): 22 while counter: 23 time.sleep(delay) 24 print ("%s: %s" % (threadName, time.ctime(time.time()))) 25 counter -= 1 26 27 threadLock = threading.Lock() 28 threads = [] 29 30 # 创建新线程 31 thread1 = myThread(1, "Thread-1", 1) 32 thread2 = myThread(2, "Thread-2", 2) 33 34 # 开启新线程 35 thread1.start() 36 thread2.start() 37 38 # 添加线程到线程列表 39 threads.append(thread1) 40 threads.append(thread2) 41 42 # 等待所有线程完成 43 for t in threads: 44 t.join() 45 print( "Exiting Main Thread")
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
Python的Condition对象提供了对复杂线程同步的支持,使用它可以在某些事件触发之后才处理数据,condition的方法有很多,看下面的例子:
用Condition来实现著名的生产者和消费者的关系:
1 #coding=utf-8 2 import threading 3 class Producer(threading.Thread): 4 def __init__(self,threadname): 5 threading.Thread.__init__(self,name=threadname) 6 def run(self): 7 global x 8 con.acquire() 9 if x==1000000: 10 con.wait() 11 pass 12 else: 13 for i in range(1000000): 14 x=x+1 15 con.notify() 16 print x 17 con.release() 18 19 class Consumer(threading.Thread): 20 def __init__(self,threadname): 21 threading.Thread.__init__(self,name=threadname) 22 def run(self): 23 global x 24 con.acquire() 25 if x==0: 26 con.wait() 27 pass 28 else: 29 for i in range(1000000): 30 x=x-1 31 con.notify() 32 print x 33 con.release() 34 con=threading.Condition() 35 x=0 36 p=Producer('Producer') 37 c=Consumer('Consumer') 38 p.start() 39 c.start() 40 p.join() 41 c.join() 42 print x
结果如下:
>>>
1000000
0
0
同样的例子,使用Queue可以实现多生产者和多消费者的先进先出的队列,每个生产者将数据依次存入队列,而每个消费者则依次从队列中取出数据,看下面的例子:
1 #coding=utf-8 2 import threading 3 import time 4 import Queue 5 6 class Producer(threading.Thread): 7 def __init__(self,threadname): 8 threading.Thread.__init__(self,name=threadname) 9 def run(self): 10 global queue 11 queue.put(self.getName()) 12 print self.getName(),'put',self.getName(),'to queue' 13 14 class Consumer(threading.Thread): 15 def __init__(self,threadname): 16 threading.Thread.__init__(self,name=threadname) 17 def run(self): 18 global queue 19 print self.getName(),'get',self.getName(),'from queue' 20 21 queue=Queue.Queue() 22 plist=[] 23 clist=[] 24 for i in range(10): 25 p=Producer('Producer'+str(i)) 26 plist.append(p) 27 for i in range(10): 28 c=Consumer('Consumer'+str(i)) 29 clist.append(c) 30 31 for i in plist: 32 i.start() 33 i.join() 34 for i in clist: 35 i.start() 36 i.join()
看结果:
Producer0 put Producer0 to queue Producer1 put Producer1 to queue Producer2 put Producer2 to queue Producer3 put Producer3 to queue Producer4 put Producer4 to queue Producer5 put Producer5 to queue Producer6 put Producer6 to queue Producer7 put Producer7 to queue Producer8 put Producer8 to queue Producer9 put Producer9 to queue Consumer0 get Consumer0 from queue Consumer1 get Consumer1 from queue Consumer2 get Consumer2 from queue Consumer3 get Consumer3 from queue Consumer4 get Consumer4 from queue Consumer5 get Consumer5 from queue Consumer6 get Consumer6 from queue Consumer7 get Consumer7 from queue Consumer8 get Consumer8 from queue Consumer9