BIG UPDATE; brand new code structure (1 public package) + read by chunks if needed
This commit is contained in:
parent
2d51ba1863
commit
a31269118f
|
@ -1,61 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"git.xdrm.io/gws/upgrader"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
/* (1) Create listening socket
|
||||
---------------------------------------------------------*/
|
||||
/* (1) Create socket */
|
||||
lsock, err := net.Listen("tcp", ":4444")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
/* (2) Accept clients */
|
||||
for {
|
||||
|
||||
sock, err := lsock.Accept()
|
||||
if err != nil {
|
||||
os.Stderr.WriteString(fmt.Sprintf("Connection error: %s\n", err))
|
||||
return
|
||||
}
|
||||
|
||||
go manageClient(sock)
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func manageClient(sock net.Conn) {
|
||||
|
||||
startTime := time.Now().UnixNano()
|
||||
|
||||
defer sock.Close()
|
||||
|
||||
for {
|
||||
|
||||
fmt.Printf("+ new client\n")
|
||||
|
||||
fmt.Printf(" + upgrade\n")
|
||||
err := upgrader.Upgrade(sock)
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf(" + error: %s\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf(" + 2-way exchange\n")
|
||||
|
||||
break;
|
||||
|
||||
}
|
||||
|
||||
fmt.Printf("+ elapsed: %1.1f us\n", float32(time.Now().UnixNano()-startTime)/1e3)
|
||||
|
||||
}
|
|
@ -1,10 +1,8 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"git.xdrm.io/gws/ws/message"
|
||||
"git.xdrm.io/gws/ws/controller"
|
||||
"git.xdrm.io/gws/ws"
|
||||
"time"
|
||||
"git.xdrm.io/gws/ws/server"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
|
@ -14,48 +12,46 @@ func main(){
|
|||
startTime := time.Now().UnixNano()
|
||||
|
||||
/* (1) Bind WebSocket server */
|
||||
serv := server.Create("0.0.0.0", 4444)
|
||||
serv := ws.CreateServer("0.0.0.0", 4444)
|
||||
|
||||
/* (2) Bind default controller */
|
||||
serv.BindDefault(func(client *controller.ClientInterface, receiver <-chan message.T, sender chan<- message.T, closer <-chan func()){
|
||||
serv.BindDefault(func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message) chan struct{}{
|
||||
|
||||
for {
|
||||
var closer chan struct{} = make(chan struct{}, 1)
|
||||
|
||||
select {
|
||||
go func(){
|
||||
for msg := range receiver {
|
||||
|
||||
// if receive message -> send it back
|
||||
case msg := <- receiver:
|
||||
sender <- msg
|
||||
|
||||
// if received closer, close after doing stuff
|
||||
case closeCallback := <- closer:
|
||||
closeCallback()
|
||||
break
|
||||
sender <- &msg
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
return closer
|
||||
|
||||
})
|
||||
|
||||
/* (3) Bind to URI */
|
||||
err := serv.Bind("/channel/./room/./", func(client *controller.ClientInterface, receiver <-chan message.T, sender chan<- message.T, closer <-chan func()){
|
||||
err := serv.Bind("/channel/./room/./", func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message) chan struct{}{
|
||||
|
||||
var closer chan struct{} = make(chan struct{}, 1)
|
||||
|
||||
go func(){
|
||||
fmt.Printf("[uri] connected\n")
|
||||
for {
|
||||
|
||||
select {
|
||||
case msg := <- receiver:
|
||||
for msg := range receiver{
|
||||
|
||||
fmt.Printf("[uri] received '%s'\n", msg.Data)
|
||||
sender <- msg
|
||||
case closeFunc := <- closer:
|
||||
fmt.Printf("[uri] client with protocol '%s' exited\n", client.Protocol)
|
||||
closeFunc()
|
||||
return
|
||||
}
|
||||
sender <- &msg
|
||||
|
||||
}
|
||||
|
||||
fmt.Printf("[uri] unexpectedly closed\n")
|
||||
}()
|
||||
|
||||
|
||||
return closer
|
||||
|
||||
})
|
||||
if err != nil { panic(err) }
|
||||
|
|
|
@ -1,26 +0,0 @@
|
|||
package reader
|
||||
|
||||
import (
|
||||
"io"
|
||||
"bufio"
|
||||
)
|
||||
|
||||
// Maximum chunk size
|
||||
const MaxChunkSize = 4096
|
||||
|
||||
|
||||
// Read reads a chunk of n bytes
|
||||
// err is io.EOF when done
|
||||
func ReadBytes(br *bufio.Reader, n int) ([]byte, error){
|
||||
|
||||
buf, err := br.Peek(n)
|
||||
|
||||
if err == io.EOF && len(buf) < n && n > 0 {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
br.Discard(len(buf))
|
||||
|
||||
return buf, err
|
||||
|
||||
}
|
|
@ -0,0 +1,235 @@
|
|||
package ws
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/binary"
|
||||
"git.xdrm.io/gws/upgrader"
|
||||
"net"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Represents a client socket utility (reader, writer, ..)
|
||||
type clientIO struct {
|
||||
sock net.Conn
|
||||
reader *bufio.Reader
|
||||
kill chan<- *client
|
||||
}
|
||||
|
||||
// Represents all channels that need a client
|
||||
type clientChannelSet struct{
|
||||
receive chan Message
|
||||
send chan *Message
|
||||
closer chan struct{}
|
||||
}
|
||||
|
||||
// Represents a websocket client
|
||||
type client struct {
|
||||
io clientIO
|
||||
iface *Client
|
||||
ch clientChannelSet
|
||||
status MessageError // close status ; 0 = nothing
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// Create creates a new client
|
||||
func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*client, error){
|
||||
|
||||
/* (1) Manage UPGRADE request
|
||||
---------------------------------------------------------*/
|
||||
upgrader, err := upgrader.Upgrade(s)
|
||||
if err != nil {
|
||||
s.Close()
|
||||
return nil, fmt.Errorf("Upgrade error: %s\n", err)
|
||||
}
|
||||
|
||||
if upgrader.Response.GetStatusCode() != 101 {
|
||||
s.Close()
|
||||
return nil, fmt.Errorf("Upgrade error (HTTP %d)\n", upgrader.Response.GetStatusCode())
|
||||
}
|
||||
|
||||
|
||||
/* (2) Initialise client
|
||||
---------------------------------------------------------*/
|
||||
/* (1) Get upgrade data */
|
||||
clientURI := upgrader.Request.GetURI()
|
||||
clientProtocol := upgrader.Response.GetProtocol()
|
||||
|
||||
/* (2) Initialise client */
|
||||
instance := &client{
|
||||
io: clientIO{
|
||||
sock: s,
|
||||
reader: bufio.NewReader(s),
|
||||
kill: make(chan<- *client, 1),
|
||||
},
|
||||
|
||||
iface: &Client{
|
||||
Protocol: string(clientProtocol),
|
||||
Arguments: [][]string{ []string{ clientURI } },
|
||||
},
|
||||
|
||||
ch: clientChannelSet{
|
||||
receive: make(chan Message, 1),
|
||||
send: make(chan *Message, 1),
|
||||
closer: make(chan struct{}, 1),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* (3) Find controller by URI
|
||||
---------------------------------------------------------*/
|
||||
/* (1) Try to find one */
|
||||
controller, arguments := ctl.Match(clientURI);
|
||||
|
||||
/* (2) If nothing found -> error */
|
||||
if controller == nil {
|
||||
return nil, fmt.Errorf("No controller found, no default controller set\n")
|
||||
}
|
||||
|
||||
/* (3) Copy arguments */
|
||||
instance.iface.Arguments = arguments
|
||||
|
||||
|
||||
|
||||
/* (4) Launch client routines
|
||||
---------------------------------------------------------*/
|
||||
/* (1) Launch client controller */
|
||||
instance.ch.closer = controller.Fun(
|
||||
instance.iface, // pass the client
|
||||
instance.ch.receive, // the receiver
|
||||
instance.ch.send, // the sender
|
||||
serverCh.broadcast, // broadcast sender
|
||||
)
|
||||
|
||||
/* (2) Launch close handler */
|
||||
go func(){
|
||||
for range instance.ch.closer {}
|
||||
instance.close(NORMAL)
|
||||
}()
|
||||
|
||||
|
||||
/* (3) Launch message reader */
|
||||
go instance.reader()
|
||||
|
||||
/* (4) Launc writer */
|
||||
go instance.writer()
|
||||
|
||||
return instance, nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
// reader reads and parses messages from the buffer
|
||||
func (c *client) reader(){
|
||||
|
||||
for {
|
||||
|
||||
/* (1) Parse message */
|
||||
msg, err := readMessage(c.io.reader)
|
||||
if err != nil {
|
||||
// fmt.Printf(" [reader] %s\n", err)
|
||||
c.close(NORMAL)
|
||||
return
|
||||
}
|
||||
|
||||
/* (2) CLOSE */
|
||||
if msg.Type == CLOSE {
|
||||
c.close(NORMAL)
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
/* (3) PING size error */
|
||||
if msg.Type == PING && msg.Size > 125 {
|
||||
c.close(PROTOCOL_ERR)
|
||||
return
|
||||
}
|
||||
|
||||
/* (4) Send PONG */
|
||||
if msg.Type == PING {
|
||||
msg.Final = true
|
||||
msg.Type = PONG
|
||||
c.ch.send <- msg
|
||||
continue
|
||||
}
|
||||
|
||||
/* (5) Unknown opcode */
|
||||
if msg.Type != TEXT && msg.Type != BINARY {
|
||||
c.close(PROTOCOL_ERR)
|
||||
return
|
||||
}
|
||||
|
||||
/* (5) Dispatch to receiver */
|
||||
c.ch.receive <- *msg
|
||||
|
||||
}
|
||||
|
||||
/* (6) close channel */
|
||||
c.close(NORMAL)
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
// writer writes into websocket
|
||||
// and is triggered by client.ch.send channel
|
||||
func (c *client) writer(){
|
||||
|
||||
for msg := range c.ch.send {
|
||||
|
||||
err := msg.Send(c.io.sock)
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf(" [writer] %s\n", err)
|
||||
break
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
/* (3) proper close */
|
||||
c.close(NORMAL)
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// close writes the error message (if needed)
|
||||
// and it closes the socket
|
||||
func (c *client) close(status MessageError){
|
||||
|
||||
/* (1) If error status -> send close frame */
|
||||
if status != NONE {
|
||||
|
||||
/* Create message */
|
||||
msg := &Message{
|
||||
Final: true,
|
||||
Type: CLOSE,
|
||||
Data: make([]byte, 8),
|
||||
}
|
||||
binary.BigEndian.PutUint16(msg.Data, uint16(status))
|
||||
msg.Data = append(msg.Data, []byte("Closing")...)
|
||||
msg.Size = uint( len(msg.Data) )
|
||||
|
||||
/* Send message */
|
||||
msg.Send(c.io.sock)
|
||||
|
||||
}
|
||||
|
||||
/* (2) Close socket */
|
||||
c.io.sock.Close()
|
||||
|
||||
/* (3) Dereference data */
|
||||
c.ch.receive = nil
|
||||
c.ch.send = nil
|
||||
c.ch.closer = nil
|
||||
c.iface = nil
|
||||
c.io.reader = nil
|
||||
|
||||
/* (4) Unregister */
|
||||
c.io.kill <- c
|
||||
|
||||
}
|
|
@ -1,164 +0,0 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"git.xdrm.io/gws/ws/controller"
|
||||
"git.xdrm.io/gws/ws/message"
|
||||
"git.xdrm.io/gws/upgrader"
|
||||
"net"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
|
||||
|
||||
// Create creates a new client
|
||||
func Create(s net.Conn, ctl controller.Set, unregister chan <-*T) (*T, error){
|
||||
|
||||
/* (1) Manage UPGRADE request
|
||||
---------------------------------------------------------*/
|
||||
upgrader, err := upgrader.Upgrade(s)
|
||||
if err != nil {
|
||||
s.Close()
|
||||
return nil, fmt.Errorf("Upgrade error: %s\n", err)
|
||||
}
|
||||
|
||||
if upgrader.Response.GetStatusCode() != 101 {
|
||||
s.Close()
|
||||
return nil, fmt.Errorf("Upgrade error (HTTP %d)\n", upgrader.Response.GetStatusCode())
|
||||
}
|
||||
|
||||
|
||||
/* (2) Initialise client
|
||||
---------------------------------------------------------*/
|
||||
/* (1) Get upgrade data */
|
||||
clientURI := upgrader.Request.GetURI()
|
||||
clientProtocol := upgrader.Response.GetProtocol()
|
||||
|
||||
/* (2) Initialise client */
|
||||
instance := &T{
|
||||
IO: ClientIO{
|
||||
Sock: s,
|
||||
},
|
||||
|
||||
iface: &controller.ClientInterface{
|
||||
Protocol: string(clientProtocol),
|
||||
Arguments: [][]string{ []string{ clientURI } },
|
||||
},
|
||||
|
||||
Ch: ClientChannelSet{
|
||||
Receive: make(chan message.T, 1),
|
||||
Send: make(chan message.T, 1),
|
||||
Close: make(chan func(), 1),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* (3) Find controller by URI
|
||||
---------------------------------------------------------*/
|
||||
/* (1) Try to find one */
|
||||
controller, arguments := ctl.Match(clientURI);
|
||||
|
||||
/* (2) If nothing found -> error */
|
||||
if controller == nil {
|
||||
return nil, fmt.Errorf("No controller found, no default controller set\n")
|
||||
}
|
||||
|
||||
/* (3) Copy data */
|
||||
instance.controller = controller
|
||||
instance.iface.Arguments = arguments
|
||||
|
||||
|
||||
|
||||
/* (4) Launch client routines
|
||||
---------------------------------------------------------*/
|
||||
/* (1) Launch client controller */
|
||||
go instance.controller.Fun(
|
||||
instance.iface, // pass the client
|
||||
instance.Ch.Receive, // the receiver
|
||||
instance.Ch.Send, // the sender
|
||||
instance.Ch.Close, // the closer
|
||||
)
|
||||
|
||||
/* (2) Launch message reader */
|
||||
go instance.reader(unregister)
|
||||
|
||||
/* (3) Launc writer */
|
||||
go instance.writer(unregister)
|
||||
|
||||
|
||||
return instance, nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
// reader reads and parses messages from the buffer
|
||||
func (c *T) reader(unregister chan <-*T){
|
||||
|
||||
for {
|
||||
|
||||
/* (1) Parse message */
|
||||
msg, err := message.Read(c.IO.Sock)
|
||||
if err != nil {
|
||||
fmt.Printf(" [reader] %s\n", err)
|
||||
break
|
||||
}
|
||||
|
||||
/* (2) If CLOSE */
|
||||
if msg.Type == message.CLOSE {
|
||||
|
||||
// fmt.Printf(" [reader] CLOSE\n")
|
||||
break
|
||||
|
||||
} else if msg.Type == message.PING {
|
||||
|
||||
// fmt.Printf(" [reader] PING\n")
|
||||
msg.Final = true
|
||||
msg.Type = message.PONG
|
||||
c.Ch.Send <- *msg
|
||||
// fmt.Printf(" [reader] sent PONG back\n")
|
||||
continue
|
||||
|
||||
} else if msg.Size == 0 && ( msg.Type == message.TEXT || msg.Type == message.BINARY ) {
|
||||
break
|
||||
}
|
||||
|
||||
/* (3) Dispatch to receiver */
|
||||
c.Ch.Receive <- *msg
|
||||
|
||||
}
|
||||
|
||||
// return closing callback
|
||||
c.Ch.Close <- func(){
|
||||
|
||||
unregister <- c
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
// writer writes into websocket
|
||||
// and is triggered by client.ch.send channel
|
||||
func (c *T) writer(unregister chan <-*T){
|
||||
|
||||
for message := range c.Ch.Send {
|
||||
|
||||
c.IO.wriMu.Lock()
|
||||
|
||||
err := message.Send(c.IO.Sock)
|
||||
|
||||
c.IO.wriMu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf(" [writer] %s\n", err)
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// return closing callback
|
||||
c.Ch.Close <- func(){
|
||||
unregister <- c
|
||||
}
|
||||
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"git.xdrm.io/gws/ws/controller"
|
||||
"git.xdrm.io/gws/ws/message"
|
||||
"sync"
|
||||
"net"
|
||||
)
|
||||
|
||||
// Represents a client socket utility (reader, writer, ..)
|
||||
type ClientIO struct {
|
||||
Sock net.Conn
|
||||
wriMu sync.Mutex
|
||||
}
|
||||
|
||||
// Represents all channels that need a client
|
||||
type ClientChannelSet struct{
|
||||
Receive chan message.T
|
||||
Send chan message.T
|
||||
Close chan func()
|
||||
}
|
||||
|
||||
// Represents a websocket client
|
||||
type T struct {
|
||||
IO ClientIO
|
||||
iface *controller.ClientInterface
|
||||
controller *controller.T // assigned controller
|
||||
Ch ClientChannelSet
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package ws
|
||||
|
||||
import (
|
||||
"git.xdrm.io/gws/internal/uri/parser"
|
||||
)
|
||||
|
||||
// Represents available information about a client
|
||||
type Client struct {
|
||||
Protocol string // choosen protocol (Sec-WebSocket-Protocol)
|
||||
Arguments [][]string // URI parameters, index 0 is full URI, then matching groups
|
||||
Store struct{} // store (for client implementation-specific data)
|
||||
}
|
||||
|
||||
// Represents a websocket controller callback function
|
||||
type ControllerFunc func(*Client, <-chan Message, chan<- *Message, chan<- *Message) chan struct{}
|
||||
|
||||
// Represents a websocket controller
|
||||
type Controller struct {
|
||||
URI *parser.Scheme // uri scheme
|
||||
Fun ControllerFunc // controller function
|
||||
}
|
||||
|
||||
// Represents a controller set
|
||||
type ControllerSet struct {
|
||||
Def *Controller // default controller
|
||||
Uri []*Controller // uri controllers
|
||||
}
|
||||
|
||||
|
||||
// Match finds a controller for a given URI
|
||||
// also it returns the matching string patterns
|
||||
func (s *ControllerSet) Match(uri string) (*Controller, [][]string){
|
||||
|
||||
/* (1) Initialise argument list */
|
||||
arguments := [][]string{ []string{ uri } }
|
||||
|
||||
|
||||
/* (2) Try each controller */
|
||||
for _, c := range s.Uri {
|
||||
|
||||
/* 1. If matches */
|
||||
if c.URI.Match(uri) {
|
||||
|
||||
/* Extract matches */
|
||||
match := c.URI.GetAllMatch()
|
||||
|
||||
/* Add them to the 'arg' attribute */
|
||||
arguments = append(arguments, match...)
|
||||
|
||||
/* Mark that we have a controller */
|
||||
return c, arguments
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/* (3) If no controller found -> set default controller */
|
||||
if s.Def != nil {
|
||||
return s.Def, arguments
|
||||
}
|
||||
|
||||
/* (4) If default is NIL, return empty controller */
|
||||
return nil, arguments
|
||||
|
||||
}
|
|
@ -1,40 +0,0 @@
|
|||
package controller
|
||||
|
||||
|
||||
|
||||
// Match finds a controller for a given URI
|
||||
// also it returns the matching string patterns
|
||||
func (s *Set) Match(uri string) (*T, [][]string){
|
||||
|
||||
/* (1) Initialise argument list */
|
||||
arguments := [][]string{ []string{ uri } }
|
||||
|
||||
|
||||
/* (2) Try each controller */
|
||||
for _, c := range s.Uri {
|
||||
|
||||
/* 1. If matches */
|
||||
if c.URI.Match(uri) {
|
||||
|
||||
/* Extract matches */
|
||||
match := c.URI.GetAllMatch()
|
||||
|
||||
/* Add them to the 'arg' attribute */
|
||||
arguments = append(arguments, match...)
|
||||
|
||||
/* Mark that we have a controller */
|
||||
return c, arguments
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/* (3) If no controller found -> set default controller */
|
||||
if s.Def != nil {
|
||||
return s.Def, arguments
|
||||
}
|
||||
|
||||
/* (4) If default is NIL, return empty controller */
|
||||
return nil, arguments
|
||||
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"git.xdrm.io/gws/ws/message"
|
||||
"git.xdrm.io/gws/internal/uri/parser"
|
||||
)
|
||||
|
||||
// Represents available information about a client
|
||||
type ClientInterface struct {
|
||||
Protocol string // choosen protocol (Sec-WebSocket-Protocol)
|
||||
Arguments [][]string // URI parameters, index 0 is full URI, then matching groups
|
||||
Store struct{} // store (for client implementation-specific data)
|
||||
}
|
||||
|
||||
// Represents a websocket controller callback function
|
||||
type Func func(*ClientInterface, <-chan message.T, chan<- message.T, <-chan func())
|
||||
|
||||
// Represents a websocket controller
|
||||
type T struct {
|
||||
URI *parser.Scheme // uri scheme
|
||||
Fun Func // controller function
|
||||
}
|
||||
|
||||
// Represents a controller set
|
||||
type Set struct {
|
||||
Def *T // default controller
|
||||
Uri []*T // uri controllers
|
||||
}
|
|
@ -0,0 +1,195 @@
|
|||
package ws
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"bufio"
|
||||
"io"
|
||||
"net"
|
||||
"encoding/binary"
|
||||
)
|
||||
|
||||
// Maximum Header Size = Final/OpCode + isMask/Length + Length + Mask
|
||||
const maximumHeaderSize = 1 + 1 + 8 + 4
|
||||
|
||||
// Lists websocket close status
|
||||
type MessageError uint16
|
||||
|
||||
const (
|
||||
NONE MessageError = 0
|
||||
NORMAL MessageError = 1000
|
||||
GOING_AWAY MessageError = 1001
|
||||
PROTOCOL_ERR MessageError = 1002
|
||||
UNACCEPTABLE_OPCODE MessageError = 1003
|
||||
INVALID_PAYLOAD MessageError = 1007 // utf8
|
||||
MESSAGE_TOO_LARGE MessageError = 1009
|
||||
)
|
||||
|
||||
// Lists websocket message types
|
||||
type MessageType byte
|
||||
|
||||
const (
|
||||
CONTINUATION MessageType = 0x0
|
||||
TEXT MessageType = 0x1
|
||||
BINARY MessageType = 0x2
|
||||
CLOSE MessageType = 0x8
|
||||
PING MessageType = 0x9
|
||||
PONG MessageType = 0xa
|
||||
);
|
||||
|
||||
|
||||
// Represents a websocket message
|
||||
type Message struct {
|
||||
Type MessageType
|
||||
Data []byte
|
||||
Size uint
|
||||
Final bool
|
||||
}
|
||||
|
||||
|
||||
// receive reads a message form reader
|
||||
func readMessage(reader *bufio.Reader) (*Message, error){
|
||||
|
||||
var err error
|
||||
var tmpBuf []byte
|
||||
var mask []byte
|
||||
var cursor int
|
||||
|
||||
m := new(Message)
|
||||
|
||||
/* (2) Byte 1: FIN and OpCode */
|
||||
tmpBuf = make([]byte, 1)
|
||||
_, err = reader.Read(tmpBuf)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
|
||||
m.Final = bool( tmpBuf[0] & 0x80 == 0x80 )
|
||||
m.Type = MessageType( tmpBuf[0] & 0x0f )
|
||||
|
||||
/* (3) Byte 2: Mask and Length[0] */
|
||||
tmpBuf = make([]byte, 1)
|
||||
_, err = reader.Read(tmpBuf)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
// if mask, byte array not nil
|
||||
if tmpBuf[0] & 0x80 == 0x80 {
|
||||
mask = make([]byte, 0)
|
||||
}
|
||||
|
||||
// payload length
|
||||
m.Size = uint( tmpBuf[0] & 0x7f )
|
||||
|
||||
/* (4) Extended payload */
|
||||
if m.Size == 127 {
|
||||
|
||||
tmpBuf = make([]byte, 8)
|
||||
_, err := reader.Read(tmpBuf)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
m.Size = uint( binary.BigEndian.Uint64(tmpBuf) )
|
||||
|
||||
} else if m.Size == 126 {
|
||||
|
||||
tmpBuf = make([]byte, 2)
|
||||
_, err := reader.Read(tmpBuf)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
m.Size = uint( binary.BigEndian.Uint16(tmpBuf) )
|
||||
|
||||
}
|
||||
|
||||
/* (5) Masking key */
|
||||
if mask != nil {
|
||||
|
||||
tmpBuf = make([]byte, 4)
|
||||
_, err := reader.Read(tmpBuf)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
mask = make([]byte, 4)
|
||||
copy(mask, tmpBuf)
|
||||
|
||||
}
|
||||
|
||||
/* (6) Read payload by chunks */
|
||||
m.Data = make([]byte, int(m.Size))
|
||||
|
||||
// If empty payload
|
||||
if m.Size <= 0 {
|
||||
return m, nil
|
||||
}
|
||||
|
||||
cursor = 0
|
||||
|
||||
// {1} While we have data to read //
|
||||
for uint(cursor) < m.Size {
|
||||
|
||||
// {2} Try to read (at least 1 byte) //
|
||||
nbread, err := io.ReadAtLeast(reader, m.Data[cursor:m.Size], 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// {3} Unmask data //
|
||||
if mask != nil {
|
||||
for i, l := cursor, cursor+nbread ; i < l ; i++ {
|
||||
|
||||
mi := i % 4 // mask index
|
||||
m.Data[i] = m.Data[i] ^ mask[mi]
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// {4} Update cursor //
|
||||
cursor += nbread
|
||||
|
||||
}
|
||||
|
||||
return m, nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// Send sends a frame over a socket
|
||||
func (m Message) Send(socket net.Conn) error {
|
||||
|
||||
header := make([]byte, 0, maximumHeaderSize)
|
||||
|
||||
/* (1) Byte 0 : FIN + opcode */
|
||||
header = append(header, 0x80 | byte(TEXT) )
|
||||
|
||||
/* (2) Get payload length */
|
||||
if m.Size < 126 { // simple
|
||||
|
||||
header = append(header, byte(m.Size) )
|
||||
|
||||
} else if m.Size < 0xffff { // extended: 16 bits
|
||||
|
||||
header = append(header, 126)
|
||||
|
||||
buf := make([]byte, 2)
|
||||
binary.BigEndian.PutUint16(buf, uint16(m.Size))
|
||||
header = append(header, buf...)
|
||||
|
||||
} else if m.Size < 0xffffffffffffffff { // extended: 64 bits
|
||||
|
||||
header = append(header, 127)
|
||||
|
||||
buf := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(buf, uint64(m.Size))
|
||||
header = append(header, buf...)
|
||||
|
||||
}
|
||||
|
||||
buffer := bytes.NewBuffer(header)
|
||||
|
||||
/* (3) Add payload */
|
||||
_, err := buffer.Write(m.Data)
|
||||
if err != nil { return err }
|
||||
|
||||
/* (4) Send over socket */
|
||||
_, err = io.Copy(socket, buffer)
|
||||
if err != nil { return err }
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,139 +0,0 @@
|
|||
package message
|
||||
|
||||
import (
|
||||
"net"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
// Read reads a message form reader
|
||||
func Read(socket net.Conn) (*T, error){
|
||||
|
||||
m := new(T)
|
||||
|
||||
var buffer []byte;
|
||||
|
||||
/* (2) Byte 1: FIN and OpCode */
|
||||
buffer = make([]byte, 1)
|
||||
_, err := socket.Read(buffer)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
|
||||
m.Final = bool( buffer[0] & 0x80 == 0x80 )
|
||||
m.Type = Type( buffer[0] & 0x0f )
|
||||
|
||||
/* (3) Byte 2: Mask and Length[0] */
|
||||
buffer = make([]byte, 1)
|
||||
_, err = socket.Read(buffer)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
// if mask, byte array not nil
|
||||
var mask []byte = nil;
|
||||
if buffer[0] & 0x80 == 0x80 {
|
||||
mask = make([]byte, 0)
|
||||
}
|
||||
|
||||
// payload length
|
||||
m.Size = uint( buffer[0] & 0x7f )
|
||||
|
||||
/* (4) Extended payload */
|
||||
if m.Size == 127 {
|
||||
|
||||
buffer := make([]byte, 8)
|
||||
_, err := socket.Read(buffer)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
m.Size = uint( binary.BigEndian.Uint64(buffer) )
|
||||
|
||||
} else if m.Size == 126 {
|
||||
|
||||
buffer := make([]byte, 2)
|
||||
_, err := socket.Read(buffer)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
m.Size = uint( binary.BigEndian.Uint16(buffer) )
|
||||
|
||||
}
|
||||
|
||||
/* (5) Masking key */
|
||||
if mask != nil {
|
||||
|
||||
buffer := make([]byte, 4)
|
||||
_, err := socket.Read(buffer)
|
||||
if err != nil { return nil, err }
|
||||
|
||||
mask = make([]byte, 4)
|
||||
copy(mask, buffer)
|
||||
|
||||
}
|
||||
|
||||
/* (6) Read payload */
|
||||
buffer = make([]byte, int(m.Size))
|
||||
_, err = socket.Read(buffer)
|
||||
if err != nil { return nil, fmt.Errorf("Cannot read payload (%s)", err) }
|
||||
|
||||
m.Data = make([]byte, 0, m.Size)
|
||||
|
||||
/* (7) Unmask payload */
|
||||
if len(mask) == 4 {
|
||||
|
||||
for i, b := range buffer{
|
||||
|
||||
mi := i % 4 // mask index
|
||||
|
||||
m.Data = append(m.Data, b ^ mask[mi])
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return m, nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// Send sends a frame over a socket
|
||||
func (m T) Send(socket net.Conn) error {
|
||||
|
||||
buffer := make([]byte, 0)
|
||||
|
||||
/* (1) Byte 0 : FIN + opcode */
|
||||
buffer = append(buffer, 0x80 | byte(TEXT) )
|
||||
|
||||
/* (2) Get payload length */
|
||||
if m.Size < 126 { // simple
|
||||
|
||||
buffer = append(buffer, byte(m.Size) )
|
||||
|
||||
} else if m.Size < 0xffff { // extended: 16 bits
|
||||
|
||||
buffer = append(buffer, 126)
|
||||
|
||||
buf := make([]byte, 2)
|
||||
binary.BigEndian.PutUint16(buf, uint16(m.Size))
|
||||
buffer = append(buffer, buf...)
|
||||
|
||||
} else if m.Size < 0xffffffffffffffff { // extended: 64 bits
|
||||
|
||||
buffer = append(buffer, 127)
|
||||
|
||||
buf := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(buf, uint64(m.Size))
|
||||
buffer = append(buffer, buf...)
|
||||
|
||||
}
|
||||
|
||||
/* (3) Add payload */
|
||||
buffer = append(buffer, m.Data...)
|
||||
|
||||
/* (4) Send over socket */
|
||||
_, err := socket.Write(buffer)
|
||||
if err != nil { return err }
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,23 +0,0 @@
|
|||
package message
|
||||
|
||||
|
||||
// Lists websocket message types
|
||||
type Type byte
|
||||
|
||||
const (
|
||||
CONTINUATION Type = 0x0
|
||||
TEXT Type = 0x1
|
||||
BINARY Type = 0x2
|
||||
CLOSE Type = 0x8
|
||||
PING Type = 0x9
|
||||
PONG Type = 0xa
|
||||
);
|
||||
|
||||
|
||||
// Represents a websocket message
|
||||
type T struct {
|
||||
Type Type
|
||||
Data []byte
|
||||
Size uint
|
||||
Final bool
|
||||
}
|
|
@ -1,32 +1,53 @@
|
|||
package server
|
||||
package ws
|
||||
|
||||
import (
|
||||
"git.xdrm.io/gws/ws/message"
|
||||
"git.xdrm.io/gws/ws/controller"
|
||||
"git.xdrm.io/gws/ws/client"
|
||||
"net"
|
||||
"fmt"
|
||||
"git.xdrm.io/gws/internal/uri/parser"
|
||||
)
|
||||
|
||||
// CreateServer creates a server for a specific HOST and PORT
|
||||
func Create(host string, port uint16) *T{
|
||||
// Represents all channels that need a server
|
||||
type serverChannelSet struct{
|
||||
register chan *client
|
||||
unregister chan *client
|
||||
broadcast chan *Message
|
||||
}
|
||||
|
||||
return &T{
|
||||
|
||||
// Represents a websocket server
|
||||
type Server struct {
|
||||
sock net.Listener // listen socket
|
||||
addr []byte // server listening ip/host
|
||||
port uint16 // server listening port
|
||||
|
||||
clients map[net.Conn]*client
|
||||
|
||||
ctl ControllerSet // controllers
|
||||
|
||||
ch serverChannelSet
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// CreateServer creates a server for a specific HOST and PORT
|
||||
func CreateServer(host string, port uint16) *Server{
|
||||
|
||||
return &Server{
|
||||
addr: []byte(host),
|
||||
port: port,
|
||||
|
||||
clients: make(map[net.Conn]*client.T, 0),
|
||||
clients: make(map[net.Conn]*client, 0),
|
||||
|
||||
ctl: controller.Set{
|
||||
ctl: ControllerSet{
|
||||
Def: nil,
|
||||
Uri: make([]*controller.T, 0),
|
||||
Uri: make([]*Controller, 0),
|
||||
},
|
||||
|
||||
ch: ServerChannelSet{
|
||||
register: make(chan *client.T, 1),
|
||||
unregister: make(chan *client.T, 1),
|
||||
broadcast: make(chan message.T, 1),
|
||||
ch: serverChannelSet{
|
||||
register: make(chan *client, 1),
|
||||
unregister: make(chan *client, 1),
|
||||
broadcast: make(chan *Message, 1),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -36,9 +57,9 @@ func Create(host string, port uint16) *T{
|
|||
// BindDefault binds a default controller
|
||||
// it will be called if the URI does not
|
||||
// match another controller
|
||||
func (s *T) BindDefault(f controller.Func){
|
||||
func (s *Server) BindDefault(f ControllerFunc){
|
||||
|
||||
s.ctl.Def = &controller.T{
|
||||
s.ctl.Def = &Controller{
|
||||
URI: nil,
|
||||
Fun: f,
|
||||
}
|
||||
|
@ -47,14 +68,14 @@ func (s *T) BindDefault(f controller.Func){
|
|||
|
||||
|
||||
// Bind binds a controller to an URI scheme
|
||||
func (s *T) Bind(uri string, f controller.Func) error {
|
||||
func (s *Server) Bind(uri string, f ControllerFunc) error {
|
||||
|
||||
/* (1) Build URI parser */
|
||||
uriScheme, err := parser.Build(uri)
|
||||
if err != nil { return fmt.Errorf("Cannot build URI: %s", err) }
|
||||
|
||||
/* (2) Create controller */
|
||||
s.ctl.Uri = append(s.ctl.Uri, &controller.T{
|
||||
s.ctl.Uri = append(s.ctl.Uri, &Controller{
|
||||
URI: uriScheme,
|
||||
Fun: f,
|
||||
} )
|
||||
|
@ -65,7 +86,7 @@ func (s *T) Bind(uri string, f controller.Func) error {
|
|||
|
||||
|
||||
// Launch launches the websocket server
|
||||
func (s *T) Launch() error {
|
||||
func (s *Server) Launch() error {
|
||||
|
||||
var err error
|
||||
|
||||
|
@ -99,16 +120,20 @@ func (s *T) Launch() error {
|
|||
break
|
||||
}
|
||||
|
||||
go func(){
|
||||
|
||||
/* (2) Try to create client */
|
||||
cli, err := client.Create(sock, s.ctl, s.ch.unregister)
|
||||
cli, err := buildClient(sock, s.ctl, s.ch)
|
||||
if err != nil {
|
||||
fmt.Printf(" - %s\n", err)
|
||||
continue
|
||||
return
|
||||
}
|
||||
|
||||
/* (3) Register client */
|
||||
s.ch.register <- cli
|
||||
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -117,7 +142,7 @@ func (s *T) Launch() error {
|
|||
|
||||
|
||||
// Scheduler schedules clients registration and broadcast
|
||||
func (s *T) scheduler(){
|
||||
func (s *Server) scheduler(){
|
||||
|
||||
for {
|
||||
|
||||
|
@ -127,22 +152,22 @@ func (s *T) scheduler(){
|
|||
case client := <- s.ch.register:
|
||||
|
||||
// fmt.Printf(" + client\n")
|
||||
s.clients[client.IO.Sock] = client
|
||||
s.clients[client.io.sock] = client
|
||||
|
||||
/* (2) New client */
|
||||
case client := <- s.ch.unregister:
|
||||
|
||||
// fmt.Printf(" - client\n")
|
||||
delete(s.clients, client.IO.Sock)
|
||||
client.IO.Sock.Close()
|
||||
s.clients[client.io.sock] = nil
|
||||
delete(s.clients, client.io.sock)
|
||||
|
||||
/* (3) Broadcast */
|
||||
case message := <- s.ch.broadcast:
|
||||
case msg := <- s.ch.broadcast:
|
||||
|
||||
fmt.Printf(" + broadcast\n")
|
||||
|
||||
for _, c := range s.clients{
|
||||
c.Ch.Send <- message
|
||||
c.ch.send <- msg
|
||||
}
|
||||
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"git.xdrm.io/gws/ws/message"
|
||||
"git.xdrm.io/gws/ws/controller"
|
||||
"git.xdrm.io/gws/ws/client"
|
||||
"net"
|
||||
)
|
||||
|
||||
// Represents all channels that need a server
|
||||
type ServerChannelSet struct{
|
||||
register chan *client.T
|
||||
unregister chan *client.T
|
||||
broadcast chan message.T
|
||||
}
|
||||
|
||||
|
||||
// Represents a websocket server
|
||||
type T struct {
|
||||
sock net.Listener // listen socket
|
||||
addr []byte // server listening ip/host
|
||||
port uint16 // server listening port
|
||||
|
||||
clients map[net.Conn]*client.T // clients
|
||||
|
||||
ctl controller.Set // controllers
|
||||
|
||||
ch ServerChannelSet
|
||||
}
|
||||
|
Loading…
Reference in New Issue