[总结篇4] l2-agent的细节

先看Ml2Plugin:

class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                external_net_db.External_net_db_mixin,
                sg_db_rpc.SecurityGroupServerRpcMixin,
                agentschedulers_db.DhcpAgentSchedulerDbMixin,
                addr_pair_db.AllowedAddressPairsMixin,
                extradhcpopt_db.ExtraDhcpOptMixin):
    ......
    
        def start_rpc_listener(self):
            self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
            self.topic = topics.PLUGIN # q-plugin
            self.conn = c_rpc.create_connection(new=True)
            self.dispatcher = self.callbacks.create_rpc_dispatcher()
            self.conn.create_consumer(self.topic, self.dispatcher,
                                  fanout=False)
            return self.conn.consume_in_thread()

    ......

看一看RpcCallbacks:

class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
                   sg_db_rpc.SecurityGroupServerRpcCallbackMixin,
                   type_tunnel.TunnelRpcCallbackMixin):

    RPC_API_VERSION = '1.1'
    # history
    #   1.0 Initial version (from openvswitch/linuxbridge)
    #   1.1 Support Security Group RPC

    def __init__(self, notifier, type_manager):
        super(RpcCallbacks, self).__init__(notifier, type_manager)

    def create_rpc_dispatcher(self):
        '''Get the rpc dispatcher for this manager.

        If a manager would like to set an rpc API version, or support more than
        one class as the target of rpc messages, override this method.
        '''
        return q_rpc.PluginRpcDispatcher([self,
                                          agents_db.AgentExtRpcCallback()])

其中比较重要的几个回调函数:

1.get_device_details(self, rpc_context, **kwargs):

    def get_device_details(self, rpc_context, **kwargs):
        """Agent requests device details."""
        agent_id = kwargs.get('agent_id')
        device = kwargs.get('device')
        LOG.debug(_("Device %(device)s details requested by agent "
                    "%(agent_id)s"),
                  {'device': device, 'agent_id': agent_id})
        port_id = self._device_to_port_id(device)

        session = db_api.get_session()
        with session.begin(subtransactions=True):
            port = db.get_port(session, port_id)
            if not port:
                LOG.warning(_("Device %(device)s requested by agent "
                              "%(agent_id)s not found in database"),
                            {'device': device, 'agent_id': agent_id})
                return {'device': device}

            segments = db.get_network_segments(session, port.network_id)
            if not segments:
                LOG.warning(_("Device %(device)s requested by agent "
                              "%(agent_id)s has network %(network_id)s with "
                              "no segments"),
                            {'device': device,
                             'agent_id': agent_id,
                             'network_id': port.network_id})
                return {'device': device}

            binding = db.ensure_port_binding(session, port.id)
            if not binding.segment:
                LOG.warning(_("Device %(device)s requested by agent "
                              "%(agent_id)s on network %(network_id)s not "
                              "bound, vif_type: %(vif_type)s"),
                            {'device': device,
                             'agent_id': agent_id,
                             'network_id': port.network_id,
                             'vif_type': binding.vif_type})
                return {'device': device}

            segment = self._find_segment(segments, binding.segment)
            if not segment:
                LOG.warning(_("Device %(device)s requested by agent "
                              "%(agent_id)s on network %(network_id)s "
                              "invalid segment, vif_type: %(vif_type)s"),
                            {'device': device,
                             'agent_id': agent_id,
                             'network_id': port.network_id,
                             'vif_type': binding.vif_type})
                return {'device': device}

            new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up
                          else q_const.PORT_STATUS_DOWN)
            if port.status != new_status:
                plugin = manager.NeutronManager.get_plugin()
                plugin.update_port_status(rpc_context,
                                          port_id,
                                          new_status)
                port.status = new_status
            entry = {'device': device,
                     'network_id': port.network_id,
                     'port_id': port.id,
                     'admin_state_up': port.admin_state_up,
                     'network_type': segment[api.NETWORK_TYPE],
                     'segmentation_id': segment[api.SEGMENTATION_ID],
                     'physical_network': segment[api.PHYSICAL_NETWORK]}
            LOG.debug(_("Returning: %s"), entry)
            return entry

