Source code for nettoolkit.pyVig.maths

import numpy as np
import pandas as pd

# -------------------------------------------------------------------------------------

[docs] def df_with_slops_and_angles(df, x1_col, x2_col, y1_col, y2_col): """add the dataframe with slop and angle for the given co-ordinates on plane. Args: df (DataFrame): Input DataFrame x1_col (str): column name for point 1 - x axis x2_col (str): column name for point 2 - x axis y1_col (str): column name for point 1 - y axis y2_col (str): column name for point 2 - y axis Returns: DataFrame: Updated Output DataFrame """ df['slop'] = (df[y2_col] - df[y1_col])/(df[x2_col] - df[x1_col]) df = df.fillna("") df['angle_angled_connector'] = df.slop.apply(slop_to_angled_connector) df['angle_straight_connector'] = df.slop.apply(slop_to_straight_connector) return df.fillna("")
[docs] def slop_to_straight_connector(m): """calculate angle from given slop(m) of a straight line. Args: m (float): slop of a straight line Returns: int: degree/slop of line """ if not m: return 0 angle = int(np.math.degrees(np.math.tanh(m))) if angle < 0: angle = 90+angle if m <= 0: angle = 360-angle return angle
[docs] def slop_to_angled_connector(m): """calculate angle from given slop(m) of an angled line. Args: m (float): slop of an angled line Returns: int: degree/slop of line """ if not m: return 0 angle = int(np.math.degrees(np.math.tanh(m))) if angle < 0: angle = 180-angle if m > 0: angle = 360-angle return angle
# --------------------------------------------- # Co-ordinate calculator # ---------------------------------------------
[docs] class CalculateXY(): """Co-ordinate calculator Args: dev_df (DataFrame): Device DataFrame default_x_spacing (int, float): horizontal spacing between two devices default_y_spacing (int, float): vertical spacing between two devices cbl_df (DataFrame): Cabling DataFrame sheet_filter_dict (dict): sheet filters for multi tab drawing """ def __init__(self, dev_df, default_x_spacing, default_y_spacing, cbl_df, sheet_filter_dict): """initialize object by providing device DataFrame, default x & y - axis spacing values. """ self.df = dev_df self.cbl_df = cbl_df # self.spacing_x = default_x_spacing self.spacing_y = default_y_spacing # self.sheet_filter_dict = sheet_filter_dict
[docs] def calc(self): """calculation sequences """ self.sort() ho_dict = self.count_of_ho(self.df) # self.update_ys(self.df, 'y-axis', ho_dict) self.update_xs(self.df, 'x-axis', ho_dict) # if self.sheet_filter_dict: self.update_xy_for_sheet_filter_dict() self.merge_xy_filter_dfs_with_dev_df()
[docs] def update_xy_for_sheet_filter_dict(self): """create and calculate x-axis, y-axis columns, values for each filtered tab database """ self.sheet_filter_cbl_df_dict = {} self.sheet_filter_dev_df_dict = {} self.sheet_filter_dev_dict = {} for k, v in self.sheet_filter_dict.items(): self.sheet_filter_cbl_df_dict[k] = self.cbl_df[self.cbl_df[k] == v] self.sheet_filter_dev_dict[k] = set(self.sheet_filter_cbl_df_dict[k]['a_device']).union(set(self.sheet_filter_cbl_df_dict[k]['b_device'])) self.sheet_filter_dev_df_dict[k] = self.df[self.df.hostname.apply(lambda x: x in self.sheet_filter_dev_dict[k])] ho_dict = self.count_of_ho(self.sheet_filter_dev_df_dict[k]) self.update_ys(self.sheet_filter_dev_df_dict[k], f'{k}-y-axis', ho_dict) self.update_xs(self.sheet_filter_dev_df_dict[k], f'{k}-x-axis', ho_dict) self.sheet_filter_dev_df_dict[k] = self.sheet_filter_dev_df_dict[k][['hostname', f'{k}-x-axis', f'{k}-y-axis']]
[docs] def merge_xy_filter_dfs_with_dev_df(self): """merge sheet filter x,y column information with main device dataframe """ for k, v in self.sheet_filter_dev_df_dict.items(): self.df = self.df.join(v.set_index('hostname'), on='hostname')
[docs] def sort(self): """sort the Device DataFrame based on ['hierarchical_order', 'hostname'] """ self.df.sort_values(by=['hierarchical_order', 'hostname'], inplace=True) self.df = self.df[self.df.hierarchical_order != 100]
[docs] def count_of_ho(self, df): """counts hierarchical_order items for given dataframe and stores it in local dict Args: df (DataFrame): Device Dataframe with `hierarchical_order` column Returns: _type_: _description_ """ vc = df['hierarchical_order'].value_counts() return {ho: c for ho, c in vc.items()}
[docs] def calc_ys(self, ho_dict): """calculate y-axis refereances with respect to high order dictionary Args: ho_dict (dict): high order devices dictionary Returns: dict: high order dictionary with y-axis reference values """ ih, y = 0, {} for i, ho in enumerate(sorted(ho_dict)): if i == 0: y[ho] = ih prev_ho = ho continue c = ho_dict[ho] + ho_dict[prev_ho] ih += c/2 * self.spacing_y y[ho] = ih y = self.inverse_y(y) return y
[docs] def inverse_y(self, y): """inverses the y axis values (turn upside down) Args: y (dict): dictionary with y axis placement values based on hierarchical_order Returns: dict: inversed dictionary with y axis placement values based on reversed hierarchical_order """ return {k: max(y.values()) - v+2 for k, v in y.items()}
[docs] def get_y(self, ho): """get the y axis value for the given hierarchical_order Args: ho (int): hierarchical order number Returns: int, float: y axis value """ return self.y[ho]
[docs] def update_ys(self, df, y_axis, ho_dict): """update `y-axis` column to given `df` Device DataFrame Args: df (DataFrame): Device DataFrame y_axis (str): column name for y_axis ho_dict (dict): high order devices dictionary """ self.y = self.calc_ys(ho_dict) df[y_axis] = df['hierarchical_order'].apply(self.get_y)
# -----------------------------------------------
[docs] def get_x(self, ho): """get the x axis value for a device from given hierarchical order number Args: ho (int): hierarchical order number Returns: int, float: x axis value """ for v in sorted(self.xs[ho]): value = self.xs[ho][v] break del(self.xs[ho][v]) return value
[docs] def calc_xs(self, ho_dict): """calculate x-axis refereances with respect to high order dictionary Args: ho_dict (dict): high order devices dictionary Returns: dict: high order dictionary with x-axis reference values """ xs = {} middle = self.full_width/2 halfspacing = self.spacing_x/2 for ho in sorted(ho_dict): if not xs.get(ho): xs[ho] = {} c = ho_dict[ho] b = middle - (c/2*self.spacing_x) - halfspacing for i, x in enumerate(range(c)): pos = x*self.spacing_x + b xs[ho][i] = pos return xs
[docs] def update_xs(self, df, x_axis, ho_dict): """update `x-axis` column to given `df` Device DataFrame Args: df (DataFrame): Device DataFrame x_axis (str): column name for x_axis ho_dict (dict): high order devices dictionary """ self.full_width = (max(ho_dict.values())+2) * self.spacing_x self.xs = self.calc_xs(ho_dict) df[x_axis] = df['hierarchical_order'].apply(self.get_x)
# ---------------------------------------------
[docs] class CalculateXYNative(): """Calculate co-ordinate default Native way Args: ddf (DataFrame): Devices DataFrame cdf (DataFrame): Cabling DataFrame """ def __init__(self, ddf, cdf, sfd): """Initializer """ self.ddf = ddf self.df = self.ddf self.cdf = cdf self.sfd = sfd self.item_indexes = {} self.indexes = [] self._x = 0 self._y = 0
[docs] def count_of_devices(self, dev): """identify devices from cable matrix and provide its occurances number. Args: dev (str): Device hostname to match with Returns: int: number of occurances """ return len(self.cdf[(self.cdf.a_device == dev)]) + len(self.cdf[(self.cdf.b_device == dev)])
[docs] def dev_connectors_dict(self): """set device connectors dictionary and its inverse dictionary """ self.dcd = {device:self.count_of_devices(device) for device in set(self.ddf.hostname)} self.ddc = {} for k, v in self.dcd.items(): if not self.ddc.get(v): self.ddc[v] = set() self.ddc[v].add(k)
[docs] def calc(self): """start calculator """ self.dev_connectors_dict() self.iterate() self.add_x_y_to_df()
[docs] def add(self, item, x, y): """add an item to provided co-ordinates Args: item (str): device hostname x (int): x-coordinate y (int): y-coordinate """ if item not in self.item_indexes: self.item_indexes[item] = (x,y) self.indexes.append((x,y))
[docs] def iterate(self): """iterate thru all devices and add its co-ordinates """ for n in sorted(self.ddc.keys()): devices = self.ddc[n] for device in devices: self.device_add(device, root=True)
[docs] def device_add(self, device, root): """check if device and its childs is added or not and add its co-ordinates. Args: device (str): hostname of device root (bool): is it root path or not Returns: None: No return """ if device not in self.item_indexes and (self._x, self._y) not in self.indexes: self.add(device, self._x, self._y) return None if root: nbr_devices = self.get_nbr_devices(device) for dev in nbr_devices: self._x += 2 self.device_add(dev, root=False) self._y += 2
[docs] def get_nbr_devices(self, device): """get set of neighbor devices for provided host/device Args: device (str): device hostname Returns: set: set of devices """ return set(self.cdf[(self.cdf.a_device == device)].b_device).union(set(self.cdf[(self.cdf.b_device == device)].a_device))
[docs] def add_x_y_to_df(self): """add x,y axis columns to devices data frame. """ self.ddf['x-axis'] = self.ddf.hostname.apply(lambda x: self.get_xy(x, 0)) self.ddf['y-axis'] = self.ddf.hostname.apply(lambda x: self.get_xy(x, 1)) for k in self.sfd: self.ddf[f'{k}-x-axis'] = self.ddf.hostname.apply(lambda x: self.get_xy(x, 0)) self.ddf[f'{k}-y-axis'] = self.ddf.hostname.apply(lambda x: self.get_xy(x, 1))
[docs] def get_xy(self, dev, i): """retrive (x,y) co-ordinate for provided device. (i=0 for x, i=1 for y) Args: dev (str): device hostname i (int): index number from tuple for x, y Returns: int: respective co-ordinate """ try: return self.item_indexes[dev][i] except: pass try: return self.item_indexes[dev.strip()][i] except: pass return -10