在服务器端使用了字典来储存连接信息和配对表。当两个人的配对表互相相同时配对成功,并通知连接成功。
视频功能只能运行在两台不同的电脑上,不然会出现视频端口被占用的问题!
接收文件、消息、视频内容均开了多线程,避免了干扰。
连接成功后,就将连接信息存入字典。
'''
连接成功后将连接信息添加到各个字典中
'''
self.selfID = selfID
self.SessionManege.messageConnTable[selfID] = self.messageConn
self.SessionManege.fileConnTable[selfID] = self.fileConn
self.SessionManege.videoConnTable[selfID] = self.videoConn
self.SessionManege.pairTable[selfID] = targetID
cv2获取的图片使用下面代码转二进制流,不能 "tobytes()" 直接转二进制流,不然无法编码。
byte_stream = cv2.imencode('.jpg', img)[1]
使用
img = cv2.imdecode(np.fromstring(byte_stream,np.uint8),cv2.IMREAD_COLOR)
再解码
直接上代码。
服务器端
import socket
import os
import hashlib
import threading,time
import json
class SessionManege(threading.Thread):
messageConnTable = {}
fileConnTable = {}
videoConnTable = {}
pairTable = {}
def newSession(self, messageConn, fileConn,videoConn):
chat_Session = MessageSession(messageConn, fileConn,videoConn,self)
chat_Session.start()
def close_messageConn(self,userID):
self.messageConnTable.pop(userID)
self.fileConnTable.pop(userID)
self.videoConnTable.pop(userID)
self.pairTable.pop(userID)
print(userID,"连接关闭")
class MessageSession(threading.Thread):
def __init__(self, messageConn, fileConn, videoConn, SessionManege):
super(MessageSession, self).__init__()
self.messageConn = messageConn
self.fileConn = fileConn
self.videoConn = videoConn
self.SessionManege = SessionManege
def pairTest(self,selfID,targetID):
if selfID in self.SessionManege.messageConnTable:
self.messageConn.send(json.dumps({
'type': 'systemMassage',
'info': "用户名重复!"
}).encode("utf-8"))
return
'''
连接成功后将连接信息添加到各个字典中
'''
self.selfID = selfID
self.SessionManege.messageConnTable[selfID] = self.messageConn
self.SessionManege.fileConnTable[selfID] = self.fileConn
self.SessionManege.videoConnTable[selfID] = self.videoConn
self.SessionManege.pairTable[selfID] = targetID
'''
返回信息
'''
if(self.SessionManege.messageConnTable.get(targetID)):
self.targetmessageConn = self.SessionManege.messageConnTable.get(targetID)
self.messageConn.send(json.dumps({
'type': 'pairTestResult',
'success': True,
}).encode("utf-8"))
self.targetmessageConn.send(json.dumps({
'type': 'pairTestResult',
'success': True,
}).encode("utf-8"))
return True
else :
self.messageConn.send(json.dumps({
'type': 'pairTestResult',
'success': False,
}).encode("utf-8"))
return False
def msgHandle(self,msg):
data = json.loads(msg)
if data['type'] == "Message": #连接成功后从messageConnTable中获取targetID的messageConn
targetmessageConn = self.SessionManege.messageConnTable.get(data['targetID'])
targetmessageConn.send(json.dumps({
'type': 'Message',
'content': data['content'],
}).encode("utf-8") )
if data['type'] == "FileTransfer": #文件传输
targetmessageConn = self.SessionManege.messageConnTable.get(data['targetID'])
targetmessageConn.send(json.dumps({
'type': 'FileTransfer',
}).encode("utf-8") )
fileTransfer = FileTransfer(data['selfID'],data['targetID'],self.SessionManege)
fileTransfer.start()
if data['type'] == "videoChat": #视频聊天
targetmessageConn = self.SessionManege.messageConnTable.get(data['targetID'])
targetmessageConn.send(json.dumps({
'type': 'videoChat',
}).encode("utf-8") )
videoTransfer = VideoTransfer(data['selfID'],data['targetID'],self.SessionManege)
if data['type'] == "PairTest":
self.pairTest(data['selfID'],data['targetID'])
def run(self):
try:
while True:
msg = self.messageConn.recv(1024).decode('utf-8')
if not msg:
break
else :
self.msgHandle(msg)
except socket.error as e:
print(e.args)
pass
finally:
targetID = self.SessionManege.pairTable.get(self.selfID)
targetmessageConn = self.SessionManege.messageConnTable.get(targetID)
self.SessionManege.close_messageConn(self.selfID)
targetmessageConn.send(json.dumps({
'type': 'pairInterruption',
'Info': "对方中断退出会话",
}).encode("utf-8"))
self.messageConn.close()
class FileTransfer(threading.Thread):
def __init__(self, sendID, receiveID, SessionManege):
super(FileTransfer, self).__init__()
self.sendConn = SessionManege.fileConnTable[sendID]
self.receiveConn = SessionManege.fileConnTable[receiveID]
self.SessionManege = SessionManege
def receiveFile(self):
while True:
try:
fileInfo = self.sendConn.recv(1024)
self.receiveConn.send(fileInfo) # 向接收方发文件信息
fileInfo = fileInfo.decode("utf-8").split("|")
filename = fileInfo[0]
file_size = (int)(fileInfo[1])
# 2.接收文件内容
self.receiveConn.recv(1024)
self.sendConn.send("ready".encode("utf-8")) # 向发送方回复就绪信号
received_size = 0
while received_size < file_size:
size = 0
if file_size - received_size > 1024: # 每次只接收 1024
size = 1024
else: # 最后一次接收
size = file_size - received_size
data = self.sendConn.recv(size) #多次接收内容
data_len = len(data)
received_size += data_len
self.receiveConn.send(data) # 向接收方转发文件
except socket.error as e:
print(e.args)
def run(self):
self.receiveFile()
class VideoTransfer(threading.Thread):
def __init__(self, sendID, receiveID, SessionManege):
super(VideoTransfer, self).__init__()
self.sendConn = SessionManege.videoConnTable[sendID]
self.receiveConn = SessionManege.videoConnTable[receiveID]
threading.Thread(target=self.receiveVideo,args=(self.sendConn,self.receiveConn)).start()
self.receiveConn = SessionManege.videoConnTable[sendID]
self.sendConn = SessionManege.videoConnTable[receiveID]
threading.Thread(target=self.receiveVideo,args=(self.sendConn,self.receiveConn)).start()
def receiveVideo(self,sendConn,receiveConn):
while True:
try:
image = sendConn.recv(1300000)
receiveConn.sendall(image)
except socket.error as e:
print(e.args)
if __name__ == '__main__':
ip_port = "127.0.0.1"
massage_server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
massage_server.bind((ip_port, 8000))
massage_server.listen(20)
file_server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
file_server.bind((ip_port, 8001))
file_server.listen(20)
video_server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
video_server.bind((ip_port, 8002))
video_server.listen(20)
sessionManege = SessionManege()
print("监听开始..")
try:
while True:
messageConn, addr = massage_server.accept() # 等待连接
fileConn, addr = file_server.accept() # 等待连接
videoConn, addr = video_server.accept() # 等待连接
# 利用handler来管理线程,实现线程之间的socket的相互通信
sessionManege.newSession(messageConn, fileConn,videoConn)
except socket.error:
pass
客户端:
import os,io,socket,json,cv2,threading,time
import numpy as np
import tkinter as tk
import tkinter.scrolledtext as tst
from tkinter.messagebox import *
from tkinter.filedialog import *
from tkinter.simpledialog import *
from PIL import Image,ImageTk
class Application(tk.Frame):
#定义GUI应用程序类,派生于Frame类
def __init__(self, master,sessionManege): #构造函数,master为父窗口
tk.Frame.__init__(self, master)#调用父类的构造函数
self.root = master
self.createWidgets()
self.sessionManege = sessionManege
self.clientReceiveThread = ClientReceiveThread(self.sessionManege.message_conn,self)
self.clientReceiveThread.start()
def createWidgets(self):
tk.Label(self.root,text="本人ID:",width=6).place(x=1,y=1)
self.selfID = tk.Entry(self.root,width=20)
self.selfID.place(x=80,y =1)
tk.Label(self.root,text="对方ID:",width=6).place(x=240,y=1)
self.targetID = tk.Entry(self.root,width=20)
self.targetID.place(x=300,y =1)
self.IDSubmit = tk.Button(self.root,text='连通测试',width = 8,command=self.pairTest).place(x=450,y=1)
self.chatRecord = tst.ScrolledText(self.root, width=80, height=20)#创建Text组件
self.chatRecord.place(x=1,y=30)
self.sendBtn = tk.Button(root,text="发送文件",width=8,command=self.sendFile)
self.sendBtn.place(x=1,y=300)
self.sendBtn = tk.Button(root,text="视频聊天",width=8,command = self.videoCharApply)
self.sendBtn.place(x=100 ,y=300)
self.chatMessage = tst.ScrolledText(self.root, width=80, height=10)#创建Text组件
self.chatMessage.place(x=1,y=340)
self.sendBtn = tk.Button(root,text="发送",width=8,command=self.sendMessage)
self.sendBtn.place(x=500,y=480)
def pairTest(self): #进行配对测试
selfID = self.selfID.get()
targetID = self.targetID.get()
self.sessionManege.testPair(selfID,targetID)
def pairCheck(self): #检查是否已经成功建立连接
if self.sessionManege.piarState == False:
tk.messagebox.showwarning("警告:",'您还未建立可靠连接!')
return True
def sendMessage(self): #发送消息
if self.pairCheck():
return
content = self.chatMessage.get(1.0, tk.END).strip()
if content =="":
return
self.sessionManege.sendMessage(content,self.selfID.get(),self.targetID.get())
self.chatMessage.delete(1.0,tk.END)
Message = "己方:\n"+content+"\n\n"
self.chatRecord.insert("insert", Message)
self.chatRecord.see(END)
def sendFile(self): #发送文件
if self.pairCheck():
return
filePath = tk.filedialog.askopenfilename(filetypes=[('文本文件','.txt'),('所有文件','.*')])
Message = "己方:\n 发送文件"+filePath+"\n\n"
self.chatRecord.insert("insert", Message)
self.chatRecord.see(END)
selfID = self.selfID.get()
targetID = self.targetID.get()
jsDict = {'type': 'FileTransfer','selfID': selfID,'targetID':targetID}
js = json.dumps(jsDict)
self.sessionManege.message_conn.sendall( bytes(js,'utf-8'))
threading.Thread(target=self.sendFileThread,args=(filePath,)).start()
def sendFileThread(self,filePath): #文件发送的线程
sendResult = self.sessionManege.sendFile(filePath)
def receiveFile(self):
threading.Thread(target=self.receiveFileThread,args=()).start()
def receiveFileThread(self):
filePath = self.sessionManege.receiveFile()
fileName = os.path.basename(filePath)
Message = "通知:\n 已成功接受文件"+fileName+",存放在D:\Receice Files\n\n"
self.chatRecord.insert("insert", Message)
self.chatRecord.see(END)
def videoCharApply(self):
if self.pairCheck():
return
Message = "己方:\n 发送视频通话请求\n\n"
self.chatRecord.insert("insert", Message)
self.chatRecord.see(END)
selfID = self.selfID.get()
targetID = self.targetID.get()
jsDict = {'type': 'videoChat','selfID': selfID,'targetID':targetID}
js = json.dumps(jsDict)
self.sessionManege.message_conn.sendall( bytes(js,'utf-8'))
self.videoChat()
def videoChat(self):
top = Toplevel() #创建Application的对象实例
top.title("视频聊天:")
top1 = videoFrame(top,self.sessionManege)
class videoFrame():
#定义GUI应用程序类,派生于Frame类
def __init__(self,toplevel,sessionManege): #构造函数,master为父窗口
self.top = toplevel
self.createWidgets()
self.video_conn = sessionManege.video_conn
self.camera = cv2.VideoCapture(0) #摄像头
threading.Thread(target=self.sendVideo,args=(self.video_conn,)).start()
threading.Thread(target=self.receiveVideo,args=(self.video_conn,)).start()
self.top.protocol("WM_DELETE_WINDOW", self.s_destroy)
def createWidgets(self):
self.panel = Label(self.top) # initialize image panel
self.panel.pack(padx=10, pady=10)
self.top.config(cursor="arrow")
def s_destroy(self):
self.camera.release()
cv2.destroyAllWindows()
self.top.destroy()
def receiveVideo(self,video_conn):
while True:
try:
byte_stream = video_conn.recv(1300000) #1228800
img = cv2.imdecode(np.fromstring(byte_stream,np.uint8),cv2.IMREAD_COLOR)
print(type(img))
cv2image = cv2.cvtColor(img, cv2.COLOR_BGR2RGBA)#转换颜色从BGR到RGBA
print(type(cv2image))
current_image = Image.fromarray(cv2image)#将图像转换成Image对象
imgtk = ImageTk.PhotoImage(image=current_image)
self.panel.imgtk = imgtk
self.panel.config(image=imgtk)
except cv2.error as e:
print(e.args)
continue
def sendVideo(self,video_conn):
while True:
try:
success, img = self.camera.read() # 从摄像头读取照片
cv2.waitKey(10)
byte_stream = cv2.imencode('.jpg', img)[1]
#这里可以获取一下byte_stream的长度,然后在服务器端口的image = sendConn.recv(1300000)设置大小
video_conn.sendall(byte_stream) # 发送数据
except socket.error as e:
print(e.args)
continue
class SessionManege:
message_conn = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
file_conn = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
video_conn = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
piarState = False
def __init__(self):
ip_port = "127.0.0.1"
message_port = (ip_port, 8000)
file_port = (ip_port, 8001)
video_port = (ip_port, 8002)
self.message_conn.connect(message_port)
self.file_conn.connect(file_port)
self.video_conn.connect(video_port)
def testPair(self,selfID,targetID):
jsDict = {'type': 'PairTest','selfID': selfID,'targetID':targetID}
js = json.dumps(jsDict)
self.message_conn.sendall( bytes(js,'utf-8'))
def sendMessage(self,content,selfID,targetID):
jsDict = {'type': 'Message','content': content,'selfID':selfID,'targetID':targetID}
js = json.dumps(jsDict)
self.message_conn.sendall( bytes(js,'utf-8'))
def sendFile(self,filePath):
size = os.stat(filePath).st_size #获取文件大小
fileName = os.path.basename(filePath)
fileInfo = fileName +"|"+str(size)
self.file_conn.send(fileInfo.encode("utf-8"))
# 2.发送文件内容
self.file_conn.recv(1024) # 接收确认
f = open(filePath, "rb")
has_sent = 0
while has_sent!= size :
data = f.read(1024)
self.file_conn.sendall(data) # 发送数据
has_sent+=len(data)
f.close()
def receiveFile(self):
server_response = self.file_conn.recv(1024)
fileInfo = server_response.decode("utf-8").split("|")
filename = fileInfo[0]
file_size = (int)(fileInfo[1])
# 2.接收文件内容
self.file_conn.send("ready".encode("utf-8")) # 回复就绪信号
filePath = "D:\\Receice Files\\"+ filename
f = open(filePath, "wb")
received_size = 0
while received_size < file_size:
size = 0
if file_size - received_size > 1024: # 每次只接收 1024
size = 1024
else: # 最后一次接收
size = file_size - received_size
data = self.file_conn.recv(size) #多次接收内容
data_len = len(data)
received_size += data_len
f.write(data)
f.close()
return filename
class ClientReceiveThread(threading.Thread):
def __init__(self,message_conn,application):
super(ClientReceiveThread, self).__init__()
self.message_conn = message_conn
self.application = application
def run(self):
self.receive_msg()
def receive_msg(self):
while True:
msg = self.message_conn.recv(1024).decode('utf-8')
if not msg:
break
js = json.loads(msg)
self.msgHandle(js)
def msgHandle(self,js):
if js["type"] == "Message":
Message = "对方:\n"+js["content"]+"\n\n"
self.application.chatRecord.insert("insert", Message)
if js["type"] == "FileTransfer":
self.application.receiveFile()
if js["type"] == "videoChat":
message = "对方打开视频通讯:\n"
self.application.chatRecord.insert("insert",message)
self.application.videoChat()
if js["type"] == "pairTestResult":
if(js["success"]):
self.application.sessionManege.piarState = True
self.application.chatRecord.insert("insert", "系统消息:连接成功,可以开始聊天!\n")
else :
self.application.chatRecord.insert("insert", "系统消息:正在等待对方上线......\n")
if js["type"] == "pairInterruption":
self.application.sessionManege.pairState = False
self.application.chatRecord.insert("insert", "系统消息:对方已中断连接.....\n")
if js["type"] == "systemMassage":
message = "系统消息:"+js["info"]+"\n"
self.application.chatRecord.insert("insert",message)
self.application.chatRecord.see(END)
if __name__ == "__main__":
root = tk.Tk() #创建1个Tk根窗口组件root
root.title('实时通信') #设置窗口标题
sw = root.winfo_screenwidth()
sh = root.winfo_screenheight()
ww = 600
wh = 510
x = (sw-ww) / 2
y = (sh-wh) / 2
root.geometry("%dx%d+%d+%d" %(ww,wh,x,y))
sessionManege = SessionManege()
app = Application(root,sessionManege) #创建Application的对象实例
app.mainloop() #调用组件的mainloop方法,进入事件循环
版权声明:本文为yxw908186797原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。