注意返回的内容

然后到了检查新的或者增加的端口信息的时候。。

    def process_network_ports(self, port_info, ovs_restarted):
        resync_a = False
        resync_b = False
        
        self.sg_agent.setup_port_filters(port_info.get('added', set()),
                                         port_info.get('updated', set()))
        
        devices_added_updated = (port_info.get('added', set()) |
                                 port_info.get('updated', set()))
        if devices_added_updated:
            start = time.time()
            try:
                skipped_devices = self.treat_devices_added_or_updated(
                    devices_added_updated, ovs_restarted) # 注意这一句
               
                port_info['current'] = (port_info['current'] -
                                        set(skipped_devices))
            except DeviceListRetrievalError:
            
                LOG.exception(_("process_network_ports - iteration:%d - "
                                "failure while retrieving port details "
                                "from server"), self.iter_num)
                resync_a = True
        if 'removed' in port_info:
            start = time.time()
            resync_b = self.treat_devices_removed(port_info['removed'])


        return (resync_a | resync_b)

然后是注意的这句:

    def treat_devices_added_or_updated(self, devices, ovs_restarted):
        skipped_devices = []
        devices_details_list = []
        for device in devices:
            try:
                devices_details_list.append(
                    self.plugin_rpc.get_device_details(
                        self.context, device, self.agent_id)) ## 这句会返回details
            except Exception as e:

                raise DeviceListRetrievalError(devices=devices, error=e)

        for details in devices_details_list:
            device = details['device']
            LOG.debug(_("Processing port %s"), device)
            port = self.int_br.get_vif_port_by_id(device)
            if not port:
                skipped_devices.append(device)
                continue

            if 'port_id' in details:
                self.treat_vif_port(port, details['port_id'],
                                    details['network_id'],
                                    details['network_type'],
                                    details['physical_network'],
                                    details['segmentation_id'],
                                    details['admin_state_up'],
                                    ovs_restarted)

                if details.get('admin_state_up'):
                    self.plugin_rpc.update_device_up(
                        self.context, device, self.agent_id, cfg.CONF.host)
                else:
                    LOG.debug(_("Setting status for %s to DOWN"), device)
                    self.plugin_rpc.update_device_down(
                        self.context, device, self.agent_id, cfg.CONF.host)
                LOG.info(_("Configuration for device %s completed."), device)
            else:
                LOG.warn(_("Device %s not defined on plugin"), device)
                if (port and port.ofport != -1):
                    self.port_dead(port)
        return skipped_devices

然后是 treat_vif_port:

    def treat_vif_port(self, vif_port, port_id, network_id, network_type,
                       physical_network, segmentation_id, admin_state_up,
                       ovs_restarted):

        if not vif_port.ofport:
            LOG.warn(_("VIF port: %s has no ofport configured, and might not "
                       "be able to transmit"), vif_port.vif_id)
        if vif_port:
            if admin_state_up:
                self.port_bound(vif_port, network_id, network_type,
                                physical_network, segmentation_id,
                                ovs_restarted)
            else:
                self.port_dead(vif_port)
        else:
            LOG.debug(_("No VIF port for port %s defined on agent."), port_id)

然后是port_bound:

    def port_bound(self, port, net_uuid,
                   network_type, physical_network, segmentation_id,
                   ovs_restarted):

        if net_uuid not in self.local_vlan_map or ovs_restarted:
            self.provision_local_vlan(net_uuid, network_type,
                                      physical_network, segmentation_id)
        lvm = self.local_vlan_map[net_uuid]
        lvm.vif_ports[port.vif_id] = port
        # Do not bind a port if it's already bound
        cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag")
        if cur_tag != str(lvm.vlan):
            self.int_br.set_db_attribute("Port", port.port_name, "tag",
                                         str(lvm.vlan))
            if port.ofport != -1:
                self.int_br.delete_flows(in_port=port.ofport)

