使用python开发多人一对一聊天系统(文字聊天,视频通讯,文件传输)

在服务器端使用了字典来储存连接信息配对表。当两个人的配对表互相相同时配对成功,并通知连接成功。
视频功能只能运行在两台不同的电脑上,不然会出现视频端口被占用的问题!
接收文件、消息、视频内容均开了多线程,避免了干扰。


连接成功后,就将连接信息存入字典。

	
		'''
        连接成功后将连接信息添加到各个字典中
        '''
        self.selfID = selfID
        self.SessionManege.messageConnTable[selfID] = self.messageConn
        self.SessionManege.fileConnTable[selfID] = self.fileConn
        self.SessionManege.videoConnTable[selfID] = self.videoConn
        self.SessionManege.pairTable[selfID] = targetID

cv2获取的图片使用下面代码转二进制流,不能 "tobytes()" 直接转二进制流,不然无法编码。
                byte_stream = cv2.imencode('.jpg', img)[1]

使用

                img =  cv2.imdecode(np.fromstring(byte_stream,np.uint8),cv2.IMREAD_COLOR) 

再解码

直接上代码。
服务器端

import socket
import os
import hashlib
import threading,time
import json

class SessionManege(threading.Thread):
    messageConnTable = {}
    fileConnTable = {}
    videoConnTable = {}
    pairTable = {}
    
    def newSession(self, messageConn, fileConn,videoConn):
        chat_Session = MessageSession(messageConn, fileConn,videoConn,self)
        chat_Session.start()


    def close_messageConn(self,userID):
        self.messageConnTable.pop(userID)
        self.fileConnTable.pop(userID)
        self.videoConnTable.pop(userID)
        self.pairTable.pop(userID)
        print(userID,"连接关闭")
    
class MessageSession(threading.Thread):

    def __init__(self, messageConn, fileConn, videoConn, SessionManege):
        super(MessageSession, self).__init__()
        self.messageConn = messageConn
        self.fileConn = fileConn
        self.videoConn = videoConn
        self.SessionManege = SessionManege

    def pairTest(self,selfID,targetID):
        if selfID in self.SessionManege.messageConnTable:
            self.messageConn.send(json.dumps({
                        'type': 'systemMassage',
                        'info': "用户名重复!"
                    }).encode("utf-8"))
            return
        '''
        连接成功后将连接信息添加到各个字典中
        '''
        self.selfID = selfID
        self.SessionManege.messageConnTable[selfID] = self.messageConn
        self.SessionManege.fileConnTable[selfID] = self.fileConn
        self.SessionManege.videoConnTable[selfID] = self.videoConn
        self.SessionManege.pairTable[selfID] = targetID
        '''
        返回信息
        '''
        if(self.SessionManege.messageConnTable.get(targetID)):
            self.targetmessageConn = self.SessionManege.messageConnTable.get(targetID)
            self.messageConn.send(json.dumps({
                        'type': 'pairTestResult',
                        'success': True,
                    }).encode("utf-8"))
            self.targetmessageConn.send(json.dumps({
                        'type': 'pairTestResult',
                        'success': True,
                    }).encode("utf-8"))
            return True
        else :
            self.messageConn.send(json.dumps({
                        'type': 'pairTestResult',
                        'success': False,
                    }).encode("utf-8"))
            return False

    def msgHandle(self,msg):
        data = json.loads(msg)
        if data['type'] == "Message":  #连接成功后从messageConnTable中获取targetID的messageConn        
            targetmessageConn = self.SessionManege.messageConnTable.get(data['targetID'])
            targetmessageConn.send(json.dumps({
                        'type': 'Message',
                        'content': data['content'],
                    }).encode("utf-8") )

        if data['type'] == "FileTransfer": #文件传输
            targetmessageConn = self.SessionManege.messageConnTable.get(data['targetID'])
            targetmessageConn.send(json.dumps({
                        'type': 'FileTransfer',
                    }).encode("utf-8") )
            fileTransfer = FileTransfer(data['selfID'],data['targetID'],self.SessionManege)
            fileTransfer.start()

        if data['type'] == "videoChat": #视频聊天
            targetmessageConn = self.SessionManege.messageConnTable.get(data['targetID'])
            targetmessageConn.send(json.dumps({
                        'type': 'videoChat',
                    }).encode("utf-8") )
            videoTransfer = VideoTransfer(data['selfID'],data['targetID'],self.SessionManege)

        if data['type'] == "PairTest":
            self.pairTest(data['selfID'],data['targetID'])
            
    def run(self):
        try:
             while True:
                 msg = self.messageConn.recv(1024).decode('utf-8')
                 if not msg:
                     break   
                 else :
                    self.msgHandle(msg)
        except socket.error as e:
            print(e.args)
            pass
        finally:
            targetID = self.SessionManege.pairTable.get(self.selfID)
            targetmessageConn = self.SessionManege.messageConnTable.get(targetID)
            self.SessionManege.close_messageConn(self.selfID)
            targetmessageConn.send(json.dumps({
                        'type': 'pairInterruption',
                        'Info': "对方中断退出会话",
                    }).encode("utf-8"))
            self.messageConn.close()


