Source code for nettoolkit.yaml_facts.cisco.parsers.protocol_bgp_run

"""cisco running-config parser for bgp section output """

# ------------------------------------------------------------------------------
from .common import *
from .protocols import ProtocolsConfig, get_protocol_instance_dict
# ------------------------------------------------------------------------------

# ------------------------------------------------------------------------------
#  RIP ATTR FUNCS
# ------------------------------------------------------------------------------

def __update_item_for_next_index(attr_dict, pg_dict, spl, item, f=None):
	if item not in spl: return 
	n_pg_dict = pg_dict if spl[1] == attr_dict else pg_dict['peers'][spl[1]]
	if not f:
		n_pg_dict[item] = next_index_item(spl, item)
	else:
		n_pg_dict[item] = f(next_index_item(spl, item))

def _get_str_next_items(attr_dict, pg_dict, line, spl):
	items = ('local-as', 'remote-as', 'update-source', 'unsuppress-map', 
		'exist-map', 'non-exist-map',
	)
	for item in items:
		__update_item_for_next_index(attr_dict, pg_dict, spl, item, f=None)

def _get_int_next_items(attr_dict, pg_dict, line, spl):
	items = ('advertisement-interval', )
	for item in items:
		__update_item_for_next_index(attr_dict, pg_dict, spl, item, f=int)


def __update_stub_items(attr_dict, pg_dict, spl, item):
	if item not in spl: return 
	n_pg_dict = pg_dict if spl[1] == attr_dict else pg_dict['peers'][spl[1]]
	n_pg_dict[item] = True

def _get_stub_items(attr_dict, pg_dict, line, spl):
	items = ('shutdown', 'activate', 'suppress-inactive')
	for item in items:
		__update_stub_items(attr_dict, pg_dict, spl, item)



def _get_password(attr_dict, pg_dict, line, spl):
	if spl[2] == 'password': 
		pg_dict['password'] = decrypt_type7(spl[-1]) if spl[3] == "7" else spl[-1]

def _get_description(attr_dict, pg_dict, line, spl):
	item = 'description'
	if item not in spl: return 
	n_pg_dict = pg_dict if spl[1] == attr_dict else pg_dict['peers'][spl[1]]
	n_pg_dict[item] = " ".join(spl[spl.index(item)+1:])

def _get_peers(attr_dict, pg_dict, line, spl):
	if len(spl)<4: return
	if spl[2] == 'peer-group' and spl[3] == attr_dict: 
		peers_dict = add_blankdict_key(pg_dict, 'peers')
		add_blankdict_key(peers_dict, spl[1])

def _get_max_prefix_attrs(attr_dict, pg_dict, line, spl):
	if not 'maximum-prefix' in spl:  return
	n_pg_dict = pg_dict if spl[1] == attr_dict else pg_dict['peers'][spl[1]]
	#
	max_pfx_dict = add_blankdict_key(n_pg_dict, 'max-prefix')
	mpidx = spl.index('maximum-prefix')
	max_pfx_dict['max_prefix_allowed'] = int(spl[mpidx+1])
	try:
		max_pfx_dict['threshold'] = int(spl[mpidx+2])
	except: pass
	if 'restart' in spl:
		max_pfx_dict['restart_interval'] =  int(spl[spl.index('restart')+1])
	if 'warning-only' in spl:
		max_pfx_dict['warning_only'] = True


def _get_default_originate(attr_dict, pg_dict, line, spl):
	item = 'default-originate'
	if item not in spl: return
	if 'route-map' not in spl:
		__update_stub_items(attr_dict, pg_dict, spl, item)
	else:
		rm = next_index_item(spl, 'route-map')
		n_pg_dict = pg_dict if spl[1] == attr_dict else pg_dict['peers'][spl[1]]
		n_pg_dict[item] = {'route-map': rm}

def _get_soft_reconfiguration(attr_dict, pg_dict, line, spl):
	item = 'soft-reconfiguration'
	if item not in spl: return
	if 'inbound' not in spl:
		__update_stub_items(attr_dict, pg_dict, spl, item)
	else:
		n_pg_dict = pg_dict if spl[1] == attr_dict else pg_dict['peers'][spl[1]]
		n_pg_dict[item] = 'inbound'

