一、导包
import inspect
import itertools
import logging
import sys
import os
import gc
from ryu import cfg
from ryu import utils
from ryu.app import wsgi
from ryu.controller.handler import register_instance, get_dependent_services
from ryu.controller.controller import Datapath
from ryu.controller import event
from ryu.controller.event import EventRequestBase, EventReplyBase
from ryu.lib import hub
from ryu.ofproto import ofproto_protocol
LOG = logging.getLogger('ryu.base.app_manager')
SERVICE_BRICKS = {}
- app_manager引入的内容与刚刚cmd中的manager类似需要注意37行导入了datapath,用于管理网桥的库以及38-39行引入的ryu的事件管理库
- 42行进行了日志配置
- 45行文件全局变量SERVICE_BRICKS这个字典,是一个app名称映射到app实例的字典,但为什么不叫app,突然改叫brick,是阅读过程中比较困惑的地方。后面会多处出现对这个字典的操作
二、app管理器
注意45行后到351行间是app基类与工具方法,此处跳跃进行介绍
class AppManager(object):
# singleton
_instance = None
@staticmethod
def run_apps(app_lists):
"""Run a set of Ryu applications
A convenient method to load and instantiate apps.
This blocks until all relevant apps stop.
"""
app_mgr = AppManager.get_instance()
app_mgr.load_apps(app_lists)
contexts = app_mgr.create_contexts()
services = app_mgr.instantiate_apps(**contexts)
webapp = wsgi.start_service(app_mgr)
if webapp:
services.append(hub.spawn(webapp))
try:
hub.joinall(services)
finally:
app_mgr.close()
for t in services:
t.kill()
hub.joinall(services)
gc.collect()
@staticmethod
def get_instance():
if not AppManager._instance:
AppManager._instance = AppManager()
return AppManager._instance
def __init__(self):
self.applications_cls = {}
self.applications = {}
self.contexts_cls = {}
self.contexts = {}
self.close_sem = hub.Semaphore()
- 首先看379行即我们看到在上面的代码中调用的实例获取函数,也确实是单实例模式
- 再看384行的初始化函数,不需要传入任何参数,仅对app和context的加载所需的变量进行初始化
- 而这个run_apps函数,与我们在manager.py中的run_apps函数名字都一样,做的事情也—致,经过搜索调用此函数的文件后,发现此函数仅在ryu/ryu/cmd/ofa_neutron_agent.py被使,猜测当openstack的neutron插件使用ryu控制器时,会与平时ryu-manager的逻辑有所不同
def load_app(self, name):
mod = utils.import_module(name)
clses = inspect.getmembers(mod,
lambda cls: (inspect.isclass(cls) and
issubclass(cls, RyuApp) and
mod.__name__ ==
cls.__module__))
if clses:
return clses[0][1]
return None
def load_apps(self, app_lists):
app_lists = [app for app
in itertools.chain.from_iterable(app.split(',')
for app in app_lists)]
while len(app_lists) > 0:
app_cls_name = app_lists.pop(0)
context_modules = [x.__module__ for x in self.contexts_cls.values()]
if app_cls_name in context_modules:
continue
LOG.info('loading app %s', app_cls_name)
cls = self.load_app(app_cls_name)
if cls is None:
continue
self.applications_cls[app_cls_name] = cls
services = []
for key, context_cls in cls.context_iteritems():
v = self.contexts_cls.setdefault(key, context_cls)
assert v == context_cls
context_modules.append(context_cls.__module__)
if issubclass(context_cls, RyuApp):
services.extend(get_dependent_services(context_cls))
# we can't load an app that will be initiataed for
# contexts.
for i in get_dependent_services(cls):
if i not in context_modules:
services.append(i)
if services:
app_lists.extend([s for s in set(services)
if s not in app_lists])
接下来就是关键的app加载相关的函数
上面的分析我们知道manager.py调用app_manager的load_apps作为加载app的入口函数
首先403行对app_list中的内容进行扁平化处理,将逗号分隔的app都拆分成单独的app重新存入列表
接下的循环就是加载app类以及处理依赖的逻辑406行,可以看到循环的结束条件是app_list为空而在407行从app_list中取出一个app进行处理
一般来说这样使用while列表不为空来处理列表中的内容,都是列表的内容会随着处理而动态改变
而动态变化的原因,正是因为只有加载了一个app类之后,才知道他的依赖是什么,才能将其动态的加入app_list中ryu没有强硬的规定依赖关系必须显式的写在配置中或者app类中(除了context)而是通过调用关系,隐含式的给我们实现的类中添加了对应的依赖关系,存疑???这也是ryu实现非常优雅的地方之一
接下来409行确认当前处理的app是不是只是一个context而已
415行调用load_app加载单个app
419行可以看出,app_manager.applications_cls的键是类名,值是类,而不是实例,注意此阶段并没有对app类进行实例化,只是记录类的类别
421行初始化了一个列表叫services:这里的services与上文manage.py中用来hub.joinall的services含义完全不同,这里的services仅用于存储当前正在处理的app所依赖的app们
422行迭代app类所声明的context:如果是context只是一个其他的类,就加入到app_manager的contexts_cls字典中,注意同样仅存了类的类别,没有初始化
而425行的context_module列表既存储了之前app声明的context,也添加了当前app声明的context
427行,如果context中的类同样实现了ryu的app_base,那么把他添加到services里,后续添加到app_list里,说明如果context中声明的一个依赖同样实现了ryu的app基类,那么同样将其以app看待进行加载
432行的get_dependent_services函数并不在本文件中,但可以根据其名称知其含义,即获取本app所依赖的app们,后续会对此函数详细解释
433行的判断逻辑避免了context中的app类被重复添加至app_list,而维护context_modules变量的意义即如此
435行将services变量中的app,即本app所依赖的app,添加到待加载的app_list中去
而前面391行的load_app函数,也很简单,利用python动态的特性,从对应对的模块中找继承了app基类的那个用户实现的app类
def create_contexts(self):
for key, cls in self.contexts_cls.items():
if issubclass(cls, RyuApp):
# hack for dpset
context = self._instantiate(None, cls)
else:
context = cls()
LOG.info('creating context %s', key)
assert key not in self.contexts
self.contexts[key] = context
return self.contexts
- 接下来create_contexts,也是前文manage.py中调用的函数
- 从内容可以看出本函数对context中记录的类进行是初始化
- 将各类的实例以kv的形式存储在app_manager.context中,并最后返回这个字典
- 所以在load_apps中context进行了加载,而本函数进行了context的初始化,生成类的实例
def _update_bricks(self):
for i in SERVICE_BRICKS.values():
for _k, m in inspect.getmembers(i, inspect.ismethod):
if not hasattr(m, 'callers'):
continue
for ev_cls, c in m.callers.items():
if not c.ev_source:
continue
brick = _lookup_service_brick_by_mod_name(c.ev_source)
if brick:
brick.register_observer(ev_cls, i.name,
c.dispatchers)
# allow RyuApp and Event class are in different module
for brick in SERVICE_BRICKS.values():
if ev_cls in brick._EVENTS:
brick.register_observer(ev_cls, i.name,
c.dispatchers)
@staticmethod
def _report_brick(name, app):
LOG.debug("BRICK %s", name)
for ev_cls, list_ in app.observers.items():
LOG.debug(" PROVIDES %s TO %s", ev_cls.__name__, list_)
for ev_cls in app.event_handlers.keys():
LOG.debug(" CONSUMES %s", ev_cls.__name__)
@staticmethod
def report_bricks():
for brick, i in SERVICE_BRICKS.items():
AppManager._report_brick(brick, i)
- 上面的update_brickcs函数涉及到了ryu的一个魔法设计,即caller,后面还会详细说
- 下面的两个report_bricks是用来打印当前控制器所管理的bricks,而这个砖头就是从文件开头的全局变量取出
- 而bricks砖头,对应着我们指定的app,以及我们的app所依赖的app
- 如下是我运行ryu时verbose模式下打印的BRICK内容:其中openflow、hostapd_wif、hostapd_eth、pusher是我编写的app,而ofp_event对应着我们的app依赖的app:ryu.controller.ofp_handler
- 下面更重要的内容是每个BRICK,即app,消费以及产生的事件类型,对于了解多app情况下的event路由情况非常有帮助,其中PROVIDES EventX To {‘openflow’: {‘main’}}中的openflow指的是app名字,main指定是当前控制器所处于的状态disaptcher,如config配置状态,main运行状态
BRICK openflow
CONSUMES EventOFPPacketIn
CONSUMES EventJoin
CONSUMES EventLeave
CONSUMES EventOFPSwitchFeatures
BRICK hostapd_wif
PROVIDES EventJoin TO {'openflow': {'main'}, 'pusher': {'main'}}
PROVIDES EventLeave TO {'openflow': {'main'}, 'pusher': {'main'}}
BRICK hostapd_eth
PROVIDES EventJoin TO {'openflow': {'main'}, 'pusher': {'main'}}
PROVIDES EventLeave TO {'openflow': {'main'}, 'pusher': {'main'}}
BRICK pusher
CONSUMES EventJoin
CONSUMES EventLeave
BRICK ofp_event
PROVIDES EventOFPPacketIn TO {'openflow': {'main'}}
PROVIDES EventOFPSwitchFeatures TO {'openflow': {'config'}}
CONSUMES EventOFPEchoReply
CONSUMES EventOFPEchoRequest
CONSUMES EventOFPErrorMsg
CONSUMES EventOFPHello
CONSUMES EventOFPPortDescStatsReply
CONSUMES EventOFPPortStatus
CONSUMES EventOFPSwitchFeatures
def _instantiate(self, app_name, cls, *args, **kwargs):
# for now, only single instance of a given module
# Do we need to support multiple instances?
# Yes, maybe for slicing.
LOG.info('instantiating app %s of %s', app_name, cls.__name__)
if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None:
ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS)
if app_name is not None:
assert app_name not in self.applications
app = cls(*args, **kwargs)
register_app(app)
assert app.name not in self.applications
self.applications[app.name] = app
return app
def instantiate(self, cls, *args, **kwargs):
app = self._instantiate(None, cls, *args, **kwargs)
self._update_bricks()
self._report_brick(app.name, app)
return app
def instantiate_apps(self, *args, **kwargs):
for app_name, cls in self.applications_cls.items():
self._instantiate(app_name, cls, *args, **kwargs)
self._update_bricks()
self.report_bricks()
threads = []
for app in self.applications.values():
t = app.start()
if t is not None:
app.set_main_thread(t)
threads.append(t)
return threads
- 接下来是用来进行app初始化的函数
- 首先manager.py调用的app初始化入口函数是instantiate_apps
在本函数里可以看到遍历了app_manager的app class,并调用_instantiate实际初始化
后面调用_update_bricks更新app间的事件依赖关系,并通过report打印结果
最后如同我们上面分析manager.py中的猜测一样,instantiate_apps函数返回了注册的所有app的协程句柄,用于在main函数中统一join - 在_instantiate函数中实现了app类的动态的实例化,并在实例化后调用register_app注册,最后在app_manager自身的application字典中注册新初始化的app
@staticmethod
def _close(app):
close_method = getattr(app, 'close', None)
if callable(close_method):
close_method()
def uninstantiate(self, name):
app = self.applications.pop(name)
unregister_app(app)
for app_ in SERVICE_BRICKS.values():
app_.unregister_observer_all_event(name)
app.stop()
self._close(app)
events = app.events
if not events.empty():
app.logger.debug('%s events remains %d', app.name, events.qsize())
def close(self):
def close_all(close_dict):
for app in close_dict.values():
self._close(app)
close_dict.clear()
# This semaphore prevents parallel execution of this function,
# as run_apps's finally clause starts another close() call.
with self.close_sem:
for app_name in list(self.applications.keys()):
self.uninstantiate(app_name)
assert not self.applications
close_all(self.contexts)
- 最后是一些关闭、清理、删除当前app的函数
- app_manager至此就分析结束了
三、工具方法
def lookup_service_brick(name):
return SERVICE_BRICKS.get(name)
def _lookup_service_brick_by_ev_cls(ev_cls):
return _lookup_service_brick_by_mod_name(ev_cls.__module__)
def _lookup_service_brick_by_mod_name(mod_name):
return lookup_service_brick(mod_name.split('.')[-1])
def register_app(app):
assert isinstance(app, RyuApp)
assert app.name not in SERVICE_BRICKS
SERVICE_BRICKS[app.name] = app
register_instance(app)
def unregister_app(app):
SERVICE_BRICKS.pop(app.name)
def require_app(app_name, api_style=False):
"""
Request the application to be automatically loaded.
If this is used for "api" style modules, which is imported by a client
application, set api_style=True.
If this is used for client application module, set api_style=False.
"""
iterable = (inspect.getmodule(frame[0]) for frame in inspect.stack())
modules = [module for module in iterable if module is not None]
if api_style:
m = modules[2] # skip a frame for "api" module
else:
m = modules[1]
m._REQUIRED_APP = getattr(m, '_REQUIRED_APP', [])
m._REQUIRED_APP.append(app_name)
LOG.debug('require_app: %s is required by %s', app_name, m.__name__)
- 这些方法在app_manager的函数中有所调用
- 可以看出,注册方法对应字典的赋值,删除方法对应字典的删除,搜索方法对应字典的取值
- 读起来SERVICE_BRICKS与app_manager自身的appcalitions没有什么区别
- 可能随着理解的加深能会慢慢了解这样的用意叭
- 最后require_app函数的逻辑是添加依赖app名字到某个module的_REQUIRED_APP列表,_REQUIRED_APP下文我们还会见到
四、app基类
由于app在app_manager视角只是一个被管理者,在下文中的事件收发小节中介绍app基类更好,这里暂且跳过。
五、ryu/ryu/controller/handler.py
def get_dependent_services(cls):
services = []
for _k, m in inspect.getmembers(cls, _is_method):
if _has_caller(m):
for ev_cls, c in m.callers.items():
service = getattr(sys.modules[ev_cls.__module__],
'_SERVICE_NAME', None)
if service:
# avoid cls that registers the own events (like
# ofp_handler)
if cls.__module__ != service:
services.append(service)
m = sys.modules[cls.__module__]
services.extend(getattr(m, '_REQUIRED_APP', []))
services = list(set(services))
return services

