Score
0
Watch 5 Star 35 Fork 16

ryanduan / wsPoolGoMIT

Create your Gitee Account
Explore and code with more than 6 million developers,Free private repositories !:)
Sign up
Clone or download
hub.go 4.62 KB
Copy Edit Web IDE Raw Blame History
ryanduan authored 2020-11-24 16:39 . 修复bug
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package wsPool
import (
"errors"
"gitee.com/rczweb/wsPool/util/queue"
"github.com/gogf/gf/container/gmap"
"github.com/gogf/gf/os/gtimer"
"log"
"time"
)
var (
//最大连接池缓冲处理连接对像管道长度
Max_client_channel_len = 10240
//最大全局广播缓冲处理管道长度
Max_broadcastQueue_len = 4096
//最大频道广播缓冲处理管道长度
Max_chanBroadcastQueue_len = 4096
//最大接收消息缓冲处理管道长度
Max_recvCh_len = 10240
//最大发送消息缓冲处理管道长度
Max_sendCh_len = 10240
)
// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type hub struct {
// Registered clients.
clients *gmap.StrAnyMap //map[string]*Client// //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
oldClients *gmap.StrAnyMap //缓存断开的连接消息队列
// Inbound messages from the clients.
//可以用于广播所有连接对象
broadcastQueue chan *SendMsg
//广播指定频道的管道
chanBroadcastQueue chan *SendMsg
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan string
}
//重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息,60秒后原ws未连接消息失效)
type oldMsg struct {
list *queue.PriorityQueue
Expiration time.Time //过期时间
}
func newHub() *hub {
return &hub{
register: make(chan *Client, Max_client_channel_len),
unregister: make(chan string, Max_client_channel_len),
clients: gmap.NewStrAnyMap(true), //make(map[string]*Client),//
oldClients: gmap.NewStrAnyMap(true), //make(map[string]*Client),//
broadcastQueue: make(chan *SendMsg, Max_broadcastQueue_len),
chanBroadcastQueue: make(chan *SendMsg, Max_chanBroadcastQueue_len),
}
}
func (h *hub) run() {
loop:
for {
select {
case id, ok := <-h.unregister:
if !ok {
break loop
}
c := h.clients.Get(id)
if c != nil {
h.clients.Remove(id)
}
log.Println("取消注册ws连接对象:", id, "连接总数:", h.clients.Size())
case client, ok := <-h.register:
if !ok {
break loop
}
log.Println("注册ws连接对象:", client.Id, "连接总数:", h.clients.Size())
h.clients.Set(client.Id, client)
case broadcastMsg, ok := <-h.broadcastQueue:
if !ok {
break loop
}
h.clients.Iterator(func(id string, v interface{}) bool {
if v != nil {
client := v.(*Client)
broadcastMsg.ToClientId = id
client.send(broadcastMsg)
}
return true
})
case chanBroadcastMsg, ok := <-h.chanBroadcastQueue:
if !ok {
break loop
}
//广播指定频道的消息处理
h.clients.Iterator(func(id string, v interface{}) bool {
if v != nil {
client := v.(*Client)
for _, ch := range chanBroadcastMsg.Channel {
if searchStrArray(client.channel, ch) {
chanBroadcastMsg.ToClientId = id
client.send(chanBroadcastMsg)
}
}
}
return true
})
}
}
}
func (h *hub) ticker() {
//定时清理清理缓存的旧的连接对像
gtimer.AddSingleton(30*time.Second, func() {
if h.oldClients.Size() > 0 {
h.oldClients.Iterator(func(k string, v interface{}) bool {
if v != nil {
client := v.(*Client)
if time.Now().Add(-180 * time.Second).After(client.CloseTime) {
//3分钟后清理组存中的旧连接对像
h.clearOldClient(client)
/// h.clearOldClient <- client
}
}
return true
})
}
})
}
func (h *hub) AddClient(client *Client) error {
timeout := time.NewTimer(time.Second * 3)
defer timeout.Stop()
select {
case h.register <- client:
return nil
case <-timeout.C:
return errors.New("AddClient register消息管道blocked,写入消息超时")
}
}
func (h *hub) clearOldClient(client *Client) {
close(client.recvCh)
close(client.sendCh)
h.oldClients.Remove(client.Id)
log.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", h.oldClients.Size())
}
func (h *hub) RemoveClient(client *Client) error {
//把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像
/*client.CloseTime = time.Now()
h.oldClients.Set(client.Id, client)*/
timeout := time.NewTimer(time.Second * 1)
defer timeout.Stop()
select {
case h.unregister <- client.Id:
return nil
case <-timeout.C:
return errors.New(" RemoveClient unregister消息管道blocked,写入消息超时")
}
}

Comment ( 0 )

Sign in for post a comment