Traffic Monitor

Routine Examination of Network

网络已经成为许多服务和业务的基础设施,因此保持正常稳定的运行是很有必要的。但是也免不了会有问题发生。
当网络上发生错误时,必须找出原因并迅速恢复操作。不用说,为了检测错误和识别原因,有必要定期了解网络状态。例如,假设某一网络设备的某个端口的流量值非常高,如果没有连续测量该端口的流量,则无法确定该端口是处于异常状态还是通常处于异常状态,何时变为异常状态。
因此,对网络健康状况的持续监控对于使用该网络的服务或企业的持续和安全运行至关重要。当然,简单地监视流量信息并不能提供完美的保证,但是本节描述了如何使用OpenFlow来获取交换机的统计信息。

Implementation of Traffic Monitor

下面是一个在switching hub中添加的流量监控功能源代码,即simple_minitor_13在ryu/app中。

from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub


class SimpleMonitor13(simple_switch_13.SimpleSwitch13):

    def __init__(self, *args, **kwargs):
        super(SimpleMonitor13, self).__init__(*args, **kwargs)
        self.datapaths = {}
        self.monitor_thread = hub.spawn(self._monitor)

    @set_ev_cls(ofp_event.EventOFPStateChange,
                [MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER:
            if datapath.id not in self.datapaths:
                self.logger.debug('register datapath: %016x', datapath.id)
                self.datapaths[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.datapaths:
                self.logger.debug('unregister datapath: %016x', datapath.id)
                del self.datapaths[datapath.id]

    def _monitor(self):
        while True:
            for dp in self.datapaths.values():
                self._request_stats(dp)
            hub.sleep(10)

    def _request_stats(self, datapath):
        self.logger.debug('send stats request: %016x', datapath.id)
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser

        req = parser.OFPFlowStatsRequest(datapath)
        datapath.send_msg(req)

        req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
        datapath.send_msg(req)

    @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
    def _flow_stats_reply_handler(self, ev):
        body = ev.msg.body

        self.logger.info('datapath         '
                         'in-port  eth-dst           '
                         'out-port packets  bytes')
        self.logger.info('---------------- '
                         '-------- ----------------- '
                         '-------- -------- --------')
        for stat in sorted([flow for flow in body if flow.priority == 1],
                           key=lambda flow: (flow.match['in_port'],
                                             flow.match['eth_dst'])):
            self.logger.info('%016x %8x %17s %8x %8d %8d',
                             ev.msg.datapath.id,
                             stat.match['in_port'], stat.match['eth_dst'],
                             stat.instructions[0].actions[0].port,
                             stat.packet_count, stat.byte_count)

    @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
    def _port_stats_reply_handler(self, ev):
        body = ev.msg.body

        self.logger.info('datapath         port     '
                         'rx-pkts  rx-bytes rx-error '
                         'tx-pkts  tx-bytes tx-error')
        self.logger.info('---------------- -------- '
                         '-------- -------- -------- '
                         '-------- -------- --------')
        for stat in sorted(body, key=attrgetter('port_no')):
            self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
                             ev.msg.datapath.id, stat.port_no,
                             stat.rx_packets, stat.rx_bytes, stat.rx_errors,
                             stat.tx_packets, stat.tx_bytes, stat.tx_errors)

Fixed-Cycle Processing

在对switching hub的流量监控中,将创建一个线程定期向OpenFlow交换机发送请求,以获得统计信息。

class SimpleMonitor13(simple_switch_13.SimpleSwitch13):

    def __init__(self, *args, **kwargs):
        super(SimpleMonitor13, self).__init__(*args, **kwargs)
        self.datapaths = {}
        self.monitor_thread = hub.spawn(self._monitor)

# ...

hub.spawn()为ryu.lib.hub类中的一个方法,用来实现创建一个线程。

# ...

@set_ev_cls(ofp_event.EventOFPStateChange,
            [MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
    datapath = ev.datapath
    if ev.state == MAIN_DISPATCHER:
        if datapath.id not in self.datapaths:
            self.logger.debug('register datapath: %016x', datapath.id)
            self.datapaths[datapath.id] = datapath
    elif ev.state == DEAD_DISPATCHER:
        if datapath.id in self.datapaths:
            self.logger.debug('unregister datapath: %016x', datapath.id)
            del self.datapaths[datapath.id]

def _monitor(self):
    while True:
        for dp in self.datapaths.values():
            self._request_stats(dp)
        hub.sleep(10)

# ...

为了确保所连接的交换机处于监控状态,使用EventOFPStateChange事件来检测连接和断开连接。此事件由Ryu框架发出,并在Datapath状态更改时发出。
如果Datapath状态处于MAIN_DISPATCHER,则该交换机加入流量监测列表中,若处于DEAD_DISPATCHER,则将流量监测列表中的该交换机删除。
monitor()函数用于每10秒对流量检测列表中的交换机执行一次请求,以获取统计信息。

def _request_stats(self, datapath):
    self.logger.debug('send stats request: %016x', datapath.id)
    ofproto = datapath.ofproto
    parser = datapath.ofproto_parser

    req = parser.OFPFlowStatsRequest(datapath)
    datapath.send_msg(req)

    req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
    datapath.send_msg(req)

通过定期调用_request_stats()函数, 则OFPFlowStatsRequest和OFPPortStatsRequest被发送给交换机。

OFPFlowStatsRequest请求交换机提供与流条目相关的统计信息。可以通过表ID、输出端口、cookie值和匹配等条件来缩小请求的目标流条目的范围。

OFPPortStatsRequest请求交换机提供端口相关的统计信息。可以指定所需的端口号来获取信息。这里,OFPP_ANY被指定为请求获取所有端口的信息。

FlowStats

为了能接收到交换机的应答,需要创建一个事件处理函数来接受FlowStatsReply信息。

@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
    body = ev.msg.body

    self.logger.info('datapath         '
                     'in-port  eth-dst           '
                     'out-port packets  bytes')
    self.logger.info('---------------- '
                     '-------- ----------------- '
                     '-------- -------- --------')
    for stat in sorted([flow for flow in body if flow.priority == 1],
                       key=lambda flow: (flow.match['in_port'],
                                         flow.match['eth_dst'])):
        self.logger.info('%016x %8x %17s %8x %8d %8d',
                         ev.msg.datapath.id,
                         stat.match['in_port'], stat.match['eth_dst'],
                         stat.instructions[0].actions[0].port,
                         stat.packet_count, stat.byte_count)

PortStats

下面的函数用于接受交换机的端口信息。

@set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
def _port_stats_reply_handler(self, ev):
    body = ev.msg.body

    self.logger.info('datapath         port     '
                     'rx-pkts  rx-bytes rx-error '
                     'tx-pkts  tx-bytes tx-error')
    self.logger.info('---------------- -------- '
                     '-------- -------- -------- '
                     '-------- -------- --------')
    for stat in sorted(body, key=attrgetter('port_no')):
        self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
                         ev.msg.datapath.id, stat.port_no,
                         stat.rx_packets, stat.rx_bytes, stat.rx_errors,
                         stat.tx_packets, stat.tx_bytes, stat.tx_errors)

Executing Traffic Monitor

首先在mininet中创建简单的拓扑结构

# sudo mn --topo single,3 --mac --switch ovsk --controller remote -x

然后在s1终端中将OpenFlow版本设置为OpenFlow13

# ovs-vsctl set Bridge s1 protocols=OpenFlow13

接着在控制器c0中执行simple_monitor_13

# ryu-manager --verbose ryu.app.simple_monitor_13
loading app ryu.app.simple_monitor_13
loading app ryu.controller.ofp_handler
instantiating app ryu.app.simple_monitor_13 of SimpleMonitor13
instantiating app ryu.controller.ofp_handler of OFPHandler
BRICK SimpleMonitor13
  CONSUMES EventOFPPacketIn
  CONSUMES EventOFPPortStatsReply
  CONSUMES EventOFPStateChange
  CONSUMES EventOFPFlowStatsReply
  CONSUMES EventOFPSwitchFeatures
BRICK ofp_event
  PROVIDES EventOFPPacketIn TO {'SimpleMonitor13': set(['main'])}
  PROVIDES EventOFPPortStatsReply TO {'SimpleMonitor13': set(['main'])}
  PROVIDES EventOFPStateChange TO {'SimpleMonitor13': set(['main', 'dead'])}
  PROVIDES EventOFPFlowStatsReply TO {'SimpleMonitor13': set(['main'])}
  PROVIDES EventOFPSwitchFeatures TO {'SimpleMonitor13': set(['config'])}
  CONSUMES EventOFPPortStatus
  CONSUMES EventOFPSwitchFeatures
  CONSUMES EventOFPEchoReply
  CONSUMES EventOFPPortDescStatsReply
  CONSUMES EventOFPErrorMsg
  CONSUMES EventOFPEchoRequest
  CONSUMES EventOFPHello
connected socket:<eventlet.greenio.base.GreenSocket object at 0x7fbab7189750> address:('127.0.0.1', 37934)
hello ev <ryu.controller.ofp_event.EventOFPHello object at 0x7fbab7179a90>
move onto config mode
EVENT ofp_event->SimpleMonitor13 EventOFPSwitchFeatures
switch features ev version=0x4,msg_type=0x6,msg_len=0x20,xid=0x21014c5c,OFPSwitchFeatures(auxiliary_id=0,capabilities=79,datapath_id=1,n_buffers=256,n_tables=254)
move onto main mode
EVENT ofp_event->SimpleMonitor13 EventOFPStateChange
register datapath: 0000000000000001
send stats request: 0000000000000001
EVENT ofp_event->SimpleMonitor13 EventOFPFlowStatsReply
EVENT ofp_event->SimpleMonitor13 EventOFPPortStatsReply
datapath         in-port  eth-dst           out-port packets  bytes
---------------- -------- ----------------- -------- -------- --------
datapath         port     rx-pkts  rx-bytes rx-error tx-pkts  tx-bytes tx-error
---------------- -------- -------- -------- -------- -------- -------- --------
0000000000000001        1        0        0        0        0        0        0
0000000000000001        2        0        0        0        0        0        0
0000000000000001        3        0        0        0        0        0        0
0000000000000001 fffffffe        0        0        0        0        0        0

此时还没有流条目以及每个端口的计数为0

然后执行h1 ping h2

host h1:

# ping -c1 10.0.0.2
PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
64 bytes from 10.0.0.2: icmp_req=1 ttl=64 time=94.4 ms

--- 10.0.0.2 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 94.489/94.489/94.489/0.000 ms
#

交换机转发包时将添加一条流条目并且更改统计信息。
此时c0的状态如下:

datapath         in-port  eth-dst           out-port packets  bytes
---------------- -------- ----------------- -------- -------- --------
0000000000000001        1 00:00:00:00:00:02        2        1       42
0000000000000001        2 00:00:00:00:00:01        1        2      140
datapath         port     rx-pkts  rx-bytes rx-error tx-pkts  tx-bytes tx-error
---------------- -------- -------- -------- -------- -------- -------- --------
0000000000000001        1        3      182        0        3      182        0
0000000000000001        2        3      182        0        3      182        0
0000000000000001        3        0        0        0        1       42        0
0000000000000001 fffffffe        0        0        0        1       42        0

根据流条目统计信息,与接收端口1流匹配的流量被记录为1个数据包,42字节。接收端口2是两个包,140字节。

根据端口统计信息,端口1的接收包数(rx-pkts)为3,接收字节数(rx-bytes)为182字节。对于端口2,它分别是3个包和182个字节。

流条目统计信息与端口统计信息不符。这是因为流条目统计信息是与条目匹配并被转发的包的信息。这意味着,当转发一个包而该包没有条目匹配时,这个包就不计入统计信息中。
比如在这种情况下,主机1首先广播的ARP请求、主机2向主机1返回的ARP应答和主机1向主机2发出的echo请求这三个数据包开始都没有条目匹配,即Table-miss,所以这三个包没有计入统计信息里。所以端口统计量大于流条目统计量。