Source code for nettoolkit.capture_it.exec_device

# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from time import sleep
from dataclasses import dataclass
import typing

from copy import deepcopy
import nettoolkit.facts_finder as ff
from nettoolkit.detect import DeviceType
from nettoolkit.capture_it.conn import conn
from nettoolkit.capture_it.captures import Captures
from nettoolkit.capture_it.common import cmd_line_pfx
from nettoolkit.nettoolkit_common import STR, LST, IP

# -----------------------------------------------------------------------------
# Execution of Show Commands on a single device. 
# -----------------------------------------------------------------------------
[docs] @dataclass(init=True, repr=False, eq=False) class Execute_Device(): """Execute a device capture Args: ip (str): device ip auth (dict): authentication parameters cmds (list, set, tuple): set of commands to be executed. capture_path (str): path where output to be stored cumulative (bool, optional): True,False,both. Defaults to False. forced_login (bool): True will try login even if device ping fails. parsed_output (bool): parse output and generate Excel or not. CustomClass(class): Custom class definition to provide additinal custom variable commands fg(bool): facts generation mandatory_cmds_retries(int): number of retries for missing mandatory commands captures append_capture(bool): append capture to existing file instead of creating new. missing_captures_only(bool): capture only missing command outputs from existing output """ ip : str auth : dict cmds : list capture_path : str cumulative : bool forced_login : bool parsed_output : bool standard_output : bool CustomClass : 'typing.Any' fg : bool mandatory_cmds_retries: int append_capture : bool missing_captures_only : bool def __post_init__(self): self.all_cmds = {'cisco_ios': set(), 'juniper_junos':set(), 'arista_eos': set()} self.cumulative_filename = None self.delay_factor, self.dev = None, None self.cmd_exec_logs = [] self.failed_reason = '' self.tmp_device_exec_log = '' # self.ip = self.ip.strip() if not self.ip: self.failed_reason = f"Missing device ip: [{self.ip}]" self._device_exec_log(display=True, msg=f"[-] {self.failed_reason} - skipping it") return None # self.pinging = self._check_ping(self.ip) self._start_execution(self.ip) def _device_exec_log(self, display, msg): if display: print(msg) self.tmp_device_exec_log += msg +"\n" def _check_ping(self, ip): """check device reachability Args: ip (str): device ip Returns: int: delay factor if device reachable, else False """ try: self._device_exec_log(display=True, msg=f"[+] {ip} - Checking ping response") self.delay_factor = IP.ping_average (ip)/100 + 3 self._device_exec_log(display=True, msg=f"[+] {ip} - Delay Factor={self.delay_factor}") return self.delay_factor except: self._device_exec_log(display=True, msg=f"[-] {ip} - Ping was unsuccessful") return False def _start_execution(self, ip): if not (self.forced_login or self.pinging): return if not self.pinging: self._device_exec_log(display=True, msg=f"[+] {ip} - Attempt login") dtype_result = self._get_device_type(ip) if not dtype_result: return if self.dev is None: return try: self._execute(ip) except: if self.dev.dtype != 'cisco_ios': return self._device_exec_log(display=True, msg=f"[-] {ip} - sleeping progress for 65 seconds due to known cisco ios bug") sleep(65) self._execute(ip) def _get_device_type(self, ip): """detect device type (cisco, juniper) Args: ip (str): device ip Returns: str: device type if detected, else None """ try: self.dev = DeviceType(dev_ip=ip, un=self.auth['un'], pw=self.auth['pw'], ) self._device_exec_log(display=False, msg=self.dev.tmp_device_detection_log) return self.dev except Exception as e: self.failed_reason = f"[-] [{ip}] - Device Type Detection Failed with Exception \n{e}" self._device_exec_log(display=True, msg=f"{'- '*40}\n{self.failed_reason}\n{'- '*40}") return None def _is_not_connected(self, c, ip): """check if connection is successful Args: c (conn): connection object ip(str): ip address of connection Returns: conn: connection object if successful, otherwise None """ connected = True if STR.found(str(c), "FAILURE"): connected = None if c.hn == None or c.hn == 'dummy': connected = None return not connected def _execute(self, ip): """login to given device(ip) using authentication parameters from uservar (u). if success start command captuers Args: ip (str): device ip """ self._device_exec_log(display=True, msg=f"[+] {ip} - Initializing") with conn(ip=ip, device=self) as c: if self.verify_connection(c, ip) == None: return None self.update_obj_properties(c) self.update_cmds_for_missing_captures_only(c) cc = self.run_cmds(c) self.run_facts_generation_required_commands(c, cc) self.run_custom_commands(c, cc) self.add_exec_logs(cc) self.write_facts_to_excel(cc) self.add_cmd_exec_logs(cc)
[docs] def verify_connection(self, c, ip): if self._is_not_connected(c, ip): self.failed_reason = self.failed_reason or "Connection Failure" return None return True
[docs] def update_obj_properties(self, c): self.c = c self.hostname = c.hn c.capture_path = self.capture_path c.dev_type = self.dev.dtype
# -- get the missing commands list if it is to do only missing captures
[docs] def update_cmds_for_missing_captures_only(self, c): if not self.missing_captures_only: return # missed_cmds = [] if isinstance(self.cmds, dict): missed_cmds = self.get_missing_commands(c, set(self.cmds[self.dev.dtype]), purpose='missing') if missed_cmds is not None: self.cmds[self.dev.dtype] = missed_cmds elif isinstance(self.cmds, (list, set, tuple)): missed_cmds = self.get_missing_commands(c, set(self.cmds), purpose='missing') if missed_cmds is not None: self.cmds = missed_cmds if missed_cmds: self._device_exec_log(display=True, msg=f"[+] {c.hn} : INFO : Missed Cmds = {missed_cmds}") elif missed_cmds is None: self._device_exec_log(display=True, msg=f"[+] {c.hn} : INFO : Cumulative file missing, new file will be generated.") else: self._device_exec_log(display=True, msg=f"[+] {c.hn} : INFO : No missing Command found in existing capture..")
[docs] def run_cmds(self, c): cc = self.command_capture(c) self.get_max_cmd_length(c, self.cmds) cc.grp_cmd_capture(self.cmds) if self.cmds: self.add_cmd_to_all_cmd_dict(self.cmds) return cc
# -- for facts generation -- presence of mandary commands, and capture if not --
[docs] def run_facts_generation_required_commands(self, c, cc): if not self.fg or not self.mandatory_cmds_retries: return # self._device_exec_log(display=True, msg=f"[+] {c.hn} : INFO : Starting with Mandatory commands capture (if any missing).") missed_cmds = self.check_facts_finder_requirements(c) self.retry_missed_cmds(c, cc, missed_cmds) self.add_cmds_to_self(missed_cmds) if missed_cmds: self.add_cmd_to_all_cmd_dict(missed_cmds)
# -- custom commands -- only log entries, no parser --
[docs] def run_custom_commands(self, c, cc): if not self.CustomClass: return # self._device_exec_log(display=True, msg=f"[+] {c.hn} : INFO : Starting with custom commands capture.") CC = self.CustomClass(c.capture_path+"/"+c.hn+".log", self.dev.dtype) self.get_max_cmd_length(c, CC.cmds) cc.grp_cmd_capture(CC.cmds) self.add_cmds_to_self(CC.cmds) if CC.cmds: self.add_cmd_to_all_cmd_dict(CC.cmds)
# -- add command execution logs dataframe --
[docs] def add_exec_logs(self, cc): cc.add_exec_logs()
# -- write facts to excel --
[docs] def write_facts_to_excel(self, cc): if not self.cumulative_filename: self.cumulative_filename = cc.cumulative_filename if self.parsed_output: self.xl_file = cc.write_facts()
# -- add execution logs
[docs] def add_cmd_exec_logs(self, cc): self.cmd_exec_logs = cc.cmd_exec_logs
[docs] def add_cmd_to_all_cmd_dict(self, cmds): """add command to all cmd dictionary Args: cmds (str, list, tuple, set, dict): commands in any format """ if self.dev.dtype not in self.all_cmds.keys(): self.all_cmds[self.dev.dtype] = set() if isinstance(cmds, (set, list, tuple)): self.all_cmds[self.dev.dtype] = self.all_cmds[self.dev.dtype].union(set(cmds)) elif isinstance(cmds, dict): for dt, _cmds in cmds.items(): if dt != self.dev.dtype: continue self.add_cmd_to_all_cmd_dict(_cmds) elif isinstance(cmds, str): self.add_cmd_to_all_cmd_dict([cmds,])
[docs] def add_cmds_to_self(self, cmds): """add additional commands to cmds list Args: cmds (list): list of additinal or missed mandatory cmds to be captured """ if isinstance(self.cmds, list): for cmd in cmds: if cmd not in self.cmds: self.cmds.append(cmd) elif isinstance(self.cmds, set): for cmd in cmds: if cmd not in self.cmds: self.cmds.add(cmd) elif isinstance(self.cmds, tuple): for cmd in cmds: if cmd not in self.cmds: self.cmds = list(self.cmds).append(cmd) elif isinstance(self.cmds, dict): for cmd in cmds: if cmd not in self.cmds[self.dev.dtype]: self.cmds[self.dev.dtype].append(cmd) else: self._device_exec_log(display=True, msg=f"[-] {self.c.hn} : ERROR : Non standard command input {type(self.cmds)}\n{self.cmds}")
[docs] def get_max_cmd_length(self, c, cmds): """returns the length of longest command Args: c (conn): connection object cmds (str,iterable): commands list """ if isinstance(cmds, (list, set, tuple)): c.max_cmd_len = LST.longest_str_len(cmds) elif isinstance(cmds, dict): for dt, _cmds in cmds.items(): if dt != self.dev.dtype: continue self.get_max_cmd_length(c, _cmds) elif isinstance(cmds, str): self.get_max_cmd_length(c, [cmds,]) else: return
[docs] def command_capture(self, c): """start command captures on connection object Args: c (conn): connection object """ self._device_exec_log(display=True, msg=f"[+] {c.hn} : INFO : Starting Capture in `{'append' if self.append_capture else 'add'}` mode") cc = Captures( conn=c, cumulative=self.cumulative, parsed_output=self.parsed_output, standard_output=self.standard_output, append_capture=self.append_capture, ) return cc
[docs] def missed_commands_capture(self, c, cc, missed_cmds, x=""): """recaptures missed commands Args: c (conn): connection object cc(Captures): Capture / Command line processing object missed_cmds (set): list/set of commands for which output to be recapture x (int, optional): iteration value """ self._device_exec_log(display=True, msg=f"[+] {c.hn} - Retrying missed_cmds({x+1}): {missed_cmds}") self.get_max_cmd_length(c, missed_cmds) cc.grp_cmd_capture(missed_cmds)
[docs] def is_any_ff_cmds_missed(self, c): """checks and returns missed mandatory capture commands Args: c (conn): connection object Returns: set: missed mandatory commands """ necessary_cmds = ff.get_necessary_cmds(self.dev.dtype) return self.get_missing_commands(c, necessary_cmds, purpose='factsgen')
[docs] def check_facts_finder_requirements(self, c): """checks and returns missed mandatory capture commands clone to is_any_ff_cmds_missed Args: c (conn): connection object Returns: set: missed mandatory commands """ return self.is_any_ff_cmds_missed(c)
[docs] def retry_missed_cmds(self, c, cc, missed_cmds): """retry missed commands captures Args: c (conn): connection object instance cc(Captures): Capture / Command line processing object missed_cmds (set): missed commands Returns: None: No retuns """ for x in range(self.mandatory_cmds_retries): if not missed_cmds: return None self.missed_commands_capture(c, cc, missed_cmds, x) missed_cmds = self.is_any_ff_cmds_missed(c) if missed_cmds: self._device_exec_log(display=True, msg=f"[-] {c.hn} - Error capture all mandatory commands, try do manually..")
[docs] def get_missing_commands(self, c, cmds, purpose): """checks and returns missed capture commands Args: c (conn): connection object cmds (list): list of commands to check Returns: set: missed mandatory commands """ try: file = c.capture_path+"/"+c.hn+".log" with open(file, 'r') as f: log_lines = f.readlines() except: if purpose == 'missing': self._device_exec_log(display=True, msg=f'[-] {c.hn} : Error: File not found {c.capture_path+"/"+c.hn+".log"}: Cumulative capture file required ') return [] if purpose == 'factsgen': return cmds captured_cmds = set() for log_line in log_lines: if log_line[1:].startswith(cmd_line_pfx): captured_cmd = ff.get_absolute_command(self.dev.dtype, log_line.split(cmd_line_pfx)[-1]) captured_cmds.add(captured_cmd.strip()) missed_cmds = cmds.difference(captured_cmds) return list(missed_cmds)