Ryu源码阅读(三)——app依赖管理

一、导包

import inspect
import itertools
import logging
import sys
import os
import gc

from ryu import cfg
from ryu import utils
from ryu.app import wsgi
from ryu.controller.handler import register_instance, get_dependent_services
from ryu.controller.controller import Datapath
from ryu.controller import event
from ryu.controller.event import EventRequestBase, EventReplyBase
from ryu.lib import hub
from ryu.ofproto import ofproto_protocol

LOG = logging.getLogger('ryu.base.app_manager')

SERVICE_BRICKS = {}
  • app_manager引入的内容与刚刚cmd中的manager类似需要注意37行导入了datapath,用于管理网桥的库以及38-39行引入的ryu的事件管理库
  • 42行进行了日志配置
  • 45行文件全局变量SERVICE_BRICKS这个字典,是一个app名称映射到app实例的字典,但为什么不叫app,突然改叫brick,是阅读过程中比较困惑的地方。后面会多处出现对这个字典的操作

二、app管理器

注意45行后到351行间是app基类与工具方法,此处跳跃进行介绍

class AppManager(object):
    # singleton
    _instance = None

    @staticmethod
    def run_apps(app_lists):
        """Run a set of Ryu applications

        A convenient method to load and instantiate apps.
        This blocks until all relevant apps stop.
        """
        app_mgr = AppManager.get_instance()
        app_mgr.load_apps(app_lists)
        contexts = app_mgr.create_contexts()
        services = app_mgr.instantiate_apps(**contexts)
        webapp = wsgi.start_service(app_mgr)
        if webapp:
            services.append(hub.spawn(webapp))
        try:
            hub.joinall(services)
        finally:
            app_mgr.close()
            for t in services:
                t.kill()
            hub.joinall(services)
            gc.collect()

    @staticmethod
    def get_instance():
        if not AppManager._instance:
            AppManager._instance = AppManager()
        return AppManager._instance

    def __init__(self):
        self.applications_cls = {}
        self.applications = {}
        self.contexts_cls = {}
        self.contexts = {}
        self.close_sem = hub.Semaphore()
  • 首先看379行即我们看到在上面的代码中调用的实例获取函数,也确实是单实例模式
  • 再看384行的初始化函数,不需要传入任何参数,仅对app和context的加载所需的变量进行初始化
  • 而这个run_apps函数,与我们在manager.py中的run_apps函数名字都一样,做的事情也—致,经过搜索调用此函数的文件后,发现此函数仅在ryu/ryu/cmd/ofa_neutron_agent.py被使,猜测当openstack的neutron插件使用ryu控制器时,会与平时ryu-manager的逻辑有所不同
