asyncio异步库创建UDP和TCP连接

了解asyncio

  • 简介

asyncio是python3.4版本引入的标准库

异步IO:就是发起一个IO操作(如:网络请求,文件读写等),这些操作一般是比较耗时的,
不用等待它结束,可以继续做其他事情,结束时会发来通知。
协程:又称为微线程,在一个线程中执行,执行函数时可以随时中断,由程序(用户)自身控制,
执行效率极高,与多线程比较,没有切换线程的开销和多线程锁机制。
  • asyncio中重要的概念
1.事件循环
事件循环是每个asyncio应用的核心,管理所有的事件,在整个程序运行过程中不断循环执行并追踪事件发生的顺序
将它们放在队列中,空闲时调用相应的时间处理者来处理这些事件。
	
	创建事件循环
	loop = asyncio.get_event_loop()
	获取当前事件循环。 如果当前 OS 线程没有设置当前事件循环并且 set_event_loop() 还没有被调用,
	asyncio 将创建一个新的事件循环并将其设置为当前循环。
	
	启动线程创建事件循环
	from threading import Thread
	import asyncio
	
	def start_thread_loop(loop):
	    asyncio.set_event_loop(loop)
	    loop.run_forever()
	    
	new_loop = asyncio.new_event_loop()
	loop_thread = Thread(target=start_thread_loop, args=(new_loop,))
	loop_thread.setDaemon(True) # 守护线程
	loop_thread.start()
2.Futurn
Futurn对象表示尚未完成的计算,还未完成的结果,它和task没有本质上的区别
3.Task
是Futurn的子类,作用是在运行某个任务的同时可以并发的运行多个任务

asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数创建:
loop.create_task() 或 asyncio.ensure_future()

loop.create_task() ,要在定义loop对象之后,调用将方法对象转化成了task的对象
asyncio.ensure_future() 直接调用asyncio 的ensure_future() 方法,返回的也是task 对象
(我们还没有声明 loop 也可以提前定义好 task 对象)
4.async/await 关键字
asyncio实现了TCP、UDP、SSL等协议,async定义一个协程,await用于挂起阻塞的异步调用接口。
对于异步io你需要知道的重点,要注意的是,await语法只能出现在通过async修饰的函数中,否则
会报SyntaxError错误。而且await后面的对象需要是一个Awaitable,或者实现了相关的协议。

注意所有需要异步执行的函数,都需要asyncio中的轮询器去轮询执行,如果函数阻塞,轮询器就会去执行下一个函数。所以所有需要异步执行的函数都需要加入到这个轮询器中。
若在协程中需要有延时操作,应该使用 await asyncio.sleep(),而不是使用time.sleep(),因为使用time.sleep()后会释放GIL,阻塞整个主线程,从而阻塞整个事件循环。

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,
就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环
将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行

耗时的操作一般是一些IO操作,例如网络请求,文件读取等。我们使用asyncio.sleep函数来模拟
IO操作。协程的目的也是让这些IO操作异步化。

创建UDP

直接上代码了

server端

# 使用asyncio创建UDP连接

#  创建udp服务
import asyncio
import socket
import time
now = lambda :time.time()
async def handler(host):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.bind(host)
    while True:
        print("sadsadsad")
        data, addr = s.recvfrom(1024)
        print("ddddddddddd")
        data = data.decode()
        print('[Recieved] {} {}'.format(data, addr))
        s.sendto(data.encode(), addr)
        await asyncio.sleep(10)


async def print_str():
    while 1:
        print("its print_str handler")
        await asyncio.sleep(1)

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))


addr = ('', 8080)
tasks = [
    asyncio.ensure_future(handler(addr)), #  将函数变为task obj
    asyncio.ensure_future(print_str()),
]
loop = asyncio.get_event_loop() #  获取事件循环
# loop.run_until_complete(asyncio.gather(*tasks))  #
loop.run_until_complete(asyncio.wait(tasks))  #  
try:
    loop.run_forever()  #  一直轮询
except Exception as e:
    print(e)
finally:
    loop.close()  #  出故障最终关闭事件循环

asyncio.gather(*tasks)和asyncio.wait(tasks)的区别

# asyncio.wait返回dones和pendings
# dones:表示已经完成的任务
# pendings:表示未完成的任务
# 手动获取结果
dones, pendings = await asyncio.wait(tasks)
print(dones, pendings)
for task in dones:
    print("Task ret:", task.result())

