beego实现websocket

package controllers
import (
	"container/list"
	"encoding/json"
	"github.com/astaxie/beego"
	"github.com/gorilla/websocket"
	"net/http"
	"time"
)

type WebSocket struct {
	beego.Controller
}

func init() {
	go chatroom()
}

var (
	subscribe = make(chan Subscriber, 10)
	unsubscribe = make(chan string, 10)
	publish = make(chan Message, 10)
	subscribers = list.New()
)

type Message struct {
	Type      int `json:"type"`
	User      string `json:"user"`
	Timestamp int `json:"timestamp"`
	Content   string `json:"content"`
}

type Subscriber struct {
	Name string
	Conn *websocket.Conn
}

func SendMessage(types int, user string, msg string) Message {
	return Message{types, user, int(time.Now().Unix()), msg}
}

func chatroom() {
	for {
		select {
		case sub := <-subscribe:
			if !isUserExist(subscribers, sub.Name) {
				subscribers.PushBack(sub)
				publish <- SendMessage(0, sub.Name, "连接成功")
			}

		case message := <-publish:
			broadcastWebSocket(message)
			
		case unsub := <-unsubscribe:
			for sub := subscribers.Front(); sub != nil; sub = sub.Next() {
				if sub.Value.(Subscriber).Name == unsub {
					subscribers.Remove(sub)
					ws := sub.Value.(Subscriber).Conn
					if ws != nil {
						ws.Close()
					}
					publish <- SendMessage(1, unsub, "关闭连接")
					break
				}
			}
		}
	}
}

func (this *WebSocket) Join() {
	username := this.GetString("username")
	ws, err := websocket.Upgrade(this.Ctx.ResponseWriter, this.Ctx.Request, nil, 1024, 1024)
	if _, ok := err.(websocket.HandshakeError); ok {
		http.Error(this.Ctx.ResponseWriter, "Not a websocket handshake", 400)
		return
	} else if err != nil {
		http.Error(this.Ctx.ResponseWriter, "Cannot setup WebSocket connection", 500)
		return
	}
	subscribe <- Subscriber{Name: username, Conn: ws}

	//发生异常时,通知关闭该用户
	defer func() {
		unsubscribe <- username
	}()

	for {
		//开启监听已连接的用户
		_, p, err := ws.ReadMessage()
		if err != nil {
			return
		}
		publish <- SendMessage(2, username, string(p))
	}
}

func broadcastWebSocket(msg Message) {
	data, err := json.Marshal(msg)
	if err != nil {
		beego.Error("Fail to marshal event:", err)
		return
	}

	for sub := subscribers.Front(); sub != nil; sub = sub.Next() {
		ws := sub.Value.(Subscriber).Conn
		if ws != nil {
			if ws.WriteMessage(websocket.TextMessage, data) != nil {
				unsubscribe <- sub.Value.(Subscriber).Name //通知关闭该用户
			}
		}
	}
}

func isUserExist(subscribers *list.List, user string) bool {
	for sub := subscribers.Front(); sub != nil; sub = sub.Next() {
		if sub.Value.(Subscriber).Name == user {
			return true
		}
	}
	return false
}

前端ws连接路由到join方法

在这里插入图片描述


版权声明:本文为weixin_46105038原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。