使用select.select编写聊天室服务器 《Python网络编程攻略》

#

现实中,大型网络服务器可能要处理几百或几千个客户端同时连接的请求,此时为每个客户端创建单独的线程或进程可能不实际。因为主机的内存可用量和CPU的能力皆有限制。
要处理大量客户端的连接需要更好的技术,那就是Python提供的select模块。
select select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误。
select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的网络接口进行操作,而不是通过Python的解释器。
select()方法接收并监控3个通信列表, 第一个是所有的输入的data,就是指外部发过来的数据,第2个是监控和接收所有要发出去的data(outgoing data),第3个监控错误信息,接下来我们需要创建2个列表来包含输入和输出信息来传给select().
# Sockets from which we expect to read inputs = [ server ]# Sockets to which we expect to write outputs = [ ]# Outgoing message queues (socket:Queue) message_queues = {} while inputs:# Wait for at least one of the sockets to be ready for processing print >>sys.stderr, '\nwaiting for the next event' readable, writable, exceptional = select.select(inputs, outputs, inputs)

当你把inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,我们上面将他们分别赋值为readable,writable,exceptional, 所有在readable list中的socket连接代表有数据可接收(recv),所有在writable list中的存放着你可以对其进行发送(send)操作的socket连接,当连接通信出现error时会把error写到exceptional列表中。
Readable list 【使用select.select编写聊天室服务器 《Python网络编程攻略》】Readable list中的socket 可以有3种可能状态:
- 第一种是如果这个socket是main “server” socket,它负责监听客户端的连接,如果这个main server socket出现在readable里,那代表这是server端已经ready来接收一个新的连接进来了,为了让这个main server能同时处理多个连接,在下面的代码里,我们把这个main server的socket设置为非阻塞模式。
# Handle inputs for s in readable:if s is server: # A "readable" server socket is ready to accept a connection connection, client_address = s.accept() print >>sys.stderr, 'new connection from', client_address connection.setblocking(0) inputs.append(connection)# Give the connection a queue for data we want to send message_queues[connection] = Queue.Queue()

  • 第二种情况是这个socket是已经建立了的连接,它把数据发了过来,这个时候你就可以通过recv()来接收它发过来的数据,然后把接收到的数据放到queue里,这样你就可以把接收到的数据再传回给客户端了。
else: data = s.recv(1024) if data: # A readable client socket has data print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) message_queues[s].put(data) # Add output channel for response if s not in outputs: outputs.append(s)

  • 第三种情况就是这个客户端已经断开了,所以你再通过recv()接收到的数据就为空了,所以这个时候你就可以把这个跟客户端的连接关闭了。
else: # Interpret empty result as closed connection print >>sys.stderr, 'closing', client_address, 'after reading no data' # Stop listening for input on the connection if s in outputs: outputs.remove(s)#既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉 inputs.remove(s)#inputs中也删除掉 s.close()#把这个连接关闭掉# Remove message queue del message_queues[s]

writable list 对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态。
# Handle outputs for s in writable: try: next_msg = message_queues[s].get_nowait() except Queue.Empty: # No messages waiting so stop checking for writability. print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty' outputs.remove(s) else: print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername()) s.send(next_msg)

