go-tcpsvr

go语言用于制作socket相关的事情是非常方便的。在底层框架直接有支持。在这里编写一个简单版本的服务器通讯逻辑。参考了leaf的代码。

package mainimport ( "encoding/binary" "fmt" "io" "net" "os" "os/signal" "sync" "time" )type ( // 会话对象 session struct { sync.Mutex connnet.Conn writeChan chan []byte closeFlag bool owner*server }SessionSet map[*session]struct{}// 服务器对象 server struct { sync.Mutex tempDelaytime.Duration lnnet.Listener wgLnsync.WaitGroup mutexConns sync.Mutex connsSessionSet wgSessionsync.WaitGroup } )func (this *server) Close() { fmt.Printf("server.Close() address: %v\n", this.ln.Addr().String()) this.ln.Close() this.wgLn.Wait()this.mutexConns.Lock() for conn := range this.conns { conn.Close() } this.conns = nil this.mutexConns.Unlock() this.wgSession.Wait() fmt.Printf("server.Close() done\n", this.ln.Addr().String()) }func (this *server) ProcTempErr() { tempDelay := this.tempDelay if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } fmt.Printf("server.ProcTempErr accept retrying in %v\n", tempDelay) time.Sleep(tempDelay) this.tempDelay = tempDelay}func (this *server) onNewSession(conn net.Conn) { fmt.Printf("server.onNewSession remote %v\n", conn.RemoteAddr().String())c := &session{conn: conn, closeFlag: false, owner: this} c.writeChan = make(chan []byte, 512*1024)this.mutexConns.Lock() this.conns[c] = struct{}{} this.mutexConns.Unlock()go c.goRecvData(this.wgSession) go c.goSendData(this.wgSession) fmt.Println("server.onNewSession done")c.PostData(`I had seen little of Holmes lately. My marriage had drifted us away from each other.`) }func (this *server) onSessionLost(conn *session) { fmt.Println("server.onSessionLost") this.mutexConns.Lock() if _, ok := this.conns[conn]; ok { delete(this.conns, conn) fmt.Println("server.onSessionLost done") } this.mutexConns.Unlock() }func (this *server) runAccept() { this.wgLn.Add(1) defer this.wgLn.Done() for { conn, err := this.ln.Accept() if err != nil { if ne, ok := err.(net.Error); ok && !ne.Temporary() { fmt.Printf("Accept err: %v\n", ne.Error()) return } this.ProcTempErr() continue } this.onNewSession(conn) } fmt.Println("server.runAccept done") }func (this *server) Listen() { ln, err := net.Listen("tcp4", "0.0.0.0:8089") if err != nil { fmt.Println("listen fail: ", err) return } this.ln = ln this.conns = make(SessionSet) go this.runAccept() fmt.Printf("server.Listen address: %v done\n", ln.Addr().String()) }func (this *session) PostData(data string) { msgLen := len(data) buf := []byte(data) msg := make([]byte, 2+msgLen) binary.LittleEndian.PutUint16(msg, uint16(msgLen)) copy(msg[2:], buf)this.Lock() if !this.closeFlag { this.writeChan <- msg } this.Unlock() fmt.Printf("session.PostData address: %v, data size: %v\n", this.conn.RemoteAddr().String(), len(msg)) }func (this *session) Close() { fmt.Println("session.Close") this.innorCloseSock() }func (this *session) innorCloseSock() { this.Lock() if !this.closeFlag { this.closeFlag = true this.conn.Close() close(this.writeChan) fmt.Println("session innorCloseSock done") } this.Unlock() }func (this *session) goRecvData(wg sync.WaitGroup) { wg.Add(1) defer wg.Done() conn := this.conn for { data, err := procData(conn) if err != nil { fmt.Printf("session.goRecvData procData fail, err: %v\n", err) break } this.handleData(data) } this.innorCloseSock() }func (this *session) goSendData(wg sync.WaitGroup) { wg.Add(1) defer wg.Done() conn := this.connfor b := range this.writeChan { if b == nil { fmt.Println("session.goSendData b is nil") break } _, err := conn.Write(b) if err != nil { fmt.Printf("session.goSendData Write fail, err: %v\n", err) break } } this.owner.onSessionLost(this) this.innorCloseSock() }func procData(conn net.Conn) ([]byte, error) { var bufHeader []byte = make([]byte, 2, 2) if _, err := io.ReadFull(conn, bufHeader); err != nil { fmt.Printf("procData io.ReadFull header fail, err: %v\n", err.Error()) return nil, err } var msgLen uint32 msgLen = uint32(binary.LittleEndian.Uint16(bufHeader)) fmt.Printf("procData header get size: %v\n", msgLen) msgData := make([]byte, msgLen) if _, err := io.ReadFull(conn, msgData); err != nil { fmt.Printf("procData io.ReadFull body fail, err: %v\n", err.Error()) return nil, err } fmt.Printf("procData get data, size: %v\n", len(msgData)) return msgData, nil }func (this *session) handleData(data []byte) { fmt.Printf("session.handleData remote address: %v, recv data size: %v\n", this.conn.RemoteAddr().String(), len(data)) content := string(data) fmt.Println(content) }func main() { svr := &server{} svr.Listen()// close c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, os.Kill) <-csvr.Close() }

【go-tcpsvr】测试代码:
#! /usr/bin/python3 # coding: utf-8 import asyncore, socket,struct import time import logginglogging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%a, %d %b %Y %H:%M:%S')msgstr = '''One night---it was on the twentieth of March,1888---I was returning from a journey to a patient. '''class Client( asyncore.dispatcher ):def __init__( self, _host, _port ): asyncore.dispatcher.__init__( self ) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect( ( _host, _port ) ) self.recv_buf_ = b'' self.send_buf_ = b'' self.post_data(msgstr)def post_data(self, _raw_buf): # # | short| binary| # | pack size| encrypt binary| # | binary data|| # small-endian unsigned int # self.send_buf_ = self.send_buf_ + struct.pack(" 2: if len(self.recv_buf_) < 2: return True stream_size = struct.unpack( " 0 )def handle_write( self ): if not self.writeable(): time.sleep(1) return sent = self.send( self.send_buf_ ) logging.info("handle_write, sent: %d"%sent) self.send_buf_ =self.send_buf_[ sent: ] passdef proc_data(self,content): logging.debug("proc_data content: {}".format(content))if __name__ == "__main__": logging.info("robot is launch") clients = [] for i in range( 0, 100 ): clients.append( Client( '127.0.0.1', 8089) ) asyncore.loop() pass

    推荐阅读