架构图
语言与组件
go语言
使用到的核心包
- github.com/graarh/golang-socketio
- github.com/dgrijalva/jwt-go
- github.com/joho/godotenv
- github.com/patrickmn/go-cache
- github.com/nsqio/go-nsq
- github.com/go-redis/redis
- gopkg.in/olivere/elastic.v7
核心包的作用
- github.com/graarh/golang-socketio
socket包,用来做网络连接,socketio 在客户端和服务端之间建立的双向通信数据交换技术,底层使用EngineIO。SocketIO的的客户端使用Engine.IO-Client,服务端使用Engine.IO实现。 - github.com/dgrijalva/jwt-go
来解析用户token,得到用户信息 - github.com/joho/godotenv
配置信息存储到.env,该包用来获取配置信息,例如redis连接配置,es连接配置 - github.com/patrickmn/go-cache
go 缓存包,存储到内存中,用来存储用户 socket的连接信息 - github.com/nsqio/go-nsq
消息队列nsq包。使用nsq消息队列的原因是为了异步解耦,例如发送消息,消息发送成功后存储消息到es中。 - github.com/go-redis/redis
redis连接包。 - gopkg.in/olivere/elastic.v7
Elasticsearch 连接包
其他组件
- redis
用来存储用户连接的socket 服务端信息,以及用户信息。 - nsq 消息队列
用来做业务逻辑的解耦,分布式socket实现的核心组件就是消息队列。 - Elasticsearch
存储消息数据,在用户端与管理后台展示消息,以及消息的搜索 - nginx
使用nginx来做反向代理,对客户端来说socket的连接地址统一到nginx,再由nginx来做分发。
实现逻辑
- 当socket服务端启动时,同步会启动nsq消息队列消费,该消费是用来监听消息下发。
- 客户端发起链接时候会先到Nginx,然后在由Nginx 负载到 socket服务端 ,同时把消息体也一并转发到socket服务上,客户端建立链接时会携带token信息,首先对token进行解密(此处token是用jwt实现)从而得到用户的身份ID,其次获取到当前socket的服务器IP与socket的端口,存储到redis中,用来做下发消息的topic。
- 客户端发送消息时,首先解析消息,获取到接收方用户身份ID,从redis获取到当前用户所在socket服务端的topic,把消息推送到该用户的所在的服务器topic上。
- 消息队列消费该topic,获取到消息后解析消息,获取到接收方用户ID,在go-cache缓存中找到该用户的socket连接信息,从而进行消息下发。
- 消息下发成功后,发送一个消息队列,此消息队列是用来存储消息数据到Elasticsearch,在另一个go协程中,消费此topic 解析消息数据存储数据到Elasticsearch中。
Nginx配置
upstream websocket {
server 192.168.10.56:9500;
server 192.168.10.55:9500;
server 192.168.10.57:9500;
server 192.168.10.58:9500;
}
server {
listen 80; # managed by Certbot
server_name im.tickeup.com;
#root /home/wwwroot;
index index.html index.htm;
access_log /var/log/nginx/im.tickeup.com.access.log;
location / {
proxy_pass http://websocket;
proxy_read_timeout 300s;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
结束语
至此分布式socket代码实现完成,思路就是这样,通过nginx、redis、nsq消息队列我们就可以实现websocket的分布式了,而且当在线用户数量多的时候,我们可以随时进行扩容,只需要在nginx处增加新的socket IP地址就可以了。
实现逻辑思路其实不难, 但是在具体实现上业务的逻辑比较多,此项目是我在2020年的时候在公司做的社交聊天的socket服务,具体在开发过程中还是要注意很多事项,在此不一一陈述,只讲核心的实现逻辑。
版权声明:本文为zhanglu0302原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。