no more channels with message pointers, only Messages (controller can close a client by closing the 'send' channel)

+ fixed channel closing (so as goroutines) when ending connections
This commit is contained in:
xdrm-brackets 2018-05-05 18:43:16 +02:00
parent 4f85752052
commit dfeba31404
4 changed files with 25 additions and 29 deletions

View File

@ -15,27 +15,27 @@ func main(){
serv := ws.CreateServer("0.0.0.0", 4444) serv := ws.CreateServer("0.0.0.0", 4444)
/* (2) Bind default controller */ /* (2) Bind default controller */
serv.BindDefault(func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message){ serv.BindDefault(func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- ws.Message, bc chan<- ws.Message){
for msg := range receiver { for msg := range receiver {
// if receive message -> send it back // if receive message -> send it back
sender <- &msg sender <- msg
sender <- nil // close(sender)
} }
}) })
/* (3) Bind to URI */ /* (3) Bind to URI */
err := serv.Bind("/channel/./room/./", func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message){ err := serv.Bind("/channel/./room/./", func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- ws.Message, bc chan<- ws.Message){
fmt.Printf("[uri] connected\n") fmt.Printf("[uri] connected\n")
for msg := range receiver{ for msg := range receiver{
fmt.Printf("[uri] received '%s'\n", msg.Data) fmt.Printf("[uri] received '%s'\n", msg.Data)
sender <- &msg sender <- msg
} }

View File

@ -19,13 +19,13 @@ type clientIO struct {
closing bool closing bool
closingMu sync.Mutex closingMu sync.Mutex
reading sync.WaitGroup reading sync.WaitGroup
writing sync.WaitGroup writing bool
} }
// Represents all channels that need a client // Represents all channels that need a client
type clientChannelSet struct{ type clientChannelSet struct{
receive chan Message receive chan Message
send chan *Message send chan Message
} }
// Represents a websocket client // Represents a websocket client
@ -82,8 +82,8 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli
}, },
ch: clientChannelSet{ ch: clientChannelSet{
receive: make(chan Message, 1), receive: make(chan Message, 1),
send: make(chan *Message, 2), send: make(chan Message, 1),
}, },
} }
@ -175,7 +175,7 @@ func clientReader(c *client){
// fmt.Printf("[reader] PING -> PONG\n") // fmt.Printf("[reader] PING -> PONG\n")
msg.Final = true msg.Final = true
msg.Type = PONG msg.Type = PONG
c.ch.send <- msg c.ch.send <- *msg
continue continue
} }
@ -198,6 +198,7 @@ func clientReader(c *client){
} }
close(c.ch.receive)
c.io.reading.Done() c.io.reading.Done()
/* (8) close channel (if not already done) */ /* (8) close channel (if not already done) */
@ -212,28 +213,23 @@ func clientReader(c *client){
// and is triggered by client.ch.send channel // and is triggered by client.ch.send channel
func clientWriter(c *client){ func clientWriter(c *client){
c.io.writing.Add(1) c.io.writing = true // if channel still exists
for msg := range c.ch.send { for msg := range c.ch.send {
/* (1) If empty message -> close properly */
if msg == nil {
fmt.Printf(" [writer] nil\n")
break
}
/* (2) Send message */ /* (2) Send message */
err := msg.Send(c.io.sock) err := msg.Send(c.io.sock)
/* (3) Fail on error */ /* (3) Fail on error */
if err != nil { if err != nil {
fmt.Printf(" [writer] %s\n", err) fmt.Printf(" [writer] %s\n", err)
c.io.writing = false
break break
} }
} }
c.io.writing.Done() c.io.writing = false
/* (4) close channel (if not already done) */ /* (4) close channel (if not already done) */
// fmt.Printf("[writer] end\n") // fmt.Printf("[writer] end\n")
@ -261,14 +257,14 @@ func (c *client) close(status MessageError, clientACK bool){
return return
} }
/* (2) kill reader if still running */
/* (2) kill 'c.reader()' if already running */
c.io.sock.SetReadDeadline(time.Now().Add(time.Second*-1)) c.io.sock.SetReadDeadline(time.Now().Add(time.Second*-1))
// fmt.Printf("[close] wait read stop\n")
c.io.reading.Wait() c.io.reading.Wait()
close(c.ch.receive)
close(c.ch.send) /* (3) kill writer' if still running */
if c.io.writing {
close(c.ch.send)
}
if status == NONE { if status == NONE {

View File

@ -12,7 +12,7 @@ type Client struct {
} }
// Represents a websocket controller callback function // Represents a websocket controller callback function
type ControllerFunc func(*Client, <-chan Message, chan<- *Message, chan<- *Message) type ControllerFunc func(*Client, <-chan Message, chan<- Message, chan<- Message)
// Represents a websocket controller // Represents a websocket controller
type Controller struct { type Controller struct {

View File

@ -10,7 +10,7 @@ import (
type serverChannelSet struct{ type serverChannelSet struct{
register chan *client register chan *client
unregister chan *client unregister chan *client
broadcast chan *Message broadcast chan Message
} }
@ -45,9 +45,9 @@ func CreateServer(host string, port uint16) *Server{
}, },
ch: serverChannelSet{ ch: serverChannelSet{
register: make(chan *client, 1), register: make(chan *client, 1),
unregister: make(chan *client, 1), unregister: make(chan *client, 1),
broadcast: make(chan *Message, 1), broadcast: make(chan Message, 1),
}, },
} }