From e89166a3bffad50e4e0a3d8dea05ffc08d5ed410 Mon Sep 17 00:00:00 2001 From: xdrm-brackets Date: Mon, 30 Apr 2018 15:35:30 +0200 Subject: [PATCH] update + opcode management (SOME THINGS TO FIX...) --- cmd/iface/main.go | 9 +-- ws/client.go | 38 +++------ ws/frame.go | 197 +++++++++++++++++++++++++++++++++++++++++++--- ws/frame/types.go | 2 +- ws/private.go | 2 +- ws/types.go | 4 +- 6 files changed, 204 insertions(+), 48 deletions(-) diff --git a/cmd/iface/main.go b/cmd/iface/main.go index 1f88082..e6b4b1f 100644 --- a/cmd/iface/main.go +++ b/cmd/iface/main.go @@ -15,7 +15,7 @@ func main(){ serv := ws.CreateServer("0.0.0.0", 4444) /* (2) Bind default controller */ - serv.BindDefault(func(client *ws.Client, receiver <-chan *ws.Frame, sender chan<- []byte, closer <-chan func()){ + serv.BindDefault(func(client *ws.Client, receiver <-chan ws.Frame, sender chan<- []byte, closer <-chan func()){ fmt.Printf("[default] connected\n") for { @@ -23,9 +23,7 @@ func main(){ select { case receivedFrame := <- receiver: fmt.Printf("[default] received '%s'\n", receivedFrame.Payload.Buffer) - if receivedFrame.Payload.Length > 0 { - sender <- receivedFrame.Payload.Buffer[1:] - } + sender <- receivedFrame.Payload.Buffer case closeFunc := <- closer: fmt.Printf("[default] client with protocol '%s' exited\n", client.Protocol) closeFunc() @@ -38,7 +36,7 @@ func main(){ }) /* (3) Bind to URI */ - err := serv.Bind("/channel/./room/./", func(client *ws.Client, receiver <-chan *ws.Frame, sender chan<- []byte, closer <-chan func()){ + err := serv.Bind("/channel/./room/./", func(client *ws.Client, receiver <-chan ws.Frame, sender chan<- []byte, closer <-chan func()){ fmt.Printf("[uri] connected\n") for { @@ -52,6 +50,7 @@ func main(){ return } } + fmt.Printf("[uri] unexpectedly closed\n") }) diff --git a/ws/client.go b/ws/client.go index 82c1acc..d81e559 100644 --- a/ws/client.go +++ b/ws/client.go @@ -18,8 +18,6 @@ func (c *Client) asyncReader(s *Server) { // Get buffer buf := new(bytes.Buffer) - // Get buffer frame - frame := new(Frame) for { @@ -27,48 +25,32 @@ func (c *Client) asyncReader(s *Server) { var startTime int64 = time.Now().UnixNano() // Try to read frame header - err := frame.ReadHeader(buf, c.sock) - + frame, err := ReadFrame(buf, c.sock) if err != nil { - fmt.Printf(" - Header reading error: %s\n", err) + fmt.Printf("%s\n", err) break } + var elapsed = float32(time.Now().UnixNano()-startTime)/1e3 - // Try to read frame payload - err = frame.ReadPayload(buf, c.sock) - if err != nil { - fmt.Printf(" - Payload reading error: %s\n", err) - break - } - - fmt.Printf("+ elapsed: %.3f us\n", float32(time.Now().UnixNano()-startTime)/1e3) + fmt.Printf("+ elapsed: %.3f us\n", elapsed) // Trigger data channel - c.recvc <- frame + c.recvc <- *frame } fmt.Printf(" - remove client\n") - // close sending channel - close(c.sendc) - // return closing callback c.closec <- func(){ // Remove client from server delete(s.clients, c.sock) - // close channels - close(c.recvc) - // Close socket c.sock.Close() - // close this closing channel - close(c.closec) - } } @@ -85,13 +67,15 @@ func (c *Client) asyncWriter(s *Server){ fmt.Printf("Writing '%s'\n", payload) // Build Frame - f := BuildFromPayload(payload) + f := buildFrame(payload) // Send over socket - c.sock.Write( f.Bytes() ) - - + senderr := f.Send(c.sock) + if senderr != nil { + fmt.Printf("Writing error: %s\n", senderr) + } } + } \ No newline at end of file diff --git a/ws/frame.go b/ws/frame.go index f1f8344..2e17326 100644 --- a/ws/frame.go +++ b/ws/frame.go @@ -1,6 +1,7 @@ package ws import ( + "git.xdrm.io/gws/ws/frame/opcode" "encoding/binary" "git.xdrm.io/gws/ws/frame" "fmt" @@ -10,8 +11,66 @@ import ( ) -// ReadHeader reads the frame header -func (f *Frame) ReadHeader(buf *bytes.Buffer, s net.Conn) error{ + + +// ReadFrame reads the frame from a socket +func ReadFrame(b *bytes.Buffer, s net.Conn) (*Frame, error) { + + f := new(Frame) + + /* (1) Read header + ---------------------------------------------------------*/ + err := f.readHeader(b, s) + if err != nil { + return nil, fmt.Errorf("Header read: %s\n", err) + } + + + /* (2) Read payload + ---------------------------------------------------------*/ + err = f.readPayload(b, s) + if err != nil { + return nil, fmt.Errorf("Payload read: %s\n", err) + } + + + /* (3) Manage OpCode + ---------------------------------------------------------*/ + switch( f.Header.Opc ){ + + case opcode.CLOSE: + fmt.Printf("Opcode: CLOSE\n") + return nil, fmt.Errorf("Closed by client\n") + + case opcode.TEXT: + fmt.Printf("Opcode: TEXT\n") + + case opcode.BINARY: + fmt.Printf("Opcode: BINARY\n") + + case opcode.PING: + fmt.Printf("Opcode: PING\n") + err = buildPong().Send(s) + if err != nil { + return nil, fmt.Errorf("Pong frame: %s\n", err) + } + + + default: + fmt.Printf("Opcode: CLOSE\n") + buildClose().Send(s) + return nil, fmt.Errorf("Unknown Opcode %x\n", f.Header.Opc) + + } + + return f, nil + +} + + +// readHeader reads the frame header +func (f *Frame) readHeader(buf *bytes.Buffer, s net.Conn) error{ + var err error /* (2) Byte 1: FIN and OpCode */ b, err := reader.ReadBytes(s, 1) @@ -62,6 +121,7 @@ func (f *Frame) ReadHeader(buf *bytes.Buffer, s net.Conn) error{ // fmt.Printf(" + Header\n") // fmt.Printf(" + FIN: %t\n", f.Header.Fin) + // fmt.Printf(" + OPC: %x\n", f.Header.Opc) // fmt.Printf(" + MASK?: %t\n", f.Header.Msk != nil) // if f.Header.Msk != nil { // fmt.Printf(" + MASK: %x\n", f.Header.Msk) @@ -73,11 +133,11 @@ func (f *Frame) ReadHeader(buf *bytes.Buffer, s net.Conn) error{ } -// ReadPayload reads the frame payload -func (f *Frame) ReadPayload(b *bytes.Buffer, s net.Conn) error{ +// readPayload reads the frame payload +func (f *Frame) readPayload(buf *bytes.Buffer, s net.Conn) error{ /* (1) Read payload */ - buf, err := reader.ReadBytes(s, uint(f.Payload.Length) ) + b, err := reader.ReadBytes(s, uint(f.Payload.Length) ) if err != nil { return fmt.Errorf("Cannot read payload (%s)", err) } f.Payload.Buffer = make([]byte, 0, f.Payload.Length) @@ -86,7 +146,7 @@ func (f *Frame) ReadPayload(b *bytes.Buffer, s net.Conn) error{ if len(f.Header.Msk) == 4 { - for i, b := range buf{ + for i, b := range b{ mi := i % 4 // mask index @@ -107,15 +167,128 @@ func (f *Frame) ReadPayload(b *bytes.Buffer, s net.Conn) error{ -// BuildFromPayload builds a frame from only a payload +// buildFrame builds a frame from a payload buffer // (as []byte) -func BuildFromPayload(payload []byte) *Frame { - return nil +func buildFrame(b []byte) *Frame { + + return &Frame{ + + Header: frame.Header{ + Fin: true, + Opc: opcode.TEXT, + Msk: nil, + }, + + Payload: frame.Payload{ + Buffer: b, + Length: uint64(len(b)), + }, + + } + } -// Bytes returns the BYTE representation of the frame -// for sending over the network (into socket) -func (f *Frame) Bytes() []byte { +// buildClose builds a closing frame +func buildClose() *Frame{ + return &Frame{ + + Header: frame.Header{ + Fin: true, + Opc: opcode.CLOSE, + Msk: nil, + }, + + Payload: frame.Payload{ + Buffer: []byte{}, + Length: 0, + }, + + } +} + + + +// buildPong builds a PONG frame (responding to PING frame) +func buildPong() *Frame{ + return &Frame{ + + Header: frame.Header{ + Fin: true, + Opc: opcode.PONG, + Msk: nil, + }, + + Payload: frame.Payload{ + Buffer: []byte{}, + Length: 0, + }, + + } +} + + +// Send sends a frame over a socket +func (f Frame) Send(s net.Conn) error { + + written := 0 + + /* (1) Byte 0 : FIN + opcode */ + buf := []byte{ 0x80 + byte(opcode.TEXT) } + w, err := s.Write(buf) + if err != nil { return err } + written += w + + /* (2) Get payload length */ + if f.Payload.Length < 126 { // simple + + buf := []byte{ byte(f.Payload.Length) } + w, err = s.Write(buf) + if err != nil { return err } + written += w + + } else if f.Payload.Length < 0xffff { // extended: 16 bits + + w, err = s.Write( []byte{126} ) + if err != nil { return err } + written += w + + buf := make([]byte, 2) + binary.BigEndian.PutUint16(buf, uint16(f.Payload.Length)) + w, err = s.Write(buf) + if err != nil { return err } + written += w + + } else if f.Payload.Length < 0xffffffffffffffff { // extended: 64 bits + + w, err = s.Write( []byte{127} ) + if err != nil { return err } + written += w + + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, f.Payload.Length) + w, err = s.Write(buf) + if err != nil { return err } + written += w + + } + + /* (3) Payload */ + w, err = s.Write(f.Payload.Buffer) + if err != nil { return err } + + fmt.Printf("[2] written %d bytes\n", written) + + + fmt.Printf(" + Header\n") + fmt.Printf(" + FIN: %t\n", f.Header.Fin) + fmt.Printf(" + OPC: %x\n", f.Header.Opc) + fmt.Printf(" + MASK?: %t\n", f.Header.Msk != nil) + if f.Header.Msk != nil { + fmt.Printf(" + MASK: %x\n", f.Header.Msk) + } + fmt.Printf(" + LEN: %d\n", f.Payload.Length) + fmt.Printf("Total written: %d bytes (%d + %d)\n", written+w, written, w) + return nil } \ No newline at end of file diff --git a/ws/frame/types.go b/ws/frame/types.go index ce7ea06..949d3b6 100644 --- a/ws/frame/types.go +++ b/ws/frame/types.go @@ -15,7 +15,7 @@ type OpCode byte type Header struct { Fin bool Opc OpCode - Msk []byte // len() = 4 if set, else empty + Msk []byte // len() = 4 if set, else nil } // Represents a frame message diff --git a/ws/private.go b/ws/private.go index 688ae78..867b886 100644 --- a/ws/private.go +++ b/ws/private.go @@ -18,7 +18,7 @@ func (s *Server) dispatch(sock net.Conn, u *upgrader.T){ sock: sock, Arguments: [][]string{ []string{ uri } }, Protocol: string(u.Response.GetProtocol()), - recvc: make(chan *Frame, maxChannelBufferLength), + recvc: make(chan Frame, maxChannelBufferLength), sendc: make(chan []byte, maxChannelBufferLength), closec: make(chan func(), maxChannelBufferLength), } diff --git a/ws/types.go b/ws/types.go index f83a9ed..4d3707d 100644 --- a/ws/types.go +++ b/ws/types.go @@ -10,7 +10,7 @@ const maxBufferLength = 4096 const maxChannelBufferLength = 1 // Represents a websocket controller callback function -type ControllerFunc func(*Client, <-chan *Frame, chan<- []byte, <-chan func()) +type ControllerFunc func(*Client, <-chan Frame, chan<- []byte, <-chan func()) // Represents a websocket controller type Controller struct { @@ -28,7 +28,7 @@ type Client struct { Controller *Controller // assigned controller Store struct{} // store (for client implementation-specific data) - recvc chan *Frame // Receive channel + recvc chan Frame // Receive channel sendc chan []byte // sending channel closec chan func() // closing channel }