import pandas as pd
from copy import deepcopy, copy
from nettoolkit.nettoolkit_db import *
from nettoolkit.nettoolkit_common import *
from nettoolkit.nettoolkit.forms.formitems import get_cable_n_connectors, add_cable_n_connectors, CONNECTOR_TYPES_FILE
from .devices import AdevDevices, device_df_drop_empty_duplicates, update_var_df_details_to_table_df
from .cablings import ADevCablings
from .maths import CalculateXY, CalculateXYNative
from .general import *
pd.set_option('mode.chained_assignment', None)
# -------------------------------------------------------------------------
CABLING_COLUMNS = ['a_device', 'a_dev_model', 'a_dev_serial', 'aport', 'a_media_type', 'a_connector_type',
'b_device', 'b_dev_model', 'b_dev_serial', 'bport', 'b_media_type', 'b_connector_type',
'cable_type', 'speed', 'cable',
]
VISIO_DRAWING_COLUMNS= [
'connector_type', 'color', 'pattern', 'weight',
]
# ---------------------------------------------
# Data Frame Generator
# ---------------------------------------------
[docs]
class DFGen():
"""DataFrame generator
Args:
files (list): list of input excel -clean files
"""
def __init__(self, files):
"""initializer of DF Generator
"""
self.files = sorted(files)
self.default_stencil = None
self.default_x_spacing = 3.5
self.default_y_spacing = 2
self.line_pattern_style_separation_on = None
self.line_pattern_style_shift_no = 2
self.weight = DEFAULT_LINE_WT
self.func_dict = {}
self.var_func_dict = {
'hostname': get_dev_hostname,
'device_model': get_dev_model,
'serial_number': get_dev_serial,
}
self.pattern = 1
self.blank_dfs()
self.color_map = {
'aoc': 'red',
'copper': 'darkgray',
'fiber om3 mm': 'skyblue',
'fiber om4 mm': 'skyblue',
'fiber mm': 'green',
'fiber sm': 'yellow',
#
# ... add more as and when need ... #
# --- adding new color will require respective edit in visio.py ---
}
[docs]
def blank_dfs(self):
"""creates devices/cabling blank DataFrames
"""
self.devices_merged_df = pd.DataFrame({'hostname':[]})
self.cabling_merged_df = pd.DataFrame({'a_device':[]})
[docs]
def custom_attributes(self, **kwargs):
"""add/update custom attributes for object
"""
for k, v in kwargs.items():
if v:
self.__dict__[k] = v
[docs]
def custom_functions(self, **kwargs):
"""add/update custom functions for object
"""
for k, v in kwargs.items():
self.func_dict[k] = v
[docs]
def custom_var_functions(self, **kwargs):
"""add/update custom `var` tab functions for object
"""
for k, v in kwargs.items():
self.var_func_dict[k] = v
def __call__(self):
self.run()
[docs]
def run(self):
"""iterate over all files for generating devices/cabling DataFrame details.
"""
self.DCT = {}
for file in self.files:
DCT = DF_ConverT(file, self.default_stencil, self.line_pattern_style_separation_on, self.line_pattern_style_shift_no)
DCT.get_self_details(self.var_func_dict)
DCT.convert(self.func_dict)
self.update_devices_df(DCT, file)
self.update_cabling_df(DCT, file)
self.DCT[DCT.hostname] = DCT
#
self.devices_merged_df = device_df_drop_empty_duplicates(self.devices_merged_df)
self.devices_merged_df = update_var_df_details_to_table_df(self.devices_merged_df, self.DCT, self.var_func_dict)
#
self.cabling_merged_df.reset_index(inplace=True)
self.add_b_device_media_types()
self.add_cable_connector_details()
#
self.remove_subintf_from_ports()
self.standardize_intf_on_ports()
self.remove_duplicate_cablings()
self.add_model_n_serial_number_to_cabling()
#
#
self.update_weight()
if not self.__dict__.get('color'): self.update_color()
# ---------------------------------------- #
#
self.df_dict = {'Devices': self.devices_merged_df, 'Cablings': self.cabling_merged_df }
#
[docs]
def map_color(self, **kwargs):
"""add, edit line color map
"""
for k, v in kwargs.items():
self.color_map[k] = v
[docs]
def arrange_cablings(self, keep_all_cols=True):
"""arrange cabling tab in to appropriate order given in CABLING COLUMNS
"""
cabeling_columns = set(self.df_dict['Cablings'].columns)
extra_cols = cabeling_columns.difference(set(CABLING_COLUMNS)).difference(set(VISIO_DRAWING_COLUMNS))
arranged_cols = copy(CABLING_COLUMNS)
if keep_all_cols:
arranged_cols += VISIO_DRAWING_COLUMNS
arranged_cols += extra_cols
self.df_dict['Cablings'] = self.df_dict['Cablings'][arranged_cols]
[docs]
def update(self, *funcs):
for f in funcs:
f(self.df_dict)
[docs]
def update_devices_df(self, DCT, file):
"""update Devices DataFrame
Args:
DCT (DF_ConverT): DataFrame Convertor object
file (str): a single database. -clean excel file. ( not in use )
"""
ddf = DCT.update_devices()
#
ddf_dev = DCT.update_device_self_detils(self.func_dict)
ddf = pd.concat([ddf, ddf_dev], axis=0, join='outer')
#
self.devices_merged_df = pd.concat([self.devices_merged_df, ddf], axis=0, join='outer')
[docs]
def update_cabling_df(self, DCT, file):
"""update Cabling DataFrame
Args:
DCT (DF_ConverT): DataFrame Convertor object
file (str): a single database. -clean excel file.
"""
cdf = DCT.update_cablings(**self.__dict__)
#
self.cabling_merged_df = pd.concat([self.cabling_merged_df, cdf], axis=0, join='outer')
[docs]
def calculate_cordinates(self, sheet_filter_dict={}):
"""calculate the x,y coordinate values for each devices and keep Devices, Cablings DataFrame Dictionary ready.
Args:
sheet_filter_dict (dict): sheet filter dictionary for mutitab executions.
"""
if self.cabling_merged_df.empty: return
if 'hierarchical_order' in self.func_dict:
CXY = CalculateXY(self.devices_merged_df, self.default_x_spacing, self.default_y_spacing, self.cabling_merged_df, sheet_filter_dict)
CXY.calc()
else:
CXY = CalculateXYNative(self.devices_merged_df, self.cabling_merged_df, sheet_filter_dict)
CXY.calc()
self.df_dict = {'Devices': CXY.df, 'Cablings': self.cabling_merged_df }
[docs]
def remove_duplicate_cablings(self):
"""removes redundant cabling entries between two devices with same port identified
"""
self.cabling_merged_df["a_dev_duplicate"] = self.cabling_merged_df.b_device
self.cabling_merged_df["aport_duplicate"] = self.cabling_merged_df.bport
self.cabling_merged_df["b_dev_duplicate"] = self.cabling_merged_df.a_device
self.cabling_merged_df["bport_duplicate"] = self.cabling_merged_df.aport
self.cabling_merged_df = remove_duplicates(self.cabling_merged_df)
self.cabling_merged_df = self.cabling_merged_df.drop(
columns=["a_dev_duplicate", "aport_duplicate", "b_dev_duplicate", "bport_duplicate"]
)
self.cabling_merged_df = merge_port_for_duplicates(self.cabling_merged_df)
[docs]
def remove_duplicate_cabling_entries(self):
"""removes duplicate cabling entries between a-b devices / deprycated.
"""
a_to_b = {}
copy_full_df = deepcopy(self.cabling_merged_df)
for i, data in copy_full_df.iterrows():
if not a_to_b.get(data.a_device):
a_to_b[data.a_device] = {'remotedev':[]}
if data.b_device in a_to_b.keys() and data.a_device in a_to_b[data.b_device]['remotedev']:
self.cabling_merged_df.drop(i, inplace=True)
continue
if data.a_device in a_to_b.keys() and data.b_device in a_to_b[data.a_device]['remotedev']:
self.cabling_merged_df.drop(i, inplace=True)
continue
a_to_b[data.a_device]['remotedev'].append(data.b_device)
[docs]
def remove_undefined_cabling_entries(self):
"""removes undefined cabling entries where device doesn't exist in devices tab / deprycated
"""
dev_hosts = set(self.devices_merged_df.hostname)
copy_full_df = deepcopy(self.cabling_merged_df)
for i, data in copy_full_df.iterrows():
if not data.a_device in dev_hosts or not data.b_device in dev_hosts:
self.cabling_merged_df.drop(i, inplace=True)
continue
[docs]
def add_cable_connector_details(self):
"""add cable and connector details to database (such as: media_type, cable_type, connector_types, mediaspeed, cable-type )
"""
self.additional_sfp_cts = {'media_type':[], 'cable_type': [],'_connector_type': [], 'speed':[]}
self.cabling_merged_df['cable_type'] = self.cabling_merged_df.apply(lambda x: self.update_cable_connector(x, 'cable_type'), axis=1)
self.cabling_merged_df['a_connector_type'] = self.cabling_merged_df.apply(lambda x: self.update_cable_connector(x, '_connector_type', 'a_media_type'), axis=1)
self.cabling_merged_df['b_connector_type'] = self.cabling_merged_df.apply(lambda x: self.update_cable_connector(x, '_connector_type', 'b_media_type'), axis=1)
self.cabling_merged_df['speed'] = self.cabling_merged_df.apply(lambda x: self.update_cable_connector(x, 'speed',), axis=1)
self.cabling_merged_df['cable'] = self.cabling_merged_df.apply(lambda x: self.merge_cable_type(x), axis=1)
add_cable_n_connectors(CONNECTOR_TYPES_FILE, **self.additional_sfp_cts)
[docs]
def merge_cable_type(self, df):
"""checks a side and b side connectors, media speed and returns desired cable type
Args:
df (DataFrame): Pandas DataFrame (cabling)
Returns:
str: cable type (ex: lc to lc 10g Multimode OM3 Cable)
"""
ac = df.a_connector_type if df.a_connector_type else "??"
bc = df.b_connector_type if df.b_connector_type else "??"
s = f"{ac} to {bc} {df.speed} {df.cable_type}"
return s
[docs]
def update_cable_connector(self, cm_df, what, mt_col=None):
"""common definition to update attributes: cable_type, connector_types, mediaspeed
Args:
cm_df (DataFrame): Pandas DataFrame (cabling)
what (str): attribute name to be updated
mt_col (str, optional): mediatype column name. Defaults to None.
Returns:
str: found atribute value
"""
if mt_col is None:
media_type = cm_df.a_media_type if cm_df.a_media_type else cm_df.b_media_type
else:
media_type = cm_df[mt_col]
if not media_type: return ""
if media_type in self.additional_sfp_cts['media_type']:
try:
if self.additional_sfp_cts[what][self.additional_sfp_cts['media_type'].index(media_type)]:
return self.additional_sfp_cts[what][self.additional_sfp_cts['media_type'].index(media_type)]
except:
pass
#
c = get_cable_n_connectors(CONNECTOR_TYPES_FILE, what, media_type)
if c: return c
#
c = input(f"Provide [{what}] - New SFP identified: [{media_type}]:")
if c:
self._update_additional_sfp_cts(media_type, what, c)
return c
#
return ''
# support to above
def _update_additional_sfp_cts(self, media_type, what, c):
if media_type not in self.additional_sfp_cts['media_type']:
self.additional_sfp_cts['media_type'].append(media_type)
for _ in {'cable_type', '_connector_type', 'speed'}:
self.additional_sfp_cts[_].append("")
self.additional_sfp_cts[what][self.additional_sfp_cts['media_type'].index(media_type)] = c
[docs]
def update_weight(self):
"""define thickness of connector
"""
update_weight(self.cabling_merged_df, self.weight)
[docs]
def update_color(self):
"""define color of connector
"""
update_color(self.cabling_merged_df, self.color_map)
[docs]
def remove_subintf_from_ports(self):
"""removes subninterface value/string from ports
"""
self.cabling_merged_df['aport'] = self.cabling_merged_df['aport'].apply(lambda x: x.split(".")[0])
self.cabling_merged_df['bport'] = self.cabling_merged_df['bport'].apply(lambda x: x.split(".")[0])
[docs]
def standardize_intf_on_ports(self):
"""standardize all interfaces
"""
self.cabling_merged_df['aport'] = self.cabling_merged_df['aport'].apply(lambda x: STR.intf_standardize_or_null(x, intf_type='PHYSICAL'))
self.cabling_merged_df['bport'] = self.cabling_merged_df['bport'].apply(lambda x: STR.intf_standardize_or_null(x, intf_type='PHYSICAL'))
[docs]
def add_model_n_serial_number_to_cabling(self):
"""cabling tab - add device model and serial numbers
"""
self.cabling_merged_df['a_dev_model'] = self.cabling_merged_df.apply(lambda x: get_model_number(x, self.devices_merged_df, 'a_device'), axis=1)
self.cabling_merged_df['b_dev_model'] = self.cabling_merged_df.apply(lambda x: get_model_number(x, self.devices_merged_df, 'b_device'), axis=1)
self.cabling_merged_df['a_dev_serial'] = self.cabling_merged_df.apply(lambda x: get_serial_number(x, self.devices_merged_df, 'a_device'), axis=1)
self.cabling_merged_df['b_dev_serial'] = self.cabling_merged_df.apply(lambda x: get_serial_number(x, self.devices_merged_df, 'b_device'), axis=1)
CableMatrix = DFGen
# --------------------------------------------------------------------------------------------------
[docs]
def get_serial_number(df, devices_merged_df, device):
"""get serial number for match device
Args:
df (DataFrame): Cable Matrix DataFrame
devices_merged_df (DataFrame): Devices DetaFrame
device (str): device hostname
Returns:
str: serial number of device
"""
mini_dev_df = devices_merged_df[(devices_merged_df.hostname == df[device])]
if mini_dev_df.empty: return ""
idx = mini_dev_df.index[0]
return mini_dev_df['serial_number'][idx]
[docs]
def get_model_number(df, devices_merged_df, device):
"""get model number for match device
Args:
df (DataFrame): Cable Matrix DataFrame
devices_merged_df (DataFrame): Devices DetaFrame
device (str): device hostname
Returns:
str: model number of device
"""
mini_dev_df = devices_merged_df[(devices_merged_df.hostname == df[device])]
if mini_dev_df.empty: return ""
idx = mini_dev_df.index[0]
return mini_dev_df['device_model'][idx]
[docs]
def update_weight(df, base_weight=1):
"""update line thickness for all connectors, where found multiple connectivities between two devices
Args:
df (DataFrame): Pandas DataFrame (cabling)
base_weight (int, optional): base weight. Defaults to 1.
"""
for i, v in df.iterrows():
minidf = df[ (df.a_device == v.a_device) & (df.b_device == v.b_device) ]
minidf_len = len(minidf)
if minidf_len == 1:
continue
elif minidf_len > 1:
df.loc[ ((df.a_device == v.a_device) & (df.b_device == v.b_device)), 'weight' ] = minidf_len * base_weight
[docs]
def update_color(df, colors):
"""update line color for all connectors, based on the found cable type. Available options are
# ( white, black, red, green, skyblue, blue, yellow, gray, lightgray, darkgray, orange )
Args:
df (DataFrame): Pandas DataFrame (cabling)
"""
df.color = df.apply(lambda x: get_color(x, colors), axis=1)
[docs]
def get_color(df, colors):
"""get color of a connector
Args:
df (DataFrame): Pandas DataFrame (cabling)
colors (dict): fibertype v/s color dictionary
Returns:
str: color for given found cable (Default: black)
"""
if df.cable_type.lower() in colors:
return colors[df.cable_type.lower()]
return 'black'
[docs]
def remove_duplicates(df):
"""removes duplicate cabling entries between a-b devices
"""
#
df2 = deepcopy(df)
for i, v in df.iterrows():
bdev = v.b_device
bport = v.bport
adev = v.a_device
aport = v.aport
minidf = df[(df.a_dev_duplicate == adev) & (df.aport_duplicate == aport) & (df.b_dev_duplicate == bdev) & (df.bport_duplicate == bport)]
if minidf.empty:
continue
df2 = df2.drop(minidf.index)
break
if len(df) == len(df2):
return df2
return remove_duplicates(df2)
[docs]
def merge_port_for_duplicates(df):
"""merge multiple cabling for a-b devices
"""
df2 = deepcopy(df)
for i, r in df.iterrows():
mdf = df[((df.a_device == r.a_device) & (df.b_device == r.b_device))]
if len(mdf.a_device) == 1:
continue
idx = 0
for j, m in mdf.iterrows():
if idx == 0:
idx = j
prv_aport = m.aport
prv_bport = m.bport
continue
prv_aport += "\n"+m.aport
prv_bport += "\n"+m.bport
df2.at[idx, 'aport'] = prv_aport
df2.at[idx, 'bport'] = prv_aport
df2 = df2.drop(j)
break
if len(df) == len(df2):
return df2
return merge_port_for_duplicates(df2)
# ---------------------------------------------
# Data Frame Converter
# ---------------------------------------------
[docs]
class DF_ConverT():
"""Data Frame Converter
Args:
file (str): a single database. -clean excel file.
default_stencil (str): default visio stencil file.
line_pattern_style_separation_on (str): column name on which line pattern separation should be decided on
line_pattern_style_shift_no (int): line pattern change/shift number/steps
"""
def __init__(self, file,
default_stencil,
line_pattern_style_separation_on,
line_pattern_style_shift_no,
):
"""object initializer
"""
self.file = file
self.full_df = read_xl(file)
file = file.split("/")[-1].split("\\")[-1]
self.self_device = file.split("-clean")[0].split(".")[0]
#
self.stencils = default_stencil
self.line_pattern_style_separation_on = line_pattern_style_separation_on
self.line_pattern_style_shift_no = line_pattern_style_shift_no
[docs]
def get_self_details(self, var_func_dict):
"""update the value from var tab of var function dictionary
Args:
var_func_dict (dict): custom var functions dictionary
"""
self.var_func_dict = var_func_dict
for k, f in var_func_dict.items():
v = f(self.full_df['var'])
if not v: v = 'N/A'
self.__dict__[k] = v
[docs]
def convert(self, func_dict):
"""create physical DataFrame, update with patterns
Args:
func_dict (dict): custom functions dictionary
"""
# vlan
vlan_df = get_vlan_if_up(self.full_df['vlan'])
vlan_df = get_vlan_if_relevants(vlan_df)
self.vlan_df = vlan_df
# physical
df = get_physical_if_up(self.full_df['physical'])
df = get_physical_if_relevants(df)
#
df = self.update_devices_df_pattern_n_custom_func(df, func_dict)
#
self.u_ph_df = df
[docs]
def update_devices_df_pattern_n_custom_func(self, df, func_dict):
"""updates Devices DataFrame patterns as per custom functions provided in func_dict
Args:
df (DataFrame): pandas DataFrame for devices
func_dict (dict): custom functions dictionary
Returns:
DataFrame: updated DataFrame
"""
for k, f in func_dict.items():
df[k] = f(df)
#
self.patterns = get_patterns(df, self.line_pattern_style_separation_on, self.line_pattern_style_shift_no)
df = update_pattern(df, self.patterns, self.line_pattern_style_separation_on)
#
return df
[docs]
def update_cablings(self, **default_dic):
"""creates Cabling object and its DataFrame, adds cabling details
Returns:
DataFrame: pandas DataFrame
"""
self.C = ADevCablings(self.self_device, **default_dic)
for k, v in self.u_ph_df.iterrows():
kwargs = {}
for x, y in v.items():
kwargs[x] = y
self.C.add_to_cablings(**kwargs)
#
self.C.cabling_dataframe()
return self.C.merged_df
[docs]
def update_devices(self):
"""creates Devices object, and its DataFrame, adds vlan informations.
Returns:
DataFrame: updated pandas DataFrame for interface connected devices
"""
self.D = AdevDevices(self.stencils, self.var_func_dict, self.full_df['var'])
self.D.int_df = self.update_devices_for(df=self.u_ph_df, dic=self.D.devices)
self.D.add_vlan_info(self.vlan_df)
return self.D.merged_df
[docs]
def update_device_self_detils(self, func_dict):
"""create a pandas DataFrame object for the self object using `var` tab and custom functions
Args:
func_dict (dict): custom var functions
Returns:
DataFrame: pandas DataFrame for self device
"""
self_device_df = self.D.get_self_device_df()
self_dev_df = self.update_devices_for(df=self_device_df, dic=self.D.self_device)
self_dev_df = self.update_devices_df_pattern_n_custom_func(self_dev_df, func_dict)
return self_dev_df
[docs]
def update_devices_for(self, df, dic):
"""update DataFrame for the provided dictionary (dic) objects, and removes empty and duplicate hostname value rows.
Args:
df (DataFrame): variable DataFrame
dic (dict): variable dictionary
Returns:
DataFame: updated DataFrame
"""
for k, v in df.iterrows():
kwargs = {}
for x, y in v.items():
kwargs[x] = y
self.D.add_to_devices(what=dic, **kwargs)
u_df = device_df_drop_empty_duplicates(dic)
return u_df
# ---------------------------------------------