websocket frame parsing works + controller dispatch but have to be improved
This commit is contained in:
parent
b1d0a71dd5
commit
dcfdb3411a
|
@ -16,13 +16,19 @@ func main(){
|
||||||
|
|
||||||
/* (2) Bind default controller */
|
/* (2) Bind default controller */
|
||||||
err := serv.BindDefault(func(c *ws.Client, f *ws.Frame){
|
err := serv.BindDefault(func(c *ws.Client, f *ws.Frame){
|
||||||
fmt.Printf("Default controller")
|
fmt.Printf("Default controller\n")
|
||||||
|
if f != nil {
|
||||||
|
fmt.Printf("Received: '%s'\n", f.Buf)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
if err != nil { panic(err) }
|
if err != nil { panic(err) }
|
||||||
|
|
||||||
/* (3) Bind to URI */
|
/* (3) Bind to URI */
|
||||||
err = serv.Bind("/channel/./room/./", func(c *ws.Client, f *ws.Frame){
|
err = serv.Bind("/channel/./room/./", func(c *ws.Client, f *ws.Frame){
|
||||||
fmt.Printf("URI controller")
|
fmt.Printf("URI controller\n")
|
||||||
|
if f != nil {
|
||||||
|
fmt.Printf("Received: '%s'\n", f.Buf)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
if err != nil { panic(err) }
|
if err != nil { panic(err) }
|
||||||
|
|
||||||
|
|
|
@ -1,52 +1,37 @@
|
||||||
package reader
|
package reader
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"bufio"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Chunk reader
|
// Maximum chunk size
|
||||||
type chunkReader struct {
|
const MaxChunkSize = 4096
|
||||||
reader *bufio.Reader // the reader
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// New creates a new reader
|
|
||||||
func NewReader(r io.Reader) (reader *chunkReader) {
|
|
||||||
|
|
||||||
br, ok := r.(*bufio.Reader)
|
|
||||||
if !ok {
|
|
||||||
br = bufio.NewReader(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &chunkReader{reader: br}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Read reads a chunk of n bytes
|
// Read reads a chunk of n bytes
|
||||||
// err is io.EOF when done
|
// err is io.EOF when done
|
||||||
func (r *chunkReader) Read(n uint) ([]byte, error){
|
func ReadBytes(r io.Reader, n uint) ([]byte, error){
|
||||||
|
|
||||||
/* (1) Read line */
|
res := make([]byte, 0, MaxChunkSize)
|
||||||
chunk := make([]byte, n)
|
|
||||||
read, err := r.reader.Read(chunk)
|
totalRead := uint(0)
|
||||||
|
|
||||||
|
// socket -> tmp( buffer)
|
||||||
|
for totalRead < n {
|
||||||
|
|
||||||
|
tmp := make([]byte, 1)
|
||||||
|
|
||||||
|
read, err := r.Read(tmp)
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
if read == 0 { return nil, fmt.Errorf("Cannot read") }
|
||||||
|
|
||||||
|
totalRead += uint(1)
|
||||||
|
|
||||||
|
res = append(res, tmp[0])
|
||||||
|
|
||||||
/* (2) manage errors */
|
|
||||||
if err == io.EOF {
|
|
||||||
err = io.EOF
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
return res, nil
|
||||||
return chunk, err
|
|
||||||
}
|
|
||||||
|
|
||||||
/* (1) Manage ending chunk */
|
|
||||||
if uint(read) == 0 || len(chunk) == 0 {
|
|
||||||
return chunk, io.EOF
|
|
||||||
}
|
|
||||||
|
|
||||||
return chunk, nil
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,7 +1,7 @@
|
||||||
package opcode
|
package opcode
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"git.xdrm.io/gws/internal/ws/frame"
|
"git.xdrm.io/gws/ws/frame"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Represents OpCode Values
|
// Represents OpCode Values
|
||||||
|
|
|
@ -1,13 +1,19 @@
|
||||||
package frame
|
package frame
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Represenst Frame errors
|
||||||
|
var ErrMalFormed = fmt.Errorf("Malformed Frame")
|
||||||
|
|
||||||
|
|
||||||
// Represents an OpCode
|
// Represents an OpCode
|
||||||
type OpCode byte
|
type OpCode byte
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Represents a frame metadata
|
// Represents a frame metadata
|
||||||
type Header struct {
|
type Header struct {
|
||||||
fin bool
|
Fin bool
|
||||||
opc OpCode
|
Opc OpCode
|
||||||
msk []byte // len: 4 if set, else empty
|
Msk []byte // len: 4 if set, else empty
|
||||||
}
|
}
|
150
ws/private.go
150
ws/private.go
|
@ -1,11 +1,16 @@
|
||||||
package ws
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"time"
|
||||||
|
"encoding/binary"
|
||||||
|
"git.xdrm.io/gws/ws/frame"
|
||||||
|
"fmt"
|
||||||
|
"git.xdrm.io/gws/internal/ws/reader"
|
||||||
"git.xdrm.io/gws/upgrader"
|
"git.xdrm.io/gws/upgrader"
|
||||||
"net"
|
"net"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
// dispatch finds a controller for a client
|
// dispatch finds a controller for a client
|
||||||
func (s *Server) dispatch(sock net.Conn, u *upgrader.T){
|
func (s *Server) dispatch(sock net.Conn, u *upgrader.T){
|
||||||
|
|
||||||
|
@ -70,9 +75,150 @@ func (s *Server) dispatch(sock net.Conn, u *upgrader.T){
|
||||||
/* (2) Add controller to client */
|
/* (2) Add controller to client */
|
||||||
client.ctl = controller
|
client.ctl = controller
|
||||||
|
|
||||||
/* (3) Launch controller routine */
|
/* (3) Launch controller routine (for new connection) */
|
||||||
go controller.fun(client, nil)
|
go controller.fun(client, nil)
|
||||||
|
|
||||||
|
/* (4) Launch continuous frame reader */
|
||||||
|
go client.awaitFrame()
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// awaitFrame reads a websocket frame chunk by chunk
|
||||||
|
// for a given client
|
||||||
|
func (c *Client) awaitFrame() {
|
||||||
|
|
||||||
|
// Get buffer
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
|
||||||
|
// Get buffer frame
|
||||||
|
frame := new(Frame)
|
||||||
|
|
||||||
|
|
||||||
|
for {
|
||||||
|
|
||||||
|
var startTime int64 = time.Now().UnixNano()
|
||||||
|
|
||||||
|
// Try to read frame header
|
||||||
|
err := frame.ReadHeader(buf, c.soc)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf(" - Header reading error: %s\n", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Try to read frame payload
|
||||||
|
err = frame.ReadPayload(buf, c.soc)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf(" - Payload reading error: %s\n", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("+ elapsed: %.3f us\n", float32(time.Now().UnixNano()-startTime)/1e3)
|
||||||
|
|
||||||
|
// Call controller
|
||||||
|
c.ctl.fun(c, frame)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf(" - error\n")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// ReadHeader reads the frame header
|
||||||
|
func (f *Frame) ReadHeader(buf *bytes.Buffer, s net.Conn) error{
|
||||||
|
|
||||||
|
/* (2) Byte 1: FIN and OpCode */
|
||||||
|
b, err := reader.ReadBytes(s, 1)
|
||||||
|
if err != nil { return fmt.Errorf("Cannot read byte Fin nor OpCode (%s)", err) }
|
||||||
|
|
||||||
|
f.H.Fin = b[0] & 0x80 == 0x80
|
||||||
|
f.H.Opc = frame.OpCode( b[0] & 0x0f )
|
||||||
|
|
||||||
|
/* (3) Byte 2: Mask and Length[0] */
|
||||||
|
b, err = reader.ReadBytes(s, 1)
|
||||||
|
if err != nil { return fmt.Errorf("Cannot read byte if has Mask nor Length (%s)", err) }
|
||||||
|
|
||||||
|
// if mask, byte array not nil
|
||||||
|
if b[0] & 0x80 == 0x80 {
|
||||||
|
f.H.Msk = []byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// payload length
|
||||||
|
f.Len = uint64( b[0] & 0x7f )
|
||||||
|
|
||||||
|
/* (4) Extended payload */
|
||||||
|
if f.Len == 127 {
|
||||||
|
|
||||||
|
bx, err := reader.ReadBytes(s, 8)
|
||||||
|
if err != nil { return fmt.Errorf("Cannot read payload extended length of 64 bytes (%s)", err) }
|
||||||
|
|
||||||
|
f.Len = binary.BigEndian.Uint64(bx)
|
||||||
|
|
||||||
|
} else if f.Len == 126 {
|
||||||
|
|
||||||
|
bx, err := reader.ReadBytes(s, 2)
|
||||||
|
if err != nil { return fmt.Errorf("Cannot read payload extended length of 16 bytes (%s)", err) }
|
||||||
|
|
||||||
|
f.Len = uint64( binary.BigEndian.Uint16(bx) )
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/* (5) Masking key */
|
||||||
|
if f.H.Msk != nil {
|
||||||
|
|
||||||
|
bx, err := reader.ReadBytes(s, 4)
|
||||||
|
if err != nil { return fmt.Errorf("Cannot read mask or 32 bytes (%s)", err) }
|
||||||
|
|
||||||
|
f.H.Msk = make([]byte, 4)
|
||||||
|
copy(f.H.Msk, bx)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmt.Printf(" + Header\n")
|
||||||
|
// fmt.Printf(" + FIN: %t\n", f.H.Fin)
|
||||||
|
// fmt.Printf(" + MASK?: %t\n", f.H.Msk != nil)
|
||||||
|
// if f.H.Msk != nil {
|
||||||
|
// fmt.Printf(" + MASK: %x\n", f.H.Msk)
|
||||||
|
// }
|
||||||
|
// fmt.Printf(" + LEN: %d\n", f.Len)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// ReadPayload reads the frame payload
|
||||||
|
func (f *Frame) ReadPayload(b *bytes.Buffer, s net.Conn) error{
|
||||||
|
|
||||||
|
/* (1) Read payload */
|
||||||
|
buf, err := reader.ReadBytes(s, uint(f.Len) )
|
||||||
|
if err != nil { return fmt.Errorf("Cannot read payload (%s)", err) }
|
||||||
|
|
||||||
|
f.Buf = make([]byte, 0, f.Len)
|
||||||
|
|
||||||
|
/* (2) Unmask payload */
|
||||||
|
if len(f.H.Msk) == 4 {
|
||||||
|
|
||||||
|
|
||||||
|
for i, b := range buf{
|
||||||
|
|
||||||
|
mi := i % 4 // mask index
|
||||||
|
|
||||||
|
f.Buf = append(f.Buf, b ^ f.H.Msk[mi])
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmt.Printf(" + Payload\n")
|
||||||
|
// fmt.Printf(" + Read: %d\n", len(f.Buf))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,8 @@ import (
|
||||||
"git.xdrm.io/gws/ws/frame"
|
"git.xdrm.io/gws/ws/frame"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const maxBufferLength = 4096
|
||||||
|
|
||||||
// Represents a websocket controller callback function
|
// Represents a websocket controller callback function
|
||||||
type ControllerFunc func(*Client, *Frame)
|
type ControllerFunc func(*Client, *Frame)
|
||||||
|
|
||||||
|
@ -41,7 +43,7 @@ type Server struct {
|
||||||
|
|
||||||
// Represents a websocket frame
|
// Represents a websocket frame
|
||||||
type Frame struct {
|
type Frame struct {
|
||||||
met frame.Header
|
H frame.Header
|
||||||
buf []byte
|
Buf []byte
|
||||||
len uint64
|
Len uint64
|
||||||
}
|
}
|
Loading…
Reference in New Issue