class FileTransfer(threading.Thread):
    def __init__(self, sendID, receiveID, SessionManege):
        super(FileTransfer, self).__init__()
        self.sendConn = SessionManege.fileConnTable[sendID]
        self.receiveConn = SessionManege.fileConnTable[receiveID]
        self.SessionManege = SessionManege

    def receiveFile(self):
        while True:       
            try:
                fileInfo = self.sendConn.recv(1024)
                self.receiveConn.send(fileInfo)  # 向接收方发文件信息

                fileInfo = fileInfo.decode("utf-8").split("|")
                filename = fileInfo[0]
                file_size = (int)(fileInfo[1])

                # 2.接收文件内容
                self.receiveConn.recv(1024)
                self.sendConn.send("ready".encode("utf-8"))  # 向发送方回复就绪信号
                received_size = 0
                while received_size < file_size:
                    size = 0  

                    if file_size - received_size > 1024: # 每次只接收 1024
                        size = 1024
                    else:  # 最后一次接收
                        size = file_size - received_size

                    data = self.sendConn.recv(size)  #多次接收内容
                    data_len = len(data)
                    received_size += data_len
                    self.receiveConn.send(data)  # 向接收方转发文件

            except socket.error as e:
                print(e.args)
            
    def run(self):
        self.receiveFile()


class VideoTransfer(threading.Thread):
    def __init__(self, sendID, receiveID, SessionManege):
        super(VideoTransfer, self).__init__()

        self.sendConn = SessionManege.videoConnTable[sendID]
        self.receiveConn = SessionManege.videoConnTable[receiveID]
        threading.Thread(target=self.receiveVideo,args=(self.sendConn,self.receiveConn)).start()
        
        self.receiveConn = SessionManege.videoConnTable[sendID]
        self.sendConn = SessionManege.videoConnTable[receiveID]
        threading.Thread(target=self.receiveVideo,args=(self.sendConn,self.receiveConn)).start()

    def receiveVideo(self,sendConn,receiveConn):
        while True:       
            try:
                image = sendConn.recv(1300000)
                receiveConn.sendall(image)
            except socket.error as e:
                print(e.args)
            


if __name__ == '__main__':
    ip_port = "127.0.0.1"
    massage_server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    massage_server.bind((ip_port, 8000))
    massage_server.listen(20)  

 
    file_server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    file_server.bind((ip_port, 8001))
    file_server.listen(20) 

    video_server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    video_server.bind((ip_port, 8002))
    video_server.listen(20) 

    sessionManege = SessionManege()
    print("监听开始..")
    try:
        while True:
            messageConn, addr = massage_server.accept()  # 等待连接
            fileConn, addr = file_server.accept()  # 等待连接
            videoConn, addr = video_server.accept()  # 等待连接
            # 利用handler来管理线程,实现线程之间的socket的相互通信
            sessionManege.newSession(messageConn, fileConn,videoConn)
    except socket.error:
        pass

客户端:

import os,io,socket,json,cv2,threading,time
import numpy as np 
import tkinter as tk 
import tkinter.scrolledtext as tst
from tkinter.messagebox import *
from tkinter.filedialog import *
from tkinter.simpledialog import * 
from PIL import Image,ImageTk

