对于追求服务器程序性能的应用有什么适用的Python框架吗?那就是今天和大家分享的Twisted框架,它支持许多常见的传输及应用层协议,包括TCP、UDP、SSL/TLS、HTTP、FTP等,这也意味着能为客户端和服务器端提供自定义开发工具。那为什么就说它能保证高效能通信呢?
Twisted在不同的操作系统平台上利用了不同的底层技术:在Windows中,基于IO完成端口技术保证了底层高效地将I/O事件通知给框架及应用程序;在Linux中采用epoll技术,它能显著提高在大量并发连接中只有少量活跃的情况下CPU利用率。Twisted框架采用Reactor设计模式,它的核心是Reactor的事件循环,监听网络、文件系统以及定时器等事件,并提供统一处理接口,使得事件能被快速响应。
在上一篇事件驱动中介绍过:对于不需要同步处理的多任务,我们可以使用事件驱动。那么在Twisted中使得程序设计可以采用事件驱动机制得益于Deferred(延迟)对象,它是一个管理回调函数的对象,我们可以向该对象添加需要回调的函数,同时可以指定该组回调函数何时被调用。
from twisted.internet import deferfrom twisted.python import failureimport sysd = defer.Deferred() # 定义defer实例def printSquare(d): # 正常处理函数print("Square of %d is %d" % (d, d * d))def processError(f): # 错误处理函数print("Error with process ")d.addCallback(printSquare) # 添加正常处理的回调函数d.addErrback(processError) # 添加错误处理回调函数# 开始调用deferif len(sys.argv) > 1 and sys.argv[1] == 'call_error':f = failure.Failure(Exception('my Exception'))d.errback(f) # 调用错误处理函数processErrorelse:d.callback(4) # 调用正常处理函数printSquare(4)
代码围绕twisted.internet.defer.Deffered对象展开。
Defer中可以管理两种回调函数:Deffered.addCallback()正常处理函数和Deffered.addErrback错误处理函数。两种回调函数可以通过Deffered.callback()和Deffered.errback()进行调用。
另外可以给一个Deffer对象赋予多个正常或错误处理的回调函数,这样在Defer对象内部形成正常处理函数链和错误处理函数链,示例代码如下。
from twisted.internet import deferd = defer.Deferred()def printSquare(d):print("Square of %d is %d", (d, d * d))return ddef processError(f):print("error with process")def printTwice(d):print("Twice of %d is %d", (2 * d))return dd.addCallback(printSquare)d.addErrback(processError)d.addCallback(printTwice)d.callback(5)# Square of %d is %d (5, 25)# Twice of %d is %d 10
Deffered的主要成员函数包括:
| addCallback(self, callback, *args, **kwargs) | 给Defer对象添加正常处理回调函数,需要至少有一个输入参数 |
| addErrback(self, errback, *args, **kwargs) | 给Defer对象添加错误处理回调函数,errback为错误处理函数名,需要至少有一个输入参数 |
| addBoth(self, callback, *args, **kwargs) | 回调函数同时作为正常和错误处理回调函数添加到Defer对象中 |
| chainDeffered(self, d) | 将另一个Defer对象的正常和错误处理回调函数添加到本Defer对象中。本函数是单向的 |
| callback(self, result) | 调用正常处理函数链,result是传递给第一个正常处理回调函数的参数 |
| errback(self, fail=None) | 调用错误处理函数链,result是传递给第一个错误处理回调函数的参数。 |
| pause(self)和unpause(self) | pause(self)和unpause(self) 用来暂停和继续调用链 |
Defer为什么要分别管理两条回调函数调用链?因为调用链函数之间除了简单的顺序调用关系,还存在交叉调用关系,两条为了对回调过程提供更好的可控性,调用流程图如下:

