python asynchrous network

2013年07月15日

select,poll,epoll test in python

selecttest.py

import select
import socket
import Queue
#create a socket
server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(False)
#set option reused
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server_address=('localhost',10001)
server.bind(server_address)
server.listen(10)
#sockets from which we except to read
inputs=[server]
#sockets from which we expect to write
outputs=[]
#Outgoing message queues (socket:Queue)
message_queues={}
#A optional parameter for select is TIMEOUT
timeout=20
while inputs:
    print "waiting for next event"
    #first parameter--read list,second parameter--write list,third parameter--error list
    readable,writable,exceptional=select.select(inputs,outputs,inputs,timeout)
    #when timeout reached,select return three empty list
    if not (readable or writable or exceptional):
        print "Time out!"
        break;
    for s in readable:
        if s is server:
            #A "readable" socket is ready to accept a connection
            connection,client_address=s.accept()
            print "  connection from",client_address
            connection.setblocking(0)
            inputs.append(connection)
            message_queues[connection]=Queue.Queue()
        else:
            data=s.recv(1024)
            if data:
                print "received ",data, "from ",s.getpeername()
                message_queues[s].put(data)
                #Add output  channel for response
                if s not in outputs:
                    outputs.append(s)
            else:
                #Interpret empty result as closed connection
                print " closing", client_address
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                #remove message queue
                del message_queues[s]
    for s in writable:
        try:
            next_msg=message_queues[s].get_nowait()
        except Queue.Empty:
            print " ",s.getpeername," queue empty"
            outputs.remove(s)
        else:
            print "sending ",next_msg," to",s.getpeername()
            s.send(next_msg)
    for s in exceptional:
        print " exception condition on ",s.getpeername()
        #stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        #Remove message queue