class Application(tk.Frame):
    #定义GUI应用程序类,派生于Frame类

    def __init__(self, master,sessionManege):  #构造函数,master为父窗口
        tk.Frame.__init__(self, master)#调用父类的构造函数
        self.root = master
        self.createWidgets()
        self.sessionManege = sessionManege
        self.clientReceiveThread = ClientReceiveThread(self.sessionManege.message_conn,self)
        self.clientReceiveThread.start()

    def createWidgets(self):
        tk.Label(self.root,text="本人ID:",width=6).place(x=1,y=1)
        self.selfID = tk.Entry(self.root,width=20)
        self.selfID.place(x=80,y =1)

        tk.Label(self.root,text="对方ID:",width=6).place(x=240,y=1)
        self.targetID = tk.Entry(self.root,width=20)
        self.targetID.place(x=300,y =1)

        self.IDSubmit = tk.Button(self.root,text='连通测试',width = 8,command=self.pairTest).place(x=450,y=1)

        self.chatRecord = tst.ScrolledText(self.root, width=80, height=20)#创建Text组件
        self.chatRecord.place(x=1,y=30)

        self.sendBtn = tk.Button(root,text="发送文件",width=8,command=self.sendFile)
        self.sendBtn.place(x=1,y=300)

        self.sendBtn = tk.Button(root,text="视频聊天",width=8,command = self.videoCharApply)
        self.sendBtn.place(x=100 ,y=300)

        self.chatMessage = tst.ScrolledText(self.root, width=80, height=10)#创建Text组件
        self.chatMessage.place(x=1,y=340)

        self.sendBtn = tk.Button(root,text="发送",width=8,command=self.sendMessage)
        self.sendBtn.place(x=500,y=480)

        
    def pairTest(self):     #进行配对测试
        selfID = self.selfID.get()
        targetID = self.targetID.get()
        self.sessionManege.testPair(selfID,targetID)
    
    def pairCheck(self):    #检查是否已经成功建立连接
        if self.sessionManege.piarState == False:
            tk.messagebox.showwarning("警告:",'您还未建立可靠连接!')
            return True
        

    def sendMessage(self): #发送消息
        if self.pairCheck():
            return
        content = self.chatMessage.get(1.0, tk.END).strip()  
        if  content =="":
            return
        self.sessionManege.sendMessage(content,self.selfID.get(),self.targetID.get())
        self.chatMessage.delete(1.0,tk.END)
        Message = "己方:\n"+content+"\n\n"
        self.chatRecord.insert("insert", Message)
        self.chatRecord.see(END)

    def sendFile(self):     #发送文件
        if self.pairCheck():
            return
        filePath = tk.filedialog.askopenfilename(filetypes=[('文本文件','.txt'),('所有文件','.*')])
        Message = "己方:\n 发送文件"+filePath+"\n\n"
        self.chatRecord.insert("insert", Message)
        self.chatRecord.see(END)
        selfID = self.selfID.get()
        targetID = self.targetID.get()
        jsDict = {'type': 'FileTransfer','selfID': selfID,'targetID':targetID}
        js = json.dumps(jsDict)
        self.sessionManege.message_conn.sendall( bytes(js,'utf-8'))
        threading.Thread(target=self.sendFileThread,args=(filePath,)).start()
            
    def sendFileThread(self,filePath): #文件发送的线程
        sendResult = self.sessionManege.sendFile(filePath)
    
    def receiveFile(self):
        threading.Thread(target=self.receiveFileThread,args=()).start()

    def receiveFileThread(self):
        filePath = self.sessionManege.receiveFile()
        fileName = os.path.basename(filePath)
        Message = "通知:\n 已成功接受文件"+fileName+",存放在D:\Receice Files\n\n"
        self.chatRecord.insert("insert", Message)
        self.chatRecord.see(END)

    def videoCharApply(self):
        if self.pairCheck():
            return
        Message = "己方:\n 发送视频通话请求\n\n"
        self.chatRecord.insert("insert", Message)
        self.chatRecord.see(END)
        selfID = self.selfID.get()
        targetID = self.targetID.get()
        jsDict = {'type': 'videoChat','selfID': selfID,'targetID':targetID}
        js = json.dumps(jsDict)
        self.sessionManege.message_conn.sendall( bytes(js,'utf-8'))
        self.videoChat()

    def videoChat(self):   
        top = Toplevel()   #创建Application的对象实例
        top.title("视频聊天:")
        top1 = videoFrame(top,self.sessionManege)

class videoFrame():
    #定义GUI应用程序类,派生于Frame类
    def __init__(self,toplevel,sessionManege):  #构造函数,master为父窗口
        self.top = toplevel
        self.createWidgets()
        self.video_conn = sessionManege.video_conn
        self.camera = cv2.VideoCapture(0)    #摄像头
        threading.Thread(target=self.sendVideo,args=(self.video_conn,)).start() 
        threading.Thread(target=self.receiveVideo,args=(self.video_conn,)).start() 
        self.top.protocol("WM_DELETE_WINDOW", self.s_destroy)

    def createWidgets(self):
        self.panel = Label(self.top)  # initialize image panel
        self.panel.pack(padx=10, pady=10)
        self.top.config(cursor="arrow")
       

    def s_destroy(self):
        self.camera.release()
        cv2.destroyAllWindows()
        self.top.destroy()

    def receiveVideo(self,video_conn): 
        while True:
            try:
                byte_stream = video_conn.recv(1300000) #1228800
                img =  cv2.imdecode(np.fromstring(byte_stream,np.uint8),cv2.IMREAD_COLOR) 
                print(type(img))
                cv2image = cv2.cvtColor(img, cv2.COLOR_BGR2RGBA)#转换颜色从BGR到RGBA
                print(type(cv2image))
                current_image = Image.fromarray(cv2image)#将图像转换成Image对象               
                imgtk = ImageTk.PhotoImage(image=current_image)
                self.panel.imgtk = imgtk
                self.panel.config(image=imgtk)
            except cv2.error as e:
                print(e.args)
                continue

    def sendVideo(self,video_conn): 
        while True:
            try:
                success, img = self.camera.read()  # 从摄像头读取照片
                cv2.waitKey(10)
                byte_stream = cv2.imencode('.jpg', img)[1]
                #这里可以获取一下byte_stream的长度,然后在服务器端口的image = sendConn.recv(1300000)设置大小
                video_conn.sendall(byte_stream)  # 发送数据         
            except socket.error as e:
                print(e.args)
                continue
