Source code for nettoolkit.nettoolkit_common.common

# """Common Definitions used across project"""
# ------------------------------------------------------------------------------
from dataclasses import dataclass, field
import yaml
from yaml import UnsafeLoader
import subprocess as sp
import pandas as pd
import os
from  pathlib import Path

# ------------------------------------------------------------------------------

### IDENTIFER OF COMMAND LINE ### >
CMD_LINE_START_WITH = "output for command: "
CISCO_ABS_COMMANDS = {
	'show lldp neighbors',
	'show cdp neighbors',
	'show interfaces status',
	'show interfaces description',
	'show mac address-table',
	'show ip arp',
	'show running-config',
	'show version',
}

JUNIPER_ABS_COMMANDS = {
	'show interfaces descriptions',
	'show lldp neighbors',
	'show configuration',
	'show version',
	'show chassis hardware',
	'show interfaces terse',
	'show arp',
	'show bgp summary',
}

# ------------------------------------------------------------------------------

[docs] def get_file_path(file): """returns folder of given file path Args: file (str): full string length file path Returns: str: folder location of file """ p = Path(file) return p.parent
[docs] def get_file_name(file, ext=False): """returns file name of given file path Args: file (str): full string length file path ext (bool, optional): include extension or not. Defaults to False. Returns: str: file name """ p = Path(file) return p.name if ext else p.stem
[docs] def create_folders(folders, *, silent=True): """Creates Folders Args: folders (list,str): folder(s) silent (bool, optional): Create without prompt. Defaults to True. Returns: bool: Success/Fail """ cf = 1 if isinstance(folders, str): folders = [folders,] for folder in folders: if not os.path.exists(folder): if not silent: print(f"[+] Creating: {folder}", end="\t") try: os.makedirs(folder) print("OK.") except: print("Failed.") cf = 0 return bool(cf)
# ------------------------------------------------------------------------------
[docs] def remove_domain(hn): """Removes domain suffix from provided hostname string Args: hn (str): fully qualified dns hostname Returns: str: hostname left by removing domain suffix """ return hn.split(".")[0]
[docs] def read_file(file): """read the provided text file and retuns output in list format Args: file (str): text file name Returns: list: output converted to list (separated by lines) """ with open(file, 'r') as f: file_lines = f.readlines() return file_lines
[docs] def read_yaml_mode_us(file): try: with open(file, 'r') as f: return yaml.load(f, Loader=UnsafeLoader) except Exception as e: raise Exception(f"[-] Unable to Read the file, or invalid data \n{e}")
# ------------------------------------------------------------------------------
[docs] def get_op(file, cmd): """filter the command output from given captured file. Note: output should be taken from capture_it utility or it should be in the format derived by it. Args: file (str): capture file cmd (str): show command for which output to capture Returns: list: filtered command output in list format """ file_lines = read_file(file) toggle, op_lst = False, [] for l in file_lines: if l.find(CMD_LINE_START_WITH)>0: toggle = l.find(cmd)>0 continue if toggle: op_lst.append(l.strip()) return op_lst
[docs] def get_ops(file, cmd_startswith): """filter the command outputs from given captured file. Note: output should be taken from capture_it utility or it should be in the format derived by it. Args: file (str): capture file cmd_startswith (str): show command start string Returns: dict: filtered command output in dict format """ file_lines = read_file(file) toggle, op_lst, op_dict = False, [], {} for l in file_lines: if toggle and l.find(CMD_LINE_START_WITH)>0: op_dict[cmd] = op_lst op_lst = [] toggle=False if l.find(CMD_LINE_START_WITH)>0: toggle = l.find(cmd_startswith)>0 cmd = l[l.find(cmd_startswith):].strip() continue if toggle: op_lst.append(l.rstrip()) return op_dict
# ------------------------------------------------------------------------------
[docs] def blank_line(line): """checks if provided line is blank line or not. Args: line (str): input line Returns: bool: is line blank or not """ return not line.strip()
[docs] def get_device_manufacturar(file): """finds out manufacturer (cisco/juniper) from given capture file. in case if not found, it will return as Unidentified. Args: file (str): input capture file Returns: str: Either one from - Cisco, Juniper, Unidentified """ file_lines = read_file(file) return detect_device_type(file_lines)
[docs] def detect_device_type(config_log_list): for line in config_log_list: if line[0] == "!": return 'Cisco' elif line[0] == "#": return "Juniper" return "Unidentified"
[docs] def verifid_output(cmd_op): """vefifies if command output is in valid state. Multiline string are splits with CR. and retuns as list. if input is a list, it will be returned as is. any other input will throw error. Args: cmd_op (list, str): Either list or Multiline string of output Raises: TypeError: Raise error if input is other than string or list. Returns: list: output in list format """ if isinstance(cmd_op, str): cmd_op = cmd_op.split("\n") if not isinstance(cmd_op, list): raise TypeError("Invalid Command Output Received.\n" f"Expected either multiline-string or list, received {type(cmd_op)}.") return cmd_op
# ------------------------------------------------------------------------------
[docs] def get_string_part(line, begin, end): """get the sub-string out of provided long string(line) Args: line (str): string line begin (int): sub-str start point end (int): sub-str end point Raises: TypeError: Raise error if input is invalid or sub-string falls outside Returns: str: sub-string """ try: return line[begin: end].strip() except: raise TypeError("Unrecognized Input")
[docs] def get_string_trailing(line, begin_at): """get the training part of sub-string starting from provided index Args: line (str): string line begin_at (int): sub-str start point Raises: TypeError: Raise error if input is invalid or sub-string falls outside Returns: str: sub-string """ try: return line[begin_at:].strip() except: raise TypeError("Unrecognized Input")
# ------------------------------------------------------------------------------
[docs] def standardize_mac(mac): """removes . or : from mac address and make it a standard Args: mac (str): mac address Returns: str: standard format of mac address """ return mac.replace(":","").replace(".","")
[docs] def mac_2digit_separated(mac): """converts input mac to 2 digit separated mac format, separator=`:` Args: mac (str): mac address Returns: str: 2 digit separated format of mac address """ mac = standardize_mac(mac) for x in range(6): if x == 0: s = mac[:2] else: s += ":" + mac[x*2:(x*2)+2] return s
[docs] def mac_4digit_separated(mac): """converts input mac to 4 digit separated mac format, separator=`.` Args: mac (str): mac address Returns: str: 4 digit separated format of mac address """ mac = standardize_mac(mac) for x in range(3): if x == 0: s = mac[:4] elif x == 1: s += "." + mac[4:8] elif x == 2: s += "." + mac[8:] return s
# ------------------------------------------------------------------------------ try: from collections import MutableMapping except: from collections.abc import MutableMapping
[docs] def flatten(d, parent_key='', sep='_'): """flattens the dictionary Args: d (dict): input can be multi-nested dictionary. parent_key (str, optional): key from previous dictionary to be prefixed with current keys. Defaults to ''. sep (str, optional): keys separator. Defaults to '_'. Returns: dict, list: dictionary of lists if input is dictinoary, list with input dictionary if input is anything else """ items = [] if isinstance(d, dict): for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k if isinstance(v, MutableMapping): items.extend(flatten(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items) else: return [d]
[docs] def dataframe_generate(d): """convert dicationary to dataframe. multi-level dictionary will be converted flattened first inorder to convert to DataFrame. Args: d (dict): input can be multi-nested dictionary. Returns: DataFrame: pandas DataFrame """ new_d = {} for k, v in d.items(): new_d[k] = flatten(v, "") return pd.DataFrame(new_d).fillna("").T
# ------------------------------------------------------------------------------
[docs] def deprycation_warning(fn): print(f"{'-'*80}\nDEPRYCATION WARNING:\n{fn} usage is getting deprycated. kindly refer documentation for alternate option.\n{'-'*80}")
# ------------------------------------------------------------------------
[docs] def printmsg(pre=None, *, post=None, pre_ends="\n", justify_pre=True, justification_len=80): def outer(func): def inner(*args, **kwargs): if pre: if justify_pre: print(pre.ljust(justification_len), end=pre_ends) else: print(pre, end=pre_ends) # fo = func(*args, **kwargs) # if post: print(post) return fo return inner return outer
# ------------------------------------------------------------------------
[docs] def open_text_file(file): """Open Text file in Notepad.exe Args: file (str): file name """ sp.Popen(["notepad.exe", file])
[docs] def open_excel_file(file): """Open Excel file in MS-Excel (excel.exe) Args: file (str): file Raises: Exception: Raise exception if unable to open excel. """ try: sp.Popen(["C:\\Program Files\\Microsoft Office\\root\\Office16\\EXCEL.EXE", file]) except: try: sp.Popen(["C:\\Program Files\\Microsoft Office\\root\\Office16\\EXCEL.EXE", file]) except Exception as e: raise Exception(f"[-] Unable to Open file {file} in excel\n{e}")
[docs] def open_folder(folder): """Open folder Args: file (str): file name """ path = os.path.realpath(folder) os.startfile(path)
# =======
[docs] def abs_command_cisco(cmd): """returns absolute full command for shorteened cmd Args: cmd (str): executed/ captured command ( can be trunked or full ) Returns: str: cisco command - full untrunked """ spl_cmd = cmd.split() for c_cmd in CISCO_ABS_COMMANDS: spl_c_cmd = c_cmd.split() if len(spl_cmd) == len(spl_c_cmd): for i, word in enumerate(spl_cmd): try: word_match = spl_c_cmd[i].startswith(word) if not word_match: break except: word_match = False break if word_match: break else: word_match = False if word_match: return c_cmd return cmd
[docs] def abs_command_juniper(cmd): """returns absolute truked command if any filter applied Args: cmd (str): executed/ captured command ( can be trunked or full ) Returns: str: juniper command - trunked """ abs_cmd = cmd.split("|")[0].strip() for j_cmd in JUNIPER_ABS_COMMANDS: match = abs_cmd == j_cmd if match: return abs_cmd return cmd
# ==========================================================================================
[docs] @dataclass class CapturesOut(): """Class define common methods and properties on captured output file. Args: capture_log_file (str): Output capture file Raises: Exception: _description_ """ # capture_log_file: list[str] = field(default_factory=[]) capture_log_file: str #### To be verify earlier it was given list which seems a typo abs_cmd_function_map = { 'Cisco': abs_command_cisco, 'Juniper': abs_command_juniper, } abs_cmd_map = { 'Cisco': CISCO_ABS_COMMANDS, 'Juniper': JUNIPER_ABS_COMMANDS, } def __post_init__(self): self._read_capture_log_file() self._get_hostname_from_file_name() self._set_device_type() self._device_parameters() self._gen_output_list_dict() # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
[docs] def cmd_output(self, cmd): """provides filtered output of provided command Args: cmd (str): command string Returns: list: list of outout for provided command """ op_list = self._has(cmd) return op_list if op_list else []
[docs] def has(self, cmd): """Checks if outout has provided command output or not. Args: cmd (str): command string Returns: bool: True / False based on match. """ return self._has(cmd) != None
@property def name(self): """Returns device hostname from capture output. Returns: str: hostname of device """ return self.hostname @property def device_manufacturar(self): """Returns device manufacturer from capture output. Returns: str: manufacturer of device """ return self.device_type # Absolute commands map @property def abs_commands(self): """Returns absolute commands Returns: dict: absolute command map """ return self.abs_cmd_map[self.device_type] @property def outputs(self): """returns dictionary of commands: output-list. Returns: dict: outputs splitted in dictionary by its command as key """ return self.output_list_dict # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - # read capture log file and store as full list def _read_capture_log_file(self): self.capture_log_list = read_file(self.capture_log_file) # extract hostname from list def _get_hostname_from_file_name(self): self.hostname = get_file_name(self.capture_log_file, ext=False) # extract device type from list def _set_device_type(self): self.device_type = detect_device_type(self.capture_log_list) # extract other device parameters from list def _device_parameters(self): if self.abs_cmd_function_map.get(self.device_type): self.abs_cmd_fn = self.abs_cmd_function_map[self.device_type] else: raise Exception(f"[-] Invalid configuration, Unable to determine Device type. {self.device_type} for provided capture log file") # generate dictionary by outputs splitted by its command as key def _gen_output_list_dict(self): toggle = 0 self.output_list_dict, op_lst = {}, [] for l in self.capture_log_list: if toggle and l.find(CMD_LINE_START_WITH)>0: self.output_list_dict[abs_cmd] = op_lst op_lst = [] toggle=0 # if l.find(CMD_LINE_START_WITH)>0: toggle = True if self.device_type == 'Juniper': cmd_line_trunked = l[l.find(CMD_LINE_START_WITH)+20:].split("|")[0].strip() else: cmd_line_trunked = l[l.find(CMD_LINE_START_WITH)+20:].strip() abs_cmd = self.abs_cmd_fn( cmd_line_trunked ) continue # if toggle: op_lst.append(l.rstrip()) if toggle: self.output_list_dict[abs_cmd] = op_lst # verify if output has provided command output or not. def _has(self, cmd): if self.device_type == 'Juniper': cmd = cmd.split("|")[0].strip() return self.output_list_dict.get(self.abs_cmd_fn(cmd))
# ==========================================================================================