From 52067c764049c18141bf92285b92e052df878450 Mon Sep 17 00:00:00 2001 From: xdrm-brackets Date: Sat, 5 May 2018 17:38:07 +0200 Subject: [PATCH] close() closes channels to free goroutines reader/writer are no more methods -> functions added reference to server.unregister some more stuff --- ws/client.go | 197 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 132 insertions(+), 65 deletions(-) diff --git a/ws/client.go b/ws/client.go index 32685fe..cf2f226 100644 --- a/ws/client.go +++ b/ws/client.go @@ -1,7 +1,9 @@ package ws import ( + "unicode/utf8" "time" + "sync" "bufio" "encoding/binary" "git.xdrm.io/gws/internal/http/upgrade/request" @@ -14,6 +16,10 @@ type clientIO struct { sock net.Conn reader *bufio.Reader kill chan<- *client // unregisters client + closing bool + closingMu sync.Mutex + reading sync.WaitGroup + writing sync.WaitGroup } // Represents all channels that need a client @@ -63,11 +69,11 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli clientProtocol := res.GetProtocol() /* (2) Initialise client */ - instance := &client{ + cli := &client{ io: clientIO{ sock: s, reader: bufio.NewReader(s), - kill: make(chan<- *client, 1), + kill: serverCh.unregister, }, iface: &Client{ @@ -77,7 +83,7 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli ch: clientChannelSet{ receive: make(chan Message, 1), - send: make(chan *Message, 1), + send: make(chan *Message, 2), }, } @@ -94,7 +100,7 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli } /* (3) Copy arguments */ - instance.iface.Arguments = arguments + cli.iface.Arguments = arguments @@ -102,93 +108,101 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli ---------------------------------------------------------*/ /* (1) Launch client controller */ go controller.Fun( - instance.iface, // pass the client - instance.ch.receive, // the receiver - instance.ch.send, // the sender + cli.iface, // pass the client + cli.ch.receive, // the receiver + cli.ch.send, // the sender serverCh.broadcast, // broadcast sender ) /* (2) Launch message reader */ - go instance.reader() + go clientReader(cli) /* (3) Launc writer */ - go instance.writer() + go clientWriter(cli) - return instance, nil + return cli, nil } + + // reader reads and parses messages from the buffer -func (c *client) reader(){ +func clientReader(c *client){ + + errorCode := NORMAL + clientAck := true + c.io.reading.Add(1) for { - /* (1) If error code -> close */ - if c.status != NONE { - c.close(c.status) + /* if currently closing -> exit */ + if c.io.closing { + fmt.Printf("[reader] killed because closing") break } - /* (2) Wait for available data */ - // c.io.sock.SetReadDeadline(time.Now().Add(10*time.Microsecond)) - // _, err := c.io.reader.Peek(1) - - // // timeout -> continune checking - // if neterr, ok := err.(net.Error); ok && neterr.Timeout() { - // time.Sleep(10*time.Microsecond) - // continue - // } - - // // another error -> stop reading - // if err != nil { - // break - // } - - // c.io.sock.SetReadDeadline(time.Time{}) // remove timeout - - /* (3) Parse message */ + /*** Parse message ***/ msg, err := readMessage(c.io.reader) if err != nil { // fmt.Printf(" [reader] %s\n", err) - c.close(NORMAL) - return + break } /* (4) CLOSE */ if msg.Type == CLOSE { - c.close(NORMAL) - return + // fmt.Printf(" [reader] CLOSE ; size %d\n", msg.Size) + // if msg.Size >= 2 { + // errCode := binary.BigEndian.Uint16(msg.Data[0:2]) + // fmt.Printf(" ; status %d\n", errCode) + // fmt.Printf(" ; msg '%s'\n", msg.Data[2:]) + // } + clientAck = false + break } /* (5) PING size error */ if msg.Type == PING && msg.Size > 125 { - c.close(PROTOCOL_ERR) - return + fmt.Printf(" [reader] PING payload too big\n") + // fmt.Printf("[reader] PING err\n") + errorCode = PROTOCOL_ERR + break } /* (6) Send PONG */ if msg.Type == PING { + // fmt.Printf("[reader] PING -> PONG\n") msg.Final = true msg.Type = PONG c.ch.send <- msg continue } - /* (7) Unknown opcode */ - if msg.Type != TEXT && msg.Type != BINARY { - c.close(PROTOCOL_ERR) - return + /* (7) Invalid UTF8 */ + if msg.Type == TEXT && !utf8.Valid(msg.Data) { + fmt.Printf(" [reader] invalid utf-8\n") + errorCode = INVALID_PAYLOAD + break } - /* (7) Dispatch to receiver */ + /* (8) Unknown opcode */ + if msg.Type != TEXT && msg.Type != BINARY { + fmt.Printf(" [reader] unknown OpCode %d\n", msg.Type) + errorCode = PROTOCOL_ERR + break + } + + /* (9) Dispatch to receiver */ c.ch.receive <- *msg } - /* (8) close channel */ - c.close(NORMAL) + c.io.reading.Done() + + /* (8) close channel (if not already done) */ + // fmt.Printf("[reader] end\n") + c.close(errorCode, clientAck) } @@ -196,12 +210,15 @@ func (c *client) reader(){ // writer writes into websocket // and is triggered by client.ch.send channel -func (c *client) writer(){ +func clientWriter(c *client){ + + c.io.writing.Add(1) for msg := range c.ch.send { /* (1) If empty message -> close properly */ if msg == nil { + fmt.Printf(" [writer] nil\n") break } @@ -216,8 +233,11 @@ func (c *client) writer(){ } - /* (4) proper close */ - c.close(NORMAL) + c.io.writing.Done() + + /* (4) close channel (if not already done) */ + // fmt.Printf("[writer] end\n") + c.close(NORMAL, true) } @@ -226,33 +246,80 @@ func (c *client) writer(){ // close writes the error message (if needed) // and it closes the socket -func (c *client) close(status MessageError){ +// if 'clientACK' is true, reads the next message (CLOSE acknowledge) +// before closing the socket +func (c *client) close(status MessageError, clientACK bool){ - /* (1) If error status -> send close frame */ - if status != NONE { + /* (1) Fail if already closing */ + alreadyClosing := false + c.io.closingMu.Lock() + alreadyClosing = c.io.closing + c.io.closing = true + c.io.closingMu.Unlock() - /* Create message */ - msg := &Message{ - Final: true, - Type: CLOSE, - Data: make([]byte, 8), + if alreadyClosing { + return + } + + + + /* (2) kill 'c.reader()' if already running */ + c.io.sock.SetReadDeadline(time.Now().Add(time.Second*-1)) + // fmt.Printf("[close] wait read stop\n") + c.io.reading.Wait() + close(c.ch.receive) + // close(c.ch.send) + + + if status == NONE { + status = NORMAL + } + + /* (3) Build message */ + msg := &Message{ + Final: true, + Type: CLOSE, + Size: 2, + Data: make([]byte, 2), + } + binary.BigEndian.PutUint16(msg.Data, uint16(status)) + // msg.Data = append(msg.Data, []byte("(closing)")...) + msg.Size = uint( len(msg.Data) ) + + /* (4) Send message */ + err := msg.Send(c.io.sock) + if err != nil { + fmt.Printf("[close] send error (%s0\n", err) + } + // fmt.Printf("[close] frame sent\n") + + + /* (2) Wait for client CLOSE if needed */ + if clientACK { + + c.io.sock.SetReadDeadline(time.Now().Add(time.Millisecond)) + + /* Wait for message */ + msg, err := readMessage(c.io.reader) + if err != nil || msg.Type != CLOSE { + if err == nil { + fmt.Printf("[close] received OpCode = %d\n", msg.Type) + } else { + fmt.Printf("[close] read error (%v)\n", err) + } } - binary.BigEndian.PutUint16(msg.Data, uint16(status)) - msg.Data = append(msg.Data, []byte(" close")...) - msg.Size = uint( len(msg.Data) ) - /* Send message */ - msg.Send(c.io.sock) + // fmt.Printf("[close] received ACK\n") } - /* (2) Close socket */ - c.io.sock.SetReadDeadline(time.Now()) - time.Sleep(time.Second * 3) + /* (3) Close socket */ c.io.sock.Close() - + // fmt.Printf("[close] socket closed\n") /* (4) Unregister */ c.io.kill <- c + return + } \ No newline at end of file