class SessionManege:
    message_conn = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    file_conn = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    video_conn = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    piarState = False 
    def __init__(self):
        ip_port = "127.0.0.1"
        message_port = (ip_port, 8000)
        file_port = (ip_port, 8001)
        video_port = (ip_port, 8002)
        self.message_conn.connect(message_port)  
        self.file_conn.connect(file_port)  
        self.video_conn.connect(video_port)  

    def testPair(self,selfID,targetID):
        jsDict = {'type': 'PairTest','selfID': selfID,'targetID':targetID}
        js = json.dumps(jsDict)
        self.message_conn.sendall( bytes(js,'utf-8'))

    def sendMessage(self,content,selfID,targetID):
        jsDict = {'type': 'Message','content': content,'selfID':selfID,'targetID':targetID}
        js = json.dumps(jsDict)
        self.message_conn.sendall( bytes(js,'utf-8'))

    def sendFile(self,filePath):

        size = os.stat(filePath).st_size  #获取文件大小
        fileName = os.path.basename(filePath)
        fileInfo = fileName +"|"+str(size)
        self.file_conn.send(fileInfo.encode("utf-8")) 
        
        # 2.发送文件内容
        self.file_conn.recv(1024)  # 接收确认
       
        f = open(filePath, "rb")
        has_sent = 0
        while has_sent!= size :
            data = f.read(1024)
            self.file_conn.sendall(data)  # 发送数据
            has_sent+=len(data)

        f.close()

    def receiveFile(self):
        server_response = self.file_conn.recv(1024)
        fileInfo = server_response.decode("utf-8").split("|")
        filename = fileInfo[0]
        file_size = (int)(fileInfo[1])

        # 2.接收文件内容
        self.file_conn.send("ready".encode("utf-8"))  # 回复就绪信号
        filePath = "D:\\Receice Files\\"+ filename 
        f = open(filePath, "wb")
        received_size = 0

        while received_size < file_size:
            size = 0  
            if file_size - received_size > 1024: # 每次只接收 1024
                    size = 1024
            else:  # 最后一次接收
                    size = file_size - received_size

            data = self.file_conn.recv(size)  #多次接收内容
            data_len = len(data)
            received_size += data_len
            f.write(data)

        f.close()
        return filename
        


class ClientReceiveThread(threading.Thread):
    def __init__(self,message_conn,application):
        super(ClientReceiveThread, self).__init__()
        self.message_conn = message_conn
        self.application = application
    def run(self):
        self.receive_msg()
    def receive_msg(self):
        while True:
            msg = self.message_conn.recv(1024).decode('utf-8')
            if not msg:
                break
            js = json.loads(msg)
            self.msgHandle(js)
    def msgHandle(self,js):
        if js["type"] == "Message":
            Message = "对方:\n"+js["content"]+"\n\n"
            self.application.chatRecord.insert("insert", Message)

        if js["type"] == "FileTransfer":
            self.application.receiveFile()

        if js["type"] == "videoChat":
            message = "对方打开视频通讯:\n"
            self.application.chatRecord.insert("insert",message)
            self.application.videoChat()

        if js["type"] == "pairTestResult":
            if(js["success"]):
                self.application.sessionManege.piarState = True
                self.application.chatRecord.insert("insert", "系统消息:连接成功,可以开始聊天!\n")
            else :
                self.application.chatRecord.insert("insert", "系统消息:正在等待对方上线......\n")
        if js["type"] == "pairInterruption":
            self.application.sessionManege.pairState = False
            self.application.chatRecord.insert("insert", "系统消息:对方已中断连接.....\n")
        if js["type"] == "systemMassage":
            message = "系统消息:"+js["info"]+"\n"
            self.application.chatRecord.insert("insert",message)

        self.application.chatRecord.see(END)
           

if __name__ == "__main__":
    root = tk.Tk()                 #创建1个Tk根窗口组件root
    root.title('实时通信')     #设置窗口标题   
    sw = root.winfo_screenwidth()
    sh = root.winfo_screenheight()
    ww = 600
    wh = 510
    x = (sw-ww) / 2
    y = (sh-wh) / 2
    root.geometry("%dx%d+%d+%d" %(ww,wh,x,y))
    sessionManege = SessionManege()
    app = Application(root,sessionManege)   #创建Application的对象实例
    app.mainloop()                #调用组件的mainloop方法,进入事件循环


版权声明:本文为yxw908186797原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。