WebSocket - Tornado - Ping & Pong

目录

一、Tornado 内置 ping 设置

1-0 ping 帧

1-1 ping 帧 发送

1-2 Tornado setting 相关项

1-3 client & server 端处理 ping 帧

1-3-1 server 端

1-3-2 client 端 - 基于 ws-client 库

二、业务逻辑 ping & pong

2-0 业务逻辑分析

2-1 gzip 压缩

2-1-1 内置 gzip 相关 settings

2-1-2 手动 gzip 压缩 - 及 binary 注意点

2-2 Ping & Pong

2-2-1 server 端

2-2-2 client 端 - ws-client库


一、Tornado 内置 ping 设置

1-0 ping 帧

# bytes to int
int.from_bytes(data[0], 'big')
​
# int to bytes
int.to_bytes('转义位数','单个int位数左大右小')
int.to_bytes(13,'big')

1-1 ping 帧 发送

# server 接收ping值的返回(client 自动返回)
class EchoWebSocket(WebSocketHandler):
    # Invoked when the response to a ping frame is received.
    def on_pong(self, data):
        print('pong:', data)
        # 默认第一个发送的ping帧为空
        if not data:
            # 主动发送13位时间戳
            byte_ping = round(time.time()*1000).to_bytes(13,'big')
            self.ping(byte_ping)

1-2 Tornado setting 相关项

官方 setting 配置文档

if __name__ == "__main__":
    application = Application([
        (r"/", EchoWebSocket),
        (r"/oplog", OplogHandler),
​
    ],
        # 间隔5秒发送一次ping帧,第一次发送为触发的5s后
        websocket_ping_interval=5,
        # 每次 ping 操作重置时间超时时间,若超时则断开连接,默认3次 ping 或 30s 断开
        websocket_ping_timeout=10
    )
    application.listen(8080)
    ioloop.IOLoop.current().start()

1-3 client & server 端处理 ping 帧

1-3-1 server 端

class EchoWebSocket(WebSocketHandler):
​
    def open(self):
        print("WebSocket opened")
        users.add(self)
​
    # Invoked when the a ping frame is received.
    def on_ping(self, data):
        print('ping:', data)
​
    # Invoked when the response to a ping frame is received.
    def on_pong(self, data):
        print('pong:', data)
        if not data:
            byte_ping = round(time.time()*1000).to_bytes(13,'big')
            self.ping(byte_ping)
​
​
    # 处理client发送的数据
    def on_message(self, message):
        print(message)
        # 将数据发送给当前连接的client
        # self.write_message(u"server said: " + message)
        # 将数据发送给所有连接的client
        for user in users:
            user.write_message(u"server said: " + message)
​
    def on_close(self):
        print("WebSocket closed")
        users.discard(self)
​
    # 允许所有跨域通讯,解决403问题
    def check_origin(self, origin):
        return True

1-3-2 client 端 - 基于 ws-client 库

import websocket
​
​
def on_message(ws, message):
    print(message)
​
​
def on_error(ws, error):
    print(error)
​
​
def on_close(ws):
    print("### closed ###")
​
​
def on_open(ws):
    print('opened')
​
# 接受server接收client的ping帧后的response 
def on_pong(ws, *data):
    print('pong', data)
​
# 接收到server发送的ping帧触发
def on_ping(ws, *data):
    print('ping', data)
    if data[0]:
        print(int.from_bytes(data[0], 'big'))
​
​
if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://localhost:8080/",
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close,
                                on_ping=on_ping,
                                on_pong=on_pong, )
    ws.on_open = on_open
    ws.run_forever()
​

 

二、业务逻辑 ping & pong

2-0 业务逻辑分析

  • 数据进行 gzip 压缩,需要 client 端的解压
  • server 定期发送13位时间戳 ping 值字典 - {"ping":1492420473027}
  • client 一旦接受 ping 值字典,必须返回 pong 值字典,且值必须相同 - {"pong":1492420473027}
  • 若 server 端在2次 send ping 后未接收到一次 pong response,则 server 主动断开连接

2-1 gzip 压缩

2-1-1 内置 gzip 相关 settings