╭─root@testdevice ~/ryu/ryu ‹master›
╰─# grep -R require_app
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.rest_topology')
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.ws_topology')
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.ofctl_rest')
app/ofctl/api.py:app_manager.require_app('ryu.app.ofctl.service', api_style=True)
base/app_manager.py:def require_app(app_name, api_style=False):
base/app_manager.py: LOG.debug('require_app: %s is required by %s', app_name, m.__name__)
services/protocols/vrrp/api.py:app_manager.require_app('ryu.services.protocols.vrrp.manager', api_style=True)
topology/api.py:app_manager.require_app('ryu.topology.switches', api_style=True)
- 除了get_dependent_services,ryu/ryu/controller/handler.py文件其他的函数或者类的定义,也都是围绕着caller有关
六、app管理小结
420
421 services = []
422 for key, context_cls in cls.context_iteritems():
423 v = self.contexts_cls.setdefault(key, context_cls)
424 assert v == context_cls
425 context_modules.append(context_cls.__module__)
426
427 if issubclass(context_cls, RyuApp):
428 services.extend(get_dependent_services(context_cls))
429
430 # we can't load an app that will be initiataed for
431 # contexts.
432 for i in get_dependent_services(cls):
433 if i not in context_modules:
434 services.append(i)
435 if services:
436 app_lists.extend([s for s in set(services)
437 if s not in app_lists])

参考:
[1] ohmyadd:https://www.jianshu.com/p/ff59c7c5f056
[2] Ryubook:http://osrg.github.io/ryu-book/en/html/
[3] Ryu官方文档:https://ryu.readthedocs.io/en/latest/