non-blocking reads (+ added peek-er to check connection every 10us)

This commit is contained in:
xdrm-brackets 2018-05-03 21:10:02 +02:00
parent 52673ba7e5
commit ab351de382
3 changed files with 59 additions and 55 deletions

View File

@ -15,43 +15,31 @@ func main(){
serv := ws.CreateServer("0.0.0.0", 4444) serv := ws.CreateServer("0.0.0.0", 4444)
/* (2) Bind default controller */ /* (2) Bind default controller */
serv.BindDefault(func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message) chan struct{}{ serv.BindDefault(func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message){
var closer chan struct{} = make(chan struct{}, 1) for msg := range receiver {
go func(){ // if receive message -> send it back
for msg := range receiver { sender <- &msg
// sender <- nil
// if receive message -> send it back }
sender <- &msg
}
}()
return closer
}) })
/* (3) Bind to URI */ /* (3) Bind to URI */
err := serv.Bind("/channel/./room/./", func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message) chan struct{}{ err := serv.Bind("/channel/./room/./", func(cli *ws.Client, receiver <-chan ws.Message, sender chan<- *ws.Message, bc chan<- *ws.Message){
var closer chan struct{} = make(chan struct{}, 1) fmt.Printf("[uri] connected\n")
go func(){ for msg := range receiver{
fmt.Printf("[uri] connected\n")
for msg := range receiver{ fmt.Printf("[uri] received '%s'\n", msg.Data)
sender <- &msg
fmt.Printf("[uri] received '%s'\n", msg.Data) }
sender <- &msg
} fmt.Printf("[uri] unexpectedly closed\n")
fmt.Printf("[uri] unexpectedly closed\n")
}()
return closer
}) })
if err != nil { panic(err) } if err != nil { panic(err) }

View File