其中实线为回调函数正常返回时的继续调用路径,虚线为处理函数中产生异常时的后续调用路径。
我们再将Deffer对象和reactor的延时调用机制结合在一起,来实现异步调用函数。
from twisted.internet import reactor, deferdef printSquare(d):print("Square of %d is %d" % (d, d * d))return ddef printTwice(d):print("Twice of %d is %d", (2 * d))return ddef makeDefer():d = defer.Deferred()d.addCallback(printSquare)d.addCallback(printTwice)reactor.callLater(2, d.callback, 5) # 配置延时2s调用makeDefer()reactor.run()# Square of 5 is 25# Twice of %d is %d 10# 挂起运行
makeDefer函数内定义了调用链执行的逻辑关系,其中 reactor.callLater(2, d.callback, 5)表示在reactor.run()运行后的2后,twisted框架才去调用callback对应的两个函数(printSquare,printTwice)。
callLater()函数原型如下
def callLater(delay, callable, *args, **kw):pass
delay定义延时调用秒数,如果为0则是立即调用;callable为被调用的函数名及其参数。
通过reactor.callLater(4, reactor.stop)定义4秒后调用函数reactor.stop(),还可以实现定时退出Twisted消息循环。
下面我们通过一个实时通信的广播系统模型介绍下用Twisted框架开发基于TCP的网络应用的方法:
首先Twisted提供了基本的通信编程封装,这里先介绍下Transports。它代表网络中两个通信结点之间的连接。Transports负责描述连接的细节,比如连接是面向流式的还是面向数据报的,流控以及可靠性,比如TCP、UDP和Unix套接字。对应方法如下:
| write | 以非阻塞的方式按顺序依次将数据写到物理连接上 |
| writeSequence | 将一个字符串列表写到物理连接上 |
| loseConnection | 将所有挂起的数据写入,然后关闭连接 |
| getPeer | 取得连接中对端的地址信息 |
| getHost | 取得连接中本端的地址信息 |
Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含如下的方法:
| makeConnection | 在transport对象和服务器之间建立一条连接 |
| connectionMade | 连接建立起来后调用 |
| dataReceived | 接收数据时调用 |
| connectionLost | 关闭连接时调用 |
广播系统服务器
针对Twisted的Protocol、Factory等类进行编程,定义它们的子类并重写connectionMade和dataReceived进行事件化处理。
from twisted.internet.protocol import Protocolfrom twisted.internet.protocol import Factoryfrom twisted.internet.endpoints import TCP4ServerEndpointfrom twisted.internet import reactorclients = [] # 保存所有客户端连接# Protocol的子类class Spreader(Protocol):def __init__(self, factory):self.factory = factorydef connectionMade(self):self.factory.numProtocols = self.factory.numProtocols + 1 # 对连接的客户端进行计数self.transport.write((u"欢迎来到Spread Site,您是第%d个客户端用户!\n" %(self.factory.numProtocols,).encode('utf8')))print(f"new connect {self.factory.numProtocols}")clients.append(self) # 将self保存到clients列表中def connectionLost(self, reason):clients.remove()print(f"lost connect: {self.connect_id}")def dataReceived(self, data):if data == "close":self.transport.loseConnection()print(f"{self.connect_id} closed")else:print(f"spreading message from {self.connect_id, self.data}")for client in clients:if client != self:# 将收到的数据通过Protocol.transport.write()函数分发给除自己以外的所有客户端client.transport.write(data)# Factory的子类class SpreadFactory(Factory):def __init__(self):self.numProtocols = 0 # 将客户端计数器置0def buildProtocol(self, addr):return Spreader(self) # 建立Protocol子类的实例endpoint = TCP4ServerEndpoint(reactor, 8007) # 定义服务器监听端口endpoint.listen(SpreadFactory()) # 指定子类实例reactor.run() # 挂起运行
广播客户端
Twisted同样提供了基于Protocol类的TCP客户端编程方法。
from twisted.internet.protocol import Protocol, ClientFactoryfrom twisted.internet import reactorimport sysimport datetimeclass Echo(Protocol):def connectionMade(self):print("connect to server!")def dataReceived(self, data):print("got message: ", data.decode('utf8'))reactor.callLater(5, self.say_hello)def connectionLost(self, reason):print("Disconnected from server!")def say_hello(self):if self.transport.connected:self.transport.write((u"hello, I'm %s %s" % (sys.argv[1], datetime.datetime.now())).encode('utf-8'))class EchoClientFactory(ClientFactory):def __init__(self):self.protocol = Nonedef startedConnecting(self, connector):print("Start to connect.")def buildProtocol(self, addr):self.protocol = Echo()return self.protocoldef clientConnectionLost(self, connector, reason):print("Lost Connection. Reason:", reason)def clientConnectionFailed(self, connector, reason):print("Connection failed. Reason:", reason)host = "127.0.0.1"port =8007factory = EchoClientFactory()reactor.connectTCP(host, port, factory)reactor.run()
执行顺序如下:
建立连接
ClientFactory.startedConnecting()
Protocol.connectionMade()
已连接
用Protocol.dataReceived()接受消息
用Protocol.transport.write()发送消息
连接断开
Protocol.connectionLost()
ClientFactory. clientConnectionLost()
即建立连接时先执行ClientFactory中回调,然后执行Protocol中回调,连接断开时正好相反。
欢迎你来我的公众号“才浅的每日python”和我一起讨论,也欢迎你来催更扯淡。日拱一卒,下一篇文不见不散。