update + opcode management (SOME THINGS TO FIX...)

This commit is contained in:
xdrm-brackets 2018-04-30 15:35:30 +02:00
parent 1df592cb81
commit e89166a3bf
6 changed files with 204 additions and 48 deletions

View File

@ -15,7 +15,7 @@ func main(){
serv := ws.CreateServer("0.0.0.0", 4444) serv := ws.CreateServer("0.0.0.0", 4444)
/* (2) Bind default controller */ /* (2) Bind default controller */
serv.BindDefault(func(client *ws.Client, receiver <-chan *ws.Frame, sender chan<- []byte, closer <-chan func()){ serv.BindDefault(func(client *ws.Client, receiver <-chan ws.Frame, sender chan<- []byte, closer <-chan func()){
fmt.Printf("[default] connected\n") fmt.Printf("[default] connected\n")
for { for {
@ -23,9 +23,7 @@ func main(){
select { select {
case receivedFrame := <- receiver: case receivedFrame := <- receiver:
fmt.Printf("[default] received '%s'\n", receivedFrame.Payload.Buffer) fmt.Printf("[default] received '%s'\n", receivedFrame.Payload.Buffer)
if receivedFrame.Payload.Length > 0 { sender <- receivedFrame.Payload.Buffer
sender <- receivedFrame.Payload.Buffer[1:]
}
case closeFunc := <- closer: case closeFunc := <- closer:
fmt.Printf("[default] client with protocol '%s' exited\n", client.Protocol) fmt.Printf("[default] client with protocol '%s' exited\n", client.Protocol)
closeFunc() closeFunc()
@ -38,7 +36,7 @@ func main(){
}) })
/* (3) Bind to URI */ /* (3) Bind to URI */
err := serv.Bind("/channel/./room/./", func(client *ws.Client, receiver <-chan *ws.Frame, sender chan<- []byte, closer <-chan func()){ err := serv.Bind("/channel/./room/./", func(client *ws.Client, receiver <-chan ws.Frame, sender chan<- []byte, closer <-chan func()){
fmt.Printf("[uri] connected\n") fmt.Printf("[uri] connected\n")
for { for {
@ -52,6 +50,7 @@ func main(){
return return
} }
} }
fmt.Printf("[uri] unexpectedly closed\n") fmt.Printf("[uri] unexpectedly closed\n")
}) })

View File

@ -18,8 +18,6 @@ func (c *Client) asyncReader(s *Server) {
// Get buffer // Get buffer
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
// Get buffer frame
frame := new(Frame)
for { for {
@ -27,48 +25,32 @@ func (c *Client) asyncReader(s *Server) {
var startTime int64 = time.Now().UnixNano() var startTime int64 = time.Now().UnixNano()
// Try to read frame header // Try to read frame header
err := frame.ReadHeader(buf, c.sock) frame, err := ReadFrame(buf, c.sock)
if err != nil { if err != nil {
fmt.Printf(" - Header reading error: %s\n", err) fmt.Printf("%s\n", err)
break break
} }
var elapsed = float32(time.Now().UnixNano()-startTime)/1e3
// Try to read frame payload fmt.Printf("+ elapsed: %.3f us\n", elapsed)
err = frame.ReadPayload(buf, c.sock)
if err != nil {
fmt.Printf(" - Payload reading error: %s\n", err)
break
}
fmt.Printf("+ elapsed: %.3f us\n", float32(time.Now().UnixNano()-startTime)/1e3)
// Trigger data channel // Trigger data channel
c.recvc <- frame c.recvc <- *frame
} }
fmt.Printf(" - remove client\n") fmt.Printf(" - remove client\n")
// close sending channel
close(c.sendc)
// return closing callback // return closing callback
c.closec <- func(){ c.closec <- func(){
// Remove client from server // Remove client from server
delete(s.clients, c.sock) delete(s.clients, c.sock)
// close channels
close(c.recvc)
// Close socket // Close socket
c.sock.Close() c.sock.Close()
// close this closing channel
close(c.closec)
} }
} }
@ -85,13 +67,15 @@ func (c *Client) asyncWriter(s *Server){
fmt.Printf("Writing '%s'\n", payload) fmt.Printf("Writing '%s'\n", payload)
// Build Frame // Build Frame
f := BuildFromPayload(payload) f := buildFrame(payload)
// Send over socket // Send over socket
c.sock.Write( f.Bytes() ) senderr := f.Send(c.sock)
if senderr != nil {
fmt.Printf("Writing error: %s\n", senderr)
}
} }
} }

View File

@ -1,6 +1,7 @@
package ws package ws
import ( import (
"git.xdrm.io/gws/ws/frame/opcode"
"encoding/binary" "encoding/binary"
"git.xdrm.io/gws/ws/frame" "git.xdrm.io/gws/ws/frame"
"fmt" "fmt"
@ -10,8 +11,66 @@ import (
) )
// ReadHeader reads the frame header
func (f *Frame) ReadHeader(buf *bytes.Buffer, s net.Conn) error{
// ReadFrame reads the frame from a socket
func ReadFrame(b *bytes.Buffer, s net.Conn) (*Frame, error) {
f := new(Frame)
/* (1) Read header
---------------------------------------------------------*/
err := f.readHeader(b, s)
if err != nil {
return nil, fmt.Errorf("Header read: %s\n", err)
}
/* (2) Read payload
---------------------------------------------------------*/
err = f.readPayload(b, s)
if err != nil {
return nil, fmt.Errorf("Payload read: %s\n", err)
}
/* (3) Manage OpCode
---------------------------------------------------------*/
switch( f.Header.Opc ){
case opcode.CLOSE:
fmt.Printf("Opcode: CLOSE\n")
return nil, fmt.Errorf("Closed by client\n")
case opcode.TEXT:
fmt.Printf("Opcode: TEXT\n")
case opcode.BINARY:
fmt.Printf("Opcode: BINARY\n")
case opcode.PING:
fmt.Printf("Opcode: PING\n")
err = buildPong().Send(s)
if err != nil {
return nil, fmt.Errorf("Pong frame: %s\n", err)
}
default:
fmt.Printf("Opcode: CLOSE\n")
buildClose().Send(s)
return nil, fmt.Errorf("Unknown Opcode %x\n", f.Header.Opc)
}
return f, nil
}
// readHeader reads the frame header
func (f *Frame) readHeader(buf *bytes.Buffer, s net.Conn) error{
var err error
/* (2) Byte 1: FIN and OpCode */ /* (2) Byte 1: FIN and OpCode */
b, err := reader.ReadBytes(s, 1) b, err := reader.ReadBytes(s, 1)
@ -62,6 +121,7 @@ func (f *Frame) ReadHeader(buf *bytes.Buffer, s net.Conn) error{
// fmt.Printf(" + Header\n") // fmt.Printf(" + Header\n")
// fmt.Printf(" + FIN: %t\n", f.Header.Fin) // fmt.Printf(" + FIN: %t\n", f.Header.Fin)
// fmt.Printf(" + OPC: %x\n", f.Header.Opc)
// fmt.Printf(" + MASK?: %t\n", f.Header.Msk != nil) // fmt.Printf(" + MASK?: %t\n", f.Header.Msk != nil)
// if f.Header.Msk != nil { // if f.Header.Msk != nil {
// fmt.Printf(" + MASK: %x\n", f.Header.Msk) // fmt.Printf(" + MASK: %x\n", f.Header.Msk)
@ -73,11 +133,11 @@ func (f *Frame) ReadHeader(buf *bytes.Buffer, s net.Conn) error{
} }
// ReadPayload reads the frame payload // readPayload reads the frame payload
func (f *Frame) ReadPayload(b *bytes.Buffer, s net.Conn) error{ func (f *Frame) readPayload(buf *bytes.Buffer, s net.Conn) error{
/* (1) Read payload */ /* (1) Read payload */
buf, err := reader.ReadBytes(s, uint(f.Payload.Length) ) b, err := reader.ReadBytes(s, uint(f.Payload.Length) )
if err != nil { return fmt.Errorf("Cannot read payload (%s)", err) } if err != nil { return fmt.Errorf("Cannot read payload (%s)", err) }
f.Payload.Buffer = make([]byte, 0, f.Payload.Length) f.Payload.Buffer = make([]byte, 0, f.Payload.Length)
@ -86,7 +146,7 @@ func (f *Frame) ReadPayload(b *bytes.Buffer, s net.Conn) error{
if len(f.Header.Msk) == 4 { if len(f.Header.Msk) == 4 {
for i, b := range buf{ for i, b := range b{
mi := i % 4 // mask index mi := i % 4 // mask index
@ -107,15 +167,128 @@ func (f *Frame) ReadPayload(b *bytes.Buffer, s net.Conn) error{
// BuildFromPayload builds a frame from only a payload // buildFrame builds a frame from a payload buffer
// (as []byte) // (as []byte)
func BuildFromPayload(payload []byte) *Frame { func buildFrame(b []byte) *Frame {
return nil
return &Frame{
Header: frame.Header{
Fin: true,
Opc: opcode.TEXT,
Msk: nil,
},
Payload: frame.Payload{
Buffer: b,
Length: uint64(len(b)),
},
}
} }
// Bytes returns the BYTE representation of the frame // buildClose builds a closing frame
// for sending over the network (into socket) func buildClose() *Frame{
func (f *Frame) Bytes() []byte { return &Frame{
Header: frame.Header{
Fin: true,
Opc: opcode.CLOSE,
Msk: nil,
},
Payload: frame.Payload{
Buffer: []byte{},
Length: 0,
},
}
}
// buildPong builds a PONG frame (responding to PING frame)
func buildPong() *Frame{
return &Frame{
Header: frame.Header{
Fin: true,
Opc: opcode.PONG,
Msk: nil,
},
Payload: frame.Payload{
Buffer: []byte{},
Length: 0,
},
}
}
// Send sends a frame over a socket
func (f Frame) Send(s net.Conn) error {
written := 0
/* (1) Byte 0 : FIN + opcode */
buf := []byte{ 0x80 + byte(opcode.TEXT) }
w, err := s.Write(buf)
if err != nil { return err }
written += w
/* (2) Get payload length */
if f.Payload.Length < 126 { // simple
buf := []byte{ byte(f.Payload.Length) }
w, err = s.Write(buf)
if err != nil { return err }
written += w
} else if f.Payload.Length < 0xffff { // extended: 16 bits
w, err = s.Write( []byte{126} )
if err != nil { return err }
written += w
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, uint16(f.Payload.Length))
w, err = s.Write(buf)
if err != nil { return err }
written += w
} else if f.Payload.Length < 0xffffffffffffffff { // extended: 64 bits
w, err = s.Write( []byte{127} )
if err != nil { return err }
written += w
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, f.Payload.Length)
w, err = s.Write(buf)
if err != nil { return err }
written += w
}
/* (3) Payload */
w, err = s.Write(f.Payload.Buffer)
if err != nil { return err }
fmt.Printf("[2] written %d bytes\n", written)
fmt.Printf(" + Header\n")
fmt.Printf(" + FIN: %t\n", f.Header.Fin)
fmt.Printf(" + OPC: %x\n", f.Header.Opc)
fmt.Printf(" + MASK?: %t\n", f.Header.Msk != nil)
if f.Header.Msk != nil {
fmt.Printf(" + MASK: %x\n", f.Header.Msk)
}
fmt.Printf(" + LEN: %d\n", f.Payload.Length)
fmt.Printf("Total written: %d bytes (%d + %d)\n", written+w, written, w)
return nil return nil
} }

View File

@ -15,7 +15,7 @@ type OpCode byte
type Header struct { type Header struct {
Fin bool Fin bool
Opc OpCode Opc OpCode
Msk []byte // len() = 4 if set, else empty Msk []byte // len() = 4 if set, else nil
} }
// Represents a frame message // Represents a frame message

View File

@ -18,7 +18,7 @@ func (s *Server) dispatch(sock net.Conn, u *upgrader.T){
sock: sock, sock: sock,
Arguments: [][]string{ []string{ uri } }, Arguments: [][]string{ []string{ uri } },
Protocol: string(u.Response.GetProtocol()), Protocol: string(u.Response.GetProtocol()),
recvc: make(chan *Frame, maxChannelBufferLength), recvc: make(chan Frame, maxChannelBufferLength),
sendc: make(chan []byte, maxChannelBufferLength), sendc: make(chan []byte, maxChannelBufferLength),
closec: make(chan func(), maxChannelBufferLength), closec: make(chan func(), maxChannelBufferLength),
} }

View File

@ -10,7 +10,7 @@ const maxBufferLength = 4096
const maxChannelBufferLength = 1 const maxChannelBufferLength = 1
// Represents a websocket controller callback function // Represents a websocket controller callback function
type ControllerFunc func(*Client, <-chan *Frame, chan<- []byte, <-chan func()) type ControllerFunc func(*Client, <-chan Frame, chan<- []byte, <-chan func())
// Represents a websocket controller // Represents a websocket controller
type Controller struct { type Controller struct {
@ -28,7 +28,7 @@ type Client struct {
Controller *Controller // assigned controller Controller *Controller // assigned controller
Store struct{} // store (for client implementation-specific data) Store struct{} // store (for client implementation-specific data)
recvc chan *Frame // Receive channel recvc chan Frame // Receive channel
sendc chan []byte // sending channel sendc chan []byte // sending channel
closec chan func() // closing channel closec chan func() // closing channel
} }