select 实例 服务器端示例代码
#_*_coding:utf-8_*_ __author__ = 'Alex Li'import select import socket import sys import queue# Create a TCP/IP socket server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False)# Bind the socket to the port server_address = ('localhost', 10000) print(sys.stderr, 'starting up on %s port %s' % server_address) server.bind(server_address)# Listen for incoming connections server.listen(5)# Sockets from which we expect to read inputs = [ server ]# Sockets to which we expect to write outputs = [ ]message_queues = {} while inputs:# Wait for at least one of the sockets to be ready for processing print( '\nwaiting for the next event') readable, writable, exceptional = select.select(inputs, outputs, inputs) # Handle inputs for s in readable:if s is server: # A "readable" server socket is ready to accept a connection connection, client_address = s.accept() print('new connection from', client_address) connection.setblocking(False) inputs.append(connection)# Give the connection a queue for data we want to send message_queues[connection] = queue.Queue() else: data = s.recv(1024) if data: # A readable client socket has data print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) ) message_queues[s].put(data) # Add output channel for response if s not in outputs: outputs.append(s) else: # Interpret empty result as closed connection print('closing', client_address, 'after reading no data') # Stop listening for input on the connection if s in outputs: outputs.remove(s)#既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉 inputs.remove(s)#inputs中也删除掉 s.close()#把这个连接关闭掉# Remove message queue del message_queues[s] # Handle outputs for s in writable: try: next_msg = message_queues[s].get_nowait() except queue.Empty: # No messages waiting so stop checking for writability. print('output queue for', s.getpeername(), 'is empty') outputs.remove(s) else: print( 'sending "%s" to %s' % (next_msg, s.getpeername())) s.send(next_msg) # Handle "exceptional conditions" for s in exceptional: print('handling exceptional condition for', s.getpeername() ) # Stop listening for input on the connection inputs.remove(s) if s in outputs: outputs.remove(s) s.close()# Remove message queue del message_queues[s]

客户端示例代码
__author__ = 'jieli' import socket import sysmessages = [ 'This is the message. ', 'It will be sent ', 'in parts.', ] server_address = ('localhost', 10000)# Create a TCP/IP socket socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ]# Connect the socket to the port where the server is listening print >>sys.stderr, 'connecting to %s port %s' % server_address for s in socks: s.connect(server_address)for message in messages:# Send messages on both sockets for s in socks: print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message) s.send(message)# Read responses on both sockets for s in socks: data = https://www.it610.com/article/s.recv(1024) print>>sys.stderr, '%s: received "%s"' % (s.getsockname(), data) if not data: print >>sys.stderr, 'closing socket', s.getsockname() s.close()

运行结果参考:
http://pymotw.com/2/select/
http://www.cnblogs.com/alex3714/p/4372426.html#top
实现方法
  1. 本例将客户端和服务器端代码写在一个脚本里,运行时指定不同的–name参数区别运行的是服务器或客户端:当传入–name=server时,脚本启动聊天室服务器;当传入的是其他参数,如client1、client2时,则运行的是客户端。
  2. 聊天室服务器端口通过–port指定。
  3. 对大型的应用程序而言,最好在不同模块中编写服务器和客户端。
