使用go开发分布式websocket

架构图

在这里插入图片描述

语言与组件

go语言

使用到的核心包

  • github.com/graarh/golang-socketio
  • github.com/dgrijalva/jwt-go
  • github.com/joho/godotenv
  • github.com/patrickmn/go-cache
  • github.com/nsqio/go-nsq
  • github.com/go-redis/redis
  • gopkg.in/olivere/elastic.v7

核心包的作用

  1. github.com/graarh/golang-socketio
    socket包,用来做网络连接,socketio 在客户端和服务端之间建立的双向通信数据交换技术,底层使用EngineIO。SocketIO的的客户端使用Engine.IO-Client,服务端使用Engine.IO实现。
  2. github.com/dgrijalva/jwt-go
    来解析用户token,得到用户信息
  3. github.com/joho/godotenv
    配置信息存储到.env,该包用来获取配置信息,例如redis连接配置,es连接配置
  4. github.com/patrickmn/go-cache
    go 缓存包,存储到内存中,用来存储用户 socket的连接信息
  5. github.com/nsqio/go-nsq
    消息队列nsq包。使用nsq消息队列的原因是为了异步解耦,例如发送消息,消息发送成功后存储消息到es中。
  6. github.com/go-redis/redis
    redis连接包。
  7. gopkg.in/olivere/elastic.v7
    Elasticsearch 连接包

其他组件

  1. redis
    用来存储用户连接的socket 服务端信息,以及用户信息。
  2. nsq 消息队列
    用来做业务逻辑的解耦,分布式socket实现的核心组件就是消息队列。
  3. Elasticsearch
    存储消息数据,在用户端与管理后台展示消息,以及消息的搜索
  4. nginx
    使用nginx来做反向代理,对客户端来说socket的连接地址统一到nginx,再由nginx来做分发。

实现逻辑

  • 当socket服务端启动时,同步会启动nsq消息队列消费,该消费是用来监听消息下发。
  • 客户端发起链接时候会先到Nginx,然后在由Nginx 负载到 socket服务端 ,同时把消息体也一并转发到socket服务上,客户端建立链接时会携带token信息,首先对token进行解密(此处token是用jwt实现)从而得到用户的身份ID,其次获取到当前socket的服务器IP与socket的端口,存储到redis中,用来做下发消息的topic。
  • 客户端发送消息时,首先解析消息,获取到接收方用户身份ID,从redis获取到当前用户所在socket服务端的topic,把消息推送到该用户的所在的服务器topic上。
  • 消息队列消费该topic,获取到消息后解析消息,获取到接收方用户ID,在go-cache缓存中找到该用户的socket连接信息,从而进行消息下发。
  • 消息下发成功后,发送一个消息队列,此消息队列是用来存储消息数据到Elasticsearch,在另一个go协程中,消费此topic 解析消息数据存储数据到Elasticsearch中。

Nginx配置

upstream websocket {
    server 192.168.10.56:9500;
    server 192.168.10.55:9500;
    server 192.168.10.57:9500;
    server 192.168.10.58:9500;
}

server {
    listen 80; # managed by Certbot
    server_name im.tickeup.com;

    #root /home/wwwroot;
    index index.html index.htm;

    access_log /var/log/nginx/im.tickeup.com.access.log;

    location / {
        proxy_pass http://websocket;
        proxy_read_timeout 300s;

        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }

}

结束语

至此分布式socket代码实现完成,思路就是这样,通过nginx、redis、nsq消息队列我们就可以实现websocket的分布式了,而且当在线用户数量多的时候,我们可以随时进行扩容,只需要在nginx处增加新的socket IP地址就可以了。

实现逻辑思路其实不难, 但是在具体实现上业务的逻辑比较多,此项目是我在2020年的时候在公司做的社交聊天的socket服务,具体在开发过程中还是要注意很多事项,在此不一一陈述,只讲核心的实现逻辑。


版权声明:本文为zhanglu0302原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。