def load_app(self, name):
        mod = utils.import_module(name)
        clses = inspect.getmembers(mod,
                                   lambda cls: (inspect.isclass(cls) and
                                                issubclass(cls, RyuApp) and
                                                mod.__name__ ==
                                                cls.__module__))
        if clses:
            return clses[0][1]
        return None

    def load_apps(self, app_lists):
        app_lists = [app for app
                     in itertools.chain.from_iterable(app.split(',')
                                                      for app in app_lists)]
        while len(app_lists) > 0:
            app_cls_name = app_lists.pop(0)

            context_modules = [x.__module__ for x in self.contexts_cls.values()]
            if app_cls_name in context_modules:
                continue

            LOG.info('loading app %s', app_cls_name)

            cls = self.load_app(app_cls_name)
            if cls is None:
                continue

            self.applications_cls[app_cls_name] = cls

            services = []
            for key, context_cls in cls.context_iteritems():
                v = self.contexts_cls.setdefault(key, context_cls)
                assert v == context_cls
                context_modules.append(context_cls.__module__)

                if issubclass(context_cls, RyuApp):
                    services.extend(get_dependent_services(context_cls))

            # we can't load an app that will be initiataed for
            # contexts.
            for i in get_dependent_services(cls):
                if i not in context_modules:
                    services.append(i)
            if services:
                app_lists.extend([s for s in set(services)
                                  if s not in app_lists])
  • 接下来就是关键的app加载相关的函数

  • 上面的分析我们知道manager.py调用app_manager的load_apps作为加载app的入口函数

  • 首先403行对app_list中的内容进行扁平化处理,将逗号分隔的app都拆分成单独的app重新存入列表
    接下的循环就是加载app类以及处理依赖的逻辑

  • 406行,可以看到循环的结束条件是app_list为空而在407行从app_list中取出一个app进行处理

  • 一般来说这样使用while列表不为空来处理列表中的内容,都是列表的内容会随着处理而动态改变
    而动态变化的原因,正是因为只有加载了一个app类之后,才知道他的依赖是什么,才能将其动态的加入app_list中

  • ryu没有强硬的规定依赖关系必须显式的写在配置中或者app类中(除了context)而是通过调用关系,隐含式的给我们实现的类中添加了对应的依赖关系,存疑???这也是ryu实现非常优雅的地方之一

  • 接下来409行确认当前处理的app是不是只是一个context而已

  • 415行调用load_app加载单个app

  • 419行可以看出,app_manager.applications_cls的键是类名,值是类,而不是实例,注意此阶段并没有对app类进行实例化,只是记录类的类别

  • 421行初始化了一个列表叫services:这里的services与上文manage.py中用来hub.joinall的services含义完全不同,这里的services仅用于存储当前正在处理的app所依赖的app们

  • 422行迭代app类所声明的context:如果是context只是一个其他的类,就加入到app_manager的contexts_cls字典中,注意同样仅存了类的类别,没有初始化

  • 而425行的context_module列表既存储了之前app声明的context,也添加了当前app声明的context

  • 427行,如果context中的类同样实现了ryu的app_base,那么把他添加到services里,后续添加到app_list里,说明如果context中声明的一个依赖同样实现了ryu的app基类,那么同样将其以app看待进行加载

  • 432行的get_dependent_services函数并不在本文件中,但可以根据其名称知其含义,即获取本app所依赖的app们,后续会对此函数详细解释

  • 433行的判断逻辑避免了context中的app类被重复添加至app_list,而维护context_modules变量的意义即如此

  • 435行将services变量中的app,即本app所依赖的app,添加到待加载的app_list中去

  • 而前面391行的load_app函数,也很简单,利用python动态的特性,从对应对的模块中找继承了app基类的那个用户实现的app类

 def create_contexts(self):
        for key, cls in self.contexts_cls.items():
            if issubclass(cls, RyuApp):
                # hack for dpset
                context = self._instantiate(None, cls)
            else:
                context = cls()
            LOG.info('creating context %s', key)
            assert key not in self.contexts
            self.contexts[key] = context
        return self.contexts
  • 接下来create_contexts,也是前文manage.py中调用的函数
  • 从内容可以看出本函数对context中记录的类进行是初始化
  • 将各类的实例以kv的形式存储在app_manager.context中,并最后返回这个字典
  • 所以在load_apps中context进行了加载,而本函数进行了context的初始化,生成类的实例

    def _update_bricks(self):
        for i in SERVICE_BRICKS.values():
            for _k, m in inspect.getmembers(i, inspect.ismethod):
                if not hasattr(m, 'callers'):
                    continue
                for ev_cls, c in m.callers.items():
                    if not c.ev_source:
                        continue

                    brick = _lookup_service_brick_by_mod_name(c.ev_source)
                    if brick:
                        brick.register_observer(ev_cls, i.name,
                                                c.dispatchers)

                    # allow RyuApp and Event class are in different module
                    for brick in SERVICE_BRICKS.values():
                        if ev_cls in brick._EVENTS:
                            brick.register_observer(ev_cls, i.name,
                                                    c.dispatchers)

    @staticmethod
    def _report_brick(name, app):
        LOG.debug("BRICK %s", name)
        for ev_cls, list_ in app.observers.items():
            LOG.debug("  PROVIDES %s TO %s", ev_cls.__name__, list_)
        for ev_cls in app.event_handlers.keys():
            LOG.debug("  CONSUMES %s", ev_cls.__name__)

    @staticmethod
    def report_bricks():
        for brick, i in SERVICE_BRICKS.items():
            AppManager._report_brick(brick, i)
  • 上面的update_brickcs函数涉及到了ryu的一个魔法设计,即caller,后面还会详细说
  • 下面的两个report_bricks是用来打印当前控制器所管理的bricks,而这个砖头就是从文件开头的全局变量取出
  • 而bricks砖头,对应着我们指定的app,以及我们的app所依赖的app
  • 如下是我运行ryu时verbose模式下打印的BRICK内容:其中openflow、hostapd_wif、hostapd_eth、pusher是我编写的app,而ofp_event对应着我们的app依赖的app:ryu.controller.ofp_handler
  • 下面更重要的内容是每个BRICK,即app,消费以及产生的事件类型,对于了解多app情况下的event路由情况非常有帮助,其中PROVIDES EventX To {‘openflow’: {‘main’}}中的openflow指的是app名字,main指定是当前控制器所处于的状态disaptcher,如config配置状态,main运行状态