程序代码
''' Created on 2017-2-28@author: lenovo ''' import select import socket import sys import signal import cPickle import struct import argparseSERVER_HOST = 'localhost' CHAT_SERVER_NAME = 'server'# Some utilities def send(channel, *args): buffer = cPickle.dumps(args) value = https://www.it610.com/article/socket.htonl(len(buffer)) size = struct.pack("L",value) channel.send(size) channel.send(buffer)def receive(channel): size = struct.calcsize("L") size = channel.recv(size)#socket.recv(bufsize[, flags]) #Receive data from the socket. The return value is a string representing the data received. #The maximum amount of data to be received at once is specified by bufsize. See the Unix manual page recv(2) for the meaning of the optional argument flags; it defaults to zero. #Note:For best match with hardware and network realities, the value of bufsize should be a relatively small power of 2, for example, 4096.try: size = socket.ntohl(struct.unpack("L",size)[0]) #socket.ntohl(x) #Convert 32-bit positive integers from network to host byte order. #On machines where the host byte order is the same as network byte order, #this is a no-op; otherwise, it performs a 4-byte swap operation.except struct.error, e: return '' buf = "" while len(buf) < size: buf += channel.recv(size - len(buf)) return cPickle.loads(buf)[0]class ChatServer(object): """An example chat server using select""" def __init__(self,port,backlog=5): self.clients = 0 self.clientmap = {} self.outputs = [] #list out sockets self.server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind((SERVER_HOST,port)) print 'Server listening to port: %s...' %port self.server.listen(backlog) # Catch keyboard interrupts signal.signal(signal.SIGINT, self.sighandler)def sighandler(self,signum,frame): """Clean up client output""" # Close the server print 'Shutting down server...' # close existing client sockets for output in self.outputs: output.close() self.server.close()def get_client_name(self,client): """Return the name of the client""" info = self.clientmap[client] host,name = info[0][0],info[1] return '@'.join((name,host))def run(self): # define input source. inputs = [self.server,sys.stdin] self.outputs = [] running = True while running: try: readable,writeable,exceptional = select.select(inputs, self.outputs, []) except select.error, e: breakfor sock in readable: if sock == self.server: # handle the server socket client,address = self.server.accept() print "Chat server: got connection %d from %s" %(client.fileno(),address) # read the login name cname = receive(client).split('NAME: ')[1] # Compute client name and send back self.clients+= 1 send(client, 'CLIENT: ' + str(address[0])) inputs.append(client) self.clientmap[client] = (address,cname) # Send joining information to other clients msg = "\n(Connected: New client (%d) from %s)" % (self.clients,self.get_client_name(client)) for output in self.outputs: send(output,msg) self.outputs.append(client)elif sock == sys.stdin: #handle standard inut junk = sys.stdin.readline() running = Falseelse: # Handle all other sockets try: data = receive(sock) if data: #send as new client's message... msg = '\n#[' + self.get_client_name(sock) +']>>' + data # send data to all except ourself for output in self.outputs: if output != sock: send(output,msg) else: print "Chat server: %d hung up" % sock.fileno() self.clients -= 1 sock.close() inputs.remove(sock) self.outputs.remove(sock)#sending client leaving information to others msg = "\n(Now hung up: Client from %s)" % self.get_client_name(sock) for output in self.outputs: send(output,msg) except socket.error, e: #remove inputs.remove(sock) self.outputs.remove(sock) self.server.close()class ChatClient(object): """ A command ine chat client using select """ def __init__(self,name,port,host=SERVER_HOST): self.name = name self.connected = False self.host = host self.port = port #Initial prompt self.prompt = '[' + '@'.join((name,socket.gethostname().split('.')[0])) + ']>' # Connect to server at port try: self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.sock.connect((host,self.port)) print "New connected to chat server @ port %d" %self.port self.connected = True #Send my name... send(self.sock,'NAME: ' + self.name) data = https://www.it610.com/article/receive(self.sock) #Contains client address, set it addr = data.split('CLIENT: ')[1] self.prompt = '[' +'@'.join((self.name,addr)) +']>' except socket.error, e: print "Failed to connect to chat server @ port %d" %self.port sys.exit(1)def run(self): """ Chat client main loop""" while self.connected: try: sys.stdout.write(self.prompt) sys.stdout.flush() #wait for input fromstdin and socket readable,writeable,exceptional = select.select([0,self.sock], [], []) for sock in readable: if sock == 0: data = sys.stdin.readline().strip() if data: send(self.sock,data) elif sock == self.sock: data = receive(self.sock) if not data: print 'Client shutting down.' self.connected = False break else: sys.stdout.write(data + '\n') sys.stdout.flush() except KeyboardInterrupt: print " CLient interrupted. """ self.sock.close() break if __name__ == "__main__": parser = argparse.ArgumentParser(description = 'Socket Server Example with Select') parser.add_argument('--name', action="store",dest="name",required=True) parser.add_argument('--port',action="store",dest="port",type=int,required=True) given_args=parser.parse_args() port=given_args.port name=given_args.name if name == CHAT_SERVER_NAME: server = ChatServer(port) server.run() else: client = ChatClient(name=name,port=port) client.run()

运行结果: 使用select.select编写聊天室服务器 《Python网络编程攻略》
文章图片

    推荐阅读