diff --git a/cmd/iface/main.go b/cmd/iface/main.go index 2dcbf2e..aca887f 100644 --- a/cmd/iface/main.go +++ b/cmd/iface/main.go @@ -1,8 +1,10 @@ package main import ( + "git.xdrm.io/gws/ws/message" + "git.xdrm.io/gws/ws/controller" "time" - "git.xdrm.io/gws/ws" + "git.xdrm.io/gws/ws/server" "fmt" ) @@ -12,19 +14,18 @@ func main(){ startTime := time.Now().UnixNano() /* (1) Bind WebSocket server */ - serv := ws.CreateServer("0.0.0.0", 4444) + serv := server.Create("0.0.0.0", 4444) /* (2) Bind default controller */ - serv.BindDefault(func(client *ws.Client, receiver <-chan ws.Frame, sender chan<- []byte, closer <-chan func()){ + serv.BindDefault(func(client *controller.ClientInterface, receiver <-chan message.T, sender chan<- message.T, closer <-chan func()){ fmt.Printf("[default] connected\n") for { select { - case receivedFrame := <- receiver: - fmt.Printf("[default] received '%s'\n", receivedFrame.Payload.Buffer) - sender <- receivedFrame.Payload.Buffer - fmt.Printf("[default] sent\n") + case msg := <- receiver: + fmt.Printf("[default] received '%s'\n", msg.Data) + sender <- msg case closeFunc := <- closer: fmt.Printf("[default] client with protocol '%s' exited\n", client.Protocol) closeFunc() @@ -37,14 +38,14 @@ func main(){ }) /* (3) Bind to URI */ - err := serv.Bind("/channel/./room/./", func(client *ws.Client, receiver <-chan ws.Frame, sender chan<- []byte, closer <-chan func()){ + err := serv.Bind("/channel/./room/./", func(client *controller.ClientInterface, receiver <-chan message.T, sender chan<- message.T, closer <-chan func()){ fmt.Printf("[uri] connected\n") for { select { - case receivedFrame := <- receiver: - fmt.Printf("[uri] received '%s'\n", receivedFrame.Payload.Buffer) + case msg := <- receiver: + fmt.Printf("[uri] received '%s'\n", msg.Data) case closeFunc := <- closer: fmt.Printf("[uri] client with protocol '%s' exited\n", client.Protocol) closeFunc() diff --git a/ws/client.go b/ws/client.go deleted file mode 100644 index e1e2639..0000000 --- a/ws/client.go +++ /dev/null @@ -1,77 +0,0 @@ -package ws - -import ( - "fmt" - "time" -) - - - - - - -// asyncReader reads a websocket frame chunk by chunk -// for a given client -func (c *Client) asyncReader(s *Server) { - - - for { - - var startTime int64 = time.Now().UnixNano() - - // Try to read frame header - frame, err := ReadFrame(c) - if err != nil { - fmt.Printf("[read.err] %s\n", err) - break - } - - var elapsed = float32(time.Now().UnixNano()-startTime)/1e3 - - fmt.Printf("+ elapsed: %.3f us\n", elapsed) - - // Trigger data channel - c.recvc <- *frame - - } - - fmt.Printf(" - remove client\n") - - // return closing callback - c.closec <- func(){ - - // Remove client from server - s.clientsMutex.Lock() - delete(s.clients, c.conn.sock) - s.clientsMutex.Unlock() - - // Close socket - c.conn.sock.Close() - - } - -} - - - -// asyncWriter writes into websocket -// and is triggered by client.sendc channel -func (c *Client) asyncWriter(s *Server){ - - - for payload := range c.sendc { - - // Build Frame - f := buildFrame(payload) - - // Send over socket - senderr := f.Send(&c.conn) - if senderr != nil { - fmt.Printf("Writing error: %s\n", senderr) - } - - - } - - -} \ No newline at end of file diff --git a/ws/client/public.go b/ws/client/public.go new file mode 100644 index 0000000..548e3c7 --- /dev/null +++ b/ws/client/public.go @@ -0,0 +1,173 @@ +package client + +import ( + "git.xdrm.io/gws/ws/controller" + "git.xdrm.io/gws/ws/message" + "git.xdrm.io/gws/upgrader" + "net" + "fmt" +) + + + +// Create creates a new client +func Create(s net.Conn, ctl controller.Set, unregister chan <-*T) (*T, error){ + + /* (1) Manage UPGRADE request + ---------------------------------------------------------*/ + upgrader, err := upgrader.Upgrade(s) + if err != nil { + s.Close() + return nil, fmt.Errorf("Upgrade error: %s\n", err) + } + + if upgrader.Response.GetStatusCode() != 101 { + s.Close() + return nil, fmt.Errorf("Upgrade error (HTTP %d)\n", upgrader.Response.GetStatusCode()) + } + + + /* (2) Initialise client + ---------------------------------------------------------*/ + /* (1) Get upgrade data */ + clientURI := upgrader.Request.GetURI() + clientProtocol := upgrader.Response.GetProtocol() + + /* (2) Initialise client */ + instance := &T{ + IO: ClientIO{ + Sock: s, + }, + + iface: &controller.ClientInterface{ + Protocol: string(clientProtocol), + Arguments: [][]string{ []string{ clientURI } }, + }, + + Ch: ClientChannelSet{ + Receive: make(chan message.T, 1), + Send: make(chan message.T, 1), + Close: make(chan func(), 1), + }, + } + + + + /* (3) Find controller by URI + ---------------------------------------------------------*/ + /* (1) Try to find one */ + controller, arguments := ctl.Match(clientURI); + + /* (2) If nothing found -> error */ + if controller == nil { + return nil, fmt.Errorf("No controller found, no default controller set\n") + } + + /* (3) Copy data */ + instance.controller = controller + instance.iface.Arguments = arguments + + + + /* (4) Launch client routines + ---------------------------------------------------------*/ + /* (1) Launch client controller */ + go instance.controller.Fun( + instance.iface, // pass the client + instance.Ch.Receive, // the receiver + instance.Ch.Send, // the sender + instance.Ch.Close, // the closer + ) + + /* (2) Launch message reader */ + go instance.reader(unregister) + + /* (3) Launc writer */ + go instance.writer(unregister) + + + return instance, nil + +} + + +// reader reads and parses messages from the buffer +func (c *T) reader(unregister chan <-*T){ + + fmt.Printf("[reader] start\n"); + + for { + + /* (1) Parse message */ + msg, err := message.Read(c.IO.Sock) + if err != nil { + fmt.Printf(" [reader] %s\n", err) + break + } + fmt.Printf(" [reader] ok\n"); + + /* (2) If CLOSE */ + if msg.Type == message.CLOSE { + + fmt.Printf(" [reader] CLOSE\n") + break + + } else if msg.Type == message.PING { + + fmt.Printf(" [reader] PING\n") + msg.Final = true + msg.Type = message.PONG + c.Ch.Send <- *msg + fmt.Printf(" [reader] sent PONG back\n") + continue + + } + + /* (3) Dispatch to receiver */ + c.Ch.Receive <- *msg + + } + + fmt.Printf("[reader] end\n") + + // return closing callback + c.Ch.Close <- func(){ + unregister <- c + } + +} + + + +// writer writes into websocket +// and is triggered by client.ch.send channel +func (c *T) writer(unregister chan <-*T){ + + fmt.Printf("[writer] start\n"); + + for message := range c.Ch.Send { + + c.IO.wriMu.Lock() + + err := message.Send(c.IO.Sock) + + c.IO.wriMu.Unlock() + + if err != nil { + fmt.Printf(" [writer] %s\n", err) + break; + } + + fmt.Printf(" [writer] ok\n") + + } + + + fmt.Printf("[writer] end\n") + + // return closing callback + c.Ch.Close <- func(){ + unregister <- c + } + +} \ No newline at end of file diff --git a/ws/client/types.go b/ws/client/types.go new file mode 100644 index 0000000..312aab7 --- /dev/null +++ b/ws/client/types.go @@ -0,0 +1,29 @@ +package client + +import ( + "git.xdrm.io/gws/ws/controller" + "git.xdrm.io/gws/ws/message" + "sync" + "net" +) + +// Represents a client socket utility (reader, writer, ..) +type ClientIO struct { + Sock net.Conn + wriMu sync.Mutex +} + +// Represents all channels that need a client +type ClientChannelSet struct{ + Receive chan message.T + Send chan message.T + Close chan func() +} + +// Represents a websocket client +type T struct { + IO ClientIO + iface *controller.ClientInterface + controller *controller.T // assigned controller + Ch ClientChannelSet +} \ No newline at end of file diff --git a/ws/controller/public.go b/ws/controller/public.go new file mode 100644 index 0000000..f34667f --- /dev/null +++ b/ws/controller/public.go @@ -0,0 +1,40 @@ +package controller + + + +// Match finds a controller for a given URI +// also it returns the matching string patterns +func (s *Set) Match(uri string) (*T, [][]string){ + + /* (1) Initialise argument list */ + arguments := [][]string{ []string{ uri } } + + + /* (2) Try each controller */ + for _, c := range s.Uri { + + /* 1. If matches */ + if c.URI.Match(uri) { + + /* Extract matches */ + match := c.URI.GetAllMatch() + + /* Add them to the 'arg' attribute */ + arguments = append(arguments, match...) + + /* Mark that we have a controller */ + return c, arguments + + } + + } + + /* (3) If no controller found -> set default controller */ + if s.Def != nil { + return s.Def, arguments + } + + /* (4) If default is NIL, return empty controller */ + return nil, arguments + +} \ No newline at end of file diff --git a/ws/controller/types.go b/ws/controller/types.go new file mode 100644 index 0000000..c970491 --- /dev/null +++ b/ws/controller/types.go @@ -0,0 +1,28 @@ +package controller + +import ( + "git.xdrm.io/gws/ws/message" + "git.xdrm.io/gws/internal/uri/parser" +) + +// Represents available information about a client +type ClientInterface struct { + Protocol string // choosen protocol (Sec-WebSocket-Protocol) + Arguments [][]string // URI parameters, index 0 is full URI, then matching groups + Store struct{} // store (for client implementation-specific data) +} + +// Represents a websocket controller callback function +type Func func(*ClientInterface, <-chan message.T, chan<- message.T, <-chan func()) + +// Represents a websocket controller +type T struct { + URI *parser.Scheme // uri scheme + Fun Func // controller function +} + +// Represents a controller set +type Set struct { + Def *T // default controller + Uri []*T // uri controllers +} diff --git a/ws/frame.go b/ws/frame.go deleted file mode 100644 index 172e733..0000000 --- a/ws/frame.go +++ /dev/null @@ -1,278 +0,0 @@ -package ws - -import ( - "bufio" - "git.xdrm.io/gws/ws/frame/opcode" - "encoding/binary" - "git.xdrm.io/gws/ws/frame" - "fmt" - "git.xdrm.io/gws/internal/ws/reader" - "net" -) - - - - -// ReadFrame reads the frame from a socket -func ReadFrame(c *Client) (*Frame, error) { - - f := new(Frame) - - /* (1) Read header - ---------------------------------------------------------*/ - err := f.readHeader(c.conn.br) - if err != nil { - return nil, err - } - - - /* (2) Read payload - ---------------------------------------------------------*/ - err = f.readPayload(c.conn.br) - if err != nil { - return nil, err - } - - - /* (3) Manage OpCode - ---------------------------------------------------------*/ - switch( f.Header.Opc ){ - - case opcode.CLOSE: - fmt.Printf("Opcode: CLOSE\n") - return nil, fmt.Errorf("Closed by client\n") - - case opcode.TEXT: - fmt.Printf("Opcode: TEXT\n") - - case opcode.BINARY: - fmt.Printf("Opcode: BINARY\n") - - case opcode.PING: - fmt.Printf("Opcode: PING\n") - err = buildPong().Send(&c.conn) - if err != nil { - return nil, fmt.Errorf("Pong frame: %s\n", err) - } - - - default: - fmt.Printf("Opcode: CLOSE\n") - buildClose().Send(&c.conn) - return nil, fmt.Errorf("Unknown Opcode %x\n", f.Header.Opc) - - } - - return f, nil - -} - - -// readHeader reads the frame header -func (f *Frame) readHeader(br *bufio.Reader) error{ - var err error - - /* (2) Byte 1: FIN and OpCode */ - b, err := reader.ReadBytes(br, 1) - if err != nil { return err } - - f.Header.Fin = b[0] & 0x80 == 0x80 - f.Header.Opc = frame.OpCode( b[0] & 0x0f ) - - /* (3) Byte 2: Mask and Length[0] */ - b, err = reader.ReadBytes(br, 1) - if err != nil { return err } - - // if mask, byte array not nil - if b[0] & 0x80 == 0x80 { - f.Header.Msk = []byte{} - } - - // payload length - f.Payload.Length = uint64( b[0] & 0x7f ) - - /* (4) Extended payload */ - if f.Payload.Length == 127 { - - bx, err := reader.ReadBytes(br, 8) - if err != nil { return err } - - f.Payload.Length = binary.BigEndian.Uint64(bx) - - } else if f.Payload.Length == 126 { - - bx, err := reader.ReadBytes(br, 2) - if err != nil { return err } - - f.Payload.Length = uint64( binary.BigEndian.Uint16(bx) ) - - } - - /* (5) Masking key */ - if f.Header.Msk != nil { - - bx, err := reader.ReadBytes(br, 4) - if err != nil { return err } - - f.Header.Msk = make([]byte, 4) - copy(f.Header.Msk, bx) - - } - - // fmt.Printf(" + Header\n") - // fmt.Printf(" + FIN: %t\n", f.Header.Fin) - // fmt.Printf(" + OPC: %x\n", f.Header.Opc) - // fmt.Printf(" + MASK?: %t\n", f.Header.Msk != nil) - // if f.Header.Msk != nil { - // fmt.Printf(" + MASK: %x\n", f.Header.Msk) - // } - // fmt.Printf(" + LEN: %d\n", f.Payload.Length) - - return nil - -} - - -// readPayload reads the frame payload -func (f *Frame) readPayload(br *bufio.Reader) error{ - - /* (1) Read payload */ - b, err := reader.ReadBytes(br, int(f.Payload.Length) ) - if err != nil { return fmt.Errorf("Cannot read payload (%s)", err) } - - f.Payload.Buffer = make([]byte, 0, f.Payload.Length) - - /* (2) Unmask payload */ - if len(f.Header.Msk) == 4 { - - - for i, b := range b{ - - mi := i % 4 // mask index - - f.Payload.Buffer = append(f.Payload.Buffer, b ^ f.Header.Msk[mi]) - - } - - } - - // fmt.Printf(" + Payload\n") - // fmt.Printf(" + Read: %d\n", len(f.Payload.Buffer)) - - return nil - -} - - - - - -// buildFrame builds a frame from a payload buffer -// (as []byte) -func buildFrame(b []byte) *Frame { - - return &Frame{ - - Header: frame.Header{ - Fin: true, - Opc: opcode.TEXT, - Msk: nil, - }, - - Payload: frame.Payload{ - Buffer: b, - Length: uint64(len(b)), - }, - - } - -} - - -// buildClose builds a closing frame -func buildClose() *Frame{ - return &Frame{ - - Header: frame.Header{ - Fin: true, - Opc: opcode.CLOSE, - Msk: nil, - }, - - Payload: frame.Payload{ - Buffer: []byte{}, - Length: 0, - }, - - } -} - - - -// buildPong builds a PONG frame (responding to PING frame) -func buildPong() *Frame{ - return &Frame{ - - Header: frame.Header{ - Fin: true, - Opc: opcode.PONG, - Msk: nil, - }, - - Payload: frame.Payload{ - Buffer: []byte{}, - Length: 0, - }, - - } -} - - -// Send sends a frame over a socket -func (f Frame) Send(c *Conn) error { - - frameHeader := make([]byte, 0, maxHeaderLength) - - /* (1) Byte 0 : FIN + opcode */ - frameHeader = append(frameHeader, 0x80 | byte(opcode.TEXT) ) - - /* (2) Get payload length */ - if f.Payload.Length < 126 { // simple - - frameHeader = append(frameHeader, byte(f.Payload.Length) ) - - } else if f.Payload.Length < 0xffff { // extended: 16 bits - - - frameHeader = append(frameHeader, 126) - - buf := make([]byte, 2) - binary.BigEndian.PutUint16(buf, uint16(f.Payload.Length)) - frameHeader = append(frameHeader, buf...) - - } else if f.Payload.Length < 0xffffffffffffffff { // extended: 64 bits - - frameHeader = append(frameHeader, 127) - - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, f.Payload.Length) - frameHeader = append(frameHeader, buf...) - - } - - /* (3) Add payload */ - writeBuffer := net.Buffers{frameHeader, f.Payload.Buffer[:f.Payload.Length]} - fmt.Printf("[send]\n") - _, err := writeBuffer.WriteTo(c.sock) - fmt.Printf("[/send] ") - if err != nil { return err } - - - // fmt.Printf(" + Header\n") - // fmt.Printf(" + FIN: %t\n", f.Header.Fin) - // fmt.Printf(" + OPC: %x\n", f.Header.Opc) - // fmt.Printf(" + MASK?: %t\n", f.Header.Msk != nil) - // fmt.Printf(" + LEN: %d\n", f.Payload.Length) - - return nil -} \ No newline at end of file diff --git a/ws/message/public.go b/ws/message/public.go new file mode 100644 index 0000000..8d2ac46 --- /dev/null +++ b/ws/message/public.go @@ -0,0 +1,183 @@ +package message + +import ( + "net" + "encoding/binary" + "fmt" +) + + + + +// Read reads a message form reader +func Read(socket net.Conn) (*T, error){ + + m := new(T) + + var buffer []byte; + + /* (2) Byte 1: FIN and OpCode */ + buffer = make([]byte, 1) + _, err := socket.Read(buffer) + if err != nil { return nil, err } + + + m.Final = bool( buffer[0] & 0x80 == 0x80 ) + m.Type = byte( buffer[0] & 0x0f ) + fmt.Printf(" + read\n") + fmt.Printf(" - final: %t\n", m.Final) + fmt.Printf(" - Type: %x\n", m.Type) + + /* (3) Byte 2: Mask and Length[0] */ + buffer = make([]byte, 1) + _, err = socket.Read(buffer) + if err != nil { return nil, err } + + // if mask, byte array not nil + var mask []byte = nil; + if buffer[0] & 0x80 == 0x80 { + mask = make([]byte, 0) + } + fmt.Printf(" - has mask ? %t\n", mask != nil) + + // payload length + m.Size = uint( buffer[0] & 0x7f ) + fmt.Printf(" - size/flag: %d\n", m.Size) + + /* (4) Extended payload */ + if m.Size == 127 { + + buffer := make([]byte, 8) + _, err := socket.Read(buffer) + if err != nil { return nil, err } + + m.Size = uint( binary.BigEndian.Uint64(buffer) ) + + } else if m.Size == 126 { + + buffer := make([]byte, 2) + _, err := socket.Read(buffer) + if err != nil { return nil, err } + + m.Size = uint( binary.BigEndian.Uint16(buffer) ) + + } + + fmt.Printf(" - final size: %d\n", m.Size) + + /* (5) Masking key */ + if mask != nil { + + buffer := make([]byte, 4) + _, err := socket.Read(buffer) + if err != nil { return nil, err } + + mask = make([]byte, 4) + copy(mask, buffer) + + } + fmt.Printf(" - mask: %x\n", mask) + + /* (6) Read payload */ + buffer = make([]byte, int(m.Size)) + _, err = socket.Read(buffer) + if err != nil { return nil, fmt.Errorf("Cannot read payload (%s)", err) } + + m.Data = make([]byte, 0, m.Size) + fmt.Printf(" - raw data: '%s'\n", buffer) + + /* (7) Unmask payload */ + if len(mask) == 4 { + + for i, b := range buffer{ + + mi := i % 4 // mask index + + m.Data = append(m.Data, b ^ mask[mi]) + + } + + } + + + fmt.Printf(" - unmasked data: '%s'\n", m.Data) + + return m, nil + +} + + + + +// Send sends a frame over a socket +func (m T) Send(socket net.Conn) error { + + var bi uint = 0; + + buffer := make([]byte, 0) + fmt.Printf(" + write\n") + fmt.Printf(" - final: %t\n", m.Final) + fmt.Printf(" - type: %x\n", m.Type) + + /* (1) Byte 0 : FIN + opcode */ + buffer = append(buffer, 0x80 | TEXT ) + fmt.Printf(" > byte[%d]: '%x'\n", bi, 0x80 | TEXT); bi++ + fmt.Printf(" ? '%x'\n", buffer[bi-1]); + + /* (2) Get payload length */ + if m.Size < 126 { // simple + + buffer = append(buffer, byte(m.Size) ) + fmt.Printf(" - size: %d\n", m.Size) + fmt.Printf(" > byte[%d]: '%.2x'\n", bi, m.Size); bi++ + fmt.Printf(" ? '%.2x'\n", buffer[bi-1]); + + } else if m.Size < 0xffff { // extended: 16 bits + + buffer = append(buffer, 126) + + buf := make([]byte, 2) + binary.BigEndian.PutUint16(buf, uint16(m.Size)) + buffer = append(buffer, buf...) + fmt.Printf(" - size flag: %d\n", 126) + fmt.Printf(" > byte[%d]: '%.2x'\n", bi, 126); bi++ + fmt.Printf(" ? '%x'\n", buffer[bi-1]); + fmt.Printf(" - size: %d\n", m.Size) + fmt.Printf(" > byte[%d.%d]: '%4.x'\n", bi, bi+2-1, m.Size); bi+=2 + fmt.Printf(" ? '%.4x'\n", buffer[bi-2:bi]); + + } else if m.Size < 0xffffffffffffffff { // extended: 64 bits + + buffer = append(buffer, 127) + + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(m.Size)) + buffer = append(buffer, buf...) + fmt.Printf(" - size flag: %d\n", 127) + fmt.Printf(" > byte[%d]: '%.2x'\n", bi, 127); bi++ + fmt.Printf(" ? '%x'\n", buffer[bi-1]); + fmt.Printf(" - size: %d\n", m.Size) + fmt.Printf(" > byte[%d.%d]: '%16.x'\n", bi, bi+8-1, m.Size); bi+=8 + fmt.Printf(" ? '%.16x'\n", buffer[bi-8:bi]); + + } + + /* (3) Write header */ + // fmt.Printf(" - header: %x\n", buffer) + // _, err := socket.Write(buffer) + // if err != nil { return err } + + /* (4) Add payload */ + buffer = append(buffer, m.Data...) + fmt.Printf(" - data: '%s'\n", m.Data) + fmt.Printf(" - data: %x\n", m.Data) + fmt.Printf(" > byte[%d.%d]: '%x'\n", bi, bi+m.Size-1, buffer[bi:]); bi+=m.Size + + /* (4) Send message */ + nbw, err := socket.Write(buffer) + fmt.Printf(" - buffer: '%x'\n", buffer) + fmt.Printf(" - written: %d / %d\n", nbw, bi) + if err != nil { return err } + + return nil +} \ No newline at end of file diff --git a/ws/message/types.go b/ws/message/types.go new file mode 100644 index 0000000..27641af --- /dev/null +++ b/ws/message/types.go @@ -0,0 +1,20 @@ +package message + + +// Lists websocket message types +const ( + TEXT = 0x1 + BINARY = 0x2 + CLOSE = 0x8 + PING = 0x9 + PONG = 0xa +); + + +// Represents a websocket message +type T struct { + Type byte + Data []byte + Size uint + Final bool +} \ No newline at end of file diff --git a/ws/private.go b/ws/private.go deleted file mode 100644 index 2ad5d28..0000000 --- a/ws/private.go +++ /dev/null @@ -1,90 +0,0 @@ -package ws - -import ( - "bufio" - "git.xdrm.io/gws/upgrader" - "net" -) - -// dispatch finds a controller for a client -func (s *Server) dispatch(sock net.Conn, u *upgrader.T){ - - /* (1) Build client - ---------------------------------------------------------*/ - /* (1) Get request URI */ - uri := u.Request.GetURI() - - - client := &Client{ - conn: Conn{ - sock: sock, - br: bufio.NewReader(sock), - }, - Arguments: [][]string{ []string{ uri } }, - Protocol: string(u.Response.GetProtocol()), - recvc: make(chan Frame, maxChannelBufferLength), - sendc: make(chan []byte, maxChannelBufferLength), - closec: make(chan func(), maxChannelBufferLength), - } - - - /* (2) Search for a controller (from URI) - ---------------------------------------------------------*/ - var controller *Controller = nil - - /* (1) Iterate over URI controllers */ - for _, c := range s.controllers { - - /* (2) If matches */ - if c.uri.Match(uri) { - - /* (3) Extract matches */ - match := c.uri.GetAllMatch() - - /* (4) Add them to the 'arg' attribute */ - client.Arguments = append(client.Arguments, match...) - - /* (5) Mark that we have a controller */ - controller = c - break - - } - - } - - /* (6) If no controller found -> try default */ - if controller == nil && s.defaultController != nil { - controller = s.defaultController - } - - /* (7) If no controller -> close socket */ - if controller == nil { - sock.Close() - return - } - - - /* (3) Manage client+controller - ---------------------------------------------------------*/ - /* (1) Add controller to client */ - client.Controller = controller - - /* (2) Add client to server */ - s.clientsMutex.Lock() - s.clients[sock] = client - s.clientsMutex.Unlock() - - /* (3) Bind controller */ - go controller.fun(client, client.recvc, client.sendc, client.closec) - - /* (4) Launch asynchronous frame writer */ - go client.asyncWriter(s) - - /* (5) Run asynchronous frame reader */ - go client.asyncReader(s) - -} - - - - diff --git a/ws/public.go b/ws/public.go deleted file mode 100644 index cc32c97..0000000 --- a/ws/public.go +++ /dev/null @@ -1,19 +0,0 @@ -package ws - -import () -import "net" - - - -// CreateServer creates a server for a specific HOST and PORT -func CreateServer(host string, port uint16) *Server{ - - return &Server{ - addr: []byte(host), - port: port, - clients: make(map[net.Conn]*Client, 0), - defaultController: nil, - controllers: make([]*Controller, 0), - } - -} diff --git a/ws/server.go b/ws/server.go deleted file mode 100644 index 69fd935..0000000 --- a/ws/server.go +++ /dev/null @@ -1,91 +0,0 @@ -package ws - -import ( - "git.xdrm.io/gws/upgrader" - "net" - "fmt" - "git.xdrm.io/gws/internal/uri/parser" -) - - -// BindDefault binds a default controller -// it will be called if the URI does not -// match another controller -func (s *Server) BindDefault(c ControllerFunc){ - - s.defaultController = &Controller{ - uri: nil, - fun: c, - } -} - -// Bind binds a controller to an URI scheme -func (s *Server) Bind(uri string, c ControllerFunc) error { - - /* (1) Build URI parser */ - uriScheme, err := parser.Build(uri) - if err != nil { return fmt.Errorf("Cannot build URI: %s", err) } - - /* (2) Create controller */ - s.controllers = append(s.controllers, &Controller{ - uri: uriScheme, - fun: c, - } ) - - return nil - -} - - -// Launch launches the websocket server -func (s *Server) Launch() error { - - var err error - - /* (1) Listen socket - ---------------------------------------------------------*/ - /* (1) Build full url */ - url := fmt.Sprintf("%s:%d", s.addr, s.port) - - /* (3) Bind listen socket */ - s.sock, err = net.Listen("tcp", url) - if err != nil { - return fmt.Errorf("Listen socket: %s", err) - } - defer s.sock.Close() - - fmt.Printf("+ listening on %s\n", url) - - - /* (2) For each incoming connection (client) - ---------------------------------------------------------*/ - for { - - // {1} Wait for connection // - sock, err := s.sock.Accept() - fmt.Printf(" + new client\n") - if err != nil { - fmt.Printf(" - error: %s\n", err) - continue - } - - // {2} Upgrade request // - upgrader, err := upgrader.Upgrade(sock) - if err != nil { - fmt.Printf(" - upgrade error: %s\n", err) - sock.Close() - continue - } - if upgrader.Response.GetStatusCode() != 101 { - fmt.Printf(" - upgrade bad request (status code %d)\n", upgrader.Response.GetStatusCode()) - sock.Close() - continue - } - - // {3} Dispatch to controllers // - go s.dispatch(sock, upgrader) - - - } - -} \ No newline at end of file diff --git a/ws/server/public.go b/ws/server/public.go new file mode 100644 index 0000000..17e3d37 --- /dev/null +++ b/ws/server/public.go @@ -0,0 +1,154 @@ +package server + +import ( + "git.xdrm.io/gws/ws/message" + "git.xdrm.io/gws/ws/controller" + "git.xdrm.io/gws/ws/client" + "net" + "fmt" + "git.xdrm.io/gws/internal/uri/parser" +) + +// CreateServer creates a server for a specific HOST and PORT +func Create(host string, port uint16) *T{ + + return &T{ + addr: []byte(host), + port: port, + + clients: make(map[net.Conn]*client.T, 0), + + ctl: controller.Set{ + Def: nil, + Uri: make([]*controller.T, 0), + }, + + ch: ServerChannelSet{ + register: make(chan *client.T, 1), + unregister: make(chan *client.T, 1), + broadcast: make(chan message.T, 1), + }, + } + +} + + +// BindDefault binds a default controller +// it will be called if the URI does not +// match another controller +func (s *T) BindDefault(f controller.Func){ + + s.ctl.Def = &controller.T{ + URI: nil, + Fun: f, + } + +} + + +// Bind binds a controller to an URI scheme +func (s *T) Bind(uri string, f controller.Func) error { + + /* (1) Build URI parser */ + uriScheme, err := parser.Build(uri) + if err != nil { return fmt.Errorf("Cannot build URI: %s", err) } + + /* (2) Create controller */ + s.ctl.Uri = append(s.ctl.Uri, &controller.T{ + URI: uriScheme, + Fun: f, + } ) + + return nil + +} + + +// Launch launches the websocket server +func (s *T) Launch() error { + + var err error + + /* (1) Listen socket + ---------------------------------------------------------*/ + /* (1) Build full url */ + url := fmt.Sprintf("%s:%d", s.addr, s.port) + + /* (2) Bind socket to listen */ + s.sock, err = net.Listen("tcp", url) + if err != nil { + return fmt.Errorf("Listen socket: %s", err) + } + + defer s.sock.Close() + + fmt.Printf("+ listening on %s\n", url) + + /* (3) Launch scheduler */ + go s.scheduler() + + + + /* (2) For each incoming connection (client) + ---------------------------------------------------------*/ + for { + + /* (1) Wait for client */ + sock, err := s.sock.Accept() + if err != nil { + break + } + + /* (2) Try to create client */ + cli, err := client.Create(sock, s.ctl, s.ch.unregister) + if err != nil { + fmt.Printf(" - %s\n", err) + continue + } + + /* (3) Register client */ + s.ch.register <- cli + + } + + return nil + +} + + +// Scheduler schedules clients registration and broadcast +func (s *T) scheduler(){ + + for { + + select { + + /* (1) New client */ + case client := <- s.ch.register: + + fmt.Printf(" + client (sock: %p)\n", client.IO.Sock) + s.clients[client.IO.Sock] = client + + /* (2) New client */ + case client := <- s.ch.unregister: + + fmt.Printf(" - client (sock: %p)\n", client.IO.Sock) + delete(s.clients, client.IO.Sock) + client.IO.Sock.Close() + + /* (3) Broadcast */ + case message := <- s.ch.broadcast: + + fmt.Printf(" + broadcast\n") + + for _, c := range s.clients{ + c.Ch.Send <- message + } + + } + + } + + fmt.Printf("+ server stopped\n") + +} diff --git a/ws/server/types.go b/ws/server/types.go new file mode 100644 index 0000000..88a3bda --- /dev/null +++ b/ws/server/types.go @@ -0,0 +1,30 @@ +package server + +import ( + "git.xdrm.io/gws/ws/message" + "git.xdrm.io/gws/ws/controller" + "git.xdrm.io/gws/ws/client" + "net" +) + +// Represents all channels that need a server +type ServerChannelSet struct{ + register chan *client.T + unregister chan *client.T + broadcast chan message.T +} + + +// Represents a websocket server +type T struct { + sock net.Listener // listen socket + addr []byte // server listening ip/host + port uint16 // server listening port + + clients map[net.Conn]*client.T // clients + + ctl controller.Set // controllers + + ch ServerChannelSet +} + diff --git a/ws/spec/const.go b/ws/spec/const.go new file mode 100644 index 0000000..b190920 --- /dev/null +++ b/ws/spec/const.go @@ -0,0 +1,5 @@ +package spec + +const maxBufferLength = 4096 +const maxHeaderLength = 2 + 8 + 4 +const maxChannelBufferLength = 1