BRICK openflow
  CONSUMES EventOFPPacketIn
  CONSUMES EventJoin
  CONSUMES EventLeave
  CONSUMES EventOFPSwitchFeatures
BRICK hostapd_wif
  PROVIDES EventJoin TO {'openflow': {'main'}, 'pusher': {'main'}}
  PROVIDES EventLeave TO {'openflow': {'main'}, 'pusher': {'main'}}
BRICK hostapd_eth
  PROVIDES EventJoin TO {'openflow': {'main'}, 'pusher': {'main'}}
  PROVIDES EventLeave TO {'openflow': {'main'}, 'pusher': {'main'}}
BRICK pusher
  CONSUMES EventJoin
  CONSUMES EventLeave
BRICK ofp_event
  PROVIDES EventOFPPacketIn TO {'openflow': {'main'}}
  PROVIDES EventOFPSwitchFeatures TO {'openflow': {'config'}}
  CONSUMES EventOFPEchoReply
  CONSUMES EventOFPEchoRequest
  CONSUMES EventOFPErrorMsg
  CONSUMES EventOFPHello
  CONSUMES EventOFPPortDescStatsReply
  CONSUMES EventOFPPortStatus
  CONSUMES EventOFPSwitchFeatures
 def _instantiate(self, app_name, cls, *args, **kwargs):
        # for now, only single instance of a given module
        # Do we need to support multiple instances?
        # Yes, maybe for slicing.
        LOG.info('instantiating app %s of %s', app_name, cls.__name__)

        if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None:
            ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS)

        if app_name is not None:
            assert app_name not in self.applications
        app = cls(*args, **kwargs)
        register_app(app)
        assert app.name not in self.applications
        self.applications[app.name] = app
        return app

    def instantiate(self, cls, *args, **kwargs):
        app = self._instantiate(None, cls, *args, **kwargs)
        self._update_bricks()
        self._report_brick(app.name, app)
        return app

    def instantiate_apps(self, *args, **kwargs):
        for app_name, cls in self.applications_cls.items():
            self._instantiate(app_name, cls, *args, **kwargs)

        self._update_bricks()
        self.report_bricks()

        threads = []
        for app in self.applications.values():
            t = app.start()
            if t is not None:
                app.set_main_thread(t)
                threads.append(t)
        return threads
  • 接下来是用来进行app初始化的函数
  • 首先manager.py调用的app初始化入口函数是instantiate_apps
    在本函数里可以看到遍历了app_manager的app class,并调用_instantiate实际初始化
    后面调用_update_bricks更新app间的事件依赖关系,并通过report打印结果
    最后如同我们上面分析manager.py中的猜测一样,instantiate_apps函数返回了注册的所有app的协程句柄,用于在main函数中统一join
  • 在_instantiate函数中实现了app类的动态的实例化,并在实例化后调用register_app注册,最后在app_manager自身的application字典中注册新初始化的app
@staticmethod
    def _close(app):
        close_method = getattr(app, 'close', None)
        if callable(close_method):
            close_method()

    def uninstantiate(self, name):
        app = self.applications.pop(name)
        unregister_app(app)
        for app_ in SERVICE_BRICKS.values():
            app_.unregister_observer_all_event(name)
        app.stop()
        self._close(app)
        events = app.events
        if not events.empty():
            app.logger.debug('%s events remains %d', app.name, events.qsize())

    def close(self):
        def close_all(close_dict):
            for app in close_dict.values():
                self._close(app)
            close_dict.clear()

        # This semaphore prevents parallel execution of this function,
        # as run_apps's finally clause starts another close() call.
        with self.close_sem:
            for app_name in list(self.applications.keys()):
                self.uninstantiate(app_name)
            assert not self.applications
            close_all(self.contexts)
  • 最后是一些关闭、清理、删除当前app的函数
  • app_manager至此就分析结束了

三、工具方法