def _get_route_map(attr_dict, pg_dict, line, spl):
	item = 'route-map'
	if item != spl[2]: return
	rm = spl[3]
	way = spl[4]
	n_pg_dict = pg_dict if spl[1] == attr_dict else pg_dict['peers'][spl[1]]
	n_pg_dict[item] = {rm: way}

def _get_inherited_template(attr_dict, pg_dict, line, spl):
	item = 'inherit'
	if item not in spl: return
	items = ( 'peer-session', 'peer-policy' )
	for item in items:
		if item not in spl: continue
		n_pg_dict = pg_dict if spl[1] == attr_dict else pg_dict['peers'][spl[1]]
		inherited_template_name = next_index_item(spl, item)
		n_pg_dict[f'inherit-{item}'] = inherited_template_name


# ====================================================================================================

def _get_router_id(vrf_dict, line, spl):
	if line.startswith("bgp router-id "):
		vrf_dict['router-id'] = spl[-1]

def _get_networks(attr_dict, line, spl):
	if spl[0] == 'network' and spl[2] == 'mask': 
		network = str(addressing(spl[1], spl[3]))
		networks_dict = add_blankdict_key(attr_dict, 'networks')
		network_dict = add_blankdict_key(networks_dict, network)
		if 'route-map' in spl:
			append_attribute(network_dict, 'route-map', spl[spl.index('route-map')+1])
		if 'backdoor' in spl:
			append_attribute(network_dict, 'backdoor-route', True)

def _get_aggregates(attr_dict, line, spl):
	if spl[0] == 'aggregate-address': 
		try:
			network = addressing(spl[1], spl[2])
		except:
			network = addressing(spl[1])
		networks_dict = add_blankdict_key(attr_dict, 'aggregates')
		if network.version == 4:
			networks_dict = add_blankdict_key(networks_dict, 'ipv4')
		elif network.version == 6:
			networks_dict = add_blankdict_key(networks_dict, 'ipv6')
		network_dict = add_blankdict_key(networks_dict, str(network))
		if len(spl) > 3:
			stub_items = ( 'as-set', 'summary-only')
			for item in stub_items:
				if item in spl: network_dict[item] = True
			nxt_items = ( 'attribute-map', 'route-map', 'suppress-map', )
			for item in nxt_items:
				if item in spl: network_dict[item] = next_index_item(spl, item)

def _get_timers(attr_dict, line, spl):
	if line.startswith("timers bgp "):
		d = { 'keepalive': spl[2], 'holdtime': spl[3] }
		append_attribute(attr_dict, 'timers', d)

def _get_redistributions(attr_dict, line, spl):
	if spl[0] != 'redistribute': return
	redistribute = spl[1]
	redis_dict = add_blankdict_key(attr_dict, 'redistribute')
	redis_dict_l1 = add_blankdict_key(redis_dict, redistribute)
	items = ('route-map', 'ospf', 'rip', 'eigrp', 'bgp', 'isis',)
	get_instance_parameter_for_items(redis_dict_l1, line, spl, items)

# ====================================================================================================
#  BGP Config extractor Class
# ====================================================================================================

