From dfeba3140450354c94cd8b13a5b349b7065df821 Mon Sep 17 00:00:00 2001 From: xdrm-brackets Date: Sat, 5 May 2018 18:43:16 +0200 Subject: [PATCH] no more channels with message pointers, only Messages (controller can close a client by closing the 'send' channel) + fixed channel closing (so as goroutines) when ending connections --- cmd/iface/main.go | 10 +++++----- ws/client.go | 34 +++++++++++++++------------------- ws/controller.go | 2 +- ws/server.go | 8 ++++---- 4 files changed, 25 insertions(+), 29 deletions(-) diff --git a/cmd/iface/main.go b/cmd/iface/main.go index 8d0baa6..81beec3 100644 --- a/cmd/iface/main.go +++ b/cmd/iface/main.go @@ -15,27 +15,27 @@ func main(){ serv := ws.CreateServer("0.0.0.0", 4444) /* (2) Bind default controller */ - serv.BindDefault(func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message){ + serv.BindDefault(func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- ws.Message, bc chan<- ws.Message){ for msg := range receiver { // if receive message -> send it back - sender <- &msg - sender <- nil + sender <- msg + // close(sender) } }) /* (3) Bind to URI */ - err := serv.Bind("/channel/./room/./", func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message){ + err := serv.Bind("/channel/./room/./", func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- ws.Message, bc chan<- ws.Message){ fmt.Printf("[uri] connected\n") for msg := range receiver{ fmt.Printf("[uri] received '%s'\n", msg.Data) - sender <- &msg + sender <- msg } diff --git a/ws/client.go b/ws/client.go index 9adef86..308b7dd 100644 --- a/ws/client.go +++ b/ws/client.go @@ -19,13 +19,13 @@ type clientIO struct { closing bool closingMu sync.Mutex reading sync.WaitGroup - writing sync.WaitGroup + writing bool } // Represents all channels that need a client type clientChannelSet struct{ receive chan Message - send chan *Message + send chan Message } // Represents a websocket client @@ -82,8 +82,8 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli }, ch: clientChannelSet{ - receive: make(chan Message, 1), - send: make(chan *Message, 2), + receive: make(chan Message, 1), + send: make(chan Message, 1), }, } @@ -175,7 +175,7 @@ func clientReader(c *client){ // fmt.Printf("[reader] PING -> PONG\n") msg.Final = true msg.Type = PONG - c.ch.send <- msg + c.ch.send <- *msg continue } @@ -198,6 +198,7 @@ func clientReader(c *client){ } + close(c.ch.receive) c.io.reading.Done() /* (8) close channel (if not already done) */ @@ -212,28 +213,23 @@ func clientReader(c *client){ // and is triggered by client.ch.send channel func clientWriter(c *client){ - c.io.writing.Add(1) + c.io.writing = true // if channel still exists for msg := range c.ch.send { - /* (1) If empty message -> close properly */ - if msg == nil { - fmt.Printf(" [writer] nil\n") - break - } - /* (2) Send message */ err := msg.Send(c.io.sock) /* (3) Fail on error */ if err != nil { fmt.Printf(" [writer] %s\n", err) + c.io.writing = false break } } - c.io.writing.Done() + c.io.writing = false /* (4) close channel (if not already done) */ // fmt.Printf("[writer] end\n") @@ -261,14 +257,14 @@ func (c *client) close(status MessageError, clientACK bool){ return } - - - /* (2) kill 'c.reader()' if already running */ + /* (2) kill reader if still running */ c.io.sock.SetReadDeadline(time.Now().Add(time.Second*-1)) - // fmt.Printf("[close] wait read stop\n") c.io.reading.Wait() - close(c.ch.receive) - close(c.ch.send) + + /* (3) kill writer' if still running */ + if c.io.writing { + close(c.ch.send) + } if status == NONE { diff --git a/ws/controller.go b/ws/controller.go index 59d67ed..696ac62 100644 --- a/ws/controller.go +++ b/ws/controller.go @@ -12,7 +12,7 @@ type Client struct { } // Represents a websocket controller callback function -type ControllerFunc func(*Client, <-chan Message, chan<- *Message, chan<- *Message) +type ControllerFunc func(*Client, <-chan Message, chan<- Message, chan<- Message) // Represents a websocket controller type Controller struct { diff --git a/ws/server.go b/ws/server.go index 6ab5666..d276a88 100644 --- a/ws/server.go +++ b/ws/server.go @@ -10,7 +10,7 @@ import ( type serverChannelSet struct{ register chan *client unregister chan *client - broadcast chan *Message + broadcast chan Message } @@ -45,9 +45,9 @@ func CreateServer(host string, port uint16) *Server{ }, ch: serverChannelSet{ - register: make(chan *client, 1), - unregister: make(chan *client, 1), - broadcast: make(chan *Message, 1), + register: make(chan *client, 1), + unregister: make(chan *client, 1), + broadcast: make(chan Message, 1), }, }