Score
0
Watch 5 Star 35 Fork 16

ryanduan / wsPoolGoMIT

Create your Gitee Account
Explore and code with more than 6 million developers,Free private repositories !:)
Sign up
Clone or download
client.go 9.28 KB
Copy Edit Web IDE Raw Blame History
ryanduan authored 2020-12-21 14:48 . 支持并发处理消息
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package wsPool
import (
"errors"
"fmt"
"gitee.com/rczweb/wsPool/util/grpool"
"github.com/gorilla/websocket"
"net/http"
"sync"
"time"
)
const (
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10 //1 * time.Second//(pongWait * 9) / 10
//var pingPeriod =27* time.Second //1 * time.Second//(pongWait * 9) / 10
// Time allowed to write a message to the peer.
writeWait = 30 * time.Second
// Maximum message size allowed from peer.
maxMessageSize = 1024 * 1024 * 20
)
var upgrader = websocket.Upgrader{
//ReadBufferSize: 1024 * 1024,
//WriteBufferSize: 1024 * 1024,
// 默认允许WebSocket请求跨域,权限控制可以由业务层自己负责,灵活度更高
CheckOrigin: func(r *http.Request) bool {
return true
},
}
/*连接参数结构体*/
type Config struct {
Id string //标识连接的名称
Type string //连接类型或path
Channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
Goroutine int //每个连接开启的go程数里 默认为10
}
type RuntimeInfo struct {
Id string //标识连接的名称
Type string //连接类型或path
Ip string
Channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
OpenTime time.Time //连接打开时间
LastReceiveTime time.Time //最后一次接收到数据的时间
LastSendTime time.Time //最后一次发送数据的时间
}
// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *hub
// The websocket connection.
conn *websocket.Conn
types string //连接类型或path
openTime time.Time //连接打开时间
CloseTime time.Time //连接断开的时间
lastReceiveTime time.Time //最后一次接收到数据的时间
lastSendTime time.Time //最后一次发送数据的时间
Id string //标识连接的名称
mux *sync.Mutex
IsClose chan bool //连接的状态。true为关闭
channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
// Buffered channel of outbound messages.
grpool *grpool.Pool
sendCh chan *SendMsg //发送消息的缓冲管首
recvCh chan *SendMsg //接收消息的缓冲管首
recvPing chan int //收到ping的存储管道,方便回复pong处理
sendPing chan int //发送ping的存储管道,方便收到pong处理下次发ping
//ticker *time.Ticker //定时发送ping的定时器
onError func(error)
onOpen func() //连接成功的回调
onPing func() //收到ping
onPong func() //收到pong
onMessage func(*SendMsg)
onClose func()
pingPeriodTicker *time.Timer
}
// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
defer func() {
fmt.Println("连接己关闭或者断开,正在清理对像")
c.conn.Close()
//触发连接关闭的事件回调
c.onClose() //先执行完关闭回调,再请空所有的回调
c.OnError(nil)
c.OnOpen(nil)
c.OnMessage(nil)
c.OnClose(nil)
c.OnPong(nil)
c.OnPing(nil)
close(c.recvPing)
close(c.sendPing)
c.grpool.Close()
c.hub.RemoveClient(c)
dump()
}()
Loop:
for {
select {
case <-c.IsClose:
return
default:
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseProtocolError,
websocket.CloseUnsupportedData,
websocket.CloseNoStatusReceived,
websocket.CloseAbnormalClosure,
websocket.CloseInvalidFramePayloadData,
websocket.ClosePolicyViolation,
websocket.CloseMessageTooBig,
websocket.CloseMandatoryExtension,
websocket.CloseInternalServerErr,
websocket.CloseServiceRestart,
websocket.CloseTryAgainLater,
websocket.CloseTLSHandshake) {
c.onError(errors.New("连接ID:" + c.Id + "ReadMessage Is Unexpected Close Error:" + err.Error()))
//c.closeChan<-true;
goto Loop1
}
c.onError(errors.New("连接ID:" + c.Id + "ReadMessage other error:" + err.Error()))
//c.closeChan<-true;
goto Loop1
}
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.pingPeriodTicker.Reset(pingPeriod)
c.lastReceiveTime = time.Now()
msg, err := unMarshal(message)
if err != nil {
c.onError(errors.New("接收数据ProtoBuf解析失败!!连接ID:" + c.Id + "原因:" + err.Error()))
break Loop
}
c.readMessage(msg)
}
}
Loop1:
c.close()
}
// 读取消息写管道缓冲区
func (c *Client) readMessage(msg *SendMsg) {
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case <-c.IsClose:
c.onError(errors.New("readMessage连接" + c.Id + ",连接己在关闭,不进行消息接收"))
return
case c.recvCh <- msg:
return
case <-timeout.C:
c.onError(errors.New("recvCh 消息管道blocked,写入消息超时,管道长度:" + string(len(c.recvCh))))
return
}
}
// 单个连接接收消息
func (c *Client) recvMessage() {
defer func() {
dump()
}()
loop:
for {
select {
case <-c.IsClose:
return
case data, ok := <-c.recvCh:
if !ok {
break loop
}
/* //ToClientId与Channel不能同时存在!!!注意!!!!
if message.ToClientId!="" {
Send(message)
}
//ToClientId与Channel不能同时存在!!!注意!!!!
if message.Channel!="" {
Broadcast(message)
}*/
//收到消息触发回调
//c.onMessage(data)
c.grpool.Add(func() {
c.onMessage(data)
})
}
}
}
// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
defer func() {
dump()
}()
Loop:
for {
select {
case <-c.IsClose:
return
case d, ok := <-c.sendCh:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
//说明管道己经关闭
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
//glog.Error("连接ID:"+c.Id,"wsServer发送消息失败,一般是连接channel已经被关闭:(此处服务端会断开连接,使客户端能够感知进行重连)")
goto Loop1
}
message, err := marshal(d)
if err != nil {
c.onError(errors.New("接收数据ProtoBuf编码失败!!连接ID:" + c.Id + "原因:" + err.Error()))
break Loop
}
w, err := c.conn.NextWriter(websocket.BinaryMessage)
if err != nil {
goto Loop1
}
c.lastSendTime = time.Now()
_, err = w.Write(message)
if err != nil {
c.onError(errors.New("连接ID:" + c.Id + "写消息进写入IO错误!连接中断" + err.Error()))
goto Loop1
}
/*// Add queued chat messages to the current websocket message.
n := len(c.sendCh)
if n > 0 {
for i := 0; i < n; i++ {
_, err = w.Write(<-c.sendCh)
if err != nil {
c.onError(errors.New("连接ID:" + c.Id + "写上次连接未发送的消息消息进写入IO错误!连接中断" + err.Error()))
return
}
}
}
*/
//关闭写入io对象
if err := w.Close(); err != nil {
c.onError(errors.New("连接ID:" + c.Id + "关闭写入IO对象出错,连接中断" + err.Error()))
goto Loop1
}
case p, ok := <-c.sendPing: //定时发送ping
if !ok {
break Loop
}
if p == 1 {
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.onError(errors.New("连接ID:" + c.Id + "关闭写入IO对象出错,连接中断" + err.Error()))
goto Loop1
}
}
case p, ok := <-c.recvPing:
if !ok {
break Loop
}
if p == 1 {
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
c.onError(errors.New("回复客户端PongMessage出现异常:" + err.Error()))
goto Loop1
}
}
}
}
Loop1:
c.close()
}
func (c *Client) send(msg *SendMsg) {
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case c.sendCh <- msg:
return
case <-timeout.C:
c.onError(errors.New("sendCh消息管道blocked,写入消息超时,管道长度:" + string(len(c.sendCh))))
return
}
//c.sendCh<-msg
}
/*定时发送ping*/
func (c *Client) Tickers() {
for {
select {
case <-c.IsClose:
return
case <-c.pingPeriodTicker.C:
c.sendPing <- 1
}
}
}
func (c *Client) close() {
c.mux.Lock()
defer c.mux.Unlock()
select {
case <-c.IsClose:
return
default:
close(c.IsClose)
}
}

Comment ( 0 )

Sign in for post a comment