BIGUPDATE: new execution structure + package structure (debug to remove later) + Write does not work... yet

This commit is contained in:
xdrm-brackets 2018-05-02 22:24:50 +02:00
parent 4ec4dfd1d5
commit 9abf760e37
15 changed files with 673 additions and 565 deletions

View File

@ -1,8 +1,10 @@
package main package main
import ( import (
"git.xdrm.io/gws/ws/message"
"git.xdrm.io/gws/ws/controller"
"time" "time"
"git.xdrm.io/gws/ws" "git.xdrm.io/gws/ws/server"
"fmt" "fmt"
) )
@ -12,19 +14,18 @@ func main(){
startTime := time.Now().UnixNano() startTime := time.Now().UnixNano()
/* (1) Bind WebSocket server */ /* (1) Bind WebSocket server */
serv := ws.CreateServer("0.0.0.0", 4444) serv := server.Create("0.0.0.0", 4444)
/* (2) Bind default controller */ /* (2) Bind default controller */
serv.BindDefault(func(client *ws.Client, receiver <-chan ws.Frame, sender chan<- []byte, closer <-chan func()){ serv.BindDefault(func(client *controller.ClientInterface, receiver <-chan message.T, sender chan<- message.T, closer <-chan func()){
fmt.Printf("[default] connected\n") fmt.Printf("[default] connected\n")
for { for {
select { select {
case receivedFrame := <- receiver: case msg := <- receiver:
fmt.Printf("[default] received '%s'\n", receivedFrame.Payload.Buffer) fmt.Printf("[default] received '%s'\n", msg.Data)
sender <- receivedFrame.Payload.Buffer sender <- msg
fmt.Printf("[default] sent\n")
case closeFunc := <- closer: case closeFunc := <- closer:
fmt.Printf("[default] client with protocol '%s' exited\n", client.Protocol) fmt.Printf("[default] client with protocol '%s' exited\n", client.Protocol)
closeFunc() closeFunc()
@ -37,14 +38,14 @@ func main(){
}) })
/* (3) Bind to URI */ /* (3) Bind to URI */
err := serv.Bind("/channel/./room/./", func(client *ws.Client, receiver <-chan ws.Frame, sender chan<- []byte, closer <-chan func()){ err := serv.Bind("/channel/./room/./", func(client *controller.ClientInterface, receiver <-chan message.T, sender chan<- message.T, closer <-chan func()){
fmt.Printf("[uri] connected\n") fmt.Printf("[uri] connected\n")
for { for {
select { select {
case receivedFrame := <- receiver: case msg := <- receiver:
fmt.Printf("[uri] received '%s'\n", receivedFrame.Payload.Buffer) fmt.Printf("[uri] received '%s'\n", msg.Data)
case closeFunc := <- closer: case closeFunc := <- closer:
fmt.Printf("[uri] client with protocol '%s' exited\n", client.Protocol) fmt.Printf("[uri] client with protocol '%s' exited\n", client.Protocol)
closeFunc() closeFunc()

View File

@ -1,77 +0,0 @@
package ws
import (
"fmt"
"time"
)
// asyncReader reads a websocket frame chunk by chunk
// for a given client
func (c *Client) asyncReader(s *Server) {
for {
var startTime int64 = time.Now().UnixNano()
// Try to read frame header
frame, err := ReadFrame(c)
if err != nil {
fmt.Printf("[read.err] %s\n", err)
break
}
var elapsed = float32(time.Now().UnixNano()-startTime)/1e3
fmt.Printf("+ elapsed: %.3f us\n", elapsed)
// Trigger data channel
c.recvc <- *frame
}
fmt.Printf(" - remove client\n")
// return closing callback
c.closec <- func(){
// Remove client from server
s.clientsMutex.Lock()
delete(s.clients, c.conn.sock)
s.clientsMutex.Unlock()
// Close socket
c.conn.sock.Close()
}
}
// asyncWriter writes into websocket
// and is triggered by client.sendc channel
func (c *Client) asyncWriter(s *Server){
for payload := range c.sendc {
// Build Frame
f := buildFrame(payload)
// Send over socket
senderr := f.Send(&c.conn)
if senderr != nil {
fmt.Printf("Writing error: %s\n", senderr)
}
}
}

173
ws/client/public.go Normal file
View File

@ -0,0 +1,173 @@
package client
import (
"git.xdrm.io/gws/ws/controller"
"git.xdrm.io/gws/ws/message"
"git.xdrm.io/gws/upgrader"
"net"
"fmt"
)
// Create creates a new client
func Create(s net.Conn, ctl controller.Set, unregister chan <-*T) (*T, error){
/* (1) Manage UPGRADE request
---------------------------------------------------------*/
upgrader, err := upgrader.Upgrade(s)
if err != nil {
s.Close()
return nil, fmt.Errorf("Upgrade error: %s\n", err)
}
if upgrader.Response.GetStatusCode() != 101 {
s.Close()
return nil, fmt.Errorf("Upgrade error (HTTP %d)\n", upgrader.Response.GetStatusCode())
}
/* (2) Initialise client
---------------------------------------------------------*/
/* (1) Get upgrade data */
clientURI := upgrader.Request.GetURI()
clientProtocol := upgrader.Response.GetProtocol()
/* (2) Initialise client */
instance := &T{
IO: ClientIO{
Sock: s,
},
iface: &controller.ClientInterface{
Protocol: string(clientProtocol),
Arguments: [][]string{ []string{ clientURI } },
},
Ch: ClientChannelSet{
Receive: make(chan message.T, 1),
Send: make(chan message.T, 1),
Close: make(chan func(), 1),
},
}
/* (3) Find controller by URI
---------------------------------------------------------*/
/* (1) Try to find one */
controller, arguments := ctl.Match(clientURI);
/* (2) If nothing found -> error */
if controller == nil {
return nil, fmt.Errorf("No controller found, no default controller set\n")
}
/* (3) Copy data */
instance.controller = controller
instance.iface.Arguments = arguments
/* (4) Launch client routines
---------------------------------------------------------*/
/* (1) Launch client controller */
go instance.controller.Fun(
instance.iface, // pass the client
instance.Ch.Receive, // the receiver
instance.Ch.Send, // the sender
instance.Ch.Close, // the closer
)
/* (2) Launch message reader */
go instance.reader(unregister)
/* (3) Launc writer */
go instance.writer(unregister)
return instance, nil
}
// reader reads and parses messages from the buffer
func (c *T) reader(unregister chan <-*T){
fmt.Printf("[reader] start\n");
for {
/* (1) Parse message */
msg, err := message.Read(c.IO.Sock)
if err != nil {
fmt.Printf(" [reader] %s\n", err)
break
}
fmt.Printf(" [reader] ok\n");
/* (2) If CLOSE */
if msg.Type == message.CLOSE {
fmt.Printf(" [reader] CLOSE\n")
break
} else if msg.Type == message.PING {
fmt.Printf(" [reader] PING\n")
msg.Final = true
msg.Type = message.PONG
c.Ch.Send <- *msg
fmt.Printf(" [reader] sent PONG back\n")
continue
}
/* (3) Dispatch to receiver */
c.Ch.Receive <- *msg
}
fmt.Printf("[reader] end\n")
// return closing callback
c.Ch.Close <- func(){
unregister <- c
}
}
// writer writes into websocket
// and is triggered by client.ch.send channel
func (c *T) writer(unregister chan <-*T){
fmt.Printf("[writer] start\n");
for message := range c.Ch.Send {
c.IO.wriMu.Lock()
err := message.Send(c.IO.Sock)
c.IO.wriMu.Unlock()
if err != nil {
fmt.Printf(" [writer] %s\n", err)
break;
}
fmt.Printf(" [writer] ok\n")
}
fmt.Printf("[writer] end\n")
// return closing callback
c.Ch.Close <- func(){
unregister <- c
}
}

29
ws/client/types.go Normal file
View File

@ -0,0 +1,29 @@
package client
import (
"git.xdrm.io/gws/ws/controller"
"git.xdrm.io/gws/ws/message"
"sync"
"net"
)
// Represents a client socket utility (reader, writer, ..)
type ClientIO struct {
Sock net.Conn
wriMu sync.Mutex
}
// Represents all channels that need a client
type ClientChannelSet struct{
Receive chan message.T
Send chan message.T
Close chan func()
}
// Represents a websocket client
type T struct {
IO ClientIO
iface *controller.ClientInterface
controller *controller.T // assigned controller
Ch ClientChannelSet
}

40
ws/controller/public.go Normal file
View File

@ -0,0 +1,40 @@
package controller
// Match finds a controller for a given URI
// also it returns the matching string patterns
func (s *Set) Match(uri string) (*T, [][]string){
/* (1) Initialise argument list */
arguments := [][]string{ []string{ uri } }
/* (2) Try each controller */
for _, c := range s.Uri {
/* 1. If matches */
if c.URI.Match(uri) {
/* Extract matches */
match := c.URI.GetAllMatch()
/* Add them to the 'arg' attribute */
arguments = append(arguments, match...)
/* Mark that we have a controller */
return c, arguments
}
}
/* (3) If no controller found -> set default controller */
if s.Def != nil {
return s.Def, arguments
}
/* (4) If default is NIL, return empty controller */
return nil, arguments
}

28
ws/controller/types.go Normal file
View File

@ -0,0 +1,28 @@
package controller
import (
"git.xdrm.io/gws/ws/message"
"git.xdrm.io/gws/internal/uri/parser"
)
// Represents available information about a client
type ClientInterface struct {
Protocol string // choosen protocol (Sec-WebSocket-Protocol)
Arguments [][]string // URI parameters, index 0 is full URI, then matching groups
Store struct{} // store (for client implementation-specific data)
}
// Represents a websocket controller callback function
type Func func(*ClientInterface, <-chan message.T, chan<- message.T, <-chan func())
// Represents a websocket controller
type T struct {
URI *parser.Scheme // uri scheme
Fun Func // controller function
}
// Represents a controller set
type Set struct {
Def *T // default controller
Uri []*T // uri controllers
}

View File

@ -1,278 +0,0 @@
package ws
import (
"bufio"
"git.xdrm.io/gws/ws/frame/opcode"
"encoding/binary"
"git.xdrm.io/gws/ws/frame"
"fmt"
"git.xdrm.io/gws/internal/ws/reader"
"net"
)
// ReadFrame reads the frame from a socket
func ReadFrame(c *Client) (*Frame, error) {
f := new(Frame)
/* (1) Read header
---------------------------------------------------------*/
err := f.readHeader(c.conn.br)
if err != nil {
return nil, err
}
/* (2) Read payload
---------------------------------------------------------*/
err = f.readPayload(c.conn.br)
if err != nil {
return nil, err
}
/* (3) Manage OpCode
---------------------------------------------------------*/
switch( f.Header.Opc ){
case opcode.CLOSE:
fmt.Printf("Opcode: CLOSE\n")
return nil, fmt.Errorf("Closed by client\n")
case opcode.TEXT:
fmt.Printf("Opcode: TEXT\n")
case opcode.BINARY:
fmt.Printf("Opcode: BINARY\n")
case opcode.PING:
fmt.Printf("Opcode: PING\n")
err = buildPong().Send(&c.conn)
if err != nil {
return nil, fmt.Errorf("Pong frame: %s\n", err)
}
default:
fmt.Printf("Opcode: CLOSE\n")
buildClose().Send(&c.conn)
return nil, fmt.Errorf("Unknown Opcode %x\n", f.Header.Opc)
}
return f, nil
}
// readHeader reads the frame header
func (f *Frame) readHeader(br *bufio.Reader) error{
var err error
/* (2) Byte 1: FIN and OpCode */
b, err := reader.ReadBytes(br, 1)
if err != nil { return err }
f.Header.Fin = b[0] & 0x80 == 0x80
f.Header.Opc = frame.OpCode( b[0] & 0x0f )
/* (3) Byte 2: Mask and Length[0] */
b, err = reader.ReadBytes(br, 1)
if err != nil { return err }
// if mask, byte array not nil
if b[0] & 0x80 == 0x80 {
f.Header.Msk = []byte{}
}
// payload length
f.Payload.Length = uint64( b[0] & 0x7f )
/* (4) Extended payload */
if f.Payload.Length == 127 {
bx, err := reader.ReadBytes(br, 8)
if err != nil { return err }
f.Payload.Length = binary.BigEndian.Uint64(bx)
} else if f.Payload.Length == 126 {
bx, err := reader.ReadBytes(br, 2)
if err != nil { return err }
f.Payload.Length = uint64( binary.BigEndian.Uint16(bx) )
}
/* (5) Masking key */
if f.Header.Msk != nil {
bx, err := reader.ReadBytes(br, 4)
if err != nil { return err }
f.Header.Msk = make([]byte, 4)
copy(f.Header.Msk, bx)
}
// fmt.Printf(" + Header\n")
// fmt.Printf(" + FIN: %t\n", f.Header.Fin)
// fmt.Printf(" + OPC: %x\n", f.Header.Opc)
// fmt.Printf(" + MASK?: %t\n", f.Header.Msk != nil)
// if f.Header.Msk != nil {
// fmt.Printf(" + MASK: %x\n", f.Header.Msk)
// }
// fmt.Printf(" + LEN: %d\n", f.Payload.Length)
return nil
}
// readPayload reads the frame payload
func (f *Frame) readPayload(br *bufio.Reader) error{
/* (1) Read payload */
b, err := reader.ReadBytes(br, int(f.Payload.Length) )
if err != nil { return fmt.Errorf("Cannot read payload (%s)", err) }
f.Payload.Buffer = make([]byte, 0, f.Payload.Length)
/* (2) Unmask payload */
if len(f.Header.Msk) == 4 {
for i, b := range b{
mi := i % 4 // mask index
f.Payload.Buffer = append(f.Payload.Buffer, b ^ f.Header.Msk[mi])
}
}
// fmt.Printf(" + Payload\n")
// fmt.Printf(" + Read: %d\n", len(f.Payload.Buffer))
return nil
}
// buildFrame builds a frame from a payload buffer
// (as []byte)
func buildFrame(b []byte) *Frame {
return &Frame{
Header: frame.Header{
Fin: true,
Opc: opcode.TEXT,
Msk: nil,
},
Payload: frame.Payload{
Buffer: b,
Length: uint64(len(b)),
},
}
}
// buildClose builds a closing frame
func buildClose() *Frame{
return &Frame{
Header: frame.Header{
Fin: true,
Opc: opcode.CLOSE,
Msk: nil,
},
Payload: frame.Payload{
Buffer: []byte{},
Length: 0,
},
}
}
// buildPong builds a PONG frame (responding to PING frame)
func buildPong() *Frame{
return &Frame{
Header: frame.Header{
Fin: true,
Opc: opcode.PONG,
Msk: nil,
},
Payload: frame.Payload{
Buffer: []byte{},
Length: 0,
},
}
}
// Send sends a frame over a socket
func (f Frame) Send(c *Conn) error {
frameHeader := make([]byte, 0, maxHeaderLength)
/* (1) Byte 0 : FIN + opcode */
frameHeader = append(frameHeader, 0x80 | byte(opcode.TEXT) )
/* (2) Get payload length */
if f.Payload.Length < 126 { // simple
frameHeader = append(frameHeader, byte(f.Payload.Length) )
} else if f.Payload.Length < 0xffff { // extended: 16 bits
frameHeader = append(frameHeader, 126)
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, uint16(f.Payload.Length))
frameHeader = append(frameHeader, buf...)
} else if f.Payload.Length < 0xffffffffffffffff { // extended: 64 bits
frameHeader = append(frameHeader, 127)
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, f.Payload.Length)
frameHeader = append(frameHeader, buf...)
}
/* (3) Add payload */
writeBuffer := net.Buffers{frameHeader, f.Payload.Buffer[:f.Payload.Length]}
fmt.Printf("[send]\n")
_, err := writeBuffer.WriteTo(c.sock)
fmt.Printf("[/send] ")
if err != nil { return err }
// fmt.Printf(" + Header\n")
// fmt.Printf(" + FIN: %t\n", f.Header.Fin)
// fmt.Printf(" + OPC: %x\n", f.Header.Opc)
// fmt.Printf(" + MASK?: %t\n", f.Header.Msk != nil)
// fmt.Printf(" + LEN: %d\n", f.Payload.Length)
return nil
}

183
ws/message/public.go Normal file
View File

@ -0,0 +1,183 @@
package message
import (
"net"
"encoding/binary"
"fmt"
)
// Read reads a message form reader
func Read(socket net.Conn) (*T, error){
m := new(T)
var buffer []byte;
/* (2) Byte 1: FIN and OpCode */
buffer = make([]byte, 1)
_, err := socket.Read(buffer)
if err != nil { return nil, err }
m.Final = bool( buffer[0] & 0x80 == 0x80 )
m.Type = byte( buffer[0] & 0x0f )
fmt.Printf(" + read\n")
fmt.Printf(" - final: %t\n", m.Final)
fmt.Printf(" - Type: %x\n", m.Type)
/* (3) Byte 2: Mask and Length[0] */
buffer = make([]byte, 1)
_, err = socket.Read(buffer)
if err != nil { return nil, err }
// if mask, byte array not nil
var mask []byte = nil;
if buffer[0] & 0x80 == 0x80 {
mask = make([]byte, 0)
}
fmt.Printf(" - has mask ? %t\n", mask != nil)
// payload length
m.Size = uint( buffer[0] & 0x7f )
fmt.Printf(" - size/flag: %d\n", m.Size)
/* (4) Extended payload */
if m.Size == 127 {
buffer := make([]byte, 8)
_, err := socket.Read(buffer)
if err != nil { return nil, err }
m.Size = uint( binary.BigEndian.Uint64(buffer) )
} else if m.Size == 126 {
buffer := make([]byte, 2)
_, err := socket.Read(buffer)
if err != nil { return nil, err }
m.Size = uint( binary.BigEndian.Uint16(buffer) )
}
fmt.Printf(" - final size: %d\n", m.Size)
/* (5) Masking key */
if mask != nil {
buffer := make([]byte, 4)
_, err := socket.Read(buffer)
if err != nil { return nil, err }
mask = make([]byte, 4)
copy(mask, buffer)
}
fmt.Printf(" - mask: %x\n", mask)
/* (6) Read payload */
buffer = make([]byte, int(m.Size))
_, err = socket.Read(buffer)
if err != nil { return nil, fmt.Errorf("Cannot read payload (%s)", err) }
m.Data = make([]byte, 0, m.Size)
fmt.Printf(" - raw data: '%s'\n", buffer)
/* (7) Unmask payload */
if len(mask) == 4 {
for i, b := range buffer{
mi := i % 4 // mask index
m.Data = append(m.Data, b ^ mask[mi])
}
}
fmt.Printf(" - unmasked data: '%s'\n", m.Data)
return m, nil
}
// Send sends a frame over a socket
func (m T) Send(socket net.Conn) error {
var bi uint = 0;
buffer := make([]byte, 0)
fmt.Printf(" + write\n")
fmt.Printf(" - final: %t\n", m.Final)
fmt.Printf(" - type: %x\n", m.Type)
/* (1) Byte 0 : FIN + opcode */
buffer = append(buffer, 0x80 | TEXT )
fmt.Printf(" > byte[%d]: '%x'\n", bi, 0x80 | TEXT); bi++
fmt.Printf(" ? '%x'\n", buffer[bi-1]);
/* (2) Get payload length */
if m.Size < 126 { // simple
buffer = append(buffer, byte(m.Size) )
fmt.Printf(" - size: %d\n", m.Size)
fmt.Printf(" > byte[%d]: '%.2x'\n", bi, m.Size); bi++
fmt.Printf(" ? '%.2x'\n", buffer[bi-1]);
} else if m.Size < 0xffff { // extended: 16 bits
buffer = append(buffer, 126)
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, uint16(m.Size))
buffer = append(buffer, buf...)
fmt.Printf(" - size flag: %d\n", 126)
fmt.Printf(" > byte[%d]: '%.2x'\n", bi, 126); bi++
fmt.Printf(" ? '%x'\n", buffer[bi-1]);
fmt.Printf(" - size: %d\n", m.Size)
fmt.Printf(" > byte[%d.%d]: '%4.x'\n", bi, bi+2-1, m.Size); bi+=2
fmt.Printf(" ? '%.4x'\n", buffer[bi-2:bi]);
} else if m.Size < 0xffffffffffffffff { // extended: 64 bits
buffer = append(buffer, 127)
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(m.Size))
buffer = append(buffer, buf...)
fmt.Printf(" - size flag: %d\n", 127)
fmt.Printf(" > byte[%d]: '%.2x'\n", bi, 127); bi++
fmt.Printf(" ? '%x'\n", buffer[bi-1]);
fmt.Printf(" - size: %d\n", m.Size)
fmt.Printf(" > byte[%d.%d]: '%16.x'\n", bi, bi+8-1, m.Size); bi+=8
fmt.Printf(" ? '%.16x'\n", buffer[bi-8:bi]);
}
/* (3) Write header */
// fmt.Printf(" - header: %x\n", buffer)
// _, err := socket.Write(buffer)
// if err != nil { return err }
/* (4) Add payload */
buffer = append(buffer, m.Data...)
fmt.Printf(" - data: '%s'\n", m.Data)
fmt.Printf(" - data: %x\n", m.Data)
fmt.Printf(" > byte[%d.%d]: '%x'\n", bi, bi+m.Size-1, buffer[bi:]); bi+=m.Size
/* (4) Send message */
nbw, err := socket.Write(buffer)
fmt.Printf(" - buffer: '%x'\n", buffer)
fmt.Printf(" - written: %d / %d\n", nbw, bi)
if err != nil { return err }
return nil
}

20
ws/message/types.go Normal file
View File

@ -0,0 +1,20 @@
package message
// Lists websocket message types
const (
TEXT = 0x1
BINARY = 0x2
CLOSE = 0x8
PING = 0x9
PONG = 0xa
);
// Represents a websocket message
type T struct {
Type byte
Data []byte
Size uint
Final bool
}

View File

@ -1,90 +0,0 @@
package ws
import (
"bufio"
"git.xdrm.io/gws/upgrader"
"net"
)
// dispatch finds a controller for a client
func (s *Server) dispatch(sock net.Conn, u *upgrader.T){
/* (1) Build client
---------------------------------------------------------*/
/* (1) Get request URI */
uri := u.Request.GetURI()
client := &Client{
conn: Conn{
sock: sock,
br: bufio.NewReader(sock),
},
Arguments: [][]string{ []string{ uri } },
Protocol: string(u.Response.GetProtocol()),
recvc: make(chan Frame, maxChannelBufferLength),
sendc: make(chan []byte, maxChannelBufferLength),
closec: make(chan func(), maxChannelBufferLength),
}
/* (2) Search for a controller (from URI)
---------------------------------------------------------*/
var controller *Controller = nil
/* (1) Iterate over URI controllers */
for _, c := range s.controllers {
/* (2) If matches */
if c.uri.Match(uri) {
/* (3) Extract matches */
match := c.uri.GetAllMatch()
/* (4) Add them to the 'arg' attribute */
client.Arguments = append(client.Arguments, match...)
/* (5) Mark that we have a controller */
controller = c
break
}
}
/* (6) If no controller found -> try default */
if controller == nil && s.defaultController != nil {
controller = s.defaultController
}
/* (7) If no controller -> close socket */
if controller == nil {
sock.Close()
return
}
/* (3) Manage client+controller
---------------------------------------------------------*/
/* (1) Add controller to client */
client.Controller = controller
/* (2) Add client to server */
s.clientsMutex.Lock()
s.clients[sock] = client
s.clientsMutex.Unlock()
/* (3) Bind controller */
go controller.fun(client, client.recvc, client.sendc, client.closec)
/* (4) Launch asynchronous frame writer */
go client.asyncWriter(s)
/* (5) Run asynchronous frame reader */
go client.asyncReader(s)
}

View File

@ -1,19 +0,0 @@
package ws
import ()
import "net"
// CreateServer creates a server for a specific HOST and PORT
func CreateServer(host string, port uint16) *Server{
return &Server{
addr: []byte(host),
port: port,
clients: make(map[net.Conn]*Client, 0),
defaultController: nil,
controllers: make([]*Controller, 0),
}
}

View File

@ -1,91 +0,0 @@
package ws
import (
"git.xdrm.io/gws/upgrader"
"net"
"fmt"
"git.xdrm.io/gws/internal/uri/parser"
)
// BindDefault binds a default controller
// it will be called if the URI does not
// match another controller
func (s *Server) BindDefault(c ControllerFunc){
s.defaultController = &Controller{
uri: nil,
fun: c,
}
}
// Bind binds a controller to an URI scheme
func (s *Server) Bind(uri string, c ControllerFunc) error {
/* (1) Build URI parser */
uriScheme, err := parser.Build(uri)
if err != nil { return fmt.Errorf("Cannot build URI: %s", err) }
/* (2) Create controller */
s.controllers = append(s.controllers, &Controller{
uri: uriScheme,
fun: c,
} )
return nil
}
// Launch launches the websocket server
func (s *Server) Launch() error {
var err error
/* (1) Listen socket
---------------------------------------------------------*/
/* (1) Build full url */
url := fmt.Sprintf("%s:%d", s.addr, s.port)
/* (3) Bind listen socket */
s.sock, err = net.Listen("tcp", url)
if err != nil {
return fmt.Errorf("Listen socket: %s", err)
}
defer s.sock.Close()
fmt.Printf("+ listening on %s\n", url)
/* (2) For each incoming connection (client)
---------------------------------------------------------*/
for {
// {1} Wait for connection //
sock, err := s.sock.Accept()
fmt.Printf(" + new client\n")
if err != nil {
fmt.Printf(" - error: %s\n", err)
continue
}
// {2} Upgrade request //
upgrader, err := upgrader.Upgrade(sock)
if err != nil {
fmt.Printf(" - upgrade error: %s\n", err)
sock.Close()
continue
}
if upgrader.Response.GetStatusCode() != 101 {
fmt.Printf(" - upgrade bad request (status code %d)\n", upgrader.Response.GetStatusCode())
sock.Close()
continue
}
// {3} Dispatch to controllers //
go s.dispatch(sock, upgrader)
}
}

154
ws/server/public.go Normal file
View File

@ -0,0 +1,154 @@
package server
import (
"git.xdrm.io/gws/ws/message"
"git.xdrm.io/gws/ws/controller"
"git.xdrm.io/gws/ws/client"
"net"
"fmt"
"git.xdrm.io/gws/internal/uri/parser"
)
// CreateServer creates a server for a specific HOST and PORT
func Create(host string, port uint16) *T{
return &T{
addr: []byte(host),
port: port,
clients: make(map[net.Conn]*client.T, 0),
ctl: controller.Set{
Def: nil,
Uri: make([]*controller.T, 0),
},
ch: ServerChannelSet{
register: make(chan *client.T, 1),
unregister: make(chan *client.T, 1),
broadcast: make(chan message.T, 1),
},
}
}
// BindDefault binds a default controller
// it will be called if the URI does not
// match another controller
func (s *T) BindDefault(f controller.Func){
s.ctl.Def = &controller.T{
URI: nil,
Fun: f,
}
}
// Bind binds a controller to an URI scheme
func (s *T) Bind(uri string, f controller.Func) error {
/* (1) Build URI parser */
uriScheme, err := parser.Build(uri)
if err != nil { return fmt.Errorf("Cannot build URI: %s", err) }
/* (2) Create controller */
s.ctl.Uri = append(s.ctl.Uri, &controller.T{
URI: uriScheme,
Fun: f,
} )
return nil
}
// Launch launches the websocket server
func (s *T) Launch() error {
var err error
/* (1) Listen socket
---------------------------------------------------------*/
/* (1) Build full url */
url := fmt.Sprintf("%s:%d", s.addr, s.port)
/* (2) Bind socket to listen */
s.sock, err = net.Listen("tcp", url)
if err != nil {
return fmt.Errorf("Listen socket: %s", err)
}
defer s.sock.Close()
fmt.Printf("+ listening on %s\n", url)
/* (3) Launch scheduler */
go s.scheduler()
/* (2) For each incoming connection (client)
---------------------------------------------------------*/
for {
/* (1) Wait for client */
sock, err := s.sock.Accept()
if err != nil {
break
}
/* (2) Try to create client */
cli, err := client.Create(sock, s.ctl, s.ch.unregister)
if err != nil {
fmt.Printf(" - %s\n", err)
continue
}
/* (3) Register client */
s.ch.register <- cli
}
return nil
}
// Scheduler schedules clients registration and broadcast
func (s *T) scheduler(){
for {
select {
/* (1) New client */
case client := <- s.ch.register:
fmt.Printf(" + client (sock: %p)\n", client.IO.Sock)
s.clients[client.IO.Sock] = client
/* (2) New client */
case client := <- s.ch.unregister:
fmt.Printf(" - client (sock: %p)\n", client.IO.Sock)
delete(s.clients, client.IO.Sock)
client.IO.Sock.Close()
/* (3) Broadcast */
case message := <- s.ch.broadcast:
fmt.Printf(" + broadcast\n")
for _, c := range s.clients{
c.Ch.Send <- message
}
}
}
fmt.Printf("+ server stopped\n")
}

30
ws/server/types.go Normal file
View File

@ -0,0 +1,30 @@
package server
import (
"git.xdrm.io/gws/ws/message"
"git.xdrm.io/gws/ws/controller"
"git.xdrm.io/gws/ws/client"
"net"
)
// Represents all channels that need a server
type ServerChannelSet struct{
register chan *client.T
unregister chan *client.T
broadcast chan message.T
}
// Represents a websocket server
type T struct {
sock net.Listener // listen socket
addr []byte // server listening ip/host
port uint16 // server listening port
clients map[net.Conn]*client.T // clients
ctl controller.Set // controllers
ch ServerChannelSet
}

5
ws/spec/const.go Normal file
View File

@ -0,0 +1,5 @@
package spec
const maxBufferLength = 4096
const maxHeaderLength = 2 + 8 + 4
const maxChannelBufferLength = 1