From a31269118fed4ed042dbade51fdd4dad428fc087 Mon Sep 17 00:00:00 2001 From: xdrm-brackets Date: Thu, 3 May 2018 19:31:09 +0200 Subject: [PATCH] BIG UPDATE; brand new code structure (1 public package) + read by chunks if needed --- cmd/gwstester/tester.go | 61 -------- cmd/iface/main.go | 52 +++---- internal/ws/reader/reader.go | 26 ---- ws/client.go | 235 +++++++++++++++++++++++++++++ ws/client/public.go | 164 -------------------- ws/client/types.go | 29 ---- ws/controller.go | 65 ++++++++ ws/controller/public.go | 40 ----- ws/controller/types.go | 28 ---- ws/message.go | 195 ++++++++++++++++++++++++ ws/message/public.go | 139 ----------------- ws/message/types.go | 23 --- ws/{server/public.go => server.go} | 91 +++++++---- ws/server/types.go | 30 ---- 14 files changed, 577 insertions(+), 601 deletions(-) delete mode 100644 cmd/gwstester/tester.go delete mode 100644 internal/ws/reader/reader.go create mode 100644 ws/client.go delete mode 100644 ws/client/public.go delete mode 100644 ws/client/types.go create mode 100644 ws/controller.go delete mode 100644 ws/controller/public.go delete mode 100644 ws/controller/types.go create mode 100644 ws/message.go delete mode 100644 ws/message/public.go delete mode 100644 ws/message/types.go rename ws/{server/public.go => server.go} (54%) delete mode 100644 ws/server/types.go diff --git a/cmd/gwstester/tester.go b/cmd/gwstester/tester.go deleted file mode 100644 index 1f8f292..0000000 --- a/cmd/gwstester/tester.go +++ /dev/null @@ -1,61 +0,0 @@ -package main - -import ( - "git.xdrm.io/gws/upgrader" - "fmt" - "net" - "os" - "time" -) - -func main() { - - /* (1) Create listening socket - ---------------------------------------------------------*/ - /* (1) Create socket */ - lsock, err := net.Listen("tcp", ":4444") - if err != nil { - panic(err) - } - - /* (2) Accept clients */ - for { - - sock, err := lsock.Accept() - if err != nil { - os.Stderr.WriteString(fmt.Sprintf("Connection error: %s\n", err)) - return - } - - go manageClient(sock) - - } - -} - -func manageClient(sock net.Conn) { - - startTime := time.Now().UnixNano() - - defer sock.Close() - - for { - - fmt.Printf("+ new client\n") - - fmt.Printf(" + upgrade\n") - err := upgrader.Upgrade(sock) - - if err != nil { - fmt.Printf(" + error: %s\n", err) - return - } - fmt.Printf(" + 2-way exchange\n") - - break; - - } - - fmt.Printf("+ elapsed: %1.1f us\n", float32(time.Now().UnixNano()-startTime)/1e3) - -} diff --git a/cmd/iface/main.go b/cmd/iface/main.go index e7559f1..a7f51f7 100644 --- a/cmd/iface/main.go +++ b/cmd/iface/main.go @@ -1,10 +1,8 @@ package main import ( - "git.xdrm.io/gws/ws/message" - "git.xdrm.io/gws/ws/controller" + "git.xdrm.io/gws/ws" "time" - "git.xdrm.io/gws/ws/server" "fmt" ) @@ -14,48 +12,46 @@ func main(){ startTime := time.Now().UnixNano() /* (1) Bind WebSocket server */ - serv := server.Create("0.0.0.0", 4444) + serv := ws.CreateServer("0.0.0.0", 4444) /* (2) Bind default controller */ - serv.BindDefault(func(client *controller.ClientInterface, receiver <-chan message.T, sender chan<- message.T, closer <-chan func()){ + serv.BindDefault(func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message) chan struct{}{ - for { + var closer chan struct{} = make(chan struct{}, 1) - select { + go func(){ + for msg := range receiver { // if receive message -> send it back - case msg := <- receiver: - sender <- msg - - // if received closer, close after doing stuff - case closeCallback := <- closer: - closeCallback() - break + sender <- &msg } + }() - } + return closer }) /* (3) Bind to URI */ - err := serv.Bind("/channel/./room/./", func(client *controller.ClientInterface, receiver <-chan message.T, sender chan<- message.T, closer <-chan func()){ + err := serv.Bind("/channel/./room/./", func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message) chan struct{}{ - fmt.Printf("[uri] connected\n") - for { + var closer chan struct{} = make(chan struct{}, 1) + + go func(){ + fmt.Printf("[uri] connected\n") + + for msg := range receiver{ + + fmt.Printf("[uri] received '%s'\n", msg.Data) + sender <- &msg - select { - case msg := <- receiver: - fmt.Printf("[uri] received '%s'\n", msg.Data) - sender <- msg - case closeFunc := <- closer: - fmt.Printf("[uri] client with protocol '%s' exited\n", client.Protocol) - closeFunc() - return } - } - fmt.Printf("[uri] unexpectedly closed\n") + fmt.Printf("[uri] unexpectedly closed\n") + }() + + + return closer }) if err != nil { panic(err) } diff --git a/internal/ws/reader/reader.go b/internal/ws/reader/reader.go deleted file mode 100644 index e7bf0e1..0000000 --- a/internal/ws/reader/reader.go +++ /dev/null @@ -1,26 +0,0 @@ -package reader - -import ( - "io" - "bufio" -) - -// Maximum chunk size -const MaxChunkSize = 4096 - - -// Read reads a chunk of n bytes -// err is io.EOF when done -func ReadBytes(br *bufio.Reader, n int) ([]byte, error){ - - buf, err := br.Peek(n) - - if err == io.EOF && len(buf) < n && n > 0 { - err = io.ErrUnexpectedEOF - } - - br.Discard(len(buf)) - - return buf, err - -} \ No newline at end of file diff --git a/ws/client.go b/ws/client.go new file mode 100644 index 0000000..09a0eaf --- /dev/null +++ b/ws/client.go @@ -0,0 +1,235 @@ +package ws + +import ( + "bufio" + "encoding/binary" + "git.xdrm.io/gws/upgrader" + "net" + "fmt" +) + +// Represents a client socket utility (reader, writer, ..) +type clientIO struct { + sock net.Conn + reader *bufio.Reader + kill chan<- *client +} + +// Represents all channels that need a client +type clientChannelSet struct{ + receive chan Message + send chan *Message + closer chan struct{} +} + +// Represents a websocket client +type client struct { + io clientIO + iface *Client + ch clientChannelSet + status MessageError // close status ; 0 = nothing +} + + + + +// Create creates a new client +func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*client, error){ + + /* (1) Manage UPGRADE request + ---------------------------------------------------------*/ + upgrader, err := upgrader.Upgrade(s) + if err != nil { + s.Close() + return nil, fmt.Errorf("Upgrade error: %s\n", err) + } + + if upgrader.Response.GetStatusCode() != 101 { + s.Close() + return nil, fmt.Errorf("Upgrade error (HTTP %d)\n", upgrader.Response.GetStatusCode()) + } + + + /* (2) Initialise client + ---------------------------------------------------------*/ + /* (1) Get upgrade data */ + clientURI := upgrader.Request.GetURI() + clientProtocol := upgrader.Response.GetProtocol() + + /* (2) Initialise client */ + instance := &client{ + io: clientIO{ + sock: s, + reader: bufio.NewReader(s), + kill: make(chan<- *client, 1), + }, + + iface: &Client{ + Protocol: string(clientProtocol), + Arguments: [][]string{ []string{ clientURI } }, + }, + + ch: clientChannelSet{ + receive: make(chan Message, 1), + send: make(chan *Message, 1), + closer: make(chan struct{}, 1), + }, + } + + + + /* (3) Find controller by URI + ---------------------------------------------------------*/ + /* (1) Try to find one */ + controller, arguments := ctl.Match(clientURI); + + /* (2) If nothing found -> error */ + if controller == nil { + return nil, fmt.Errorf("No controller found, no default controller set\n") + } + + /* (3) Copy arguments */ + instance.iface.Arguments = arguments + + + + /* (4) Launch client routines + ---------------------------------------------------------*/ + /* (1) Launch client controller */ + instance.ch.closer = controller.Fun( + instance.iface, // pass the client + instance.ch.receive, // the receiver + instance.ch.send, // the sender + serverCh.broadcast, // broadcast sender + ) + + /* (2) Launch close handler */ + go func(){ + for range instance.ch.closer {} + instance.close(NORMAL) + }() + + + /* (3) Launch message reader */ + go instance.reader() + + /* (4) Launc writer */ + go instance.writer() + + return instance, nil + +} + + +// reader reads and parses messages from the buffer +func (c *client) reader(){ + + for { + + /* (1) Parse message */ + msg, err := readMessage(c.io.reader) + if err != nil { + // fmt.Printf(" [reader] %s\n", err) + c.close(NORMAL) + return + } + + /* (2) CLOSE */ + if msg.Type == CLOSE { + c.close(NORMAL) + return + + } + + /* (3) PING size error */ + if msg.Type == PING && msg.Size > 125 { + c.close(PROTOCOL_ERR) + return + } + + /* (4) Send PONG */ + if msg.Type == PING { + msg.Final = true + msg.Type = PONG + c.ch.send <- msg + continue + } + + /* (5) Unknown opcode */ + if msg.Type != TEXT && msg.Type != BINARY { + c.close(PROTOCOL_ERR) + return + } + + /* (5) Dispatch to receiver */ + c.ch.receive <- *msg + + } + + /* (6) close channel */ + c.close(NORMAL) + +} + + + +// writer writes into websocket +// and is triggered by client.ch.send channel +func (c *client) writer(){ + + for msg := range c.ch.send { + + err := msg.Send(c.io.sock) + + if err != nil { + fmt.Printf(" [writer] %s\n", err) + break + } + + + } + + /* (3) proper close */ + c.close(NORMAL) + +} + + + + +// close writes the error message (if needed) +// and it closes the socket +func (c *client) close(status MessageError){ + + /* (1) If error status -> send close frame */ + if status != NONE { + + /* Create message */ + msg := &Message{ + Final: true, + Type: CLOSE, + Data: make([]byte, 8), + } + binary.BigEndian.PutUint16(msg.Data, uint16(status)) + msg.Data = append(msg.Data, []byte("Closing")...) + msg.Size = uint( len(msg.Data) ) + + /* Send message */ + msg.Send(c.io.sock) + + } + + /* (2) Close socket */ + c.io.sock.Close() + + /* (3) Dereference data */ + c.ch.receive = nil + c.ch.send = nil + c.ch.closer = nil + c.iface = nil + c.io.reader = nil + + /* (4) Unregister */ + c.io.kill <- c + +} \ No newline at end of file diff --git a/ws/client/public.go b/ws/client/public.go deleted file mode 100644 index 0505667..0000000 --- a/ws/client/public.go +++ /dev/null @@ -1,164 +0,0 @@ -package client - -import ( - "git.xdrm.io/gws/ws/controller" - "git.xdrm.io/gws/ws/message" - "git.xdrm.io/gws/upgrader" - "net" - "fmt" -) - - - -// Create creates a new client -func Create(s net.Conn, ctl controller.Set, unregister chan <-*T) (*T, error){ - - /* (1) Manage UPGRADE request - ---------------------------------------------------------*/ - upgrader, err := upgrader.Upgrade(s) - if err != nil { - s.Close() - return nil, fmt.Errorf("Upgrade error: %s\n", err) - } - - if upgrader.Response.GetStatusCode() != 101 { - s.Close() - return nil, fmt.Errorf("Upgrade error (HTTP %d)\n", upgrader.Response.GetStatusCode()) - } - - - /* (2) Initialise client - ---------------------------------------------------------*/ - /* (1) Get upgrade data */ - clientURI := upgrader.Request.GetURI() - clientProtocol := upgrader.Response.GetProtocol() - - /* (2) Initialise client */ - instance := &T{ - IO: ClientIO{ - Sock: s, - }, - - iface: &controller.ClientInterface{ - Protocol: string(clientProtocol), - Arguments: [][]string{ []string{ clientURI } }, - }, - - Ch: ClientChannelSet{ - Receive: make(chan message.T, 1), - Send: make(chan message.T, 1), - Close: make(chan func(), 1), - }, - } - - - - /* (3) Find controller by URI - ---------------------------------------------------------*/ - /* (1) Try to find one */ - controller, arguments := ctl.Match(clientURI); - - /* (2) If nothing found -> error */ - if controller == nil { - return nil, fmt.Errorf("No controller found, no default controller set\n") - } - - /* (3) Copy data */ - instance.controller = controller - instance.iface.Arguments = arguments - - - - /* (4) Launch client routines - ---------------------------------------------------------*/ - /* (1) Launch client controller */ - go instance.controller.Fun( - instance.iface, // pass the client - instance.Ch.Receive, // the receiver - instance.Ch.Send, // the sender - instance.Ch.Close, // the closer - ) - - /* (2) Launch message reader */ - go instance.reader(unregister) - - /* (3) Launc writer */ - go instance.writer(unregister) - - - return instance, nil - -} - - -// reader reads and parses messages from the buffer -func (c *T) reader(unregister chan <-*T){ - - for { - - /* (1) Parse message */ - msg, err := message.Read(c.IO.Sock) - if err != nil { - fmt.Printf(" [reader] %s\n", err) - break - } - - /* (2) If CLOSE */ - if msg.Type == message.CLOSE { - - // fmt.Printf(" [reader] CLOSE\n") - break - - } else if msg.Type == message.PING { - - // fmt.Printf(" [reader] PING\n") - msg.Final = true - msg.Type = message.PONG - c.Ch.Send <- *msg - // fmt.Printf(" [reader] sent PONG back\n") - continue - - } else if msg.Size == 0 && ( msg.Type == message.TEXT || msg.Type == message.BINARY ) { - break - } - - /* (3) Dispatch to receiver */ - c.Ch.Receive <- *msg - - } - - // return closing callback - c.Ch.Close <- func(){ - - unregister <- c - } - -} - - - -// writer writes into websocket -// and is triggered by client.ch.send channel -func (c *T) writer(unregister chan <-*T){ - - for message := range c.Ch.Send { - - c.IO.wriMu.Lock() - - err := message.Send(c.IO.Sock) - - c.IO.wriMu.Unlock() - - if err != nil { - fmt.Printf(" [writer] %s\n", err) - break; - } - - } - - // return closing callback - c.Ch.Close <- func(){ - unregister <- c - } - -} \ No newline at end of file diff --git a/ws/client/types.go b/ws/client/types.go deleted file mode 100644 index 312aab7..0000000 --- a/ws/client/types.go +++ /dev/null @@ -1,29 +0,0 @@ -package client - -import ( - "git.xdrm.io/gws/ws/controller" - "git.xdrm.io/gws/ws/message" - "sync" - "net" -) - -// Represents a client socket utility (reader, writer, ..) -type ClientIO struct { - Sock net.Conn - wriMu sync.Mutex -} - -// Represents all channels that need a client -type ClientChannelSet struct{ - Receive chan message.T - Send chan message.T - Close chan func() -} - -// Represents a websocket client -type T struct { - IO ClientIO - iface *controller.ClientInterface - controller *controller.T // assigned controller - Ch ClientChannelSet -} \ No newline at end of file diff --git a/ws/controller.go b/ws/controller.go new file mode 100644 index 0000000..186b77a --- /dev/null +++ b/ws/controller.go @@ -0,0 +1,65 @@ +package ws + +import ( + "git.xdrm.io/gws/internal/uri/parser" +) + +// Represents available information about a client +type Client struct { + Protocol string // choosen protocol (Sec-WebSocket-Protocol) + Arguments [][]string // URI parameters, index 0 is full URI, then matching groups + Store struct{} // store (for client implementation-specific data) +} + +// Represents a websocket controller callback function +type ControllerFunc func(*Client, <-chan Message, chan<- *Message, chan<- *Message) chan struct{} + +// Represents a websocket controller +type Controller struct { + URI *parser.Scheme // uri scheme + Fun ControllerFunc // controller function +} + +// Represents a controller set +type ControllerSet struct { + Def *Controller // default controller + Uri []*Controller // uri controllers +} + + +// Match finds a controller for a given URI +// also it returns the matching string patterns +func (s *ControllerSet) Match(uri string) (*Controller, [][]string){ + + /* (1) Initialise argument list */ + arguments := [][]string{ []string{ uri } } + + + /* (2) Try each controller */ + for _, c := range s.Uri { + + /* 1. If matches */ + if c.URI.Match(uri) { + + /* Extract matches */ + match := c.URI.GetAllMatch() + + /* Add them to the 'arg' attribute */ + arguments = append(arguments, match...) + + /* Mark that we have a controller */ + return c, arguments + + } + + } + + /* (3) If no controller found -> set default controller */ + if s.Def != nil { + return s.Def, arguments + } + + /* (4) If default is NIL, return empty controller */ + return nil, arguments + +} \ No newline at end of file diff --git a/ws/controller/public.go b/ws/controller/public.go deleted file mode 100644 index f34667f..0000000 --- a/ws/controller/public.go +++ /dev/null @@ -1,40 +0,0 @@ -package controller - - - -// Match finds a controller for a given URI -// also it returns the matching string patterns -func (s *Set) Match(uri string) (*T, [][]string){ - - /* (1) Initialise argument list */ - arguments := [][]string{ []string{ uri } } - - - /* (2) Try each controller */ - for _, c := range s.Uri { - - /* 1. If matches */ - if c.URI.Match(uri) { - - /* Extract matches */ - match := c.URI.GetAllMatch() - - /* Add them to the 'arg' attribute */ - arguments = append(arguments, match...) - - /* Mark that we have a controller */ - return c, arguments - - } - - } - - /* (3) If no controller found -> set default controller */ - if s.Def != nil { - return s.Def, arguments - } - - /* (4) If default is NIL, return empty controller */ - return nil, arguments - -} \ No newline at end of file diff --git a/ws/controller/types.go b/ws/controller/types.go deleted file mode 100644 index c970491..0000000 --- a/ws/controller/types.go +++ /dev/null @@ -1,28 +0,0 @@ -package controller - -import ( - "git.xdrm.io/gws/ws/message" - "git.xdrm.io/gws/internal/uri/parser" -) - -// Represents available information about a client -type ClientInterface struct { - Protocol string // choosen protocol (Sec-WebSocket-Protocol) - Arguments [][]string // URI parameters, index 0 is full URI, then matching groups - Store struct{} // store (for client implementation-specific data) -} - -// Represents a websocket controller callback function -type Func func(*ClientInterface, <-chan message.T, chan<- message.T, <-chan func()) - -// Represents a websocket controller -type T struct { - URI *parser.Scheme // uri scheme - Fun Func // controller function -} - -// Represents a controller set -type Set struct { - Def *T // default controller - Uri []*T // uri controllers -} diff --git a/ws/message.go b/ws/message.go new file mode 100644 index 0000000..026609e --- /dev/null +++ b/ws/message.go @@ -0,0 +1,195 @@ +package ws + +import ( + "bytes" + "bufio" + "io" + "net" + "encoding/binary" +) + +// Maximum Header Size = Final/OpCode + isMask/Length + Length + Mask +const maximumHeaderSize = 1 + 1 + 8 + 4 + +// Lists websocket close status +type MessageError uint16 + +const ( + NONE MessageError = 0 + NORMAL MessageError = 1000 + GOING_AWAY MessageError = 1001 + PROTOCOL_ERR MessageError = 1002 + UNACCEPTABLE_OPCODE MessageError = 1003 + INVALID_PAYLOAD MessageError = 1007 // utf8 + MESSAGE_TOO_LARGE MessageError = 1009 +) + +// Lists websocket message types +type MessageType byte + +const ( + CONTINUATION MessageType = 0x0 + TEXT MessageType = 0x1 + BINARY MessageType = 0x2 + CLOSE MessageType = 0x8 + PING MessageType = 0x9 + PONG MessageType = 0xa +); + + +// Represents a websocket message +type Message struct { + Type MessageType + Data []byte + Size uint + Final bool +} + + +// receive reads a message form reader +func readMessage(reader *bufio.Reader) (*Message, error){ + + var err error + var tmpBuf []byte + var mask []byte + var cursor int + + m := new(Message) + + /* (2) Byte 1: FIN and OpCode */ + tmpBuf = make([]byte, 1) + _, err = reader.Read(tmpBuf) + if err != nil { return nil, err } + + + m.Final = bool( tmpBuf[0] & 0x80 == 0x80 ) + m.Type = MessageType( tmpBuf[0] & 0x0f ) + + /* (3) Byte 2: Mask and Length[0] */ + tmpBuf = make([]byte, 1) + _, err = reader.Read(tmpBuf) + if err != nil { return nil, err } + + // if mask, byte array not nil + if tmpBuf[0] & 0x80 == 0x80 { + mask = make([]byte, 0) + } + + // payload length + m.Size = uint( tmpBuf[0] & 0x7f ) + + /* (4) Extended payload */ + if m.Size == 127 { + + tmpBuf = make([]byte, 8) + _, err := reader.Read(tmpBuf) + if err != nil { return nil, err } + + m.Size = uint( binary.BigEndian.Uint64(tmpBuf) ) + + } else if m.Size == 126 { + + tmpBuf = make([]byte, 2) + _, err := reader.Read(tmpBuf) + if err != nil { return nil, err } + + m.Size = uint( binary.BigEndian.Uint16(tmpBuf) ) + + } + + /* (5) Masking key */ + if mask != nil { + + tmpBuf = make([]byte, 4) + _, err := reader.Read(tmpBuf) + if err != nil { return nil, err } + + mask = make([]byte, 4) + copy(mask, tmpBuf) + + } + + /* (6) Read payload by chunks */ + m.Data = make([]byte, int(m.Size)) + + // If empty payload + if m.Size <= 0 { + return m, nil + } + + cursor = 0 + + // {1} While we have data to read // + for uint(cursor) < m.Size { + + // {2} Try to read (at least 1 byte) // + nbread, err := io.ReadAtLeast(reader, m.Data[cursor:m.Size], 1) + if err != nil { + return nil, err + } + + // {3} Unmask data // + if mask != nil { + for i, l := cursor, cursor+nbread ; i < l ; i++ { + + mi := i % 4 // mask index + m.Data[i] = m.Data[i] ^ mask[mi] + + } + } + + // {4} Update cursor // + cursor += nbread + + } + + return m, nil + +} + + + + +// Send sends a frame over a socket +func (m Message) Send(socket net.Conn) error { + + header := make([]byte, 0, maximumHeaderSize) + + /* (1) Byte 0 : FIN + opcode */ + header = append(header, 0x80 | byte(TEXT) ) + + /* (2) Get payload length */ + if m.Size < 126 { // simple + + header = append(header, byte(m.Size) ) + + } else if m.Size < 0xffff { // extended: 16 bits + + header = append(header, 126) + + buf := make([]byte, 2) + binary.BigEndian.PutUint16(buf, uint16(m.Size)) + header = append(header, buf...) + + } else if m.Size < 0xffffffffffffffff { // extended: 64 bits + + header = append(header, 127) + + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(m.Size)) + header = append(header, buf...) + + } + + buffer := bytes.NewBuffer(header) + + /* (3) Add payload */ + _, err := buffer.Write(m.Data) + if err != nil { return err } + + /* (4) Send over socket */ + _, err = io.Copy(socket, buffer) + if err != nil { return err } + + return nil +} \ No newline at end of file diff --git a/ws/message/public.go b/ws/message/public.go deleted file mode 100644 index c16ffb8..0000000 --- a/ws/message/public.go +++ /dev/null @@ -1,139 +0,0 @@ -package message - -import ( - "net" - "encoding/binary" - "fmt" -) - - - - -// Read reads a message form reader -func Read(socket net.Conn) (*T, error){ - - m := new(T) - - var buffer []byte; - - /* (2) Byte 1: FIN and OpCode */ - buffer = make([]byte, 1) - _, err := socket.Read(buffer) - if err != nil { return nil, err } - - - m.Final = bool( buffer[0] & 0x80 == 0x80 ) - m.Type = Type( buffer[0] & 0x0f ) - - /* (3) Byte 2: Mask and Length[0] */ - buffer = make([]byte, 1) - _, err = socket.Read(buffer) - if err != nil { return nil, err } - - // if mask, byte array not nil - var mask []byte = nil; - if buffer[0] & 0x80 == 0x80 { - mask = make([]byte, 0) - } - - // payload length - m.Size = uint( buffer[0] & 0x7f ) - - /* (4) Extended payload */ - if m.Size == 127 { - - buffer := make([]byte, 8) - _, err := socket.Read(buffer) - if err != nil { return nil, err } - - m.Size = uint( binary.BigEndian.Uint64(buffer) ) - - } else if m.Size == 126 { - - buffer := make([]byte, 2) - _, err := socket.Read(buffer) - if err != nil { return nil, err } - - m.Size = uint( binary.BigEndian.Uint16(buffer) ) - - } - - /* (5) Masking key */ - if mask != nil { - - buffer := make([]byte, 4) - _, err := socket.Read(buffer) - if err != nil { return nil, err } - - mask = make([]byte, 4) - copy(mask, buffer) - - } - - /* (6) Read payload */ - buffer = make([]byte, int(m.Size)) - _, err = socket.Read(buffer) - if err != nil { return nil, fmt.Errorf("Cannot read payload (%s)", err) } - - m.Data = make([]byte, 0, m.Size) - - /* (7) Unmask payload */ - if len(mask) == 4 { - - for i, b := range buffer{ - - mi := i % 4 // mask index - - m.Data = append(m.Data, b ^ mask[mi]) - - } - - } - - return m, nil - -} - - - - -// Send sends a frame over a socket -func (m T) Send(socket net.Conn) error { - - buffer := make([]byte, 0) - - /* (1) Byte 0 : FIN + opcode */ - buffer = append(buffer, 0x80 | byte(TEXT) ) - - /* (2) Get payload length */ - if m.Size < 126 { // simple - - buffer = append(buffer, byte(m.Size) ) - - } else if m.Size < 0xffff { // extended: 16 bits - - buffer = append(buffer, 126) - - buf := make([]byte, 2) - binary.BigEndian.PutUint16(buf, uint16(m.Size)) - buffer = append(buffer, buf...) - - } else if m.Size < 0xffffffffffffffff { // extended: 64 bits - - buffer = append(buffer, 127) - - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, uint64(m.Size)) - buffer = append(buffer, buf...) - - } - - /* (3) Add payload */ - buffer = append(buffer, m.Data...) - - /* (4) Send over socket */ - _, err := socket.Write(buffer) - if err != nil { return err } - - return nil -} \ No newline at end of file diff --git a/ws/message/types.go b/ws/message/types.go deleted file mode 100644 index 07390cd..0000000 --- a/ws/message/types.go +++ /dev/null @@ -1,23 +0,0 @@ -package message - - -// Lists websocket message types -type Type byte - -const ( - CONTINUATION Type = 0x0 - TEXT Type = 0x1 - BINARY Type = 0x2 - CLOSE Type = 0x8 - PING Type = 0x9 - PONG Type = 0xa -); - - -// Represents a websocket message -type T struct { - Type Type - Data []byte - Size uint - Final bool -} \ No newline at end of file diff --git a/ws/server/public.go b/ws/server.go similarity index 54% rename from ws/server/public.go rename to ws/server.go index 898b692..c44a796 100644 --- a/ws/server/public.go +++ b/ws/server.go @@ -1,32 +1,53 @@ -package server +package ws import ( - "git.xdrm.io/gws/ws/message" - "git.xdrm.io/gws/ws/controller" - "git.xdrm.io/gws/ws/client" "net" "fmt" "git.xdrm.io/gws/internal/uri/parser" ) -// CreateServer creates a server for a specific HOST and PORT -func Create(host string, port uint16) *T{ +// Represents all channels that need a server +type serverChannelSet struct{ + register chan *client + unregister chan *client + broadcast chan *Message +} - return &T{ + +// Represents a websocket server +type Server struct { + sock net.Listener // listen socket + addr []byte // server listening ip/host + port uint16 // server listening port + + clients map[net.Conn]*client + + ctl ControllerSet // controllers + + ch serverChannelSet +} + + + + +// CreateServer creates a server for a specific HOST and PORT +func CreateServer(host string, port uint16) *Server{ + + return &Server{ addr: []byte(host), port: port, - clients: make(map[net.Conn]*client.T, 0), + clients: make(map[net.Conn]*client, 0), - ctl: controller.Set{ + ctl: ControllerSet{ Def: nil, - Uri: make([]*controller.T, 0), + Uri: make([]*Controller, 0), }, - ch: ServerChannelSet{ - register: make(chan *client.T, 1), - unregister: make(chan *client.T, 1), - broadcast: make(chan message.T, 1), + ch: serverChannelSet{ + register: make(chan *client, 1), + unregister: make(chan *client, 1), + broadcast: make(chan *Message, 1), }, } @@ -36,9 +57,9 @@ func Create(host string, port uint16) *T{ // BindDefault binds a default controller // it will be called if the URI does not // match another controller -func (s *T) BindDefault(f controller.Func){ +func (s *Server) BindDefault(f ControllerFunc){ - s.ctl.Def = &controller.T{ + s.ctl.Def = &Controller{ URI: nil, Fun: f, } @@ -47,14 +68,14 @@ func (s *T) BindDefault(f controller.Func){ // Bind binds a controller to an URI scheme -func (s *T) Bind(uri string, f controller.Func) error { +func (s *Server) Bind(uri string, f ControllerFunc) error { /* (1) Build URI parser */ uriScheme, err := parser.Build(uri) if err != nil { return fmt.Errorf("Cannot build URI: %s", err) } /* (2) Create controller */ - s.ctl.Uri = append(s.ctl.Uri, &controller.T{ + s.ctl.Uri = append(s.ctl.Uri, &Controller{ URI: uriScheme, Fun: f, } ) @@ -65,7 +86,7 @@ func (s *T) Bind(uri string, f controller.Func) error { // Launch launches the websocket server -func (s *T) Launch() error { +func (s *Server) Launch() error { var err error @@ -99,15 +120,19 @@ func (s *T) Launch() error { break } - /* (2) Try to create client */ - cli, err := client.Create(sock, s.ctl, s.ch.unregister) - if err != nil { - fmt.Printf(" - %s\n", err) - continue - } + go func(){ - /* (3) Register client */ - s.ch.register <- cli + /* (2) Try to create client */ + cli, err := buildClient(sock, s.ctl, s.ch) + if err != nil { + fmt.Printf(" - %s\n", err) + return + } + + /* (3) Register client */ + s.ch.register <- cli + + }() } @@ -117,7 +142,7 @@ func (s *T) Launch() error { // Scheduler schedules clients registration and broadcast -func (s *T) scheduler(){ +func (s *Server) scheduler(){ for { @@ -127,22 +152,22 @@ func (s *T) scheduler(){ case client := <- s.ch.register: // fmt.Printf(" + client\n") - s.clients[client.IO.Sock] = client + s.clients[client.io.sock] = client /* (2) New client */ case client := <- s.ch.unregister: // fmt.Printf(" - client\n") - delete(s.clients, client.IO.Sock) - client.IO.Sock.Close() + s.clients[client.io.sock] = nil + delete(s.clients, client.io.sock) /* (3) Broadcast */ - case message := <- s.ch.broadcast: + case msg := <- s.ch.broadcast: fmt.Printf(" + broadcast\n") for _, c := range s.clients{ - c.Ch.Send <- message + c.ch.send <- msg } } diff --git a/ws/server/types.go b/ws/server/types.go deleted file mode 100644 index 88a3bda..0000000 --- a/ws/server/types.go +++ /dev/null @@ -1,30 +0,0 @@ -package server - -import ( - "git.xdrm.io/gws/ws/message" - "git.xdrm.io/gws/ws/controller" - "git.xdrm.io/gws/ws/client" - "net" -) - -// Represents all channels that need a server -type ServerChannelSet struct{ - register chan *client.T - unregister chan *client.T - broadcast chan message.T -} - - -// Represents a websocket server -type T struct { - sock net.Listener // listen socket - addr []byte // server listening ip/host - port uint16 // server listening port - - clients map[net.Conn]*client.T // clients - - ctl controller.Set // controllers - - ch ServerChannelSet -} -