目录
1-3 client & server 端处理 ping 帧
1-3-2 client 端 - 基于 ws-client 库
2-1-2 手动 gzip 压缩 - 及 binary 注意点
一、Tornado 内置 ping 设置
1-0 ping 帧
# bytes to int int.from_bytes(data[0], 'big') # int to bytes int.to_bytes('转义位数','单个int位数左大右小') int.to_bytes(13,'big')1-1 ping 帧 发送
# server 接收ping值的返回(client 自动返回) class EchoWebSocket(WebSocketHandler): # Invoked when the response to a ping frame is received. def on_pong(self, data): print('pong:', data) # 默认第一个发送的ping帧为空 if not data: # 主动发送13位时间戳 byte_ping = round(time.time()*1000).to_bytes(13,'big') self.ping(byte_ping)1-2 Tornado setting 相关项
if __name__ == "__main__": application = Application([ (r"/", EchoWebSocket), (r"/oplog", OplogHandler), ], # 间隔5秒发送一次ping帧,第一次发送为触发的5s后 websocket_ping_interval=5, # 每次 ping 操作重置时间超时时间,若超时则断开连接,默认3次 ping 或 30s 断开 websocket_ping_timeout=10 ) application.listen(8080) ioloop.IOLoop.current().start()1-3 client & server 端处理 ping 帧
1-3-1 server 端
class EchoWebSocket(WebSocketHandler): def open(self): print("WebSocket opened") users.add(self) # Invoked when the a ping frame is received. def on_ping(self, data): print('ping:', data) # Invoked when the response to a ping frame is received. def on_pong(self, data): print('pong:', data) if not data: byte_ping = round(time.time()*1000).to_bytes(13,'big') self.ping(byte_ping) # 处理client发送的数据 def on_message(self, message): print(message) # 将数据发送给当前连接的client # self.write_message(u"server said: " + message) # 将数据发送给所有连接的client for user in users: user.write_message(u"server said: " + message) def on_close(self): print("WebSocket closed") users.discard(self) # 允许所有跨域通讯,解决403问题 def check_origin(self, origin): return True1-3-2 client 端 - 基于 ws-client 库
import websocket def on_message(ws, message): print(message) def on_error(ws, error): print(error) def on_close(ws): print("### closed ###") def on_open(ws): print('opened') # 接受server接收client的ping帧后的response def on_pong(ws, *data): print('pong', data) # 接收到server发送的ping帧触发 def on_ping(ws, *data): print('ping', data) if data[0]: print(int.from_bytes(data[0], 'big')) if __name__ == "__main__": websocket.enableTrace(True) ws = websocket.WebSocketApp("ws://localhost:8080/", on_message=on_message, on_error=on_error, on_close=on_close, on_ping=on_ping, on_pong=on_pong, ) ws.on_open = on_open ws.run_forever()
二、业务逻辑 ping & pong
2-0 业务逻辑分析
- 数据进行 gzip 压缩,需要 client 端的解压
- server 定期发送13位时间戳 ping 值字典 - {"ping":1492420473027}
- client 一旦接受 ping 值字典,必须返回 pong 值字典,且值必须相同 - {"pong":1492420473027}
- 若 server 端在2次 send ping 后未接收到一次 pong response,则 server 主动断开连接
2-1 gzip 压缩
2-1-1 内置 gzip 相关 settings
如果请求的报文头部里面有要求 'Accept-Encoding': 'gzip', 那么tornado会对回复的报文进行压缩操作, 请求方收到回复时需要自己手动对这个进行解压操作。(未验证)
2-1-2 手动 gzip 压缩 - 及 binary 注意点
def beat_ping(): print('beatping') # 向所有用户推送ping dict,若没有 pong 返回则断开连接 ping_dic = {'ping': round(time.time() * 1000)} ping_json = json.dumps(ping_dic) ping_byte = bytes(ping_json, encoding='utf-8') ping_gzip = gzip.compress(ping_byte) for user in users: # !!注意!!推送消息时,必须 binary 为 True,用于发送二进制数据 user.write_message(ping_gzip,binary=True) # 若发送 dict 字典会自动转换为 json 格式 # user.write_message(ping_dic) if user_times[user] == 2: user.close() user_times.pop(user) else: user_times[user] += 1
2-2 Ping & Pong
2-2-1 server 端
import gzip import json from tornado.web import Application from tornado.websocket import WebSocketHandler import time from tornado import ioloop users = set() # users_times = {'user':{'count':0,'ping':None}} user_times = {} class EchoWebSocket(WebSocketHandler): def open(self): print("WebSocket opened") users.add(self) user_times[self] = dict( count=0, ping=None, ) # 处理client发送的数据 def on_message(self, message): msg = json.loads(message) user_dic = user_times.get(self) print(user_dic) # 若 pong 存在,则表示该请求用于测试连接 if 'pong' in msg: # 满足条件,即重置对应user_times if user_dic and user_dic['ping'] == msg['pong']: user_times[self]['count'] = 0 # 将数据发送给连接的client - 视业务需要 # data = {'data':'message from server'} # data = json.dumps(data).encode('utf-8') # self.write_message(gzip.compress(data), binary=True) def on_close(self): print("WebSocket closed") users.discard(self) if user_times.get(self): # 若非自然断开连接,则删除字典内信息 user_times.pop(self) # 允许所有跨域通讯,解决403问题 def check_origin(self, origin): return True def beatping(): print('beatping') # 向所有用户推送ping dict,若没有 pong 返回则断开连接 ping_dic = {'ping': round(time.time() * 1000)} ping_json = json.dumps(ping_dic) ping_byte = bytes(ping_json, encoding='utf-8') ping_gzip = gzip.compress(ping_byte) for user in users: user.write_message(ping_gzip, binary=True) user_times[user]['ping'] = ping_dic['ping'] if user_times[user]['count'] == 2: user.close() user_times.pop(user) else: user_times[user]['count'] += 1 if __name__ == "__main__": application = Application([ (r"/", EchoWebSocket), ], # compress_response=True, ) application.listen(8888) ioloop.PeriodicCallback(beatping, 5000).start() # start scheduler 每隔5s执行一次 ioloop.IOLoop.current().start() 2-2-2 client 端 - ws-client库
import gzip import json import websocket def on_message(ws, message): msg = gzip.decompress(message).decode("utf-8") msg = json.loads(msg) print(msg) # 若相应数据内带有 ping 则表示为连接测试,仅返回对应 pong 值 if 'ping' in msg: pong = {'pong': msg['ping']} ws.send(json.dumps(pong)) def on_error(ws, error): print('!!! error !!!',error) def on_close(ws): print("### closed ###") def on_open(ws): print('opened') if __name__ == "__main__": websocket.enableTrace(True) ws = websocket.WebSocketApp("ws://localhost:8888/", on_message=on_message, on_open=on_open, on_error=on_error, on_close=on_close) # ws.on_open = on_open ws.run_forever()
版权声明:本文为qq_33961117原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。