def lookup_service_brick(name):
    return SERVICE_BRICKS.get(name)


def _lookup_service_brick_by_ev_cls(ev_cls):
    return _lookup_service_brick_by_mod_name(ev_cls.__module__)


def _lookup_service_brick_by_mod_name(mod_name):
    return lookup_service_brick(mod_name.split('.')[-1])


def register_app(app):
    assert isinstance(app, RyuApp)
    assert app.name not in SERVICE_BRICKS
    SERVICE_BRICKS[app.name] = app
    register_instance(app)


def unregister_app(app):
    SERVICE_BRICKS.pop(app.name)


def require_app(app_name, api_style=False):
    """
    Request the application to be automatically loaded.

    If this is used for "api" style modules, which is imported by a client
    application, set api_style=True.

    If this is used for client application module, set api_style=False.
    """
    iterable = (inspect.getmodule(frame[0]) for frame in inspect.stack())
    modules = [module for module in iterable if module is not None]
    if api_style:
        m = modules[2]  # skip a frame for "api" module
    else:
        m = modules[1]
    m._REQUIRED_APP = getattr(m, '_REQUIRED_APP', [])
    m._REQUIRED_APP.append(app_name)
    LOG.debug('require_app: %s is required by %s', app_name, m.__name__)

  • 这些方法在app_manager的函数中有所调用
  • 可以看出,注册方法对应字典的赋值,删除方法对应字典的删除,搜索方法对应字典的取值
  • 读起来SERVICE_BRICKS与app_manager自身的appcalitions没有什么区别
  • 可能随着理解的加深能会慢慢了解这样的用意叭
  • 最后require_app函数的逻辑是添加依赖app名字到某个module的_REQUIRED_APP列表,_REQUIRED_APP下文我们还会见到

四、app基类

由于app在app_manager视角只是一个被管理者,在下文中的事件收发小节中介绍app基类更好,这里暂且跳过。

五、ryu/ryu/controller/handler.py

def get_dependent_services(cls):
    services = []
    for _k, m in inspect.getmembers(cls, _is_method):
        if _has_caller(m):
            for ev_cls, c in m.callers.items():
                service = getattr(sys.modules[ev_cls.__module__],
                                  '_SERVICE_NAME', None)
                if service:
                    # avoid cls that registers the own events (like
                    # ofp_handler)
                    if cls.__module__ != service:
                        services.append(service)

    m = sys.modules[cls.__module__]
    services.extend(getattr(m, '_REQUIRED_APP', []))
    services = list(set(services))
    return services

在这里插入图片描述

╭─root@testdevice ~/ryu/ryu ‹master›
╰─# grep -R require_app
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.rest_topology')
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.ws_topology')
app/gui_topology/gui_topology.py:app_manager.require_app('ryu.app.ofctl_rest')
app/ofctl/api.py:app_manager.require_app('ryu.app.ofctl.service', api_style=True)
base/app_manager.py:def require_app(app_name, api_style=False):
base/app_manager.py:    LOG.debug('require_app: %s is required by %s', app_name, m.__name__)
services/protocols/vrrp/api.py:app_manager.require_app('ryu.services.protocols.vrrp.manager', api_style=True)
topology/api.py:app_manager.require_app('ryu.topology.switches', api_style=True)
  • 除了get_dependent_services,ryu/ryu/controller/handler.py文件其他的函数或者类的定义,也都是围绕着caller有关

六、app管理小结

   420
   421              services = []
   422              for key, context_cls in cls.context_iteritems():
   423                  v = self.contexts_cls.setdefault(key, context_cls)
   424                  assert v == context_cls
   425                  context_modules.append(context_cls.__module__)
   426
   427                  if issubclass(context_cls, RyuApp):
   428                      services.extend(get_dependent_services(context_cls))
   429
   430              # we can't load an app that will be initiataed for
   431              # contexts.
   432              for i in get_dependent_services(cls):
   433                  if i not in context_modules:
   434                      services.append(i)
   435              if services:
   436                  app_lists.extend([s for s in set(services)
   437                                    if s not in app_lists])

在这里插入图片描述

参考:
[1] ohmyadd:https://www.jianshu.com/p/ff59c7c5f056
[2] Ryubook:http://osrg.github.io/ryu-book/en/html/
[3] Ryu官方文档:https://ryu.readthedocs.io/en/latest/