From ab351de3820c6abc669ed3c394c658ddd55db393 Mon Sep 17 00:00:00 2001 From: xdrm-brackets Date: Thu, 3 May 2018 21:10:02 +0200 Subject: [PATCH] non-blocking reads (+ added peek-er to check connection every 10us) --- cmd/iface/main.go | 38 +++++++++--------------- ws/client.go | 74 ++++++++++++++++++++++++++++------------------- ws/controller.go | 2 +- 3 files changed, 59 insertions(+), 55 deletions(-) diff --git a/cmd/iface/main.go b/cmd/iface/main.go index a7f51f7..a64e02c 100644 --- a/cmd/iface/main.go +++ b/cmd/iface/main.go @@ -15,43 +15,31 @@ func main(){ serv := ws.CreateServer("0.0.0.0", 4444) /* (2) Bind default controller */ - serv.BindDefault(func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message) chan struct{}{ + serv.BindDefault(func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message){ - var closer chan struct{} = make(chan struct{}, 1) + for msg := range receiver { - go func(){ - for msg := range receiver { + // if receive message -> send it back + sender <- &msg + // sender <- nil - // if receive message -> send it back - sender <- &msg - - } - }() - - return closer + } }) /* (3) Bind to URI */ - err := serv.Bind("/channel/./room/./", func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message) chan struct{}{ + err := serv.Bind("/channel/./room/./", func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message){ - var closer chan struct{} = make(chan struct{}, 1) + fmt.Printf("[uri] connected\n") - go func(){ - fmt.Printf("[uri] connected\n") + for msg := range receiver{ - for msg := range receiver{ + fmt.Printf("[uri] received '%s'\n", msg.Data) + sender <- &msg - fmt.Printf("[uri] received '%s'\n", msg.Data) - sender <- &msg + } - } - - fmt.Printf("[uri] unexpectedly closed\n") - }() - - - return closer + fmt.Printf("[uri] unexpectedly closed\n") }) if err != nil { panic(err) } diff --git a/ws/client.go b/ws/client.go index 8ee9e89..7d7cd3e 100644 --- a/ws/client.go +++ b/ws/client.go @@ -1,6 +1,7 @@ package ws import ( + "time" "bufio" "encoding/binary" "git.xdrm.io/gws/internal/http/upgrade/request" @@ -12,14 +13,13 @@ import ( type clientIO struct { sock net.Conn reader *bufio.Reader - kill chan<- *client + kill chan<- *client // unregisters client } // Represents all channels that need a client type clientChannelSet struct{ receive chan Message send chan *Message - closer chan struct{} } // Represents a websocket client @@ -27,7 +27,7 @@ type client struct { io clientIO iface *Client ch clientChannelSet - status MessageError // close status ; 0 = nothing + status MessageError // close status ; 0 = nothing ; else -> must close } @@ -78,7 +78,6 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli ch: clientChannelSet{ receive: make(chan Message, 1), send: make(chan *Message, 1), - closer: make(chan struct{}, 1), }, } @@ -102,24 +101,17 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli /* (4) Launch client routines ---------------------------------------------------------*/ /* (1) Launch client controller */ - instance.ch.closer = controller.Fun( + go controller.Fun( instance.iface, // pass the client instance.ch.receive, // the receiver instance.ch.send, // the sender serverCh.broadcast, // broadcast sender ) - /* (2) Launch close handler */ - go func(){ - for range instance.ch.closer {} - instance.close(NORMAL) - }() - - - /* (3) Launch message reader */ + /* (2) Launch message reader */ go instance.reader() - /* (4) Launc writer */ + /* (3) Launc writer */ go instance.writer() return instance, nil @@ -132,7 +124,30 @@ func (c *client) reader(){ for { - /* (1) Parse message */ + /* (1) If error code -> close */ + if c.status != NONE { + c.close(c.status) + break + } + + /* (2) Wait for available data */ + c.io.sock.SetReadDeadline(time.Now().Add(time.Microsecond*10)) + _, err := c.io.reader.Peek(1) + + // timeout -> continune checking + if neterr, ok := err.(net.Error); ok && neterr.Timeout() { + continue + + } + + // another error -> stop reading + if err != nil { + break + } + + c.io.sock.SetReadDeadline(time.Time{}) // remove timeout + + /* (3) Parse message */ msg, err := readMessage(c.io.reader) if err != nil { // fmt.Printf(" [reader] %s\n", err) @@ -140,20 +155,20 @@ func (c *client) reader(){ return } - /* (2) CLOSE */ + /* (4) CLOSE */ if msg.Type == CLOSE { c.close(NORMAL) return } - /* (3) PING size error */ + /* (5) PING size error */ if msg.Type == PING && msg.Size > 125 { c.close(PROTOCOL_ERR) return } - /* (4) Send PONG */ + /* (6) Send PONG */ if msg.Type == PING { msg.Final = true msg.Type = PONG @@ -161,18 +176,18 @@ func (c *client) reader(){ continue } - /* (5) Unknown opcode */ + /* (7) Unknown opcode */ if msg.Type != TEXT && msg.Type != BINARY { c.close(PROTOCOL_ERR) return } - /* (5) Dispatch to receiver */ + /* (7) Dispatch to receiver */ c.ch.receive <- *msg } - /* (6) close channel */ + /* (8) close channel */ c.close(NORMAL) } @@ -185,17 +200,23 @@ func (c *client) writer(){ for msg := range c.ch.send { + /* (1) If empty message -> close properly */ + if msg == nil { + break + } + + /* (2) Send message */ err := msg.Send(c.io.sock) + /* (3) Fail on error */ if err != nil { fmt.Printf(" [writer] %s\n", err) break } - } - /* (3) proper close */ + /* (4) proper close */ c.close(NORMAL) } @@ -226,14 +247,9 @@ func (c *client) close(status MessageError){ } /* (2) Close socket */ + time.Sleep(time.Second * 3) c.io.sock.Close() - /* (3) Dereference data */ - c.ch.receive = nil - c.ch.send = nil - c.ch.closer = nil - c.iface = nil - c.io.reader = nil /* (4) Unregister */ c.io.kill <- c diff --git a/ws/controller.go b/ws/controller.go index 186b77a..a671cdb 100644 --- a/ws/controller.go +++ b/ws/controller.go @@ -12,7 +12,7 @@ type Client struct { } // Represents a websocket controller callback function -type ControllerFunc func(*Client, <-chan Message, chan<- *Message, chan<- *Message) chan struct{} +type ControllerFunc func(*Client, <-chan Message, chan<- *Message, chan<- *Message) // Represents a websocket controller type Controller struct {