package websocket import ( "bufio" "encoding/binary" "fmt" "net" "sync" "time" "git.xdrm.io/go/ws/internal/http/upgrade" ) // Represents a client socket utility (reader, writer, ..) type clientIO struct { sock net.Conn reader *bufio.Reader kill chan<- *client // unregisters client closing bool closingMu sync.Mutex reading sync.WaitGroup writing bool } // Represents all channels that need a client type clientChannelSet struct { receive chan Message send chan Message } // Represents a websocket client type client struct { io clientIO iface *Client ch clientChannelSet status MessageError // close status ; 0 = nothing ; else -> must close } // Create creates a new client func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*client, error) { /* (1) Manage UPGRADE request ---------------------------------------------------------*/ // 1. Parse request req, _ := upgrade.Parse(s) // 3. Build response res := req.BuildResponse() // 4. Write into socket _, err := res.Send(s) if err != nil { return nil, fmt.Errorf("upgrade write: %w", err) } if res.GetStatusCode() != 101 { s.Close() return nil, fmt.Errorf("upgrade failed (HTTP %d)", res.GetStatusCode()) } /* (2) Initialise client ---------------------------------------------------------*/ // 1. Get upgrade data clientURI := req.GetURI() clientProtocol := res.GetProtocol() // 2. Initialise client cli := &client{ io: clientIO{ sock: s, reader: bufio.NewReader(s), kill: serverCh.unregister, }, iface: &Client{ Protocol: string(clientProtocol), Arguments: [][]string{{clientURI}}, }, ch: clientChannelSet{ receive: make(chan Message, 1), send: make(chan Message, 1), }, } /* (3) Find controller by URI ---------------------------------------------------------*/ // 1. Try to find one controller, arguments := ctl.Match(clientURI) // 2. If nothing found -> error if controller == nil { return nil, fmt.Errorf("No controller found, no default controller set") } // 3. Copy arguments cli.iface.Arguments = arguments /* (4) Launch client routines ---------------------------------------------------------*/ // 1. Launch client controller go controller.Fun( cli.iface, // pass the client cli.ch.receive, // the receiver cli.ch.send, // the sender serverCh.broadcast, // broadcast sender ) // 2. Launch message reader go clientReader(cli) // 3. Launc writer go clientWriter(cli) return cli, nil } // reader reads and parses messages from the buffer func clientReader(c *client) { var frag *Message closeStatus := Normal clientAck := true c.io.reading.Add(1) for { // 1. if currently closing -> exit if c.io.closing { fmt.Printf("[reader] killed because closing") break } // 2. Parse message msg, err := readMessage(c.io.reader) if err == ErrUnmaskedFrame || err == ErrReservedBits { closeStatus = ProtocolError } if err != nil { break } // 3. Fail on invalid message msgErr := msg.check(frag != nil) if msgErr != nil { mustClose := false switch msgErr { // Fail case ErrUnexpectedContinuation: closeStatus = None clientAck = false mustClose = true // proper close case ErrCloseFrame: closeStatus = Normal clientAck = true mustClose = true // invalid payload proper close case ErrInvalidPayload: closeStatus = InvalidPayload clientAck = true mustClose = true // any other error -> protocol error default: closeStatus = ProtocolError clientAck = true mustClose = true } if mustClose { break } } // 4. Ping <-> Pong if msg.Type == Ping && c.io.writing { msg.Final = true msg.Type = Pong c.ch.send <- *msg continue } // 5. Store first fragment if frag == nil && !msg.Final { frag = &Message{ Type: msg.Type, Final: msg.Final, Data: msg.Data, Size: msg.Size, } continue } // 6. Store fragments if frag != nil { frag.Final = msg.Final frag.Size += msg.Size frag.Data = append(frag.Data, msg.Data...) if !frag.Final { // continue if not last fragment continue } // check message errors fragErr := frag.check(false) if fragErr == ErrInvalidPayload { closeStatus = InvalidPayload break } else if fragErr != nil { closeStatus = ProtocolError break } msg = frag frag = nil } // 7. Dispatch to receiver if msg.Type == Text || msg.Type == Binary { c.ch.receive <- *msg } } close(c.ch.receive) c.io.reading.Done() // 8. close channel (if not already done) // fmt.Printf("[reader] end\n") c.close(closeStatus, clientAck) } // writer writes into websocket // and is triggered by client.ch.send channel func clientWriter(c *client) { c.io.writing = true // if channel still exists for msg := range c.ch.send { // 2. Send message err := msg.Send(c.io.sock) // 3. Fail on error if err != nil { fmt.Printf(" [writer] %s\n", err) c.io.writing = false break } } c.io.writing = false // 4. close channel (if not already done) // fmt.Printf("[writer] end\n") c.close(Normal, true) } // closes the connection // send CLOSE frame is 'status' is not NONE // wait for the next message (CLOSE acknowledge) if 'clientACK' // then delete client func (c *client) close(status MessageError, clientACK bool) { // 1. Fail if already closing alreadyClosing := false c.io.closingMu.Lock() alreadyClosing = c.io.closing c.io.closing = true c.io.closingMu.Unlock() if alreadyClosing { return } // 2. kill writer' if still running if c.io.writing { close(c.ch.send) } // 3. kill reader if still running c.io.sock.SetReadDeadline(time.Now().Add(time.Second * -1)) c.io.reading.Wait() if status != None { // 3. Build message msg := &Message{ Final: true, Type: Close, Size: 2, Data: make([]byte, 2), } binary.BigEndian.PutUint16(msg.Data, uint16(status)) // 4. Send message msg.Send(c.io.sock) // if err != nil { // fmt.Printf("[close] send error (%s0\n", err) // } } // 2. Wait for client CLOSE if needed if clientACK { c.io.sock.SetReadDeadline(time.Now().Add(time.Millisecond)) /* Wait for message */ readMessage(c.io.reader) // if err != nil || msg.Type != CLOSE { // if err == nil { // fmt.Printf("[close] received OpCode = %d\n", msg.Type) // } else { // fmt.Printf("[close] read error (%v)\n", err) // } // } // fmt.Printf("[close] received ACK\n") } // 3. Close socket c.io.sock.Close() // fmt.Printf("[close] socket closed\n") // 4. Unregister c.io.kill <- c return }