"""
asyncio.wait它的返回值就是 return的结果 ,不用再task.result() 来获取

asyncio.wait带有控制功能
【控制运行任务数】:运行第一个任务就返回
FIRST_COMPLETED :第一个任务完全返回
FIRST_EXCEPTION:产生第一个异常返回
ALL_COMPLETED:所有任务完成返回 (默认选项)
"""
# 返回 task 方法的 返回值
results = await asyncio.gather(*tasks)
for result in results:
     print("Task ret:",result)

client

import random
import socket
import time
import ujson

address = ("192.168.210.255", 8080)  # 服务端地址和端口
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# s.setblocking(False)
# s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# s.bind(address)
while True:
    trigger = ''.join(random.sample(
            ['z', 'y', 'x', 'w', 'v', 'u', 't', 's', 'r', 'q', 'p', 'o', 'n', 'm', 'l', 'k', 'j', 'i', 'h', 'g', 'f', 'e',
             'd', 'c', 'b', 'a'], 15))
    # trigger = {"ip":"192.168.220.217","mac":"dc:a6:32:68:99:80","camera_count":2,"device_type":"CameraSensor","camera_1_status":4,"camera_2_status":1,"server_ip":"192.168.220.137"}
    print(trigger)
    s.sendto(trigger.encode(), address)
    data, addr = s.recvfrom(1024)  # 返回数据和接入连接的(服务端)地址
    data = data.decode()
    print('[Recieved] {} {}'.format(data, addr))
    # if trigger == '###': # 自定义结束字符串
    #     break
    time.sleep(1)

创建TCP

server

import asyncio
import socket
import random

#  创建异步TCP连接
async def handle(reader, writer): # 把socket请求当作参数传递进来,reader:STREAM_READER,writer:STREAM_WRITER
    while True:
        print(reader)
        print(writer)
        data = await reader.read(1024) # 从缓冲区读,消耗时间,因此它是一个生成器,yield from数据出来。是一个生成器对象。
        client = writer.get_extra_info('peername') # 获取客户的信息
        message = "{} Your msg {}".format(client,data.decode()).encode() # 打印客户的信息和读取到的数据
        writer.write(message)  # 写数据到缓冲区
        await writer.drain()  # 类似flush,发送数据到客户的,消耗时间,异步之

loop = asyncio.get_event_loop()
ip = '127.0.0.1'
port = 9999

crt = asyncio.start_server(handle, ip, port, loop=loop) # handle定义处理请求,它是异步处理的

server = loop.run_until_complete(crt)
print(server)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    server.close()
    loop.close()

client

import socket
import random
import time

sc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sc.connect(("127.0.0.1", 9999))

while True:
    msg = ''.join(random.sample(
                    ['z', 'y', 'x', 'w', 'v', 'u', 't', 's', 'r', 'q', 'p', 'o', 'n', 'm', 'l', 'k', 'j', 'i', 'h', 'g', 'f', 'e',
                     'd', 'c', 'b', 'a'], 15))
    sc.sendall(msg.encode("UTF-8"))
    msg = sc.recv(1024)
    print("服务器回应:" + msg.decode("UTF-8"))
    time.sleep(1)

使用Process和asyncio异步io方式实现udp

udp服务端:
先收后发

import os
from multiprocessing import Process, Queue
import time
import asyncio

class MyUDP:

    def __init__(self, host, port):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(("0.0.0.0", port))

    def __enter__(self):
        return self.sock

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.sock.close()

class TCPProcess(Process):
    def __init__(self, host, port, *args, **kwargs):
        self.host = host
        self.port = port
        super(TCPProcess, self).__init__(*args, **kwargs)
        self.queue = Queue()

    def run(self):
        with MyUDP(self.host, self.port) as sock:
            tasks = [
                asyncio.ensure_future(self.read_data(sock=sock)),
                asyncio.ensure_future(self.write_data(sock=sock)),
            ]
            loop = asyncio.get_event_loop()
            # loop.run_in_executor()
            # loop.run_until_complete(asyncio.gather(*tasks))
            loop.run_until_complete(asyncio.wait(tasks))
            try:
                loop.run_forever()
            except Exception as e:
                print(e)
            finally:
                loop.close()

    async def read_data(self, sock):
        while True:
            print("jeishouxiaoxi")
            data, addr = sock.recvfrom(1024)  # 返回数据和接入连接的(服务端)地址
            data = data.decode()
            print('[Recieved] {} {}'.format(data, addr))
            self.queue.put((data, addr))
            await asyncio.sleep(0.1)

    async def write_data(self, sock):
        while True:
            print("fasongxiaoxi")
            if self.queue.empty():
                print("queue is empty")
            trigger, addr = self.queue.get()
            sock.sendto(trigger.encode(), addr)
            print("给{}发送消息{}成功".format(addr, trigger))
            await asyncio.sleep(0.1)
            