@ -1,6 +1,7 @@
package ws package ws
import ( import (
"time"
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"git.xdrm.io/gws/internal/http/upgrade/request" "git.xdrm.io/gws/internal/http/upgrade/request"
@ -12,14 +13,13 @@ import (
type clientIO struct { type clientIO struct {
sock net.Conn sock net.Conn
reader *bufio.Reader reader *bufio.Reader
kill chan<- *client kill chan<- *client // unregisters client
} }
// Represents all channels that need a client // Represents all channels that need a client
type clientChannelSet struct{ type clientChannelSet struct{
receive chan Message receive chan Message
send chan *Message send chan *Message
closer chan struct{}
} }
// Represents a websocket client // Represents a websocket client
@ -27,7 +27,7 @@ type client struct {
io clientIO io clientIO
iface *Client iface *Client
ch clientChannelSet ch clientChannelSet
status MessageError // close status ; 0 = nothing status MessageError // close status ; 0 = nothing ; else -> must close
} }
@ -78,7 +78,6 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli
ch: clientChannelSet{ ch: clientChannelSet{
receive: make(chan Message, 1), receive: make(chan Message, 1),
send: make(chan *Message, 1), send: make(chan *Message, 1),
closer: make(chan struct{}, 1),
}, },
} }
@ -102,24 +101,17 @@ func buildClient(s net.Conn, ctl ControllerSet, serverCh serverChannelSet) (*cli
/* (4) Launch client routines /* (4) Launch client routines
---------------------------------------------------------*/ ---------------------------------------------------------*/
/* (1) Launch client controller */ /* (1) Launch client controller */
instance.ch.closer = controller.Fun( go controller.Fun(
instance.iface, // pass the client instance.iface, // pass the client
instance.ch.receive, // the receiver instance.ch.receive, // the receiver
instance.ch.send, // the sender instance.ch.send, // the sender
serverCh.broadcast, // broadcast sender serverCh.broadcast, // broadcast sender
) )
/* (2) Launch close handler */ /* (2) Launch message reader */
go func(){
for range instance.ch.closer {}
instance.close(NORMAL)
}()
/* (3) Launch message reader */
go instance.reader() go instance.reader()
/* (4) Launc writer */ /* (3) Launc writer */
go instance.writer() go instance.writer()
return instance, nil return instance, nil
@ -132,7 +124,30 @@ func (c *client) reader(){
for { for {
/* (1) Parse message */ /* (1) If error code -> close */
if c.status != NONE {
c.close(c.status)
break
}
/* (2) Wait for available data */
c.io.sock.SetReadDeadline(time.Now().Add(time.Microsecond*10))
_, err := c.io.reader.Peek(1)
// timeout -> continune checking
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
continue
}
// another error -> stop reading
if err != nil {
break
}
c.io.sock.SetReadDeadline(time.Time{}) // remove timeout
/* (3) Parse message */
msg, err := readMessage(c.io.reader) msg, err := readMessage(c.io.reader)
if err != nil { if err != nil {
// fmt.Printf(" [reader] %s\n", err) // fmt.Printf(" [reader] %s\n", err)
@ -140,20 +155,20 @@ func (c *client) reader(){
return return
} }
/* (2) CLOSE */ /* (4) CLOSE */
if msg.Type == CLOSE { if msg.Type == CLOSE {
c.close(NORMAL) c.close(NORMAL)
return return
} }
/* (3) PING size error */ /* (5) PING size error */
if msg.Type == PING && msg.Size > 125 { if msg.Type == PING && msg.Size > 125 {
c.close(PROTOCOL_ERR) c.close(PROTOCOL_ERR)
return return
} }
/* (4) Send PONG */ /* (6) Send PONG */
if msg.Type == PING { if msg.Type == PING {
msg.Final = true msg.Final = true
msg.Type = PONG msg.Type = PONG
@ -161,18 +176,18 @@ func (c *client) reader(){
continue continue
} }
/* (5) Unknown opcode */ /* (7) Unknown opcode */
if msg.Type != TEXT && msg.Type != BINARY { if msg.Type != TEXT && msg.Type != BINARY {
c.close(PROTOCOL_ERR) c.close(PROTOCOL_ERR)
return return
} }
/* (5) Dispatch to receiver */ /* (7) Dispatch to receiver */
c.ch.receive <- *msg c.ch.receive <- *msg
} }
/* (6) close channel */ /* (8) close channel */
c.close(NORMAL) c.close(NORMAL)
} }
@ -185,17 +200,23 @@ func (c *client) writer(){
for msg := range c.ch.send { for msg := range c.ch.send {
/* (1) If empty message -> close properly */
if msg == nil {
break
}
/* (2) Send message */
err := msg.Send(c.io.sock) err := msg.Send(c.io.sock)
/* (3) Fail on error */
if err != nil { if err != nil {
fmt.Printf(" [writer] %s\n", err) fmt.Printf(" [writer] %s\n", err)
break break
} }
} }
/* (3) proper close */ /* (4) proper close */
c.close(NORMAL) c.close(NORMAL)
} }
@ -226,14 +247,9 @@ func (c *client) close(status MessageError){
} }
/* (2) Close socket */ /* (2) Close socket */
time.Sleep(time.Second * 3)
c.io.sock.Close() c.io.sock.Close()
/* (3) Dereference data */
c.ch.receive = nil
c.ch.send = nil
c.ch.closer = nil
c.iface = nil
c.io.reader = nil
/* (4) Unregister */ /* (4) Unregister */
c.io.kill <- c c.io.kill <- c

View File

@ -12,7 +12,7 @@ type Client struct {
} }
// Represents a websocket controller callback function // Represents a websocket controller callback function
type ControllerFunc func(*Client, <-chan Message, chan<- *Message, chan<- *Message) chan struct{} type ControllerFunc func(*Client, <-chan Message, chan<- *Message, chan<- *Message)
// Represents a websocket controller // Represents a websocket controller
type Controller struct { type Controller struct {