如果请求的报文头部里面有要求 'Accept-Encoding': 'gzip', 那么tornado会对回复的报文进行压缩操作, 请求方收到回复时需要自己手动对这个进行解压操作。(未验证)

2-1-2 手动 gzip 压缩 - 及 binary 注意点

def beat_ping():
    print('beatping')
    
    # 向所有用户推送ping dict,若没有 pong 返回则断开连接
    ping_dic = {'ping': round(time.time() * 1000)}
    ping_json = json.dumps(ping_dic)
    ping_byte = bytes(ping_json, encoding='utf-8')
    ping_gzip = gzip.compress(ping_byte)
    
    for user in users:
        # !!注意!!推送消息时,必须 binary 为 True,用于发送二进制数据
        user.write_message(ping_gzip,binary=True)
        # 若发送 dict 字典会自动转换为 json 格式
        # user.write_message(ping_dic)
        if user_times[user] == 2:
            user.close()
            user_times.pop(user)
        else:
            user_times[user] += 1

2-2 Ping & Pong

2-2-1 server 端

import gzip
import json
​
from tornado.web import Application
from tornado.websocket import WebSocketHandler
import time
from tornado import ioloop
​
users = set()
# users_times = {'user':{'count':0,'ping':None}}
user_times = {}
​
​
class EchoWebSocket(WebSocketHandler):
​
    def open(self):
        print("WebSocket opened")
        users.add(self)
        user_times[self] = dict(
            count=0,
            ping=None,
        )
​
    # 处理client发送的数据
    def on_message(self, message):
        msg = json.loads(message)
        user_dic = user_times.get(self)
        print(user_dic)
        # 若 pong 存在,则表示该请求用于测试连接
        if 'pong' in msg:
            # 满足条件,即重置对应user_times
            if user_dic and user_dic['ping'] == msg['pong']:
                user_times[self]['count'] = 0
                # 将数据发送给连接的client - 视业务需要
                # data = {'data':'message from server'}
                # data = json.dumps(data).encode('utf-8')
                # self.write_message(gzip.compress(data), binary=True)
​
    def on_close(self):
        print("WebSocket closed")
        users.discard(self)
        if user_times.get(self):
            # 若非自然断开连接,则删除字典内信息
            user_times.pop(self)
​
    # 允许所有跨域通讯,解决403问题
    def check_origin(self, origin):
        return True
​
​
def beatping():
    print('beatping')
    # 向所有用户推送ping dict,若没有 pong 返回则断开连接
    ping_dic = {'ping': round(time.time() * 1000)}
    ping_json = json.dumps(ping_dic)
    ping_byte = bytes(ping_json, encoding='utf-8')
    ping_gzip = gzip.compress(ping_byte)
    for user in users:
        user.write_message(ping_gzip, binary=True)
        user_times[user]['ping'] = ping_dic['ping']
        if user_times[user]['count'] == 2:
            user.close()
            user_times.pop(user)
        else:
            user_times[user]['count'] += 1
​
​
if __name__ == "__main__":
    application = Application([
        (r"/", EchoWebSocket),
    ],
        # compress_response=True,
    )
    application.listen(8888)
    ioloop.PeriodicCallback(beatping, 5000).start()  # start scheduler 每隔5s执行一次
    ioloop.IOLoop.current().start()
​

2-2-2 client 端 - ws-client库

import gzip
import json
​
import websocket
​
​
def on_message(ws, message):
    msg = gzip.decompress(message).decode("utf-8")
    msg = json.loads(msg)
    print(msg)
    # 若相应数据内带有 ping 则表示为连接测试,仅返回对应 pong 值
    if 'ping' in msg:
        pong = {'pong': msg['ping']}
        ws.send(json.dumps(pong))
​
def on_error(ws, error):
    print('!!! error !!!',error)
​
​
def on_close(ws):
    print("### closed ###")
​
​
def on_open(ws):
    print('opened')
​
​
if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://localhost:8888/",
                                on_message=on_message,
                                on_open=on_open,
                                on_error=on_error,
                                on_close=on_close)
    # ws.on_open = on_open
    ws.run_forever()
​


 


版权声明:本文为qq_33961117原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。