然后就是self.provision_local_vlan:

self.provision_local_vlan(self, net_uuid, network_type, physical_network, segmentation_id):

    def provision_local_vlan(self, net_uuid, network_type, physical_network,
                             segmentation_id):

        lvm = self.local_vlan_map.get(net_uuid)
        if lvm:
            lvid = lvm.vlan
        else:
            if not self.available_local_vlans:
                LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
                return
            lvid = self.available_local_vlans.pop()
            self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid,
                                                             network_type,
                                                             physical_network,
                                                             segmentation_id)

        LOG.info(_("Assigning %(vlan_id)s as local vlan for "
                   "net-id=%(net_uuid)s"),
                 {'vlan_id': lvid, 'net_uuid': net_uuid})

        if network_type in constants.TUNNEL_NETWORK_TYPES:
            if self.enable_tunneling:

                ofports = ','.join(self.tun_br_ofports[network_type].values())
                if ofports:
                    self.tun_br.mod_flow(table=constants.FLOOD_TO_TUN,
                                         dl_vlan=lvid,
                                         actions="strip_vlan,"
                                         "set_tunnel:%s,output:%s" %
                                         (segmentation_id, ofports))

                self.tun_br.add_flow(table=constants.TUN_TABLE[network_type],
                                     priority=1,
                                     tun_id=segmentation_id,
                                     actions="mod_vlan_vid:%s,resubmit(,%s)" %
                                     (lvid, constants.LEARN_FROM_TUN))
            else:
                LOG.error(_("Cannot provision %(network_type)s network for "
                          "net-id=%(net_uuid)s - tunneling disabled"),
                          {'network_type': network_type,
                           'net_uuid': net_uuid})
        elif network_type == p_const.TYPE_FLAT:
            if physical_network in self.phys_brs:
                # outbound
                br = self.phys_brs[physical_network]
                br.add_flow(priority=4,
                            in_port=self.phys_ofports[physical_network],
                            dl_vlan=lvid,
                            actions="strip_vlan,normal") # string vlan
                # inbound
                self.int_br.add_flow(
                    priority=3,
                    in_port=self.int_ofports[physical_network],
                    dl_vlan=0xffff,
                    actions="mod_vlan_vid:%s,normal" % lvid) ## flat网络也是有内部vlan号的
            else:
                LOG.error(_("Cannot provision flat network for "
                            "net-id=%(net_uuid)s - no bridge for "
                            "physical_network %(physical_network)s"),
                          {'net_uuid': net_uuid,
                           'physical_network': physical_network})
        elif network_type == p_const.TYPE_VLAN: # 重点是vlan这块的。。
            if physical_network in self.phys_brs:
                # outbound
                br = self.phys_brs[physical_network]
                br.add_flow(priority=4,
                            in_port=self.phys_ofports[physical_network],
                            dl_vlan=lvid,
                            actions="mod_vlan_vid:%s,normal" % segmentation_id) # 内部vlan改成外部vlan
                # inbound
                self.int_br.add_flow(priority=3,
                                     in_port=self.
                                     int_ofports[physical_network],
                                     dl_vlan=segmentation_id,
                                     actions="mod_vlan_vid:%s,normal" % lvid) # 外部vlan改成内部vlan
            else:
                LOG.error(_("Cannot provision VLAN network for "
                            "net-id=%(net_uuid)s - no bridge for "
                            "physical_network %(physical_network)s"),
                          {'net_uuid': net_uuid,
                           'physical_network': physical_network})
        elif network_type == p_const.TYPE_LOCAL:
            # no flows needed for local networks
            pass
        else:
            LOG.error(_("Cannot provision unknown network type "
                        "%(network_type)s for net-id=%(net_uuid)s"),
                      {'network_type': network_type,
                       'net_uuid': net_uuid})

 


版权声明:本文为m0_37313888原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。