"""common modifier functions/classes
"""
from nettoolkit.addressing import *
import pandas as pd
# ================================================================================================
# Functions
# ================================================================================================
def trunk_no_more(cmd):
"""trunkates command from pipe | and returns only command
Args:
cmd (str): command string (juniper)
Returns:
str: trunkated command
"""
spl = cmd.split("|")
ucmd = spl[0].strip()
return ucmd
# ================================================================================================
# Dynamic Cisco Key Exchanger
# ================================================================================================
[docs]
class KeyExchanger():
"""Dynamic Cisco Key Exchanger
Args:
column_mapper (dict): cisco commands column mapper dictionary
cmd_lst (list): commands capture list
"""
def __init__(self, column_mapper, cmd_lst):
"""instance initializer
"""
self.column_mapper = column_mapper
self.cmd_lst = cmd_lst
self.dfd = self.read_column_mapper()
self.update_cisco_cmd_lst()
[docs]
def read_column_mapper(self):
"""reads column mapper file for each sheet, and update dictionary for each sheet Dataframe
if sheet name found in command list
Returns:
dict: dictionary of DataFrame
"""
dfd = pd.read_excel(self.column_mapper, None)
for sheet, df in dfd.items():
trunkated_sheet = trunk_no_more(sheet)
if trunkated_sheet not in self.cmd_lst: continue
dfd[trunkated_sheet] = df.fillna("")
return dfd
[docs]
def update_cisco_cmd_lst(self):
"""updates cisco commands list with headers
"""
for sheet, df in self.dfd.items():
if sheet not in self.cmd_lst: continue
for head in df:
if not df[head][0]: continue
self.cmd_lst[sheet][head] = df[head][0]
# ================================================================================================
# Cisco Database Object
# ================================================================================================
[docs]
class DataFrameInit():
"""Database Object
Args:
capture (str): capture log file
"""
def __init__(self, capture):
"""instance initializer
"""
self.capture = capture
self.dfd = self.read_int_sheets()
[docs]
def read_int_sheets(self):
"""reads ntc parsed excel file, returns dictionary of dataframe if sheet found in command list
Returns:
dict: dictionary of dataframe
"""
try:
dfd = pd.read_excel(self.capture, None)
except:
dfd = {}
ndf = {}
for sheet, df in dfd.items():
trunkated_sheet = trunk_no_more(sheet)
if trunkated_sheet not in self.cmd_lst: continue
ndf[trunkated_sheet] = df.fillna("")
return ndf
@staticmethod
def _is_sheet_data_available(dataframe_dict, sheet_name):
"""checks if sheet data from given dataframe dictionary does exist or not or empty and returns based on findings.
Args:
dataframe_dict (dict): dictionary of dataframe
sheet_name (str): sheet name
Returns:
None, False, DataFrame: None if sheet empty, False if sheet not found, DataFrame for matching sheet.
"""
try:
sht_df = dataframe_dict[sheet_name]
if sht_df.empty:
print(f"{sheet_name}: sheet data empty.")
return None
return sht_df
except:
print(f"{sheet_name}: sheet data unavailable.")
return False
# ================================================================================================
# var Object common calls/methods class
# ================================================================================================
[docs]
class Var():
"""var Object common calls/methods class
"""
def __getitem__(self, k):
return self.var[k]
def __iter__(self):
for k, v in self.var.items():
yield k, v
[docs]
def update_var(self, key, value):
"""update self.var dicationary
Args:
key (str): key
value (str): value
"""
self.var[key] = value
@property
def pdf_dict(self):
"""dictionary of dataframe with `var` key
Returns:
dict: dictionary of dataframe
"""
return {'var': self.var_df }
## Calls
[docs]
def create_a_temp_v6_hext2_3_column(self, sht_df):
"""creates a temporary column 'ipaddr' for further processing
Args:
sht_df (DataFrame): pandas DataFrame
Returns:
DataFrame: updated DataFrame with 'ipaddr' column
"""
sht_df['temp'] = sht_df['ipaddr'].apply(get_hext2_3)
return sht_df
[docs]
def convert_to_dataframe(self):
"""creates a new dictionary of data frame from local variables
Dictionary of Pandas DataFrame as value
"""
df = pd.DataFrame(self.var, index=[0]).T
df = df.reset_index()
df.rename(columns={'index': 'var', 0:'default'}, inplace=True)
df.sort_values(['var'], inplace=True)
self.var_df = df
[docs]
def update_device(self, sht):
"""updates the multiple fields from show version output
Args:
sht (str): sheet name
Returns:
None: None
"""
sht_df = self._is_sheet_data_available(self.dfd, sht)
if not sht_df or sht_df.empty: return None
#
for k, v in self.cmd_lst[sht].items():
try:
x = eval(sht_df[k][0])
except:
x = sht_df[k][0]
if isinstance(x, (int, str)):
self.update_var(v, x)
elif isinstance(x, (list, set, tuple)):
self.update_var(v, "\n".join(x))
#
self.update_var('host-name', self['hostname']) ## duplicate entry for various support
self.hostname = self['hostname'] ## self.hostname for late access.
# ----------------------------------------------------------------------------------------------------
def get_hext2_3(ipaddr):
"""get the hextate 2 and hextate 3 from given ipv6 ip address.
Args:
ipaddr (list): string of list of IPV6 strings
Returns:
str, tuple: blank string if no eligible hextate found, else tuple of hextate 2 and hextate 3
"""
l = eval(ipaddr)
if isinstance(l, (list, set, tuple)):
try:
v6 = IPv6(l[-1][:-2]+"/64")
h2b = v6.get_hext(2)
h3b = v6.get_hext(3)
if h2b and h3b and h2b!='0'and h3b!='0':
return (h2b, h3b)
return ""
except:
pass
return ""
# ================================================================================================
# Interfaces Object common calls/methods class
# ================================================================================================
[docs]
class TableInterfaces():
"""Interfaces Object common calls/methods class
"""
@property
def pdf_dict(self):
"""tables dictionary of DataFrame
Returns:
dict: dictionary of dataframe
"""
return {'tables': self.pdf}
## Calls
# ================================================================================================
# VRF Object common calls/methods class
# ================================================================================================
[docs]
class TableVrfs():
"""VRF Object common calls/methods class
"""
@property
def pdf_dict(self):
"""vrf dictionary of DataFrame
Returns:
dict: dictionary of dataframe
"""
return {'vrf': self.vrf_df}
## Calls
[docs]
def update_column_names(self):
"""rename vrf column name from ``name`` to ``vrf``
"""
self.vrf_df.rename(columns={'name': 'vrf',}, inplace=True)
[docs]
def drop_mtmt_vrf(self):
"""drop rows where vrf is management vrf
"""
self.vrf_df.drop(self.vrf_df[
self.vrf_df["vrf"] == "Mgmt-vrf"
].index, axis=0, inplace=True)
[docs]
def add_filter_col(self):
"""add filter column for vrf
"""
self.vrf_df['filter'] = "vrf"
[docs]
def update_rt(self):
"""update vrf route targets value from rd values
"""
self.vrf_df['vrf_route_target'] = self.vrf_df['default_rd'].apply(get_rt)
# ----------------------------------------------------------------------------------------------------
def get_rt(default_rd):
"""get the route target number from rd value
Args:
default_rd (str): route-distinguisher value
Returns:
str: route-target if set else blank string
"""
try:
rt = default_rd.split(":")[-1]
if rt != "<not set>":
return rt
else:
return ""
except: return ""
# ================================================================================================
__all__ = [
'DataFrameInit', 'KeyExchanger',
'Var', 'TableInterfaces', 'TableVrfs',
]