[docs] @dataclass class BGPConf(ProtocolsConfig): run_list: list[str] = field(default_factory=[]) supported_af_types = ('ipv4', 'vpnv4', 'ipv6', 'vpnv6') attr_functions = [ _get_router_id, _get_networks, _get_aggregates, _get_timers, _get_redistributions, ] peer_attr_functions = [ _get_peers, _get_str_next_items, _get_int_next_items, _get_stub_items, _get_description, _get_password, _get_max_prefix_attrs, _get_default_originate, _get_soft_reconfiguration, _get_route_map, _get_inherited_template, ] def __post_init__(self): self.bgp_peer_dict = {} self.protocol_config_initialize(protocol='bgp') self._get_bgp_peer_informations() self._get_template_peer_sessions_attr_dict() self._iterate_vrfs() self.remove_empty_vrfs(self.bgp_peer_dict) def _get_bgp_peer_informations(self): self._get_peer_group_names() self._get_peer_group_attr_dict() self._get_template_peer_sessions() def _get_peer_group_names(self): for vrf, vrf_dict in self.vrfs.items(): vrf_peer_grps = set() remove_eligibles = set() if not vrf_dict.get('lines'): continue for line in vrf_dict['lines']: if not line.startswith("neighbor"): continue spl = line.split() vrf_peer_grps.add(spl[1]) if len(spl) > 3 and spl[2] == 'peer-group' and spl[3] in vrf_peer_grps: remove_eligibles.add(spl[1]) vrf_dict['vrf_peer_grps'] = vrf_peer_grps - remove_eligibles def _get_peer_group_attr_dict(self): vrf_pg_dict = {} for vrf, vrf_dict in self.vrfs.items(): vrf_pg_dict[vrf] = {} if not vrf_dict.get('vrf_peer_grps'): continue for peer_grp in vrf_dict['vrf_peer_grps']: other = None if not vrf_pg_dict[vrf].get('peers'): vrf_pg_dict[vrf]['peers'] = {} vrf_pg_dict[vrf]['peers'][peer_grp] = {} pg_dict = vrf_pg_dict[vrf]['peers'][peer_grp] for line in vrf_dict['lines']: if not line.startswith("neighbor"): continue spl = line.split() valid_line = spl[1] == peer_grp or other in spl if not valid_line: if peer_grp in spl and spl[-1] == peer_grp: other = spl[1] valid_line = True else: other = None valid_line = False if not valid_line: continue for f in self.peer_attr_functions: f(peer_grp, pg_dict, line, spl) self.bgp_peer_dict = vrf_pg_dict def _get_template_peer_sessions(self): stmp = {} for vrf, vrf_dict in self.vrfs.items(): vrf_ps_dict = add_blankdict_key(stmp, vrf) start = False if not vrf_dict.get('lines'): continue for line in vrf_dict['lines']: if not start and line.startswith("template "): spl = line.strip().split() stmp_type, stmp_name, stmp_lines = spl[1], spl[2], [] start = True continue if start and line.startswith("exit"): start = False x_dict = add_blankdict_key(vrf_ps_dict, stmp_type) x_dict[stmp_name] = stmp_lines if not start: continue stmp_lines.append(line) self.stmp_dict = stmp def _get_template_peer_sessions_attr_dict(self): ps_dict = {} for vrf, stmp_dict in self.stmp_dict.items(): vrf_dict = add_blankdict_key(ps_dict, vrf) if not stmp_dict: continue x_pss_dict = add_blankdict_key(vrf_dict, 'template') for stmp_type, stmp_types_dict in stmp_dict.items(): stmp_type_dict = add_blankdict_key(x_pss_dict, stmp_type) for stmp_name, stmplines in stmp_types_dict.items(): vrf_ps_dict = add_blankdict_key(stmp_type_dict, stmp_name) for line in stmplines: spl = line.strip().split() if len(spl) == 2: vrf_ps_dict[spl[0]] = spl[1] elif len(spl) < 2: vrf_ps_dict[spl[0]] = True elif len(spl) > 2: x_dict = vrf_ps_dict for i in range(len(spl)): x_dict = add_blankdict_key(x_dict, spl[i]) self.stmp_attr_dict = ps_dict # ---------------------------------------------------------------------- # def _iterate_vrfs(self): for vrf, vrf_dict in self.vrfs.items(): if not vrf_dict.get('lines'): continue for line in vrf_dict['lines']: line = line.strip() spl = line.split() for f in self.attr_functions: f(self.bgp_peer_dict[vrf], line, spl) ## Add session template attributes dict ## if vrf in self.stmp_attr_dict: merge_dict( self.bgp_peer_dict[vrf], self.stmp_attr_dict[vrf] )
# ---------------------------------------------------------------------- # # ==================================================================================================== # RIP Config extractor function # ====================================================================================================
[docs] def get_bgp_running(command_output): """parse output of : show running-config Args: command_output (list): command output Returns: dict: protocols bgp level parsed output dictionary """ BC = BGPConf(command_output) return get_protocol_instance_dict(protocol='bgp', instances_dic=BC.bgp_peer_dict)
# ====================================================================================================