Source code for nettoolkit.configure.configure
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import pandas as pd
from netmiko import ConnectHandler
from itertools import zip_longest
from time import sleep
import traceback
from nettoolkit.nettoolkit_common import printmsg, STR, LST, Multi_Execution, print_banner
from nettoolkit.nettoolkit_db import read_xl_all_sheet
from nettoolkit.detect import DeviceType
# -----------------------------------------------------------------------------
[docs]
class ConfigEnvironmentals(Multi_Execution):
"""Configuration Object environmental properties. Inherits Multi_Execution
"""
def __init__(self, auth, log_folder, config_log, exec_log, exec_display):
self.auth = auth
self.log_folder = log_folder
self.config_log = config_log
self.exec_log = exec_log
self.exec_display = exec_display
self.config_env = {
'log_folder': log_folder,
'config_log': config_log,
'exec_log': exec_log,
'exec_display': exec_display,
}
# -----------------------------------------------------------------------------
[docs]
class Config_common():
"""Common Methods and properties for Configuration classes
"""
[docs]
def get_device_type(self, ip, auth):
"""detecting device type (cisco, juniper)
Args:
ip (str): device ip
auth (dict): authentication dicationary with 'un', 'pw'. 'en' keys.
Returns:
str: device type if detected, else None
"""
try:
dev = DeviceType(dev_ip=ip,
un=auth['un'],
pw=auth['pw'],
)
self.write_exec_log(ip, f"[+] {ip} - Device Type Detection successful - {dev.dtype}")
return dev.dtype
except Exception as e:
self.write_exec_log(ip, f"[-] {ip} - Device Type Detection Failed with Exception \n{e}")
return None
[docs]
def write_config_log(self, host, log):
"""send out the configuration log to a log file
Args:
host (str): host/device name
log (str/multiline): log to be write to
"""
if self.config_log and self.log_folder:
self.write_exec_log(host, f"[+] writing configuration application log @ {self.log_folder}/{host}-config-apply.log")
with open(f"{self.log_folder}/{host}-config-apply.log", 'a') as f:
f.write(log)
self.write_exec_log(host, f"[+] writing configuration application log @ {self.log_folder}/{host}-config-apply.log\t...done")
[docs]
def write_exec_log(self, host, s, ends='\n'):
"""writes execution log (internal)
Args:
host (str): host/device name
s (str/multiline): execution log content
ends (str, optional): End string. Defaults to enter.
"""
if self.exec_display: print(s)
if self.exec_log and self.log_folder:
with open(f"{self.log_folder}/{host}-exec.log", 'a') as f:
f.write(s+ends)
[docs]
def send_configuration(self, conf_list):
"""sends provided list of configuration to self device connection
Args:
conf_list (list): configuration change list
Returns:
bool: success/fail
"""
self.write_exec_log(self.conn.host, f"[+] applying config to {self.device_type} // {self.conn.host} // {self.ip}")
try:
self.op_return = self.conn.send_config_set(conf_list)
self.write_exec_log(self.conn.host, f"[+] applying config to {self.device_type} // {self.conn.host} // {self.ip}\t...done")
return True
except:
self.write_exec_log(self.conn.host, f"[-] applying config to {self.device_type} // {self.conn.host} // {self.ip}\t...Failed")
return False
[docs]
def get_connection(self):
"""retrive a new connection
Returns:
conn: connection object
"""
try:
conn = ConnectHandler(**self.dev_var)
self.connectionsuccess = True
return conn
except:
self.write_exec_log(self.ip, f"[-] Connection Failed to establish {self.device_type} // No connection // {self.ip}", ends="\n\n")
self.connectionsuccess = False
return None
[docs]
def terminate_connection(self):
"""terminate active connection
"""
try:
self.conn.disconnect()
except:
pass
[docs]
def set_hostname(self):
"""retrive hostname from current connection
"""
try:
self.dev_var['host'] = STR.hostname(self.conn).lower()
except:
self.write_exec_log(self.conn.host, f"[-] Hostname Retrival failed for device {self.ip} ")
self.dev_var['host'] = self.ip
self.hn = self.dev_var['host']
# ----------------------------------------------------------------------------------------------------
[docs]
class Configure(Config_common):
"""Configure class to do configuration on a Cisco IOS or Juniper Junos device
Inherits Config_common
Args:
ip (str): device ip address or FQDN
auth (dict): authentication dicationary with 'un', 'pw'. 'en' keys.
conf_list (list, optional): configuration change list. Defaults to None. Either
conf_file (str, optional): configuration change file. Defaults to None. Or
log_folder (str, optional): folder where logs to be stored. Defaults to None.
config_log (bool, optional): generate configuration log. Defaults to True.
exec_log (bool, optional): generate execution log. Defaults to True.
exec_display (bool, optional): on screen display execution log. Defaults to True.
"""
def __init__(self, ip, auth,
conf_list=None,
conf_file=None,
log_folder=None,
config_log=True,
exec_log=True,
exec_display=True,
):
self.ip = ip
self.auth = auth
self.conf_list = conf_list
self.conf_file = conf_file # prefered
self.log_folder = log_folder
self.config_log = config_log
self.exec_log = exec_log
self.exec_display = exec_display
self._get_conf_list_from_file()
def _get_conf_list_from_file(self):
if self.conf_file:
try:
with open(self.conf_file, 'r') as f:
conf_list = f.readlines()
conf_list = [ _.rstrip() for _ in conf_list ]
except:
self.write_exec_log(self.conn.host, f"[-] Error Reading file {self.conf_file}", ends="\n\n")
return None
if self.conf_list and conf_list:
_d = input(f"[-] DUAL INPUT DETECTED, conf_list as well as conf_file. configuration file will override list. Continue [Y/N]")
if _d.upper() != 'Y': quit()
if conf_list:
self.conf_list = conf_list
[docs]
def apply(self):
"""apply the configuration to active connection
"""
if not self.conf_list:
self.write_exec_log(self.conn.host, f"[-] No configurations to apply for {self.ip} // configuration=[{self.conf_list}]")
if isinstance(self.conf_list, str):
self.conf_list = [self.conf_list, ]
self.device_type = self.get_device_type(self.ip, self.auth)
self.dev_var = {'device_type': self.device_type,
'ip': self.ip,
'username': self.auth['un'],
'password': self.auth['pw'],
'secret': self.auth['en'] if self.auth.get('en') else self.auth['pw'],
}
self._start_push()
def _start_push(self):
if self.device_type == 'juniper_junos': self.juniper_push()
elif self.device_type == 'cisco_ios': self.cisco_push()
else: print(f"[-] Undetected device {self.ip}")
## -------------- Juniper ------------------
[docs]
def juniper_push(self):
"""method defining configuration push for Juniper devices
Returns:
bool/None: False if unable to connect, None after connection terminate
"""
if self.conf_list[-1] != 'commit check':
self.conf_list.append("commit check")
#
self.conn = self.get_connection()
if not self.connectionsuccess: return False
self.set_hostname()
#
send_conf = self.send_configuration(self.conf_list)
if not send_conf:
self.write_exec_log(self.conn.host, f"[-] Termination without configuration apply for {self.device_type} // {self.conn.host} // {self.ip}", ends="\n\n")
self.terminate_connection()
return None
self.write_config_log(self.conn.host, self.op_return)
#
check = self.juniper_verify_push_op(self.op_return)
if not check:
self.write_exec_log(self.conn.host, f"[-] ERROR: Termination without configuration apply for {self.device_type} // {self.conn.host} // {self.ip}", ends="\n\n")
self.terminate_connection()
return None
#
commit_return = self.juniper_commit()
self.juniper_verify_commit_op(commit_return)
#
self.terminate_connection()
[docs]
def juniper_verify_push_op(self, op):
"""verifications on juniper configuration push output
Args:
op (multiline str): configuaration log output
Returns:
bool: success or syntex error
"""
check = False
self.write_exec_log(self.conn.host, f"[+] checking applied configuration for {self.device_type} // {self.conn.host} // {self.ip}")
for line in op.splitlines():
if line.strip().startswith('syntax error'): break
check = line == "configuration check succeeds"
if check: break
if check:
self.write_exec_log(self.conn.host, f"[+] checking applied configuration for {self.device_type} // {self.conn.host} // {self.ip}\t...done" )
else:
self.write_exec_log(self.conn.host, f"[-] checking applied configuration for {self.device_type} // {self.conn.host} // {self.ip}\t...Failed\n. Re-Check configuration manually before commit\nGot:\n{self.op_return}")
return check
[docs]
def juniper_verify_commit_op(self, op):
"""verification of commit
Args:
op (multiline str): configuaration log output
"""
self.write_exec_log(self.conn.host, f"[+] verifying configuration commit to {self.device_type} // {self.conn.host} // {self.ip}")
check = 0
for line in op.splitlines():
if (line.strip().startswith("configuration check succeeds")
or line.strip().startswith("commit complete")
):
check+=1
#
if check == 2:
self.write_exec_log(self.conn.host, f"[+] verifying configuration commit to {self.device_type} // {self.conn.host} // {self.ip}\t...done")
else:
self.write_exec_log(self.conn.host, f"[-] verifying configuration commit to {self.device_type} // {self.conn.host} // {self.ip}\t...Failed\nGot\n{op}")
[docs]
def juniper_commit(self):
"""commiting the pushed juniper configurations.
Returns:
bool: success or fail
"""
self.write_exec_log(self.conn.host, f"[+] commiting configurations to {self.device_type} // {self.conn.host} // {self.ip}")
try:
commit_return = self.conn.commit()
self.write_exec_log(self.conn.host, f"[+] commiting configurations to {self.device_type} // {self.conn.host} // {self.ip}\t...done")
return commit_return
except:
self.write_exec_log(self.conn.host, f"[-] commiting configurations to {self.device_type} // {self.conn.host} // {self.ip}\t...failed\nGot\n{commit_return}")
return False
## -------------- Cisco ------------------
[docs]
def cisco_enable(self):
"""method to enable device mode
"""
if any( [
self.device_type == 'cisco_ios'
] ):
for tries in range(3):
try:
if self.conn.check_enable_mode():
break
self.conn.enable(cmd="enable")
break
except:
self.write_exec_log(self.hn, f"[-] {self.hn} - enable failed on attemp {tries}")
continue
[docs]
def cisco_push(self):
"""method defining configuration push for Cisco devices
Returns:
bool/None: False if unable to connect, None after connection terminate
"""
self.conn = self.get_connection()
if not self.connectionsuccess: return False
self.set_hostname()
self.cisco_enable()
#
send_conf = self.send_configuration(self.conf_list)
if not send_conf:
self.write_exec_log(self.conn.host, f"[-] Termination without configuration apply for {self.device_type} // {self.conn.host} // {self.ip}", ends="\n\n")
self.terminate_connection()
return None
#
self.write_config_log(self.conn.host, self.op_return)
#
error = self.cisco_verify_push_op(self.op_return)
if error:
self.write_exec_log(self.conn.host, f"[-] ERROR: Termination without configuration apply for {self.device_type} // {self.conn.host} // {self.ip}", ends="\n\n")
self.terminate_connection()
return None
#
_return = self.cisco_commit()
self.terminate_connection()
[docs]
def cisco_verify_push_op(self, op):
"""verifications on cisco configuration push output
Args:
op (multiline str): configuaration log output
Returns:
bool: success or syntex error
"""
error = False
self.write_exec_log(self.conn.host, f"[+] checking applied configuration for {self.device_type} // {self.conn.host} // {self.ip}" )
for line in op.splitlines():
error = line.strip().startswith("^")
if error: break
#
if error:
self.write_exec_log(self.conn.host, f"[-] checking applied configuration for {self.device_type} // {self.conn.host} // {self.ip}\t...Failed\n. Re-Check configuration manually and reapply\nGot:\n{self.op_return}")
else:
self.write_exec_log(self.conn.host, f"[+] checking applied configuration for {self.device_type} // {self.conn.host} // {self.ip}\t...done")
return error
# save config
[docs]
def cisco_commit(self):
"""write mem on cisco device
Returns:
bool: success or fail
"""
self.write_exec_log(self.conn.host, f"[+] saving configurations for {self.device_type} // {self.conn.host} // {self.ip}")
try:
_return = self.conn.save_config()
self.write_exec_log(self.conn.host, f"[+] saving configurations for {self.device_type} // {self.conn.host} // {self.ip}\t...done")
return _return
except:
self.write_exec_log(self.conn.host, f"[+] saving configurations for {self.device_type} // {self.conn.host} // {self.ip}\t...failed\nGot\n{_return}")
return False
# ----------------------------------------------------------------------------------------------------
[docs]
class GroupsConfigure(Multi_Execution):
"""Configure class to do configuration on a multiple group of devices at a time.
Inherits Multi_Execution
Args:
auth (dict): authentication dicationary with 'un', 'pw'. 'en' keys.
devices_config_dict (dict, optional): {device:[list of config], } . Defaults to {}.
config_by_order (bool, optional): if True follows execution in provided order_list entries. Defaults to True.
order_list (list, optional): order list in which execution to be done. Defaults to [].
dev_apply_at_dict (dict, optional): time to apply config at (under implementation). Defaults to {}.
log_folder (str, optional): folder where logs to be stored. Defaults to None.
config_log (bool, optional): generate configuration log. Defaults to True.
exec_log (bool, optional): generate execution log. Defaults to True.
exec_display (bool, optional): on screen display execution log. Defaults to True.
configure (bool, optional): configure or it is for test only. Defaults to False.
"""
def __init__(self, auth,
devices_config_dict={},
config_by_order=True,
order_list=[],
dev_apply_at_dict={},
log_folder=None,
config_log=True,
exec_log=True,
exec_display=True,
configure=False,
):
self.auth = auth
self.devices_config_dict = devices_config_dict
self.order_list = order_list
self.dev_apply_at_dict = dev_apply_at_dict
self.config_by_order = config_by_order
self.config_env = {
'log_folder': log_folder,
'config_log': config_log,
'exec_log': exec_log,
'exec_display': exec_display,
}
self.configure = configure
@printmsg(pre=f'{"-"*40}\n[+] INFO: A Group Configuration, Called...', post=f'{"-"*43}' )
def __call__(self):
self._verify_inputs()
if self.config_by_order: self.configure_by_orderlist()
@printmsg(pre='[+] INFO: Verifying inputs...',)
def _verify_inputs(self):
self._get_dev_conf_dict_ip_list()
self._get_order_list()
self.remove_empty_config_lines()
@printmsg(pre='[+] INFO: configuring devices in order by order_list...',
post='[+] INFO: configuration of this order_list completed...' )
def configure_by_orderlist(self):
"""configure devices as per sequence provided in order_list
"""
for i, order in enumerate(self.order_list):
if isinstance(order, (list, set, tuple)):
self.items = order
self.start()
elif isinstance(order, str):
self.execute(order)
[docs]
def execute(self, ip):
"""executor
Args:
ip (str): device ip or FQDN
"""
conf_list = self.devices_config_dict[ip]['cmds_list']
print(f"[+] \t\tStarting Configuration on: {ip}")
if self.configure:
CFG = Configure(ip, self.auth,
conf_list=conf_list,
**self.config_env
)
CFG.apply()
else:
print(f"[-] \t\tConfiguration skipped as `configure` parameter is set to `{self.configure}`",
", change it True in order to start configure process.")
def _get_dev_conf_dict_ip_list(self):
for ip, value in self.devices_config_dict.items():
if not isinstance(value, dict):
raise Exception(f"[-] ERROR: Incorrect input: configuration parameters in devices_config_dict",
f"Expected `dict` got {type(value)} for {ip}")
if 'cmds_list' in value and 'cmd_file' in value:
print(f"[-] WARNING: Dual configuration input detected for ip {ip}"
'file input will be prefered and considered')
if 'cmd_file' in value:
if not isinstance(value['cmd_file'], str):
raise Exception(f"[-] CRITICAL: Incorrect input: command file name for ip {ip}: {value['cmd_file']}")
try:
with open(value['cmd_file'], 'r') as f:
value['cmds_list'] = f.readlines()
value['cmds_list'] = [ _.rstrip() for _ in value['cmds_list'] ]
del(value['cmd_file'])
continue
except Exception as e:
print(f"[-] CRITICAL: Error Occured for ip {ip}: {e}")
quit()
if 'cmds_list' in value:
for _ in value['cmds_list']:
if isinstance(_, str): continue
print(f"[-] CRITICAL: Invalid input: command detected for ip {ip},{_}, Expected `str`. got {type(_)} ")
quit()
continue
print(f"[-] WARNING: No configuration input detected for ip {ip}: {value}. This device will be skipped")
value['skip'] = True
def _get_order_list(self):
if not self.order_list:
self.order_list = [{ip for ip, value in self.devices_config_dict.items() if not value.get('skip')},]
print(f"[+] INFO: No order_list provided, cretated one \n{self.order_list}")
return None
if not isinstance(self.order_list, (list, tuple)):
raise Exception(f"[-] CRITICAL: Incorrect input: order_list. expected (tuple/list), got {type(self.order_list)}")
self._cd_to_ol()
self._ol_to_cd()
def _cd_to_ol(self):
flatten_ol = set(LST.flatten(self.order_list))
missing_ones = tuple([ip for ip, value in self.devices_config_dict.items() if not value.get('skip') and ip not in flatten_ol])
print(f"[+] WARNING: Device(s) missing in order list [{missing_ones}] were appened.")
self.order_list.append(missing_ones)
self.flatten_ol = flatten_ol.union(set(missing_ones))
if missing_ones:
print(f"[+] \tINFO: updated order_list={self.order_list}")
def _ol_to_cd(self):
removed = False
for ol_item in self.flatten_ol:
if ol_item not in self.devices_config_dict:
self.remove_order_list_item(ol_item, self.order_list)
removed = True
if removed:
self.order_list = LST.remove_empty_members(self.order_list)
print(f"[+] INFO: updated order_list={self.order_list}")
[docs]
def remove_empty_config_lines(self):
"""sanitizer: removes empty lines from configuration
"""
for ip, value in self.devices_config_dict.items():
value['cmds_list'] = LST.remove_empty_members(value['cmds_list'])
[docs]
def remove_order_list_item(self, item, lst):
"""sanitizer: Remove device from configuration sequence where no configuration changes provided.
Args:
item (str): device ip or FQDN
lst (list): configuration change list
"""
for _ in lst:
if isinstance(_, str):
if _ == item:
lst.remove(_)
print(f"[+] WARNING: Addititional device `{_}`found in ordered list, which is missing in devices_config_dict.",
"Removed from order_list ")
elif isinstance(_, (set, tuple, list)):
self.remove_order_list_item(item, _)
# ----------------------------------------------------------------------------------------------------
[docs]
class ConfigureByExcel(ConfigEnvironmentals):
"""class to do configuration based on configuration changes provided in excel.
All listed devices in a single Excel tab will be executed at once (simultaneously).
Multiple Excel tabs/files can be provided to execute those in sequence.
Inherits ConfigEnvironmentals
Args:
auth (dict): authentication dicationary with 'un', 'pw'. 'en' keys.
files (list, optional): list of excel files, will be executed in provided sequence. Defaults to [].
tab_sort_order (list, optional): Excel tabs execution order. Defaults to []. ( options: privide in list manually, `ascending`, `reversed`)
log_folder (str, optional): folder where logs to be stored. Defaults to None.
config_log (bool, optional): generate configuration log. Defaults to True.
exec_log (bool, optional): generate execution log. Defaults to True.
exec_display (bool, optional): on screen display execution log. Defaults to True.
configure (bool, optional): configure or it is for test only. Defaults to False.
sleep_time_between_group (int, optional): sleep time between execution of two groups of executions. Defaults to 0.
"""
def __init__(self, auth,
files=[],
tab_sort_order=[],
log_folder=None,
config_log=True,
exec_log=True,
exec_display=True,
configure=False,
sleep_time_between_group=0,
):
super().__init__(auth, log_folder, config_log, exec_log, exec_display)
self.files = files
self.tab_sort_order = tab_sort_order
self.configure = configure
self.sleep_time_between_group = sleep_time_between_group
if not isinstance(files, list):
print(f"[-] Invalid argument `files`: should be of `list` type, got `{type(files)}`")
quit()
def __call__(self):
print_banner("Configure", 'red')
self._load_dfs()
self._define_sort_order()
self.cmds_groups = self._get_cmds_ordered_group()
self.run()
@printmsg(pre='[+] INFO: \tReading Excel file and Loading tabs...', pre_ends="\t", post='Done...' )
def _load_dfs(self):
self.ordered_configs_df_dict_list = []
if isinstance(self.files, list):
for file in self.files:
self.ordered_configs_df_dict_list.append(read_xl_all_sheet(file))
@printmsg(pre='[+] INFO: \tDefining sort order...', pre_ends="\t", post='Done...' )
def _define_sort_order(self):
if self.tab_sort_order in ('ascending', 'ordered', 'alphabetical', []):
self._set_sort_order('ascending')
elif self.tab_sort_order in ('reversed', 'descending'):
self._set_sort_order('descending')
else:
self._verify_sort_orders()
def _set_sort_order(self, how):
self.tab_sort_order = [
sorted(dfd.keys()) if how == 'ascending' else list(reversed(sorted(dfd.keys())))
for dfd in self.ordered_configs_df_dict_list
]
def _verify_sort_orders(self):
for tso, dfd in zip_longest(self.tab_sort_order, self.ordered_configs_df_dict_list):
if tso is None or dfd is None:
print("[-] CRITICAL: Length of Order sequences v/s excel files count mismatch, both should be with same length")
quit()
#
if set(tso) == set(dfd.keys()): continue
print(f"[-] CRITICAL: Mismatch Sheet Names with provided order, please check")
print(f"\tSort Order = {tso}")
print(f"\tSheets available = {dfd.keys()}")
quit()
@printmsg(pre='[+] INFO: \tDefining commands groups...', pre_ends="\t", post="Done...")
def _get_cmds_ordered_group(self):
cmds_groups = []
for tso, dfd in zip(self.tab_sort_order, self.ordered_configs_df_dict_list):
for tab in tso:
cmds_group = {}
df = dfd[tab]
for ip in df.columns:
cmds_group[ip] = {'cmds_list': list(df[ip])}
cmds_groups.append(cmds_group)
return cmds_groups
@printmsg(pre='[+] INFO: START: Configuing devices',
post='[+] INFO: END : Configuing devices')
def run(self):
"""starts configuration of devices
"""
tso_list = LST.flatten(self.tab_sort_order)
for i, cg in enumerate(self.cmds_groups):
if not self.get_concurrance(i, cg, tso_list): continue
GC = GroupsConfigure(self.auth,
devices_config_dict = cg,
configure=self.configure,
**self.config_env
)
GC()
if self.sleep_time_between_group:
sleep(self.sleep_time_between_group)
[docs]
@staticmethod
def get_concurrance(i, cg, tso_list):
user_concern = input(f"[+] Configuration on group of devices GROUP{i+1}: [{tso_list[i]}] :\n ({set(cg.keys())}) ready to process. \nWant to continue [y/n]")
if user_concern.lower() == 'y': return True
print(f"[-] Configuration on group of devices GROUP{i+1}: [{tso_list[i]}] : Not confirmed, Aborted !!!")
return False