if __name__ == '__main__':
    host = "192.168.210.255"
    port = 9880
    tp = TCPProcess(host=host, port=port)
    tp.start()


udpl客户端:
先发在收

import socket, random, time
import socketserver

updclient =socket.socket(socket.AF_INET,socket.SOCK_DGRAM) #买手机
server_ip_port =('192.168.210.255', 9880) #找到服务端软件
updclient.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
updclient.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
while True:#通信循环
    print("监听192.168.210.206...")
    trigger = ''.join(random.sample(
        ['z', 'y', 'x', 'w', 'v', 'u', 't', 's', 'r', 'q', 'p', 'o', 'n', 'm', 'l', 'k', 'j', 'i', 'h', 'g', 'f',
         'e',
         'd', 'c', 'b', 'a'], 15))
    print(trigger)
    updclient.sendto(trigger.encode(), server_ip_port)
    data,server_ip =updclient.recvfrom(1024) # 接收信息
    print("收{}的数据{}".format(server_ip, data.decode('utf-8')))
    # time.sleep(1)

使用Process+gevent实现异步udp

server端:

from gevent import monkey
monkey.patch_all()
import os
from multiprocessing import Process, Queue
import gevent
import asyncio

class MyUDP:

    def __init__(self, host, port):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(("0.0.0.0", port))

    def __enter__(self):
        return self.sock

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.sock.close()


class TCPProcess(Process):
    def __init__(self, host, port, *args, **kwargs):
        self.host = host
        self.port = port
        self.dic = {}
        self.queue = Queue()
        super(TCPProcess, self).__init__(*args, **kwargs)

    def run(self):

        with MyUDP(self.host, self.port) as sock:
            tasks = [
                gevent.spawn(self.read_data, sock),
                gevent.spawn(self.write_data, sock)
            ]
            gevent.joinall(tasks)

            # gevent.sleep(0.1)


    def read_data(self, sock):
        while True:
            print("jeishouxiaoxi")
            data, addr = sock.recvfrom(1024)  # 返回数据和接入连接的(服务端)地址
            data = data.decode()
            print('[Recieved] {} {}'.format(data, addr))
            self.queue.put((data, addr))
            # await asyncio.sleep(0.1)
            gevent.sleep(0.5)

    def write_data(self, sock):
        while True:
            print("fasongxiaoxi")
            if self.queue.empty():
                print("queue is empty")
                gevent.sleep(0.1)
                continue
            trigger, addr = self.queue.get()
            sock.sendto(trigger.encode(), addr)
            print("给{}发送消息{}成功".format(addr, trigger))
            # await asyncio.sleep(0.1)
            gevent.sleep(0.5)

def mian():
    host = "192.168.210.255"
    port = 9880
    tp = TCPProcess(host=host, port=port)
    tp.start()

if __name__ == '__main__':
    mian()

客户端:

import socket, random, time
import socketserver

updclient =socket.socket(socket.AF_INET,socket.SOCK_DGRAM) #买手机
server_ip_port =('192.168.210.255', 9880) #找到服务端软件
updclient.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
updclient.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
while True:#通信循环
    print("监听192.168.210.206...")
    trigger = ''.join(random.sample(
        ['z', 'y', 'x', 'w', 'v', 'u', 't', 's', 'r', 'q', 'p', 'o', 'n', 'm', 'l', 'k', 'j', 'i', 'h', 'g', 'f',
         'e',
         'd', 'c', 'b', 'a'], 15))
    print(trigger)
    updclient.sendto(trigger.encode(), server_ip_port)
    data,server_ip =updclient.recvfrom(1024) # 接收信息
    print("收{}的数据{}".format(server_ip, data.decode('utf-8')))
    # time.sleep(1)

版权声明:本文为qq_43067962原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。