ws/client.go

347 lines
6.7 KiB
Go

package websocket
import (
"bufio"
"encoding/binary"
"fmt"
"net"
"sync"
"time"
"git.xdrm.io/go/ws/internal/http/upgrade/request"
)
// Represents a client socket utility (reader, writer, ..)
type clientIO struct {
sock net.Conn
reader *bufio.Reader
kill chan<- *client // unregisters client
closing bool
closingMu sync.Mutex
reading sync.WaitGroup
writing bool
}
// Represents all channels that need a client
type clientChannelSet struct {
receive chan Message
send chan Message
}
// Represents a websocket client
type client struct {
io clientIO
iface *Client
ch clientChannelSet
status MessageError // close status ; 0 = nothing ; else -> must close
}
// Create creates a new client
func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*client, error) {
/* (1) Manage UPGRADE request
---------------------------------------------------------*/
/* (1) Parse request */
req, _ := request.Parse(s)
/* (3) Build response */
res := req.BuildResponse()
/* (4) Write into socket */
_, err := res.Send(s)
if err != nil {
return nil, fmt.Errorf("Upgrade write error: %s", err)
}
if res.GetStatusCode() != 101 {
s.Close()
return nil, fmt.Errorf("Upgrade error (HTTP %d)\n", res.GetStatusCode())
}
/* (2) Initialise client
---------------------------------------------------------*/
/* (1) Get upgrade data */
clientURI := req.GetURI()
clientProtocol := res.GetProtocol()
/* (2) Initialise client */
cli := &client{
io: clientIO{
sock: s,
reader: bufio.NewReader(s),
kill: serverCh.unregister,
},
iface: &Client{
Protocol: string(clientProtocol),
Arguments: [][]string{[]string{clientURI}},
},
ch: clientChannelSet{
receive: make(chan Message, 1),
send: make(chan Message, 1),
},
}
/* (3) Find controller by URI
---------------------------------------------------------*/
/* (1) Try to find one */
controller, arguments := ctl.Match(clientURI)
/* (2) If nothing found -> error */
if controller == nil {
return nil, fmt.Errorf("No controller found, no default controller set\n")
}
/* (3) Copy arguments */
cli.iface.Arguments = arguments
/* (4) Launch client routines
---------------------------------------------------------*/
/* (1) Launch client controller */
go controller.Fun(
cli.iface, // pass the client
cli.ch.receive, // the receiver
cli.ch.send, // the sender
serverCh.broadcast, // broadcast sender
)
/* (2) Launch message reader */
go clientReader(cli)
/* (3) Launc writer */
go clientWriter(cli)
return cli, nil
}
// reader reads and parses messages from the buffer
func clientReader(c *client) {
var frag *Message
closeStatus := Normal
clientAck := true
c.io.reading.Add(1)
for {
/* (1) if currently closing -> exit */
if c.io.closing {
fmt.Printf("[reader] killed because closing")
break
}
/* (2) Parse message */
msg, err := readMessage(c.io.reader)
if err == ErrUnmaskedFrame || err == ErrReservedBits {
closeStatus = ProtocolError
}
if err != nil {
break
}
/* (3) Fail on invalid message */
msgErr := msg.check(frag != nil)
if msgErr != nil {
mustClose := false
switch msgErr {
// Fail
case ErrUnexpectedContinuation:
closeStatus = None
clientAck = false
mustClose = true
// proper close
case ErrCloseFrame:
closeStatus = Normal
clientAck = true
mustClose = true
// invalid payload proper close
case ErrInvalidPayload:
closeStatus = InvalidPayload
clientAck = true
mustClose = true
// any other error -> protocol error
default:
closeStatus = ProtocolError
clientAck = true
mustClose = true
}
if mustClose {
break
}
}
/* (4) Ping <-> Pong */
if msg.Type == Ping && c.io.writing {
msg.Final = true
msg.Type = Pong
c.ch.send <- *msg
continue
}
/* (5) Store first fragment */
if frag == nil && !msg.Final {
frag = &Message{
Type: msg.Type,
Final: msg.Final,
Data: msg.Data,
Size: msg.Size,
}
continue
}
/* (6) Store fragments */
if frag != nil {
frag.Final = msg.Final
frag.Size += msg.Size
frag.Data = append(frag.Data, msg.Data...)
if !frag.Final { // continue if not last fragment
continue
}
// check message errors
fragErr := frag.check(false)
if fragErr == ErrInvalidPayload {
closeStatus = InvalidPayload
break
} else if fragErr != nil {
closeStatus = ProtocolError
break
}
msg = frag
frag = nil
}
/* (7) Dispatch to receiver */
if msg.Type == Text || msg.Type == Binary {
c.ch.receive <- *msg
}
}
close(c.ch.receive)
c.io.reading.Done()
/* (8) close channel (if not already done) */
// fmt.Printf("[reader] end\n")
c.close(closeStatus, clientAck)
}
// writer writes into websocket
// and is triggered by client.ch.send channel
func clientWriter(c *client) {
c.io.writing = true // if channel still exists
for msg := range c.ch.send {
/* (2) Send message */
err := msg.Send(c.io.sock)
/* (3) Fail on error */
if err != nil {
fmt.Printf(" [writer] %s\n", err)
c.io.writing = false
break
}
}
c.io.writing = false
/* (4) close channel (if not already done) */
// fmt.Printf("[writer] end\n")
c.close(Normal, true)
}
// closes the connection
// send CLOSE frame is 'status' is not NONE
// wait for the next message (CLOSE acknowledge) if 'clientACK'
// then delete client
func (c *client) close(status MessageError, clientACK bool) {
/* (1) Fail if already closing */
alreadyClosing := false
c.io.closingMu.Lock()
alreadyClosing = c.io.closing
c.io.closing = true
c.io.closingMu.Unlock()
if alreadyClosing {
return
}
/* (2) kill writer' if still running */
if c.io.writing {
close(c.ch.send)
}
/* (3) kill reader if still running */
c.io.sock.SetReadDeadline(time.Now().Add(time.Second * -1))
c.io.reading.Wait()
if status != None {
/* (3) Build message */
msg := &Message{
Final: true,
Type: Close,
Size: 2,
Data: make([]byte, 2),
}
binary.BigEndian.PutUint16(msg.Data, uint16(status))
/* (4) Send message */
msg.Send(c.io.sock)
// if err != nil {
// fmt.Printf("[close] send error (%s0\n", err)
// }
}
/* (2) Wait for client CLOSE if needed */
if clientACK {
c.io.sock.SetReadDeadline(time.Now().Add(time.Millisecond))
/* Wait for message */
readMessage(c.io.reader)
// if err != nil || msg.Type != CLOSE {
// if err == nil {
// fmt.Printf("[close] received OpCode = %d\n", msg.Type)
// } else {
// fmt.Printf("[close] read error (%v)\n", err)
// }
// }
// fmt.Printf("[close] received ACK\n")
}
/* (3) Close socket */
c.io.sock.Close()
// fmt.Printf("[close] socket closed\n")
/* (4) Unregister */
c.io.kill <- c
return
}