先看Ml2Plugin:
class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
addr_pair_db.AllowedAddressPairsMixin,
extradhcpopt_db.ExtraDhcpOptMixin):
......
def start_rpc_listener(self):
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
self.topic = topics.PLUGIN # q-plugin
self.conn = c_rpc.create_connection(new=True)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
return self.conn.consume_in_thread()
......
看一看RpcCallbacks:
class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin,
type_tunnel.TunnelRpcCallbackMixin):
RPC_API_VERSION = '1.1'
# history
# 1.0 Initial version (from openvswitch/linuxbridge)
# 1.1 Support Security Group RPC
def __init__(self, notifier, type_manager):
super(RpcCallbacks, self).__init__(notifier, type_manager)
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return q_rpc.PluginRpcDispatcher([self,
agents_db.AgentExtRpcCallback()])
其中比较重要的几个回调函数:
1.get_device_details(self, rpc_context, **kwargs):
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug(_("Device %(device)s details requested by agent "
"%(agent_id)s"),
{'device': device, 'agent_id': agent_id})
port_id = self._device_to_port_id(device)
session = db_api.get_session()
with session.begin(subtransactions=True):
port = db.get_port(session, port_id)
if not port:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s not found in database"),
{'device': device, 'agent_id': agent_id})
return {'device': device}
segments = db.get_network_segments(session, port.network_id)
if not segments:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s has network %(network_id)s with "
"no segments"),
{'device': device,
'agent_id': agent_id,
'network_id': port.network_id})
return {'device': device}
binding = db.ensure_port_binding(session, port.id)
if not binding.segment:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s on network %(network_id)s not "
"bound, vif_type: %(vif_type)s"),
{'device': device,
'agent_id': agent_id,
'network_id': port.network_id,
'vif_type': binding.vif_type})
return {'device': device}
segment = self._find_segment(segments, binding.segment)
if not segment:
LOG.warning(_("Device %(device)s requested by agent "
"%(agent_id)s on network %(network_id)s "
"invalid segment, vif_type: %(vif_type)s"),
{'device': device,
'agent_id': agent_id,
'network_id': port.network_id,
'vif_type': binding.vif_type})
return {'device': device}
new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up
else q_const.PORT_STATUS_DOWN)
if port.status != new_status:
plugin = manager.NeutronManager.get_plugin()
plugin.update_port_status(rpc_context,
port_id,
new_status)
port.status = new_status
entry = {'device': device,
'network_id': port.network_id,
'port_id': port.id,
'admin_state_up': port.admin_state_up,
'network_type': segment[api.NETWORK_TYPE],
'segmentation_id': segment[api.SEGMENTATION_ID],
'physical_network': segment[api.PHYSICAL_NETWORK]}
LOG.debug(_("Returning: %s"), entry)
return entry
注意返回的内容
然后到了检查新的或者增加的端口信息的时候。。
def process_network_ports(self, port_info, ovs_restarted):
resync_a = False
resync_b = False
self.sg_agent.setup_port_filters(port_info.get('added', set()),
port_info.get('updated', set()))
devices_added_updated = (port_info.get('added', set()) |
port_info.get('updated', set()))
if devices_added_updated:
start = time.time()
try:
skipped_devices = self.treat_devices_added_or_updated(
devices_added_updated, ovs_restarted) # 注意这一句
port_info['current'] = (port_info['current'] -
set(skipped_devices))
except DeviceListRetrievalError:
LOG.exception(_("process_network_ports - iteration:%d - "
"failure while retrieving port details "
"from server"), self.iter_num)
resync_a = True
if 'removed' in port_info:
start = time.time()
resync_b = self.treat_devices_removed(port_info['removed'])
return (resync_a | resync_b)
然后是注意的这句:
def treat_devices_added_or_updated(self, devices, ovs_restarted):
skipped_devices = []
devices_details_list = []
for device in devices:
try:
devices_details_list.append(
self.plugin_rpc.get_device_details(
self.context, device, self.agent_id)) ## 这句会返回details
except Exception as e:
raise DeviceListRetrievalError(devices=devices, error=e)
for details in devices_details_list:
device = details['device']
LOG.debug(_("Processing port %s"), device)
port = self.int_br.get_vif_port_by_id(device)
if not port:
skipped_devices.append(device)
continue
if 'port_id' in details:
self.treat_vif_port(port, details['port_id'],
details['network_id'],
details['network_type'],
details['physical_network'],
details['segmentation_id'],
details['admin_state_up'],
ovs_restarted)
if details.get('admin_state_up'):
self.plugin_rpc.update_device_up(
self.context, device, self.agent_id, cfg.CONF.host)
else:
LOG.debug(_("Setting status for %s to DOWN"), device)
self.plugin_rpc.update_device_down(
self.context, device, self.agent_id, cfg.CONF.host)
LOG.info(_("Configuration for device %s completed."), device)
else:
LOG.warn(_("Device %s not defined on plugin"), device)
if (port and port.ofport != -1):
self.port_dead(port)
return skipped_devices
然后是 treat_vif_port:
def treat_vif_port(self, vif_port, port_id, network_id, network_type,
physical_network, segmentation_id, admin_state_up,
ovs_restarted):
if not vif_port.ofport:
LOG.warn(_("VIF port: %s has no ofport configured, and might not "
"be able to transmit"), vif_port.vif_id)
if vif_port:
if admin_state_up:
self.port_bound(vif_port, network_id, network_type,
physical_network, segmentation_id,
ovs_restarted)
else:
self.port_dead(vif_port)
else:
LOG.debug(_("No VIF port for port %s defined on agent."), port_id)
然后是port_bound:
def port_bound(self, port, net_uuid,
network_type, physical_network, segmentation_id,
ovs_restarted):
if net_uuid not in self.local_vlan_map or ovs_restarted:
self.provision_local_vlan(net_uuid, network_type,
physical_network, segmentation_id)
lvm = self.local_vlan_map[net_uuid]
lvm.vif_ports[port.vif_id] = port
# Do not bind a port if it's already bound
cur_tag = self.int_br.db_get_val("Port", port.port_name, "tag")
if cur_tag != str(lvm.vlan):
self.int_br.set_db_attribute("Port", port.port_name, "tag",
str(lvm.vlan))
if port.ofport != -1:
self.int_br.delete_flows(in_port=port.ofport)
然后就是self.provision_local_vlan:
self.provision_local_vlan(self, net_uuid, network_type, physical_network, segmentation_id):
def provision_local_vlan(self, net_uuid, network_type, physical_network,
segmentation_id):
lvm = self.local_vlan_map.get(net_uuid)
if lvm:
lvid = lvm.vlan
else:
if not self.available_local_vlans:
LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
return
lvid = self.available_local_vlans.pop()
self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid,
network_type,
physical_network,
segmentation_id)
LOG.info(_("Assigning %(vlan_id)s as local vlan for "
"net-id=%(net_uuid)s"),
{'vlan_id': lvid, 'net_uuid': net_uuid})
if network_type in constants.TUNNEL_NETWORK_TYPES:
if self.enable_tunneling:
ofports = ','.join(self.tun_br_ofports[network_type].values())
if ofports:
self.tun_br.mod_flow(table=constants.FLOOD_TO_TUN,
dl_vlan=lvid,
actions="strip_vlan,"
"set_tunnel:%s,output:%s" %
(segmentation_id, ofports))
self.tun_br.add_flow(table=constants.TUN_TABLE[network_type],
priority=1,
tun_id=segmentation_id,
actions="mod_vlan_vid:%s,resubmit(,%s)" %
(lvid, constants.LEARN_FROM_TUN))
else:
LOG.error(_("Cannot provision %(network_type)s network for "
"net-id=%(net_uuid)s - tunneling disabled"),
{'network_type': network_type,
'net_uuid': net_uuid})
elif network_type == p_const.TYPE_FLAT:
if physical_network in self.phys_brs:
# outbound
br = self.phys_brs[physical_network]
br.add_flow(priority=4,
in_port=self.phys_ofports[physical_network],
dl_vlan=lvid,
actions="strip_vlan,normal") # string vlan
# inbound
self.int_br.add_flow(
priority=3,
in_port=self.int_ofports[physical_network],
dl_vlan=0xffff,
actions="mod_vlan_vid:%s,normal" % lvid) ## flat网络也是有内部vlan号的
else:
LOG.error(_("Cannot provision flat network for "
"net-id=%(net_uuid)s - no bridge for "
"physical_network %(physical_network)s"),
{'net_uuid': net_uuid,
'physical_network': physical_network})
elif network_type == p_const.TYPE_VLAN: # 重点是vlan这块的。。
if physical_network in self.phys_brs:
# outbound
br = self.phys_brs[physical_network]
br.add_flow(priority=4,
in_port=self.phys_ofports[physical_network],
dl_vlan=lvid,
actions="mod_vlan_vid:%s,normal" % segmentation_id) # 内部vlan改成外部vlan
# inbound
self.int_br.add_flow(priority=3,
in_port=self.
int_ofports[physical_network],
dl_vlan=segmentation_id,
actions="mod_vlan_vid:%s,normal" % lvid) # 外部vlan改成内部vlan
else:
LOG.error(_("Cannot provision VLAN network for "
"net-id=%(net_uuid)s - no bridge for "
"physical_network %(physical_network)s"),
{'net_uuid': net_uuid,
'physical_network': physical_network})
elif network_type == p_const.TYPE_LOCAL:
# no flows needed for local networks
pass
else:
LOG.error(_("Cannot provision unknown network type "
"%(network_type)s for net-id=%(net_uuid)s"),
{'network_type': network_type,
'net_uuid': net_uuid})
版权声明:本文为m0_37313888原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。