# -----------------------------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------------------------
from inspect import signature
import pandas as pd
from abc import ABC, abstractproperty, abstractclassmethod
import datetime
from re import compile
from collections import OrderedDict
import os
import threading
from getpass import getpass
from .common import deprycation_warning
intBeginWith = compile(r'^\D+')
# ------------------------------------------------------------------------------
# Standard number of characters for identifing interface short-hand
# ------------------------------------------------------------------------------
PHYSICAL_IFS = OrderedDict()
PHYSICAL_IFS.update({
'Ethernet': 2,
'FastEthernet': 2,
'GigabitEthernet': 2,
'TenGigabitEthernet': 2,
'FortyGigabitEthernet':2,
'HundredGigE':2,
'AppGigabitEthernet': 2,
'mgmt': 4,
'Async': 5,
})
PHYSICAL_IFS['TwoGigabitEthernet'] = 3 # Do not alter sequence of these two.
PHYSICAL_IFS['TwentyFiveGigE'] = 3 # ....
CISCO_IFSH_IDENTIFIERS = {
"VLAN": {'Vlan':2,},
"TUNNEL": {'Tunnel':2,},
"LOOPBACK": {'Loopback':2,} ,
"AGGREGATED": {'Port-channel':2,},
"PHYSICAL": PHYSICAL_IFS,
"NVI": {'NVI':2,},
"NV": {'NV':2,},
}
JUNIPER_IFS_IDENTIFIERS = {
"VLAN": ('irb', 'vlan', 'iw'),
"LOOPBACK": ('lo', ),
"RANGE": ('interface-range', ),
"TUNNEL": ('lt', 'gr', 'ip', 'mt', 'vt', 'vtep', 'xt' ),
"AGGREGATED": ('ae', 'as', 'fc'),
"PHYSICAL": ('fe', 'ge', 'xe', 'et', 'xle', 'fte', ),
"MANAGEMENT": ('mg', 'em', 'me', 'fxp', 'vme', 'bme', 'bm'),
"INTERNAL": ('sxe', 'bcm', 'cp', 'demux', 'dsc', 'es', 'gre', 'ipip', 'ixgbe', 'lc','lsi', 'mo',
'ms', 'pd', 'pimd', 'rlsq', 'rms', 'rsp', 'sp', 'tap', 'umd', 'vsp', 'vc4', ),
"CIRCUIT": ('at', 'cau4', 'ce1', 'coc1', 'coc3', 'coc12', 'coc48', 'cstm1', 'cstm4', 'cstm16',
'ct1', 'ct3', 'ds', 'e1', 'e3', 'ls', 'ml', 'oc3', 'pip', 'se', 'si', 'so', 'stm1', 'stm4', 'stm16',
't1', 't3', ),
"MONITORING": ('dfc', ),
}
# -----------------------------------------------------------------------------
# Common Classes #
# -----------------------------------------------------------------------------
[docs]
def get_username():
"""input username prompt
Returns:
str: entered username
"""
return input("Enter Username: ")
[docs]
def get_password():
"""input password prompt
Returns:
str: entered password
"""
return getpass("Enter Password: ")
[docs]
class Default():
"""Default class representing class docString template"""
def __str__(self): return self.__doc__
def _repr(self):
fields = signature(self.__init__).parameters
values = ", ".join(repr(getattr(self, f)) for f in fields)
return f'{type(self).__name__}({values})'
[docs]
class Container(ABC):
"""Abstract base class providing template for standard dunder
methods template. Object should contain objVar property
"""
@abstractproperty
@property
def objVar(self): pass
def __bool__(self): return True if self.objVar else False
def __len__(self): return len(self.objVar)
def __dir__(self): return self.objVar # sorted
def __getitem__(self, i): return self.objVar[i]
def __setitem__(self, i, v): self.objVar[i] = v
def __delitem__(self, i): del(self.objVar[i])
def __contains__(self, i): return i in self.objVar
def __reversed__(self):
if isinstance(self.objVar, dict):
for k in reversed(tuple(self.objVar.keys())):
yield (k,self[k])
else:
return reversed(self.objVar)
def __missing__(self, i): raise Exception(f'[-] key {i} unavailable') # only for dict subclass
def __iter__(self):
if isinstance(self.objVar, (list, tuple, set, str)):
for line in self.objVar:
yield line
elif isinstance(self.objVar, (dict, OrderedDict)):
for key, value in self.objVar.items():
yield (key, value)
## TBD / NOT IMPLEMENTED YET ##
# class Numeric():
# """Support Numberic objects"""
# def __add__(self): pass
# def __sub__(self): pass
# def __mul__(self): pass
# def __truediv__(self): pass
# def __floordiv__(self): pass
# def __pow__(self): pass
# def __lshift__(self): pass
# def __rshift__(self): pass
# def __and__(self): pass
# def __xor__(self): pass
# def __or__(self): pass
# def __iadd__(self): pass
# def __isub__(self): pass
# def __imul__(self): pass
# def __itruediv__(self): pass
# def __ifloordiv__(self): pass
# def __ipow__(self): pass
# def __ilshift__(self): pass
# def __irshift__(self): pass
# def __iand__(self): pass
# def __ixor__(self): pass
# def __ior__(self): pass
# def __neg__(self): pass
# def __pos__(self): pass
# def __abs__(self): pass
# def __invert__(self): pass
# -----------------------------------------------------------------------------
# STRING OPERATIONS #
# -----------------------------------------------------------------------------
[docs]
class STR(Container):
"""Collection of static methods for string objects.
see more...
"""
[docs]
@staticmethod
def foundPos(s, sub, pos=0):
'''Search for substring in string and return index value result
Args:
s (str): main string to be search within
sub (str): substring which is to be search in to main string
pos (int, optional): position index, search to be start from. Defaults to 0.
Returns:
int: find index value
'''
return s.find(sub, pos)
[docs]
@staticmethod
def found(s, sub, pos=0):
"""Search for substring in string and return Boolean result
Args:
s (str): main string to be search within
sub (str): substring which is to be search in to main string
pos (int, optional): position index, search to be start from. Defaults to 0.
Returns:
bool: find or not
"""
try:
return True if s.find(sub, pos) > -1 else False
except:
return False
[docs]
@staticmethod
def find_within(s, prefix, suffix=None, pos=0):
"""finds characters between prefix and suffix substrings from string
Args:
s (str): main string to be search within
prefix (str): starting substring
suffix (str, optional): ending substring. Defaults to None.
pos (int, optional): position index, search to be start from. Defaults to 0.
Returns:
tuple: (str, int) (returned string, position of returned suffix position)
"""
p = STR.foundPos(s, prefix, pos=pos)+len(prefix)
if suffix is None:
ln = len(s)
else:
ln = STR.foundPos(s, suffix, pos=p+1)
if p == -1:
return None
if ln == -1:
ln = len(s)
return (s[p:ln], ln)
[docs]
@staticmethod
def string_within(line, prefix, suffix=None, pos=0):
"""finds characters between prefix and suffix substrings from string
Args:
line (str): main string to be search within
prefix (str): starting substring
suffix (str, optional): ending substring. Defaults to None.
pos (int, optional): position index, search to be start from. Defaults to 0.
Returns:
tuple: (str, int) (returned string, position of returned suffix position)
"""
return STR.find_within(line, prefix, suffix, pos)[0]
[docs]
@staticmethod
def suffix_index_within(line, prefix, suffix=None, pos=0):
"""finds characters between prefix and suffix substrings from string
Args:
line (str): main string to be search within
prefix (str): starting substring
suffix (str, optional): ending substring. Defaults to None.
pos (int, optional): position index, search to be start from. Defaults to 0.
Returns:
int: index of suffix
"""
return STR.find_within(line, prefix, suffix, pos)[1]
[docs]
@staticmethod
def find_multi(s, sub, start=0, count=None, index=True, beginwith=False):
"""search for multiple substrings 'sub' within string 's'.
Usage: find_multi(s, sub, [start=n, [count=c], index=True])
Args:
s (str): main string
sub (str, tuple, list): sub string ( to be search within main string )
start (int, optional): Optional: substring to be start search from index . Defaults to 0.
count (int, optional): Optional: count of character from start index. Defaults to None.
index (bool, optional): Optional: return index or boolean values. Defaults to True.
beginwith (bool, optional): Optional: check if substring is at beginning. Defaults to False.
Returns:
list: list of indexes/bool
"""
count = len(s) if count is None else count+start
if isinstance(sub, str):
i = s.find(sub, start, count)
if index:
if beginwith:
return i if i == 0 else -1
else:
return i
else:
if beginwith:
return True if i == 0 else False
else:
return False if i == -1 else True
elif isinstance(sub, (tuple, list)):
sl = []
for x in sub:
sl.append(STR.find_multi(s, x, start, count, index, beginwith))
return sl
else:
return None
[docs]
@staticmethod
def find_all(s, sub, start=0, count=None, beginwith=False):
"""search for multiple substrings 'sub' within string 's'
Usage: find_all(s, sub, [start=n, [count=c]])
Args:
s (str): main string
sub (str): sub string ( to be search within main string )
start (int, optional): Optional: substring to be start search from index. Defaults to 0.
count (int, optional): Optional: count of character from start index. Defaults to None.
beginwith (bool, optional): Optional: check if substring is at beginning. . Defaults to False.
Returns:
bool: all matches or not
"""
sl = STR.find_multi(s, sub, start, count, False, beginwith)
try:
return False if False in sl else True
except:
return sl
[docs]
@staticmethod
def find_any(s, sub, start=0, count=None, beginwith=False):
"""search for multiple substrings 'sub' within string 's'
Usage: find_any(s, sub, [start=n, [count=c]])
Args:
s (str): main string
sub (str): sub string ( to be search within main string )
start (int, optional): Optional: substring to be start search from index. Defaults to 0.
count (int, optional): Optional: count of character from start index. Defaults to None.
beginwith (bool, optional): Optional: check if substring is at beginning. . Defaults to False.
Returns:
bool: for atleast one matches
"""
sl = STR.find_multi(s, sub, start, count, False, beginwith)
try:
return True if True in sl else False
except:
return sl
[docs]
@staticmethod
def update(s, searchItem='', replaceItem=''):
"""find n replace string s
Args:
s (str): main string
searchItem (str, optional): search string. Defaults to ''.
replaceItem (str, optional): replacement string. Defaults to ''.
Returns:
str: updated string
"""
return s.replace(searchItem, replaceItem)
[docs]
@staticmethod
def replace_dual_and_split(s, duo=' ', strip=None):
"""Finds subsequent characters in string and replace those with single,
plus, splits the string using provided character (duo).
Args:
s (str): main string
duo (str, optional): characters which requires reductions if susequent. Defaults to ' '.
strip (int, optional): values (-1=lstrip ,0=strip ,1=rstrip) . Defaults to None.
Returns:
list: split using dual characters
"""
return STR.finddualnreplacesingle(s, duo, strip=strip).split(duo)
[docs]
@staticmethod
def finddualnreplacesingle(s, duo=' ', strip=None):
"""Finds subsequent characters in string and replace those with single.
Args:
s (str): Source string
duo (str, optional): characters which requires reductions if susequent. Defaults to ' '.
strip (int, optional): values (-1=lstrip ,0=strip ,1=rstrip). Defaults to None.
Returns:
_type_: _description_
"""
while s.find(duo+duo) > -1:
s = s.replace(duo+duo, duo)
if strip is not None and isinstance(strip, int):
if strip == -1:
return s.lstrip()
elif strip == 0:
return s.strip()
elif strip == 1:
return s.rstrip()
else:
pass
# print('invalid strip value detected', strip)
else:
pass
# print('invalid strip value detected', strip)
return s
[docs]
@staticmethod
def indention(s):
"""get string indention value
Args:
s (str): input string
Returns:
int: number of indentants
"""
return len(s)-len(s.lstrip())
[docs]
@staticmethod
def is_blank_line(s):
"""is provided string/line a blank line
Args:
s (str): input string
Returns:
bool: boolean value for result
"""
try:
return True if len(s.strip()) == 0 else False
except Exception: pass
[docs]
@staticmethod
def is_hostname_line(s, host):
"""string/line containing hostname of device
Args:
s (str): input string
host (str): hostname to be find in provided string
Returns:
bool: boolean value for result
"""
return s.find(host) == 0
[docs]
@staticmethod
def hostname(net_connect):
"""input paramiko netconnection, returns hostname from device.
Args:
net_connect (connection object): paramiko connection object
Returns:
str: hostname from connection
"""
try:
hns = net_connect.find_prompt()[:-1]
atPos = STR.foundPos(hns, "@")
if atPos > -1: hns = hns[atPos+1:]
return hns
except:
pass
[docs]
@staticmethod
def hostname_from_cli(line, command):
"""input standard text input line, for which command was entered
Args:
line (str): input line string
command (str): command
Returns:
str: hostname from command line
"""
if not STR.found(line, command): return None
cmdPos = STR.foundPos(line, command)
hn = line[:cmdPos].strip()[:-1]
return hn
[docs]
@staticmethod
def shrink_if(intName, length=2):
"""Interface Name shortening, input length will decide number of
charactes to be included in shortened output
Args:
intName (str): interface
length (int, optional): interface identifier prefix length. Defaults to 2.
Returns:
str: short name of interface
"""
if not intName: return ""
if intName.lower().startswith("tw"): length=3
iBW = intBeginWith.match(intName)
return iBW.group()[:length]+intName[iBW.span()[1]:]
[docs]
@staticmethod
def if_prefix(intName):
"""Interface beginning Name
Args:
intName (str): interface
Returns:
str: interface prefix
"""
if not intName: return ""
iBW = intBeginWith.match(intName)
return intName[iBW.start(): iBW.end()]
[docs]
@staticmethod
def if_suffix(intName):
"""Interface ending ports
Args:
intName (str): interface
Returns:
str: interface suffix
"""
if not intName: return ""
try:
iBW = intBeginWith.match(intName)
return intName[iBW.end():]
except:
return ""
[docs]
@staticmethod
def if_standardize(intName, expand=True):
"""standardize the interface for uneven length strings.
expand will give fulllength, otherwise it will shrink it to its standard size given
Args:
intName (str): interface
expand (bool, optional): expansion of interface. Defaults to True.
Returns:
str: standardized interface
"""
if not intName: return intName
intName = intName.replace(" ", "")
pfx = STR.if_prefix(standardize_if(intName))
sfx = STR.if_suffix(intName)
for _, inttype_length in CISCO_IFSH_IDENTIFIERS.items():
for int_type, length in inttype_length.items():
if int_type.lower().startswith(pfx.lower()):
if expand:
return f"{int_type}{sfx}"
else:
return f"{int_type[:length]}{sfx}"
return intName
[docs]
@staticmethod
def intf_standardize_or_null(intName, intf_type=None, expand=True):
"""standardize the interface for uneven length strings.
expand will give fulllength, otherwise it will shrink it to its standard size given
for incorrect interface returns blank (None)
Args:
intName (str): interface
expand (bool, optional): expansion of interface. Defaults to True.
Returns:
str: standardized interface or blank
"""
try:
return standardize_if(intName)
except:
return ""
[docs]
@staticmethod
def update_str(s, searchItem='', replaceItem=''):
"""Updates line for search item with replace item
(Find/Repalace)
Args:
s (str): input string
searchItem (str, optional): search item. Defaults to ''.
replaceItem (str, optional): replacement item. Defaults to ''.
Returns:
str: updated string
"""
return s.replace(searchItem, replaceItem)
[docs]
@staticmethod
def get_logfile_name(folder, hn, cmd='', ts='', separator="_@_", extn='.log'):
"""return log file name for the command on device with/wo provided time_stamp
Args:
folder (str): path to where file should be saved
hn (str): file name starting with provided host-name
cmd (str, optional): file name containing additional commands string. Defaults to ''.
ts (str, optional): file name containing additional time stamp. Defaults to ''.
separator (str, optional): hn-cmd-ts separator . Defaults to "_@_".
extn (str, optional): extension of filename. Defaults to '.log'.
Returns:
str: filename along with full path
"""
if ts: ts = separator + ts
cmd += ts
if cmd:
replaceCandidates = ('|', '\\', '/', ':', '*', '?', '"', '<', '>')
for x in replaceCandidates:
cmd = STR.update_str(cmd, x, "_")
cmd = separator + cmd
if folder[-1] not in ( "/", "\\"):
folder += "/"
return folder+hn+cmd+extn
[docs]
@staticmethod
def string_concate(s, s1, conj=''):
"""Concatenate strings s and s1 with conjuctor conj
Args:
s (str): input string
s1 (str): adder string
conj (str, optional): conjuctor. Defaults to ''.
Returns:
str: updated string
"""
return s + s1 if s == '' else s + conj + s1
[docs]
@staticmethod
def right(strg, n):
"""N-number of characters from right side of string
Args:
strg (str): input string
n (int): number of characters from right
Returns:
str: string portion from right
"""
l = len(strg)
return strg[l-n:l]
[docs]
@staticmethod
def mid(strg, pos, n=0):
"""N-number of characters from position in string; default n is till end
Args:
strg (str): input string
pos (int): position from where slice to begin
n (int): number of characters from from slice(pos)
Returns:
str: string portion from middle
"""
l = len(strg)
if n > 0 :
return strg[pos-1:pos+n-1]
else:
return strg[pos-1:]
[docs]
@staticmethod
def to_list(s):
"""Returns list for the provided string - s,
splits string by lines
Args:
s (str): multiline input string
Returns:
list: splitted lines
"""
s = s.split("\n")
for i, x in enumerate(s):
s[i] = x + "\n"
return s
[docs]
@staticmethod
def to_set(s):
"""Return set of values for the provided string - s.
splits string by lines and comma
Args:
s (str): multiline input string
Returns:
set: splitted lines/items
"""
if isinstance(s, str):
_s = []
for _ in s.split('\n'):
_s.extend(_.split(','))
return set(LST.remove_empty_members((_s)))
else:
return set(s)
[docs]
@staticmethod
def prepend_bgp_as(bgp_as, n):
"""`n` number of BGP AS Number prepending string.
Args:
bgp_as (str): bgp as number
n (int): to repeat with
Returns:
str: as-path prepend string
"""
s = ''
for x in range(n): s += str(bgp_as) + " "
return s[:-1]
[docs]
@staticmethod
def ending(line, c):
"""check if line ends with c or not, same as native string.endswith()
addition is it first strips the line and then checks
Args:
line (str): input string
c (str): condition string
Returns:
bool: boolean value for result
"""
return line.strip().endswith(c)
[docs]
@staticmethod
def starting(line, c):
"""check if line starts with c or not, same as native string.startswith()
addition is it first strips the line and then checks
Args:
line (str): input string
c (str): condition string
Returns:
bool: boolean value for result
"""
return line.strip().startswith(c)
# -----------------------------------------------------------------------------
[docs]
def interface_type(ifname):
"""get the interface type from interface string
Args:
ifname (str): interface name/string
Raises:
ValueError: raise error if input missing
Returns:
tuple: tuple with interface type (e.g PHYSICAL, VLAN...) and sub interface type
(e.g FastEthernet, .. ). None if not detected
"""
iname = ''
for i in ifname:
if not i.isdigit(): iname += i
else: break
ifname = iname
if ifname:
for int_type, int_types in CISCO_IFSH_IDENTIFIERS.items():
for sub_int_type in int_types:
if sub_int_type.startswith(ifname):
return (int_type, sub_int_type)
return ("", "")
[docs]
def standardize_if(ifname, expand=False):
"""standardized interface naming
Args:
ifname (str): variable length interface name
expand (bool, optional): expand will make it full length name. Defaults to False.
Raises:
ValueError: if missing with mandatory input
TypeError: if invalid value detected
KeyError: if invalid shorthand key detected
Returns:
str: updated interface string
"""
if not ifname:
raise ValueError("Missing mandatory input ifname")
if not isinstance(expand, bool):
raise TypeError(f"Invalid value detected for input expand, "
f"should be bool.")
if not isinstance(ifname, str):
raise TypeError(f"Invalid value detected for input ifname, "
f"should be str.")
srcifname = ''
for i in ifname:
if not i.isdigit(): srcifname += i
else: break
if not srcifname: return None
try:
it = interface_type(srcifname)
if it:
int_type, int_pfx = it[0], it[1]
else:
return ifname
except:
raise TypeError(f"unable to detect interface type for {srcifname}")
try:
shorthand_len = CISCO_IFSH_IDENTIFIERS[int_type][int_pfx]
except:
if get_juniper_int_type(ifname): return ifname
raise KeyError(f"Invalid shorthand Key detected {int_type}, {int_pfx}")
if expand: return int_pfx+ifname[len(srcifname):]
return int_pfx[:shorthand_len]+ifname[len(srcifname):]
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# FILE OPERATIONS/ CONVERSIONS #
# -----------------------------------------------------------------------------
[docs]
class IO():
"""Collection of static methods for IO objects.
see more...
"""
[docs]
@staticmethod
def copy_text_file(file):
"""copy file.txt to file-copy.txt
Args:
file (str): file name
"""
dst = file[:-4] + "-copy.txt"
with open(file, 'r') as sf:
with open(dst, 'w') as dt:
dt.write(sf.read())
[docs]
@staticmethod
def file_list_for_time_stamp(hn, ts, folder, splitter="_@_" ):
"""collection of files from given folder where hostname (hn)
and timestamp (ts) found in the file name.
Args:
hn (str): hostname
ts (str): time-stamp
folder (str): folder ptah
splitter (str, optional): splitter. Defaults to "_@_".
Returns:
set: files from log files
"""
files = set()
for file in os.listdir(folder):
if not splitter in file: continue
if hn in file and ts in file:
files.add(file)
return files
[docs]
@staticmethod
def devices_on_log_files(folder, splitter="_@_"):
"""collection of files from given folder where file extensions are .log
Args:
folder (str): folder path
splitter (str, optional): splitter. Defaults to "_@_".
Returns:
set: devices names from log files
"""
devices = set()
for file in os.listdir(folder):
if not splitter in file: continue
hn = file.split(splitter)
if hn[0][-4:] == '.log': hn[0] = hn[0][:-4]
devices.add(hn[0])
return devices
[docs]
@staticmethod
def timestamps_for_device(devname, folder, splitter="_@_"):
"""collection of time stamps of files from given folder
for given hostnames.
Args:
devname (str): hostname
folder (str): folder path
splitter (str, optional): splitter. Defaults to "_@_".
Returns:
set: time stampls from log files.
"""
stamps = set()
for file in os.listdir(folder):
if not splitter in file: continue
if devname in file:
stamp = file.split(splitter)
if stamp[-1][-4:] == '.log': stamp[-1] = stamp[-1][:-4]
stamps.add(stamp[-1])
return stamps
[docs]
@staticmethod
def file_to_str(file):
"""Returns string output content of the provided file
Args:
file (str): input file
Returns:
str: content of file
"""
with open(file, 'r') as f:
s = f.read()
return s
[docs]
@staticmethod
def file_to_list(file):
"""Returns list output content of the provided file
Args:
file (str): input file
Returns:
list: content of file
"""
file = file.strip()
if file is None: return None
with open(file, 'r') as f:
lines = f.readlines()
return lines
[docs]
@staticmethod
def csv_to_tuple(csv):
"""Returns tuple from the provided comma separated text values
Args:
csv (str): input string
Returns:
tuple: comma separated values
"""
if csv.find('"') and not csv.find('\"'):
ln = csv.lstrip().split('"')
return tuple([x for i, x in enumerate(ln) if i % 2 != 0])
else:
return tuple(csv.split(','))
[docs]
@staticmethod
def to_file(filename, matter):
"""Creates a file with matter
Args:
filename (str): filename with path to be creaed.
matter (str, list, tuple): matter to write to new created file.
"""
with open(filename, 'w') as f:
if isinstance(matter, str):
f.write(matter)
elif isinstance(matter, (list, tuple, set)):
f.write("\n".join(matter))
return filename
[docs]
@staticmethod
def add_to_file(filename, matter, cr=True):
"""Writes List/text to output filename.
Args:
filename (str): input file
matter (str, tuple, list): matter to write to new created file.
cr (bool, optional): carriage return to add at end of each string/line. Defaults to True.
Returns:
_type_: _description_
"""
if not filename: return None
if isinstance(matter, str):
if cr and matter and matter[-1] != "\n": matter += "\n"
with open(filename, 'a') as f:
f.write(matter)
elif isinstance(matter, (list, tuple ,set)):
for i in matter:
IO.add_to_file(filename, i)
[docs]
@staticmethod
def update(file, find_item, replace_item):
"""Find and Replace on provided file and saves file
Args:
file (str): input file
find_item (str): search item
replace_item (str): replacement item
"""
with open(file, 'r') as f:
filedata = f.read()
replace_item = str(replace_item)
if replace_item == 'nan': replace_item = ''
newdata = filedata.replace(find_item, str(replace_item))
with open(file, 'w') as f:
f.write(newdata)
[docs]
@staticmethod
def jinja_verification(folder):
"""check all text files from provided folder for verification of jinja strings descrepencies
Args:
folder (str): folder to check all jinja files from
Returns:
str: verification outcomes
"""
s = ''
for file in os.listdir(folder):
goahead = {'GOAHEAD FOR': 0, 'GOAHEAD END': 0,}
repeatfor = {'REPEAT EACH': 0, 'REPEAT STOP': 0,}
if not file.endswith(".txt"): continue
with open(folder + "/" + file, 'r') as f:
rf = f.read()
for k, v in goahead.items(): goahead[k] = rf.count(k)
for k, v in repeatfor.items(): repeatfor[k] = rf.count(k)
bg, eg = goahead['GOAHEAD FOR'], goahead['GOAHEAD END']
br, er = repeatfor['REPEAT EACH'], repeatfor['REPEAT STOP']
if bg != eg or br != er: s += f'Descrepencies found in file: <{file}>\n'
if bg != eg: s += f"\tGOAHEAD conditions : begins {bg} v/s ends {eg}\n"
if br != er: s += f"\tREPEAT conditions : begins {br} v/s ends {er}\n\n"
return s
# -----------------------------------------------------------------------------
# LIST MODIFICATIONS #
# -----------------------------------------------------------------------------
[docs]
class LST():
""" Collection of static methods for list objects.
see more...
"""
[docs]
@staticmethod
def remove_empty_members(lst):
"""house keeping of list, removes empty members from list
Args:
lst (list): input list
Returns:
list: updated list
"""
empty_members = ('', None, 'N/A', 'nil', [])
tmp_lst = [m for m in lst if not m in empty_members]
return tmp_lst
[docs]
@staticmethod
def expand_vlan_list(vlan_list):
"""takes input vlan list, expands it's ranges if any within
Args:
vlan_list (list): input vlans list
Raises:
Exception: Invalid vlan number
Returns:
list: list of vlans (individual)
"""
exp_vl_list = set()
for v in vlan_list:
if not v: continue
try:
vl = int(v)
exp_vl_list.add(vl)
continue
except:
s, e = v.split("-")
try:
s, e = int(s), int(e)
r = set(range(s, e+1))
exp_vl_list = exp_vl_list.union(r)
except:
raise Exception(f"[-] Invalid vlan number. Expected int got {type(s)}, {type(e)}")
return exp_vl_list
[docs]
@staticmethod
def convert_vlans_list_to_range_of_vlans_list(vlan_list):
"""converts list of individual vlans to a list of range of vlans
Args:
vlan_list (str): input list
Returns:
list: list of vlans (with ranges)
"""
vlans_dict, vlans_list, prev, last_key = {}, [], "", ""
vlan_list = sorted(LST.expand_vlan_list(vlan_list))
for i, vlan in enumerate(vlan_list):
if not vlan: continue
if i == 0:
prev = vlan
last_key = vlan
continue
if vlan == prev + 1:
prev = vlan
continue
else:
vlans_dict[last_key] = prev
prev = vlan
last_key = vlan
else:
vlans_dict[last_key] = prev
for k, v in vlans_dict.items():
if k == v:
vlans_list.append(str(k))
else:
vlans_list.append(str(k) + "-" + str(v))
return vlans_list
[docs]
@staticmethod
def list_variants(input_list):
"""list of vlans in different format list of vlans, space separated string, comma separated string,
Args:
input_list (list): input list
Returns:
dict: list variants (str, csv, ssv)
"""
str_list = [str(_)
for _ in LST.convert_vlans_list_to_range_of_vlans_list(input_list)]
ssv_list = " ".join(str_list)
csv_list = ",".join(str_list)
return {
'str_list': str_list,
'ssv_list': ssv_list,
'csv_list': csv_list,
}
[docs]
@staticmethod
def list_of_devices(list_of_files):
"""get hostnames (first index item) from list of files.
Args:
list_of_files (list): input file list
Returns:
set: devices from file list
"""
devices = set()
for file in list_of_files:
if not file.strip(): continue
f = ".".join(os.path.basename(file).split(".")[:-1])
hn = f.split("_")[0]
if not hn in devices: devices.add(hn)
return devices
[docs]
@staticmethod
def split(lst, n):
"""yield provided list with group of n number of items
Args:
lst (list): list of items
n (int): items per group
Yields:
list generator: group of items
"""
s = 0
lst = tuple(lst)
for _ in range(s, len(lst), n):
yield lst[_: s+n]
s += n
[docs]
@staticmethod
def list_to_octet(lst):
"""joins and return string with provided list with '.', helpful in created ipv4 string with list of 4 numeric items. Doesn't verify correctness of ip. any number can be concatenated with any numbers of instances.
Args:
lst (list): input ip address splitted by "."
Returns:
str: concatenated ip address
"""
"""
Returns str
"""
l = ''
for x in lst: l = str(x) if l == '' else l +'.'+ str(x)
return l
[docs]
@staticmethod
def flatten(lst):
"""flattens nested list to single list
Returns:
list: flattened list
"""
l = []
for _ in lst:
if isinstance(_, str):
l.append(_)
elif isinstance(_, (set, tuple, list)):
l.extend( LST.flatten(_) )
return l
[docs]
@staticmethod
def longest_str_len(lst):
"""returns length of longest string from provided lst , members shoudl be of string type
Args:
lst (list): list of strings
Returns:
int: length of maximum length command
"""
if lst:
return max(len(str(_)) for _ in lst)
else:
return 0
# -----------------------------------------------------------------------------
# DICTIONARY MODIFICATIONS #
# -----------------------------------------------------------------------------
[docs]
class DIC():
"""Collection of static methods for dictionary objects.
see more...
"""
# INTERNAL : update dictionary d for provided keyvalue pairs
# param: d: dest dictionary
# param: kv: src dictionary with key value pairs
# --> updated destn dict
@staticmethod
def __update_keyValue(d, kv):
if isinstance(d, dict):
for k, v in kv.items():
if isinstance(v, dict):
for x, y in v.items():
d = DIC.merge_dict(d, kv)
else:
d[k] = v
return d
[docs]
@staticmethod
def merge_dict(dx, dy):
"""Merges two dictionaries for identical keys
Args:
dx (dict): first dictionary
dy (dict): second dictionary
Returns:
dict: merged dictionary
"""
for k, v in dy.items():
try:
dx[k] = DIC.__update_keyValue(dx[k], dy[k])
except:
dx[k] = dy[k]
return dx
[docs]
@staticmethod
def recursive_dic(dic, indention=0):
"""convert dictionary (dic) to string. recursive dictionary increases indention. (deprycated, will be replaced with `dict_to_str`)
Args:
dic (dict): input dictionary
indention (int, optional): number of spaces. Defaults to 0.
Returns:
str: Multiline string
"""
s = ""
if isinstance(dic, dict):
for k, v in dic.items():
s += f"{' '*indention}{k}\n"
indention += 1
s += DIC.recursive_dic(v, indention)
indention -= 1
elif isinstance(dic, (tuple,list,set)):
for x in dic:
if x: s += str(x)+'\n'
elif isinstance(dic, str):
if dic: s+= f" {' '*indention}{dic}\n"
return s
[docs]
@staticmethod
def dict_to_str(dic, indention=0):
"""convert dictionary (dic) to string. recursive dictionary increases indention.
Args:
dic (dict): input dictionary
indention (int, optional): number of spaces. Defaults to 0.
Returns:
str: Multiline string
"""
LST.recursive_dic(dic, indention)
# -----------------------------------------------------------------------------
# DICTIONARY DIFFERECES #
# -----------------------------------------------------------------------------
[docs]
class DifferenceDict(dict):
"""Template class to get difference in two dictionary objects.
use dunder +/- for adds/removes.
"""
missing = "- "
additive = "+ "
def __init__(self, d):
self.d = d
def __sub__(self, d): return self.get_change(d, self.missing)
def __add__(self, d): return self.get_change(d, self.additive)
[docs]
def get_change(self, d, change):
"""compare current object/dict with provided new object/dict (ie: d)
and return differences based on change required ("- "/"+ ")
Args:
d (dict): dictionary of differences
change (str): change type from ("- "/"+ ") i.e. removed/added
Returns:
_type_: _description_
"""
"""
"""
if isinstance(d, DifferenceDict):
return dict_differences(self.d, d.d, change)
elif isinstance(d, dict):
return dict_differences(self.d, d, change)
# INTERNAL / RECURSIVE
# returns differences for provided subnet/change
# input subject can be of string/int/float/set/dictionary
# input change is change type prefix
# return value type depends on input subject type.
def _get_differences(subject, change):
if isinstance(subject, (str, int, float)):
diff = change + str(subject)
elif isinstance(subject, (list, tuple)):
diff = []
for item in subject:
df = _get_differences(item, change)
diff.append(df)
elif isinstance(subject, set):
diff = set()
for item in subject:
df = _get_differences(item, change)
diff.add(df)
elif isinstance(subject, dict):
diff = dict()
for key, value in subject.items():
key = change + str(key)
if value:
diff[key] = _get_differences(value, change)
else:
diff[key] = ''
else:
raise Exception(f"[-] InvalidSubjectTypeError: {type(subject)}:{subject}")
return diff
[docs]
def dict_differences(d1, d2, change):
"""returns differences for provided two dictionaries
input d1, d2 type: string/int/float/set/dictionary
input change is change type prefix (ex: " -", " +")
return value type depends on input d1, d2 type.
Args:
d1 (str, int, float, set, dict): input 1
d2 (str, int, float, set, dict): input 2
change (str): change type from ("- "/"+ ") i.e. removed/added
Raises:
Exception: TypeMismatch
Returns:
dict: differences in dictionary format
"""
diff = {}
if d1 == d2: return None
if (not (isinstance(d1, (dict, set)) or isinstance(d2, (dict, set))) and
type(d1) != type(d2)):
raise Exception(f"[-] TypeMismatch- d1:{type(d1)}d2:{type(d2)} - {d1}{d2}")
if isinstance(d1, dict):
for k_d1, v_d1 in d1.items():
if k_d1 not in d2:
diff.update( _get_differences({k_d1: v_d1}, change) )
continue
if v_d1 == d2[k_d1]: continue
diff[k_d1] = dict_differences(v_d1, d2[k_d1], change)
elif isinstance(d1, set):
diff = _get_differences(d1.difference(d2), change)
else:
if d1:
diff = _get_differences(d1, change)
return diff
# -----------------------------------------------------------------------------
# Common Dictionary Methods #
# -----------------------------------------------------------------------------
[docs]
class DictMethods():
"""PAPA DUNDER EXTENSIONS FOR DICTIONARY OBJECTS
[self.dic is abstract property which gets iterates over]
"""
def __iter__(self):
for k, v in self.dic.items():
yield (k, v)
def __getitem__(self, item):
try:
return self.dic[item]
except KeyError:
return None
def __get__(self, key, item):
try:
return self[key][item]
except KeyError:
return None
def __setitem__(self, item, value):
self.dic[item] = value
def __delitem__(self, srno):
try:
for k in sorted(self.dic.keys()):
if k <= srno: continue
self.dic[k-1] = self.dic[k]
del(self.dic[k])
except:
raise KeyError
[docs]
def append(self, item, value):
"""appends value to self[item] dictionary. create new list if no value found for item, appends to list if available.
Args:
item (str, int): key of dictionary
value (list): value to be added to dictionary item
Raises:
Exception: WrongInput
"""
try:
if not self.dic.get(item):
self.dic[item] = []
elif isinstance(self.dic[item], (str, int)):
self.dic[item] = [self.dic[item],]
self.dic[item].append(value)
except:
raise Exception
# -----------------------------------------------------------------------------
# LOG OPERATIONS #
# -----------------------------------------------------------------------------
[docs]
class LOG():
"""Collection of static methods for logging.
see more...
"""
[docs]
@staticmethod
def time_stamp():
"""current time stamp (for log purpose)
Returns:
str: current datetime string
"""
return str(datetime.datetime.now())[:19]
# -----------------------------------------------------------------------------
# D-B OPERATIONS #
# -----------------------------------------------------------------------------
[docs]
class DB():
"""Collection of static methods for Database.
see more...
"""
[docs]
@staticmethod
def read_excel(file, sheet='Sheet1', **kwargs):
"""reads a sheet from an excel
Args:
file (str): input excel file
sheet (str, optional): sheet name. Defaults to 'Sheet1'.
Returns:
DataFrame: pandas Dataframe Object
"""
deprycation_warning("class: DB")
return pd.read_excel(file, sheet_name=sheet, **kwargs)
# ------------------------------------------------------------------------------
# Excel Data WRITE Class, use with context manager
# ------------------------------------------------------------------------------
[docs]
class XL_WRITE():
"""Excel file generate
Deprycated: use methods available in nettoolkit.nettoolkitdb ( append_to_xl, write_to_xl) instead.
This will be removed in future version
Args:
hostname (str): excel file
folder (str): folder path
index (bool, optional): write with index or not. Defaults to False.
Returns:
XL_WRITE: object instance
"""
# Object Initializer
def __init__(self, hostname, folder, index=False, **sht_df):
"""initializer
"""
deprycation_warning("class: XL_WRITE")
i = 0
self.hostname = hostname
self.folder = folder
while True:
try:
self.__create_excel(hostname, index, **sht_df)
break
except PermissionError:
i += 1
hostname = self.hostname+" ("+str(i)+")"
except Exception as e:
print(e)
break
def __repr__(self): return self.op_file
# write to Excel/ INTERNAL
def __create_excel(self, hostname, index, **sht_df):
try:
n = 0
XL_READ(self.folder + '/'+hostname+'.xlsx', 'tables')
while True:
n += 1
XL_READ(self.folder + '/'+hostname+'-'+str(n)+'.xlsx', 'tables')
except:
if n == 0:
op_file = self.folder + '/'+hostname+'.xlsx'
else:
op_file = self.folder + '/'+hostname+'-'+str(n)+'.xlsx'
self.op_file = op_file
with pd.ExcelWriter(op_file) as writer_file:
for sht_name, df in sht_df.items():
df.to_excel(writer_file, sheet_name=sht_name, index=index)
# ------------------------------------------------------------------------------
# Excel Data Read Class
# ------------------------------------------------------------------------------
[docs]
class XL_READ:
"""Excel file read
DEPRYCATED METHOD -- use - methods available in nettoolkit.nettoolkitdb ( read_xl_all_sheet, read_an_xl_sheet, ) instead.
This will be removed in future version
Args:
xl (str): Excel file name
shtName (str, optional): sheet name. Defaults to 'Sheet1'.
Returns:
XL_READ: object instance
Yields:
tuple: sheetname, dataframe of the sheet
"""
def __init__(self, xl, shtName='Sheet1'):
"""initializer
"""
deprycation_warning("class: XL_READ")
self.df = pd.read_excel(xl, sheet_name=shtName)
def __repr__(self):
return 'Excel data reprezenting class as DataFrame Object - obj.df'
def __len__(self): return self.df.last_valid_index()+1
def __iter__(self):
for header, value in self.df.items(): yield (header, value)
def __getitem__(self, key): return self.df[key]
[docs]
def filter(self, df=None, **kwarg):
"""Filter Records
Args:
df (DataFrame, optional): pandas DataFrame. Defaults to None.
Returns:
DataFrame: filtered DataFrame
"""
tmpdf = self.df if df is None else df
for k, v in kwarg.items():
try:
tmpdf = tmpdf[tmpdf[k]==v]
except: pass
return tmpdf
[docs]
def column_values(self, column, **kwarg):
"""selected column output, after filters applied
Args:
column (str, list): a single column name or , list of column names
Returns:
DataFrame: filtered columns
"""
return self.filter(**kwarg)[column]
# -----------------------------------------------------------------------------
# Execution secquences #
# -----------------------------------------------------------------------------
[docs]
class Multi_Execution(Default):
"""Template methods for multi-threaded executions.
[self.items items are eligible threaded candidates]
"""
max_connections = 100
def __str__(self): return self._repr()
def __init__(self, items=None):
self.items = items
[docs]
def execute_steps(self, multi_thread=True):
"""steps defining executions
Args:
multi_thread (bool, optional): Multithread or Sequencial. Defaults to True.
"""
self.start(multi_thread)
[docs]
def start(self, multi_thread=True):
"""starting up executins either threaded/sequencial
Args:
multi_thread (bool, optional): Multithread or Sequencial. Defaults to True.
Returns:
None: None
"""
if not self.items: return None
if multi_thread:
self.execute_mt()
else:
self.execute_sequencial()
[docs]
def end(self):
"""Closure process"""
pass
[docs]
def get_devices(self):
"""get devices names from list of files"""
self.devices = LST.list_of_devices(self.files)
[docs]
def execute_mt(self):
"""threaded execution in groups
(self.max_connections defines max threaded processes)
"""
for group, items in enumerate(LST.split(self.items, self.max_connections)):
self.execute_threads_max(items)
[docs]
def execute_threads_max(self, item_list):
"""threaded execution of a group
Args:
item_list (list, set, tuple): items to be iterated on
"""
ts = []
for hn in item_list:
t = threading.Thread(target=self.execute, args=(hn,) )
t.start()
ts.append(t)
for t in ts: t.join()
[docs]
def execute_sequencial(self):
"""sequencial execution of items
"""
for hn in self.items: self.execute(hn)
[docs]
@abstractclassmethod
def execute(self, hn):
"""abstract class method, to be executed for each item in self.items
Args:
hn (str, int, float): individual items to be executed for each
"""
pass
[docs]
def get_juniper_int_type(intf):
"""returns juniper interface type
Args:
intf (str): juniper interface
Returns:
str: interface type (ge, xe, etc)
"""
int_type = ""
for k, v in JUNIPER_IFS_IDENTIFIERS.items():
if intf.startswith(v):
int_type = k
break
return int_type
[docs]
def get_cisco_int_type(intf):
"""returns cisco interface type
Args:
intf (str): cisco interface
Returns:
str: interface type (FastEthernet, GigEthernet, etc..)
"""
return interface_type(intf)[0].lower()
[docs]
def get_device_manu(intf):
"""returns device type from provided interface
Args:
intf (str): interface
Returns:
device type: manufacturer (cisco, juniper)
"""
jit = get_juniper_int_type(intf)
cit = get_cisco_int_type(intf)
if jit.lower() == 'physical': return "juniper"
if cit.lower() == 'physical': return "cisco"
return ""