Source code for nettoolkit.addressing.summary


import pandas as pd
from dataclasses import dataclass, field
from nettoolkit.addressing import addressing, dec2dotted_ip, inv_subnet_size_to_mask, IPv4, isSubset
from nettoolkit.nettoolkit_db import sort_dataframe_on_subnet, read_xl_all_sheet, read_an_xl_sheet

# =========================================================================================== 
# Static Global Variable
# =========================================================================================== 
# MASK_RANGE = {2**x:{'net':x , 'host':32-x} for x in range(32)}
MASK_RANGE = {
 1: {'host': 32, 'net': 0},
 2: {'host': 31, 'net': 1},
 4: {'host': 30, 'net': 2},
 8: {'host': 29, 'net': 3},
 16: {'host': 28, 'net': 4},
 32: {'host': 27, 'net': 5},
 64: {'host': 26, 'net': 6},
 128: {'host': 25, 'net': 7},
 256: {'host': 24, 'net': 8},
 512: {'host': 23, 'net': 9},
 1024: {'host': 22, 'net': 10},
 2048: {'host': 21, 'net': 11},
 4096: {'host': 20, 'net': 12},
 8192: {'host': 19, 'net': 13},
 16384: {'host': 18, 'net': 14},
 32768: {'host': 17, 'net': 15},
 65536: {'host': 16, 'net': 16},
 131072: {'host': 15, 'net': 17},
 262144: {'host': 14, 'net': 18},
 524288: {'host': 13, 'net': 19},
 1048576: {'host': 12, 'net': 20},
 2097152: {'host': 11, 'net': 21},
 4194304: {'host': 10, 'net': 22},
 8388608: {'host': 9, 'net': 23},
 16777216: {'host': 8, 'net': 24},
 33554432: {'host': 7, 'net': 25},
 67108864: {'host': 6, 'net': 26},
 134217728: {'host': 5, 'net': 27},
 268435456: {'host': 4, 'net': 28},
 536870912: {'host': 3, 'net': 29},
 1073741824: {'host': 2, 'net': 30},
 2147483648: {'host': 1, 'net': 31}
}

# =========================================================================================== 
# Functions
# =========================================================================================== 

# recursive, club adjascent ranges and split diverse ones. 
def _split_continuous_ranges(int_ranges):
	ranges = []
	for start, end in int_ranges:
		max_mask_range = max(_get_range_parts(end-start+1))
		ip = dec2dotted_ip(start)
		for _ in reversed(MASK_RANGE):
			if _ > max_mask_range: continue
			i = addressing( ip + "/" + str(MASK_RANGE[_]['host']))
			if i.NetworkIP() == str(i):
				break
		ranges.append( (i.network_number_int, i.broadcast_number_int) )
		if i.network_number_int == start and i.broadcast_number_int == end:
			continue
		ranges.extend( _split_continuous_ranges([(i.broadcast_number_int+1, end),]))
	return ranges

# get the mask ranges for provided differences.
def _get_range_parts(diff):
	mask_ranges = set() 
	for k in sorted(MASK_RANGE.keys()):
		if k > diff: break
		mask_ranges.add(k)
	return mask_ranges

