IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合:
(1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。 (2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。 (3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。 (4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。 (5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。 与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。
Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。
windows python : 只支持 select 方法 mac python : 只支持 select 方法 Linux python : 支持 select poll epoll 三种方法
说了一大堆,估计大家也被整晕了,我们还是直接上示例吧~~
std.py
#f = file() , obj = socket(), sys.stdin = 终端输入 #select.select监听用户输入,如果用户输入内容,select 会感知 sys.sdtin 改变,将改变的文件句柄保存至列表,并将列表作为select第一个参数返回,如果用户未输入内容,select 第一个参数 = [], import select import threading import sys while True: readable, writeable, error = select.select([sys.stdin,],[],[],1) if sys.stdin in readable: print 'select get stdin',sys.stdin.readline()
执行结果:
go go go #等待用户输入,用户输入: go go go select get stdin go go go ok #用户输入 ok select get stdin ok
回顾之前socket参数之 -----> sk.setblocking(bool)
是否阻塞(默认True), <阻塞> ,如果设置False, <不阻塞> ,那么accept和recv时一旦无数据,则报错。 不阻塞> 阻塞>
请看如下示例:
server.py
import socket import time sk1 = socket.socket() sk1.bind(('127.0.0.1',8001)) sk1.listen(5) sk1.setblocking(False) #这里设置False,<不阻塞> while True: try: #获取异常,如果不做这一步,获取不到数据则报错 conn,addr = sk1.accept() conn.close() print addr except Exception,e: print e time.sleep(2)
在客户端执行,如果客户端没有请求(用浏览器输入: http://127.0.0.1:8001 ),则报错,如有输入,则获取到值,如下:
Errno 35] Resource temporarily unavailable #没有客户端请求时,报错 [Errno 35] Resource temporarily unavailable [Errno 35] Resource temporarily unavailable [Errno 35] Resource temporarily unavailable [('127.0.0.1', 62393) #获取到值 ('127.0.0.1', 62394) [Errno 35] Resource temporarily unavailable ('127.0.0.1', 62395)
select 示例1:
select.py
import socket import time import select sk1 = socket.socket() sk1.bind(('127.0.0.1',8001)) sk1.listen(5) sk1.setblocking(False) while True: #readable_list :如果有客户端连接,则有值,否则空列表 readable_list, writeable_list, error_list = select.select([sk1,],[],[],2) #不是空列表,select感知,sk1获取到值 for r in readable_list: #不是空列表,循环执行 conn,addr = r.accept() print addr
执行: python select.py
在浏览器输入: http://127.0.0.1:8001
执行结果:(服务器端打印日志)
('127.0.0.1', 62205) ('127.0.0.1', 62206)
接下来,再多监听一个端口(监听多端口):
import select import socket sk1 = socket.socket() sk1.bind(('127.0.0.1',8001)) #监听8001端口 sk1.listen(5) sk1.setblocking(False) #新监听8002端 sk2 = socket.socket() sk2.bind(('127.0.0.1',8002)) #监听8002端口 -->新监听的端口 sk2.listen(5) sk2.setblocking(False) while True: readable_list, writeable_list, error_list = select.select([sk1,sk2],[],[],1) #新增:[sk1,sk2] for r in readable_list: conn,addr = r.accept() print addr
再次在浏览器器输入新监听的端口,你会发现原来服务端只支持处理一个客户端请求,如今可以支持处理多个客户端请求:
http://127.0.0.1:8002,执行结果如下 :
('127.0.0.1', 62222) ('127.0.0.1', 62223)
select 示例2:
服务端:server.py
sk1 = socket.socket() sk1.bind(('127.0.0.1',8001)) sk1.listen(5) sk1.setblocking(False) inputs = [sk1,] #将上例中select.select([sk1,]...) 中[sk1,] 赋值给 inputs #sk1 , conn 都是socket对象(文件描述符) while True: readable_list, writeable_list, error_list = select.select(inputs,[],[],1) #请看这里的修改 time.sleep(2) #因为执行处理速度太快,这里sleep 2s print "inputs:",inputs #打印inputs ,方便执行时观察变化 print "res:",readable_list #打印readable_list ,方便执行时查看变化 for r in readable_list: if r == sk1: #判断是服务端还是客户端,如果是服务端才进行下面的操作,因为客户端没有accept方法 conn,addr = r.accept() inputs.append(conn) print addr else: #如果是客户端,接受和返回数据 client_data = r.recv(1024) r.sendall(client_data)
上面的server端详解如下:
#第一次请求进来(第一个客户端进来,只是连接,没有操作): readable_list = [sk1,], 第一次执行完后: inputs = [sk1,] ---> inputs = [sk1,conn1] #第二次请求进来(第二个客户端进来,也只是连接,没有操作):readable_list = [sk1,] ,执行完后:inputs = [sk1,conn1,] ---> inputs = [sk1,conn1,conn2] #如果第一个客户端发送一条数据,服务端的socket<sk1>不变,只是客户端的socket<即conn1>变化 :readable_list = [conn1,] , inputs = [sk1,conn1,conn2] #<conn1 应该是conn,这里conn1 代表第一个客户端进来> ---- #第一个参数,监听的句柄序列 #如果第二参数有参数,即只要不是空列表,select就能感知,然后writeabled_list就能获取值 #第三个参数监听描述符,监听是否出错,如果出错,则摘除 #第四个参数,阻塞时间,如 1秒(这个如果不写,select会阻塞住,直到监听的描述符发生变化才继续往下执行) readable_list, writeable_list, error_list = select.select(inputs,[],[],1)
客户端: client.py
import select import socket client = socket.socket() client.connect(('127.0.0.1',8001)) client.settimeout(10) while True: client_input = raw_input('please input:').strip() client.sendall(client_input) server_data = client.recv(1024) print server_data client.close()
服务端执行结果:
inputs: [<socket._socketobject object at 0x104cbbf30>] res: [] inputs: [<socket._socketobject object at 0x104cbbf30>] res: []
客户端连接之后,服务端日志(但是还没有输入)
inputs: [<socket._socketobject object at 0x104cbbf30>] res: [<socket._socketobject object at 0x104cbbf30>] ('127.0.0.1', 62815) inputs: [<socket._socketobject object at 0x104cbbf30>, <socket._socketobject object at 0x104cbbfa0>] res: []
第二个客户端端连接:
inputs: [<socket._socketobject object at 0x104cbbf30>] res: [<socket._socketobject object at 0x104cbbf30>] ('127.0.0.1', 62815) inputs: [<socket._socketobject object at 0x104cbbf30>, <socket._socketobject object at 0x104cbbfa0>] res: []
在客户端输入(其中一个client):
inputs: [<socket._socketobject object at 0x104cbbf30>, <socket._socketobject object at 0x104cbbfa0>, <socket._socketobject object at 0x104cfc050>] res: [] inputs: [<socket._socketobject object at 0x104cbbf30>, <socket._socketobject object at 0x104cbbfa0>, <socket._socketobject object at 0x104cfc050>] res: [<socket._socketobject object at 0x104cbbfa0>] inputs: [<socket._socketobject object at 0x104cbbf30>, <socket._socketobject object at 0x104cbbfa0>, <socket._socketobject object at 0x104cfc050>] res: []
我们知道,上面的例子,如客户端中途断开,在服务端连接并没有释放对于上面的例子,做如下修改(下面的例子,客户端断开后,直接释放)
server.py
import select import socket import time sk1 = socket.socket() sk1.bind(('127.0.0.1',8001)) sk1.listen(5) sk1.setblocking(False) inputs = [sk1,] outputs = [] #这里修改 while True: readable_list, writeable_list, error_list = select.select(inputs,outputs,[],2) #情况这里修改 time.sleep(2) print "inputs:",inputs print "res:",readable_list print "wri",writeable_list for r in readable_list: if r == sk1: conn,addr = r.accept() inputs.append(conn) outputs.append(conn) #append句柄 print addr else: client_data = r.recv(1024) if client_data: r.sendall(client_data) else: inputs.remove(r) #如果没有收到客户端端数据,则移除客户端句柄
执行结果如下:
连个客户端连接:
断开其中一个客户端
此时,发现断开的那个客户端被自动剔除
下面是讲解 readable_list, writeable_list 分离写的示例
import select import socket import time sk1 = socket.socket() sk1.bind(('127.0.0.1',8001)) sk1.listen(5) sk1.setblocking(False) inputs = [sk1,] outputs = [] while True: readable_list, writeable_list, error_list = select.select(inputs,outputs,[],1) #文件描述符可读 readable_list 只有变化,感知 #文件描述符可写 writeable_list 只要有,感知 time.sleep(2) print "inputs:",inputs print "res:",readable_list print "wri",writeable_list for r in readable_list: if r == sk1: conn,addr = r.accept() inputs.append(conn) print addr else: client_data = r.recv(1024) if client_data: outputs.append(r) for w in writeable_list: w.sendall('1234') outputs.remove(w)
import Queue q = Queue.Queue() q.put(1) q.put(2) q.put(3) print '第一个:',q.get() print '第二个:',q.get()
执行结果:
import Queue q = Queue.Queue() q.put(1) #put数据到队列 q.put(2) q.put(3) print '第一个:',q.get() #队列是先进先出,这里get了两次,则分别取出了1,2 print '第二个:',q.get()
如果队列没有数据,这时get数据,则会等待
import Queue q = Queue.Queue() q.get() q.put(1) q.put(2) q.put(3)
上面的方式没有结果:(处于等待,没有输出)
下面的例子,如果没有数据,通过 get_nowait(),不会等待,但是会报错
import Queue q = Queue.Queue() q.get_nowait() q.get() q.put(1) q.put(2)
结果如下:(报错)
/usr/bin/python /Users/yangallen214/PycharmProjects/ob_11/day10/que_demo.py Traceback (most recent call last): File "/Users/yangallen214/PycharmProjects/ob_11/day10/que_demo.py", line 20, in <module> q.get_nowait() File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/Queue.py", line 190, in get_nowait return self.get(False) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/Queue.py", line 165, in get raise Empty Queue.Empty
通过try获取异常:
import Queue q = Queue.Queue() try: q.get_nowait() q.get() q.put(1) q.put(2) q.put(3) except Queue.Empty: print "err"
上面的put 时也可以使用 q.put_nowait() 方法,这里不多说
import select import socket import Queue sk1 = socket.socket() sk1.bind(('127.0.0.1',8001)) sk1.listen(5) sk1.setblocking(False) inputs = [sk1,] outputs = [] message = {} #message = { # 'c1':队列, # 'c2':队列,【b,bb,bbb】 #} while True: readable_list, writeable_list, error_list = select.select(inputs,outputs,[],1) #文件描述符可读 readable_list 只有变化,感知 #文件描述符可写 writeable_list 只要有,感知 for r in readable_list: if r == sk1: conn,addr = r.accept() inputs.append(conn) message[conn] = Queue.Queue() else: client_data = r.recv(1024) if client_data: #获取数据 outputs.append(r) #在指定队列中插入数据 message[r].put(client_data) else: inputs.remove(r) for w in writeable_list: #去指定队列取数据 try: data = message[w].get_nowait() w.sendall(data) except Queue.Empty: pass outputs.remove(w) del message[w] #如果异常时移除
select、多路复用 先讲这么多,下次继续更新...
更多连接: http://www.cnblogs.com/wupeiqi