polltest.py

  
    
    import socket
    import select
    import Queue
    server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    server.setblocking(False)
    server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    server_address=("localhost",10001)    
    print "Starting up on %s port %s" % server_address
    server.bind(server_address)
    server.listen(5)
    message_queues={}
    #The timeout value is represente in milliseconds,instead of seconds
    timeout=100
    #Create a limit for the next
    READ_ONLY=(select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
    READ_WRITE=(READ_ONLY|select.POLLOUT)    
    #set up the poller
    poller=select.poll()
    poller.register(server,READ_ONLY)
    #map file descriptors to socket objects
    fd_to_socket={server.fileno():server,}
    while True:
        print "Waiting for the next event"
        events=poller.poll(timeout)
        print "*"*20
        print len(events)
        print events
        print "*"*20
        for fd,flag in events:
            s=fd_to_socket[fd]
            if flag & (select.POLLIN|select.POLLPRI):
                if s is server:
                    #A readable socket is ready to accept a connection
                    connection,client_address=s.accept()
                    print "Connection ",client_address
                    connection.setblocking(False)    
                    fd_to_socket[connection.fileno()]=connection
                    poller.register(connection,READ_ONLY)    
                    #Give the connection a queue to send data
                    message_queues[connection]=Queue.Queue
                else:
                    data=s.recv(1024)
                    if data:
                        # A readable client socket has data
                        print " received %s from %s " % (data,s.getpeername())
                        message_queues[s].put(data)
                        poller.modify(s,READ_WRITE)
                    else:
                        #Close the connection
                        print " closing ",s.getpeername()
                        #Stop listening for input on the connection
                        poller.unregister(s)
                        s.close()
                        del message_queues[s]
            elif flag & select.POLLHUP:
                #A client that "Hang up",to be closed.
                print "closing ",s.getpeername,"(HUP)"
                poller.unregister(s)
                s.close()
            elif flag & select.POLLOUT:
                #SOCKET is ready to send DATA,if there is any to send
                try:
                    next_msg=message_queues[s].get_nowait()
                except Queue.Empty:
                    #No messages waiting so stop checking
                    print s.getpeername," queue empty"
                    poller.modify(s,READ_ONLY)
                else:
                    print " sending %s to %s" %(next_msg,s.getpeername())
                    s.send(next_msg)
            elif flag & select.POLLERR:
                #Any events with POLLER cause the server to close the sockets
                print " exception on",s.getpeername()
                poller.unregister(s)
                s.close()
                del message_queues[s]

epolltest.py

    import socket,logging
    import select,errno
    logger=logging.getLogger("network-server")    
    def InitLog():
        logger.setLevel(logging.DEBUG)    
        fh=logging.FileHandler("network-server.log")
        fh.setLevel(logging.DEBUG)
        ch=logging.StreamHandler()
        ch.setLevel(logging.DEBUG)    
        formatter=logging.Formatter("%(asctime)s - %(name)s - %(levelname)s -%(message)s")
        ch.setFormatter(formatter)
        fh.setFormatter(formatter)    
        logger.addHandler(fh)
        logger.addHandler(ch)    
    if __name__=="__main__":
        InitLog()    
        try:
            listen_fd=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        except socket.error,msg:
            logger.error("create a socket failed")    
        try:
            listen_fd.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
        except socket.error,msg:
            logger.error("setsocketopt error")    
        try:
            listen_fd.bind(('',10001))
        except socket.error,msg:
            logger.error("listen file id bind ip error")    
        try:
            listen_fd.listen(10)
        except socket.error,msg:
            logger.error(msg)
        try:
            epoll_fd=select.epoll()
            epoll_fd.register(listen_fd.fileno(),select.EPOLLIN)
        except select.error,msg:
            logger.error(msg)        
        connections={}
        addresses={}
        datalist={}
        while True:
            epoll_list = epoll_fd.poll()
            for fd,events in epoll_list:
                if fd==listen_fd.fileno():
                    conn,addr=listen_fd.accept()
                    logger.debug("accept connction from  %s,%d,fd = %d" %(addr[0],addr[1],conn.fileno()))
                    conn.setblocking(0)
                    epoll_fd.register(conn.fileno(),select.EPOLLIN|select.EPOLLET)
                    connections[conn.fileno()]=conn 
                    addresses[conn.fileno()]=addr
                elif select.EPOLLIN  & events:
                    datas=''
                    while True:
                        try:
                            data=connections[fd].recv(10)
                            if not data and not datas:
                                epoll_fd.unregister(fd)
                                connections[fd].close()
                                logger.debug("%s,%d closed" % (addresses[fd][0],addresses[fd][1]))                                
                                break
                            else:
                                datas+=data 
                        except socket.error,msg:
                            if msg.errno==errno.EAGAIN:
                                logger.debug("%s receive %s" % (fd,datas))
                                datalist[fd]=datas
                                epoll_fd.modify(fd,select.EPOLLET|select.EPOLLOUT)
                                break
                            else:
                                epoll_fd.unregister(fd)
                                connections[fd].close()
                                logger.error(msg)
                                break
                elif select.EPOLLHUP & events:
                    epoll_fd.unregister(fd)
                    connections[fd].close()
                    logger.debug("%s ,%d closed" % (addresses[fd][0],addresses[fd][1]))
                elif select.EPOLLOUT & events:
                    sendLen=0
                    while True:
                        sendLen += connections[fd].send(datalist[fd][sendLen:])
                        if sendLen == len(datalist[fd]):
                             break
                    epoll_fd.modify(fd,select.EPOLLIN|select.EPOLLET)
                else:
                    continue

commontest.py(client code)

    import socket
    messages = ["This is the message",
             "It will be send",
             "in parts"]
    print "Connect to the server"    
    server_address=("localhost",10001)    
    #Create a TCP/IP sock
    socks=[]
    for i in range(10):
         socks.append(socket.socket(socket.AF_INET,socket.SOCK_STREAM))    
    for s in socks:
        s.connect(server_address)    
    counter=0
    for message in messages:
        for s in socks:
            counter+=1
            print " %s sending %s "%(s.getpeername(),message+" version "+str(counter))
            s.send(message+" version "+str(counter))
        for s in socks:
            data=s.recv(1024)
            print " %s received %s " % (s.getpeername(),data)
            if not data:
                print "closing socket",s.getpeername()
                s.close()  

benchmark
针对epoll的使用有一篇很好的博客
针对asyncore的介绍和使用有很好的两篇博客:
blog1
blog2