[docs] def get_masks(lst, diff): l = sorted(lst) if diff == l[-1]: return {l[-1], } for x in l[:-1]: if l[-1] + x == diff: return {l[-1], x} return get_masks(l[:-1], diff-l[-1]) | {l[-1]}
# =========================================================================================== # Aggregation Class # ===========================================================================================
[docs] @dataclass class Aggregate(): """class Aggregating provided IPV4 prefixes.. Raises: Exception: Incorrect IP / Format Received Returns: Aggregate: Object instance of Aggregate """ prefixes: list[str] = field(default_factory=list) def __post_init__(self): self._sort_prefixes() self.pfxs_dict = {} self._get_ip_objects() self._count_start_stop() self.int_ranges = self._get_continuous_ranges() self.int_ranges = _split_continuous_ranges(self.int_ranges) self._get_aggregates_from_ranges() # Sorting prefixes on ascending order def _sort_prefixes(self): df = pd.DataFrame({'subnets': list(self.prefixes)}) df = sort_dataframe_on_subnet(df, 'subnets') self.prefixes = list(df['subnets']) # convert prefixes as IPv4 objects. def _get_ip_objects(self): for pfx in self.prefixes: try: if isinstance(pfx, str): self.pfxs_dict[IPv4(pfx)] = {} elif isinstance(pfx, IPv4): self.pfxs_dict[pfx] = {} else: raise Exception(f"[-] Incorrect IP / Format Received {pfx}\n{e}") except Exception as e: raise Exception(f"[-] Incorrect IP / Format Received {pfx}\n{e}") # count and store network and broadcast number for each prefixes. def _count_start_stop(self): for pfx in self.pfxs_dict.keys(): self.pfxs_dict[pfx]['net_num'] = pfx.network_number_int self.pfxs_dict[pfx]['bc_num'] = pfx.broadcast_number_int # sorted ranges def _get_continuous_ranges(self): ranges = [] for pfx, pfx_dict in self.pfxs_dict.items(): # -- First entry if not ranges: ranges.append( (pfx_dict['net_num'], pfx_dict['bc_num']) ) continue # -- adjuscent to previous network if ranges[-1][-1] + 1 == pfx_dict['net_num']: ranges[-1] = ( ranges[-1][0], pfx_dict['bc_num'] ) continue # -- network within or same as previous existing prefix if ( (ranges[-1][0] <= pfx_dict['net_num'] <= ranges[-1][-1]) and (ranges[-1][0] <= pfx_dict['bc_num'] <= ranges[-1][-1]) ): continue # -- overlaping to previous network if ( (ranges[-1][0] <= pfx_dict['net_num'] <= ranges[-1][-1]) and ( pfx_dict['bc_num'] > ranges[-1][-1]) ): ranges[-1] = ( ranges[-1][0], pfx_dict['bc_num'] ) continue # -- Far apart previous ranges.append( (pfx_dict['net_num'], pfx_dict['bc_num']) ) return ranges # get aggregate addresses from sorted ranges def _get_aggregates_from_ranges(self): self._summaries = [] for start, end in self.int_ranges: ip = addressing(f"{dec2dotted_ip(start)}/{inv_subnet_size_to_mask(end-start)}") if ip.network_number_int == start and ip.broadcast_number_int == end: self._summaries.append(ip) continue raise Exception(f"[-] Script Error: Invalid Summary Found: {ip}") ### ----- Available Properties --- ### @property def aggregates(self): """returns list of aggregate IPv4 objects Returns: list: list of IPv4 objects """ return self._summaries @property def summaries(self): """returns list of aggregate strings. Returns: list: list of summary address strings """ return [str(_) for _ in self._summaries]
# ===========================================================================================
[docs] def calc_summmaries(min_subnet_size, prefixes): """summarize the provided network prefixes, provide all networks as arguments. minimum subnet summarized to provided min_subnet_size parameter Args: min_subnet_size (int): minimuze subnet mask to be summarized up on prefixes (list): networks Returns: list: summaries """ summaries = Aggregate(prefixes).summaries nset = set() for subnet in summaries: if isinstance(subnet, IPv4) and subnet.mask > min_subnet_size: nset.add(subnet.expand(min_subnet_size)) elif isinstance(subnet, str) and int(subnet.split("/")[-1]) > min_subnet_size: nset.add(IPv4(subnet).expand(min_subnet_size)) summaries.extend(nset) nSummaries = Aggregate(summaries).summaries return nSummaries
# =========================================================================================== # Get unused prefixes details from provided summary subnet # ===========================================================================================
[docs] @dataclass class Subnet_Spare(): summary: str prefixes: list = field(default_factory=[]) def __post_init__(self): try: self.sbig = addressing(self.summary) except: self.sbig = None self.get_subset_subnets() self.remove_used_chunks()
[docs] def get_subset_subnets(self): self.subset_prefixes = [] if not self.sbig: return for subnet in self.prefixes: try: ssmall = addressing(subnet) except: ## invalid subnets not checked continue if ssmall.is_subset(self.sbig): self.subset_prefixes.append(ssmall)
@property def smallest_pfx_mask(self): return min({s.mask for s in self.subset_prefixes}) @property def smallest_pfx_host_count(self): return 2**(32-self.smallest_pfx_mask) @property def min_divisor(self): return int(len(self.sbig) / self.smallest_pfx_host_count) @property def smallest_divisions(self): return self.sbig / self.min_divisor
[docs] def remove_used_chunks(self): self.remaining_chunk_pfx = set(self.smallest_divisions) for chunk in self.smallest_divisions: for subset_pfx in self.subset_prefixes: if isSubset(chunk, subset_pfx): self.remaining_chunk_pfx.remove(chunk)
[docs] def unused_prefixes(self, obj=False): Agg = Aggregate(self.remaining_chunk_pfx) if obj: return Agg.aggregates return Agg.summaries
# =========================================================================================== if __name__ == "__main__": pass # ===========================================================================================