close() closes channels to free goroutines
reader/writer are no more methods -> functions added reference to server.unregister some more stuff
This commit is contained in:
parent
e56ff09fc8
commit
52067c7640
187
ws/client.go
187
ws/client.go
|
@ -1,7 +1,9 @@
|
|||
package ws
|
||||
|
||||
import (
|
||||
"unicode/utf8"
|
||||
"time"
|
||||
"sync"
|
||||
"bufio"
|
||||
"encoding/binary"
|
||||
"git.xdrm.io/gws/internal/http/upgrade/request"
|
||||
|
@ -14,6 +16,10 @@ type clientIO struct {
|
|||
sock net.Conn
|
||||
reader *bufio.Reader
|
||||
kill chan<- *client // unregisters client
|
||||
closing bool
|
||||
closingMu sync.Mutex
|
||||
reading sync.WaitGroup
|
||||
writing sync.WaitGroup
|
||||
}
|
||||
|
||||
// Represents all channels that need a client
|
||||
|
@ -63,11 +69,11 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli
|
|||
clientProtocol := res.GetProtocol()
|
||||
|
||||
/* (2) Initialise client */
|
||||
instance := &client{
|
||||
cli := &client{
|
||||
io: clientIO{
|
||||
sock: s,
|
||||
reader: bufio.NewReader(s),
|
||||
kill: make(chan<- *client, 1),
|
||||
kill: serverCh.unregister,
|
||||
},
|
||||
|
||||
iface: &Client{
|
||||
|
@ -77,7 +83,7 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli
|
|||
|
||||
ch: clientChannelSet{
|
||||
receive: make(chan Message, 1),
|
||||
send: make(chan *Message, 1),
|
||||
send: make(chan *Message, 2),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -94,7 +100,7 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli
|
|||
}
|
||||
|
||||
/* (3) Copy arguments */
|
||||
instance.iface.Arguments = arguments
|
||||
cli.iface.Arguments = arguments
|
||||
|
||||
|
||||
|
||||
|
@ -102,93 +108,101 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli
|
|||
---------------------------------------------------------*/
|
||||
/* (1) Launch client controller */
|
||||
go controller.Fun(
|
||||
instance.iface, // pass the client
|
||||
instance.ch.receive, // the receiver
|
||||
instance.ch.send, // the sender
|
||||
cli.iface, // pass the client
|
||||
cli.ch.receive, // the receiver
|
||||
cli.ch.send, // the sender
|
||||
serverCh.broadcast, // broadcast sender
|
||||
)
|
||||
|
||||
/* (2) Launch message reader */
|
||||
go instance.reader()
|
||||
go clientReader(cli)
|
||||
|
||||
/* (3) Launc writer */
|
||||
go instance.writer()
|
||||
go clientWriter(cli)
|
||||
|
||||
return instance, nil
|
||||
return cli, nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// reader reads and parses messages from the buffer
|
||||
func (c *client) reader(){
|
||||
func clientReader(c *client){
|
||||
|
||||
errorCode := NORMAL
|
||||
clientAck := true
|
||||
c.io.reading.Add(1)
|
||||
|
||||
for {
|
||||
|
||||
/* (1) If error code -> close */
|
||||
if c.status != NONE {
|
||||
c.close(c.status)
|
||||
/* if currently closing -> exit */
|
||||
if c.io.closing {
|
||||
fmt.Printf("[reader] killed because closing")
|
||||
break
|
||||
}
|
||||
|
||||
/* (2) Wait for available data */
|
||||
// c.io.sock.SetReadDeadline(time.Now().Add(10*time.Microsecond))
|
||||
// _, err := c.io.reader.Peek(1)
|
||||
|
||||
// // timeout -> continune checking
|
||||
// if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
|
||||
// time.Sleep(10*time.Microsecond)
|
||||
// continue
|
||||
// }
|
||||
|
||||
// // another error -> stop reading
|
||||
// if err != nil {
|
||||
// break
|
||||
// }
|
||||
|
||||
// c.io.sock.SetReadDeadline(time.Time{}) // remove timeout
|
||||
|
||||
/* (3) Parse message */
|
||||
/*** Parse message ***/
|
||||
msg, err := readMessage(c.io.reader)
|
||||
if err != nil {
|
||||
// fmt.Printf(" [reader] %s\n", err)
|
||||
c.close(NORMAL)
|
||||
return
|
||||
break
|
||||
}
|
||||
|
||||
/* (4) CLOSE */
|
||||
if msg.Type == CLOSE {
|
||||
c.close(NORMAL)
|
||||
return
|
||||
// fmt.Printf(" [reader] CLOSE ; size %d\n", msg.Size)
|
||||
// if msg.Size >= 2 {
|
||||
// errCode := binary.BigEndian.Uint16(msg.Data[0:2])
|
||||
// fmt.Printf(" ; status %d\n", errCode)
|
||||
// fmt.Printf(" ; msg '%s'\n", msg.Data[2:])
|
||||
// }
|
||||
clientAck = false
|
||||
break
|
||||
|
||||
}
|
||||
|
||||
/* (5) PING size error */
|
||||
if msg.Type == PING && msg.Size > 125 {
|
||||
c.close(PROTOCOL_ERR)
|
||||
return
|
||||
fmt.Printf(" [reader] PING payload too big\n")
|
||||
// fmt.Printf("[reader] PING err\n")
|
||||
errorCode = PROTOCOL_ERR
|
||||
break
|
||||
}
|
||||
|
||||
/* (6) Send PONG */
|
||||
if msg.Type == PING {
|
||||
// fmt.Printf("[reader] PING -> PONG\n")
|
||||
msg.Final = true
|
||||
msg.Type = PONG
|
||||
c.ch.send <- msg
|
||||
continue
|
||||
}
|
||||
|
||||
/* (7) Unknown opcode */
|
||||
if msg.Type != TEXT && msg.Type != BINARY {
|
||||
c.close(PROTOCOL_ERR)
|
||||
return
|
||||
/* (7) Invalid UTF8 */
|
||||
if msg.Type == TEXT && !utf8.Valid(msg.Data) {
|
||||
fmt.Printf(" [reader] invalid utf-8\n")
|
||||
errorCode = INVALID_PAYLOAD
|
||||
break
|
||||
}
|
||||
|
||||
/* (7) Dispatch to receiver */
|
||||
/* (8) Unknown opcode */
|
||||
if msg.Type != TEXT && msg.Type != BINARY {
|
||||
fmt.Printf(" [reader] unknown OpCode %d\n", msg.Type)
|
||||
errorCode = PROTOCOL_ERR
|
||||
break
|
||||
}
|
||||
|
||||
/* (9) Dispatch to receiver */
|
||||
c.ch.receive <- *msg
|
||||
|
||||
}
|
||||
|
||||
/* (8) close channel */
|
||||
c.close(NORMAL)
|
||||
c.io.reading.Done()
|
||||
|
||||
/* (8) close channel (if not already done) */
|
||||
// fmt.Printf("[reader] end\n")
|
||||
c.close(errorCode, clientAck)
|
||||
|
||||
}
|
||||
|
||||
|
@ -196,12 +210,15 @@ func (c *client) reader(){
|
|||
|
||||
// writer writes into websocket
|
||||
// and is triggered by client.ch.send channel
|
||||
func (c *client) writer(){
|
||||
func clientWriter(c *client){
|
||||
|
||||
c.io.writing.Add(1)
|
||||
|
||||
for msg := range c.ch.send {
|
||||
|
||||
/* (1) If empty message -> close properly */
|
||||
if msg == nil {
|
||||
fmt.Printf(" [writer] nil\n")
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -216,8 +233,11 @@ func (c *client) writer(){
|
|||
|
||||
}
|
||||
|
||||
/* (4) proper close */
|
||||
c.close(NORMAL)
|
||||
c.io.writing.Done()
|
||||
|
||||
/* (4) close channel (if not already done) */
|
||||
// fmt.Printf("[writer] end\n")
|
||||
c.close(NORMAL, true)
|
||||
|
||||
}
|
||||
|
||||
|
@ -226,33 +246,80 @@ func (c *client) writer(){
|
|||
|
||||
// close writes the error message (if needed)
|
||||
// and it closes the socket
|
||||
func (c *client) close(status MessageError){
|
||||
// if 'clientACK' is true, reads the next message (CLOSE acknowledge)
|
||||
// before closing the socket
|
||||
func (c *client) close(status MessageError, clientACK bool){
|
||||
|
||||
/* (1) If error status -> send close frame */
|
||||
if status != NONE {
|
||||
/* (1) Fail if already closing */
|
||||
alreadyClosing := false
|
||||
c.io.closingMu.Lock()
|
||||
alreadyClosing = c.io.closing
|
||||
c.io.closing = true
|
||||
c.io.closingMu.Unlock()
|
||||
|
||||
/* Create message */
|
||||
if alreadyClosing {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* (2) kill 'c.reader()' if already running */
|
||||
c.io.sock.SetReadDeadline(time.Now().Add(time.Second*-1))
|
||||
// fmt.Printf("[close] wait read stop\n")
|
||||
c.io.reading.Wait()
|
||||
close(c.ch.receive)
|
||||
// close(c.ch.send)
|
||||
|
||||
|
||||
if status == NONE {
|
||||
status = NORMAL
|
||||
}
|
||||
|
||||
/* (3) Build message */
|
||||
msg := &Message{
|
||||
Final: true,
|
||||
Type: CLOSE,
|
||||
Data: make([]byte, 8),
|
||||
Size: 2,
|
||||
Data: make([]byte, 2),
|
||||
}
|
||||
binary.BigEndian.PutUint16(msg.Data, uint16(status))
|
||||
msg.Data = append(msg.Data, []byte(" close")...)
|
||||
// msg.Data = append(msg.Data, []byte("(closing)")...)
|
||||
msg.Size = uint( len(msg.Data) )
|
||||
|
||||
/* Send message */
|
||||
msg.Send(c.io.sock)
|
||||
/* (4) Send message */
|
||||
err := msg.Send(c.io.sock)
|
||||
if err != nil {
|
||||
fmt.Printf("[close] send error (%s0\n", err)
|
||||
}
|
||||
// fmt.Printf("[close] frame sent\n")
|
||||
|
||||
|
||||
/* (2) Wait for client CLOSE if needed */
|
||||
if clientACK {
|
||||
|
||||
c.io.sock.SetReadDeadline(time.Now().Add(time.Millisecond))
|
||||
|
||||
/* Wait for message */
|
||||
msg, err := readMessage(c.io.reader)
|
||||
if err != nil || msg.Type != CLOSE {
|
||||
if err == nil {
|
||||
fmt.Printf("[close] received OpCode = %d\n", msg.Type)
|
||||
} else {
|
||||
fmt.Printf("[close] read error (%v)\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
// fmt.Printf("[close] received ACK\n")
|
||||
|
||||
}
|
||||
|
||||
/* (2) Close socket */
|
||||
c.io.sock.SetReadDeadline(time.Now())
|
||||
time.Sleep(time.Second * 3)
|
||||
/* (3) Close socket */
|
||||
c.io.sock.Close()
|
||||
|
||||
// fmt.Printf("[close] socket closed\n")
|
||||
|
||||
/* (4) Unregister */
|
||||
c.io.kill <- c
|
||||
|
||||
return
|
||||
|
||||
}
|
Loading…
Reference in New Issue