From c3121df33fd0d9b541e7c6c9079ab63907b57801 Mon Sep 17 00:00:00 2001 From: xdrm-brackets Date: Sat, 28 Apr 2018 17:51:05 +0200 Subject: [PATCH] updated structure (break ws.* apart) + channel management --- cmd/iface/main.go | 35 ++++++--- ws/client.go | 89 ++++++++++++++++++++++ ws/frame.go | 104 +++++++++++++++++++++++++ ws/private.go | 188 ++++++---------------------------------------- ws/public.go | 22 ++---- ws/server.go | 31 ++++---- 6 files changed, 264 insertions(+), 205 deletions(-) create mode 100644 ws/client.go create mode 100644 ws/frame.go diff --git a/cmd/iface/main.go b/cmd/iface/main.go index bed2342..acf80d5 100644 --- a/cmd/iface/main.go +++ b/cmd/iface/main.go @@ -15,25 +15,42 @@ func main(){ serv := ws.CreateServer("0.0.0.0", 4444) /* (2) Bind default controller */ - err := serv.BindDefault(func(c *ws.Client, f chan *ws.Frame){ + serv.BindDefault(func(client *ws.Client, receiver <-chan *ws.Frame, sender chan<- []byte, closer <-chan func()){ fmt.Printf("[default] connected\n") - for frame := range f { - fmt.Printf("[default] received '%s'\n", frame.Buf) + for { + + select { + case receivedFrame := <- receiver: + fmt.Printf("[default] received '%s'\n", receivedFrame.Payload.Buffer) + sender <- receivedFrame.Payload.Buffer[1:] + case closeFunc := <- closer: + fmt.Printf("[default] client with protocol '%s' exited\n", client.Protocol) + closeFunc() + return + } } - fmt.Printf("[default] closed\n") + + fmt.Printf("[default] unexpectedly closed\n") }) - if err != nil { panic(err) } /* (3) Bind to URI */ - err = serv.Bind("/channel/./room/./", func(c *ws.Client, f chan *ws.Frame){ + err := serv.Bind("/channel/./room/./", func(client *ws.Client, receiver <-chan *ws.Frame, sender chan<- []byte, closer <-chan func()){ fmt.Printf("[uri] connected\n") - for frame := range f { - fmt.Printf("[uri] received '%s'\n", frame.Buf) + for { + + select { + case receivedFrame := <- receiver: + fmt.Printf("[uri] received '%s'\n", receivedFrame.Payload.Buffer) + case closeFunc := <- closer: + fmt.Printf("[uri] client with protocol '%s' exited\n", client.Protocol) + closeFunc() + return + } } - fmt.Printf("[uri] closed\n") + fmt.Printf("[uri] unexpectedly closed\n") }) if err != nil { panic(err) } diff --git a/ws/client.go b/ws/client.go new file mode 100644 index 0000000..07c4c04 --- /dev/null +++ b/ws/client.go @@ -0,0 +1,89 @@ +package ws + +import ( + "fmt" + "time" + "bytes" +) + + + + + + +// asyncReader reads a websocket frame chunk by chunk +// for a given client +func (c *Client) asyncReader(s *Server) { + + // Get buffer + buf := new(bytes.Buffer) + + // Get buffer frame + frame := new(Frame) + + + for { + + var startTime int64 = time.Now().UnixNano() + + // Try to read frame header + err := frame.ReadHeader(buf, c.sock) + + if err != nil { + fmt.Printf(" - Header reading error: %s\n", err) + break + } + + + // Try to read frame payload + err = frame.ReadPayload(buf, c.sock) + if err != nil { + fmt.Printf(" - Payload reading error: %s\n", err) + break + } + + fmt.Printf("+ elapsed: %.3f us\n", float32(time.Now().UnixNano()-startTime)/1e3) + + // Trigger data channel + c.recvc <- frame + + } + + fmt.Printf(" - remove client\n") + + // close sending channel + close(c.sendc) + + // return closing callback + c.closec <- func(){ + + // Remove client from server + delete(s.clients, c.sock) + + // close channels + close(c.recvc) + + // Close socket + c.sock.Close() + + // close this closing channel + close(c.closec) + + } + +} + + + +// asyncWriter writes into websocket +// and is triggered by client.sendc channel +func (c *Client) asyncWriter(s *Server){ + + + for buffer := range c.sendc { + + fmt.Printf("Writing '%s'\n", buffer) + + } + +} \ No newline at end of file diff --git a/ws/frame.go b/ws/frame.go new file mode 100644 index 0000000..935e331 --- /dev/null +++ b/ws/frame.go @@ -0,0 +1,104 @@ +package ws + +import ( + "encoding/binary" + "git.xdrm.io/gws/ws/frame" + "fmt" + "git.xdrm.io/gws/internal/ws/reader" + "net" + "bytes" +) + + +// ReadHeader reads the frame header +func (f *Frame) ReadHeader(buf *bytes.Buffer, s net.Conn) error{ + + /* (2) Byte 1: FIN and OpCode */ + b, err := reader.ReadBytes(s, 1) + if err != nil { return fmt.Errorf("Cannot read byte Fin nor OpCode (%s)", err) } + + f.Header.Fin = b[0] & 0x80 == 0x80 + f.Header.Opc = frame.OpCode( b[0] & 0x0f ) + + /* (3) Byte 2: Mask and Length[0] */ + b, err = reader.ReadBytes(s, 1) + if err != nil { return fmt.Errorf("Cannot read byte if has Mask nor Length (%s)", err) } + + // if mask, byte array not nil + if b[0] & 0x80 == 0x80 { + f.Header.Msk = []byte{} + } + + // payload length + f.Payload.Length = uint64( b[0] & 0x7f ) + + /* (4) Extended payload */ + if f.Payload.Length == 127 { + + bx, err := reader.ReadBytes(s, 8) + if err != nil { return fmt.Errorf("Cannot read payload extended length of 64 bytes (%s)", err) } + + f.Payload.Length = binary.BigEndian.Uint64(bx) + + } else if f.Payload.Length == 126 { + + bx, err := reader.ReadBytes(s, 2) + if err != nil { return fmt.Errorf("Cannot read payload extended length of 16 bytes (%s)", err) } + + f.Payload.Length = uint64( binary.BigEndian.Uint16(bx) ) + + } + + /* (5) Masking key */ + if f.Header.Msk != nil { + + bx, err := reader.ReadBytes(s, 4) + if err != nil { return fmt.Errorf("Cannot read mask or 32 bytes (%s)", err) } + + f.Header.Msk = make([]byte, 4) + copy(f.Header.Msk, bx) + + } + + // fmt.Printf(" + Header\n") + // fmt.Printf(" + FIN: %t\n", f.Header.Fin) + // fmt.Printf(" + MASK?: %t\n", f.Header.Msk != nil) + // if f.Header.Msk != nil { + // fmt.Printf(" + MASK: %x\n", f.Header.Msk) + // } + // fmt.Printf(" + LEN: %d\n", f.Payload.Length) + + return nil + +} + + +// ReadPayload reads the frame payload +func (f *Frame) ReadPayload(b *bytes.Buffer, s net.Conn) error{ + + /* (1) Read payload */ + buf, err := reader.ReadBytes(s, uint(f.Payload.Length) ) + if err != nil { return fmt.Errorf("Cannot read payload (%s)", err) } + + f.Payload.Buffer = make([]byte, 0, f.Payload.Length) + + /* (2) Unmask payload */ + if len(f.Header.Msk) == 4 { + + + for i, b := range buf{ + + mi := i % 4 // mask index + + f.Payload.Buffer = append(f.Payload.Buffer, b ^ f.Header.Msk[mi]) + + } + + } + + // fmt.Printf(" + Payload\n") + // fmt.Printf(" + Read: %d\n", len(f.Payload.Buffer)) + + return nil + +} \ No newline at end of file diff --git a/ws/private.go b/ws/private.go index 7142a01..688ae78 100644 --- a/ws/private.go +++ b/ws/private.go @@ -1,12 +1,6 @@ package ws import ( - "bytes" - "time" - "encoding/binary" - "git.xdrm.io/gws/ws/frame" - "fmt" - "git.xdrm.io/gws/internal/ws/reader" "git.xdrm.io/gws/upgrader" "net" ) @@ -16,17 +10,18 @@ func (s *Server) dispatch(sock net.Conn, u *upgrader.T){ /* (1) Build client ---------------------------------------------------------*/ - /* (1) Add socket */ - client := new(Client) - client.soc = sock - - /* (2) Add uri */ + /* (1) Get request URI */ uri := u.Request.GetURI() - client.arg = make([][]string, 1) - client.arg[0] = []string{ uri } - /* (3) Add protocol */ - client.pro = string(u.Response.GetProtocol()) + + client := &Client{ + sock: sock, + Arguments: [][]string{ []string{ uri } }, + Protocol: string(u.Response.GetProtocol()), + recvc: make(chan *Frame, maxChannelBufferLength), + sendc: make(chan []byte, maxChannelBufferLength), + closec: make(chan func(), maxChannelBufferLength), + } /* (2) Search for a controller (from URI) @@ -34,7 +29,7 @@ func (s *Server) dispatch(sock net.Conn, u *upgrader.T){ var controller *Controller = nil /* (1) Iterate over URI controllers */ - for _, c := range s.ctl { + for _, c := range s.controllers { /* (2) If matches */ if c.uri.Match(uri) { @@ -43,9 +38,7 @@ func (s *Server) dispatch(sock net.Conn, u *upgrader.T){ match := c.uri.GetAllMatch() /* (4) Add them to the 'arg' attribute */ - for _, m := range match { - client.arg = append(client.arg, m) - } + client.Arguments = append(client.Arguments, match...) /* (5) Mark that we have a controller */ controller = c @@ -55,9 +48,9 @@ func (s *Server) dispatch(sock net.Conn, u *upgrader.T){ } - /* (6) If no controller found */ - if controller == nil && s.def != nil { - controller = s.def + /* (6) If no controller found -> try default */ + if controller == nil && s.defaultController != nil { + controller = s.defaultController } /* (7) If no controller -> close socket */ @@ -69,156 +62,23 @@ func (s *Server) dispatch(sock net.Conn, u *upgrader.T){ /* (3) Manage client+controller ---------------------------------------------------------*/ - /* (1) Add client to server */ - s.cli = append(s.cli, client) + /* (1) Add controller to client */ + client.Controller = controller - /* (2) Add controller to client */ - client.ctl = controller + /* (2) Add client to server */ + s.clients[sock] = client /* (3) Bind controller */ - go controller.fun(client, controller.dat) + go controller.fun(client, client.recvc, client.sendc, client.closec) - /* (4) Launch continuous frame reader */ - go client.awaitFrame() + /* (4) Launch asynchronous frame writer */ + go client.asyncWriter(s) + /* (5) Run asynchronous frame reader */ + client.asyncReader(s) } -// awaitFrame reads a websocket frame chunk by chunk -// for a given client -func (c *Client) awaitFrame() { - - // Get buffer - buf := new(bytes.Buffer) - - // Get buffer frame - frame := new(Frame) - for { - - var startTime int64 = time.Now().UnixNano() - - // Try to read frame header - err := frame.ReadHeader(buf, c.soc) - - if err != nil { - fmt.Printf(" - Header reading error: %s\n", err) - break - } - - - // Try to read frame payload - err = frame.ReadPayload(buf, c.soc) - if err != nil { - fmt.Printf(" - Payload reading error: %s\n", err) - break - } - - fmt.Printf("+ elapsed: %.3f us\n", float32(time.Now().UnixNano()-startTime)/1e3) - - // Trigger data channel - c.ctl.dat <- frame - - } - - fmt.Printf(" - error; disconnecting\n") - close(c.ctl.dat) - -} - - -// ReadHeader reads the frame header -func (f *Frame) ReadHeader(buf *bytes.Buffer, s net.Conn) error{ - - /* (2) Byte 1: FIN and OpCode */ - b, err := reader.ReadBytes(s, 1) - if err != nil { return fmt.Errorf("Cannot read byte Fin nor OpCode (%s)", err) } - - f.H.Fin = b[0] & 0x80 == 0x80 - f.H.Opc = frame.OpCode( b[0] & 0x0f ) - - /* (3) Byte 2: Mask and Length[0] */ - b, err = reader.ReadBytes(s, 1) - if err != nil { return fmt.Errorf("Cannot read byte if has Mask nor Length (%s)", err) } - - // if mask, byte array not nil - if b[0] & 0x80 == 0x80 { - f.H.Msk = []byte{} - } - - // payload length - f.Len = uint64( b[0] & 0x7f ) - - /* (4) Extended payload */ - if f.Len == 127 { - - bx, err := reader.ReadBytes(s, 8) - if err != nil { return fmt.Errorf("Cannot read payload extended length of 64 bytes (%s)", err) } - - f.Len = binary.BigEndian.Uint64(bx) - - } else if f.Len == 126 { - - bx, err := reader.ReadBytes(s, 2) - if err != nil { return fmt.Errorf("Cannot read payload extended length of 16 bytes (%s)", err) } - - f.Len = uint64( binary.BigEndian.Uint16(bx) ) - - } - - /* (5) Masking key */ - if f.H.Msk != nil { - - bx, err := reader.ReadBytes(s, 4) - if err != nil { return fmt.Errorf("Cannot read mask or 32 bytes (%s)", err) } - - f.H.Msk = make([]byte, 4) - copy(f.H.Msk, bx) - - } - - // fmt.Printf(" + Header\n") - // fmt.Printf(" + FIN: %t\n", f.H.Fin) - // fmt.Printf(" + MASK?: %t\n", f.H.Msk != nil) - // if f.H.Msk != nil { - // fmt.Printf(" + MASK: %x\n", f.H.Msk) - // } - // fmt.Printf(" + LEN: %d\n", f.Len) - - return nil - -} - - -// ReadPayload reads the frame payload -func (f *Frame) ReadPayload(b *bytes.Buffer, s net.Conn) error{ - - /* (1) Read payload */ - buf, err := reader.ReadBytes(s, uint(f.Len) ) - if err != nil { return fmt.Errorf("Cannot read payload (%s)", err) } - - f.Buf = make([]byte, 0, f.Len) - - /* (2) Unmask payload */ - if len(f.H.Msk) == 4 { - - - for i, b := range buf{ - - mi := i % 4 // mask index - - f.Buf = append(f.Buf, b ^ f.H.Msk[mi]) - - } - - } - - // fmt.Printf(" + Payload\n") - // fmt.Printf(" + Read: %d\n", len(f.Buf)) - - return nil - -} - diff --git a/ws/public.go b/ws/public.go index 4abab79..cc32c97 100644 --- a/ws/public.go +++ b/ws/public.go @@ -1,25 +1,19 @@ package ws import () +import "net" // CreateServer creates a server for a specific HOST and PORT func CreateServer(host string, port uint16) *Server{ - inst := new(Server) - - inst.adr = make([]byte, len(host)) - copy(inst.adr, []byte(host)) - - inst.prt = port - - inst.cli = make([]*Client, 0) - - inst.def = nil - - inst.ctl = make([]*Controller, 0) - - return inst + return &Server{ + addr: []byte(host), + port: port, + clients: make(map[net.Conn]*Client, 0), + defaultController: nil, + controllers: make([]*Controller, 0), + } } diff --git a/ws/server.go b/ws/server.go index 9c36426..a224a47 100644 --- a/ws/server.go +++ b/ws/server.go @@ -11,13 +11,12 @@ import ( // BindDefault binds a default controller // it will be called if the URI does not // match another controller -func (s *Server) BindDefault(c ControllerFunc) error{ - s.def = new(Controller); - s.def.uri = nil - s.def.fun = c - s.def.dat = make(chan *Frame, maxChannelBufferLength) +func (s *Server) BindDefault(c ControllerFunc){ - return nil + s.defaultController = &Controller{ + uri: nil, + fun: c, + } } // Bind binds a controller to an URI scheme @@ -28,13 +27,10 @@ func (s *Server) Bind(uri string, c ControllerFunc) error { if err != nil { return fmt.Errorf("Cannot build URI: %s", err) } /* (2) Create controller */ - ctl := new(Controller) - ctl.uri = uriScheme - ctl.fun = c - ctl.dat = make(chan *Frame, maxChannelBufferLength) - - /* (3) Add to server */ - s.ctl = append(s.ctl, ctl) + s.controllers = append(s.controllers, &Controller{ + uri: uriScheme, + fun: c, + } ) return nil @@ -49,13 +45,14 @@ func (s *Server) Launch() error { /* (1) Listen socket ---------------------------------------------------------*/ /* (1) Build full url */ - url := fmt.Sprintf("%s:%d", s.adr, s.prt) + url := fmt.Sprintf("%s:%d", s.addr, s.port) /* (3) Bind listen socket */ - s.soc, err = net.Listen("tcp", url) + s.sock, err = net.Listen("tcp", url) if err != nil { return fmt.Errorf("Listen socket: %s", err) } + defer s.sock.Close() fmt.Printf("+ listening on %s\n", url) @@ -65,7 +62,7 @@ func (s *Server) Launch() error { for { // {1} Wait for connection // - sock, err := s.soc.Accept() + sock, err := s.sock.Accept() fmt.Printf(" + new client\n") if err != nil { fmt.Printf(" - error: %s\n", err) @@ -91,6 +88,4 @@ func (s *Server) Launch() error { } - return nil - } \ No newline at end of file