diff --git a/tech_docs/networking/networking_python.md b/tech_docs/networking/networking_python.md index aadc1a3..f143ee4 100644 --- a/tech_docs/networking/networking_python.md +++ b/tech_docs/networking/networking_python.md @@ -228,4 +228,662 @@ # Extract all interfaces with admin status up result = jq.compile('.interfaces[] | select(.admin_status=="up") | .name').input(data).all() - ``` \ No newline at end of file + ``` + +--- + +# Network Engineering Python Workflows + +## Network Discovery and Inventory Management + +### Workflow 1: Automated Device Inventory Collection +**Scenario**: You need to maintain an accurate inventory of all network devices across multiple sites. + +```python +from netmiko import ConnectHandler +import pandas as pd +import json + +# Seed devices from which to discover the network +seed_devices = [ + {"ip": "10.1.1.1", "device_type": "cisco_ios", "username": "admin", "password": "cisco_pass"}, + {"ip": "10.2.1.1", "device_type": "cisco_ios", "username": "admin", "password": "cisco_pass"} +] + +inventory = [] + +for device in seed_devices: + try: + with ConnectHandler(**device) as conn: + # Get device facts + hostname = conn.send_command("show hostname").strip() + version = conn.send_command("show version | include Version") + serial = conn.send_command("show inventory | include PID") + + # Get CDP neighbors to expand discovery + neighbors_output = conn.send_command("show cdp neighbors detail") + + inventory.append({ + "hostname": hostname, + "ip": device["ip"], + "version": version, + "serial": serial + }) + + # Parse CDP output to find new devices (implementation simplified) + # Add new discovered devices to seed_devices list + + except Exception as e: + print(f"Error connecting to {device['ip']}: {e}") + +# Convert to DataFrame for analysis +df = pd.DataFrame(inventory) +df.to_csv("network_inventory.csv", index=False) +``` + +## Configuration Management + +### Workflow 2: Compliance Verification and Remediation +**Scenario**: You need to verify that all access switches have proper security configurations and fix any violations. + +```python +from nornir import InitNornir +from nornir_netmiko import netmiko_send_command, netmiko_send_config +from nornir.core.filter import F + +# Initialize Nornir with inventory +nr = InitNornir(config_file="config.yaml") + +# Filter only access switches +access_switches = nr.filter(F(role="access_switch")) + +# Define compliance checks +security_commands = { + "port_security": "show running-config | include switchport port-security", + "bpdu_guard": "show spanning-tree summary | include BPDU Guard", + "storm_control": "show running-config | include storm-control" +} + +# Check compliance +results = {} +for check_name, command in security_commands.items(): + results[check_name] = access_switches.run(netmiko_send_command, command_string=command) + +# Find non-compliant devices and apply fix +non_compliant = [] +for device_name, device_results in results["port_security"].items(): + if "switchport port-security" not in device_results.result: + non_compliant.append(device_name) + +# Apply remediation to non-compliant devices +remediation_config = [ + "interface range GigabitEthernet 0/1-48", + "switchport port-security", + "switchport port-security maximum 3", + "switchport port-security violation restrict", + "spanning-tree bpduguard enable", + "storm-control broadcast level 50.00" +] + +# Apply fix only to non-compliant devices +non_compliant_devices = nr.filter(F(name__in=non_compliant)) +remediation_results = non_compliant_devices.run( + netmiko_send_config, config_commands=remediation_config +) + +print(f"Fixed {len(non_compliant)} non-compliant devices") +``` + +## Network Monitoring and Troubleshooting + +### Workflow 3: Automated Troubleshooting for Circuit Issues +**Scenario**: A WAN circuit is experiencing intermittent problems. You need to collect data continuously to identify patterns. + +```python +import time +from netmiko import ConnectHandler +import pandas as pd +from datetime import datetime +import matplotlib.pyplot as plt + +# Connect to the edge router +device = { + "device_type": "cisco_ios", + "host": "192.168.1.254", + "username": "admin", + "password": "cisco" +} + +interface = "GigabitEthernet0/0" +monitoring_period = 3600 # 1 hour +interval = 30 # seconds +metrics = [] + +with ConnectHandler(**device) as conn: + print(f"Monitoring {interface} for {monitoring_period/60} minutes...") + + start_time = time.time() + end_time = start_time + monitoring_period + + while time.time() < end_time: + # Collect interface statistics + output = conn.send_command(f"show interface {interface}") + + # Parse output (simplified) + input_rate = int(output.split("input rate ")[1].split(" ")[0]) + output_rate = int(output.split("output rate ")[1].split(" ")[0]) + input_errors = int(output.split("input errors, ")[0].split(" ")[-1]) + crc_errors = int(output.split("CRC, ")[1].split(" ")[0]) + + # Collect ping statistics to provider edge + ping_result = conn.send_command("ping 172.16.1.1 repeat 5") + packet_loss = int(ping_result.split("%")[0].split(" ")[-1]) + + # Store metrics + metrics.append({ + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "input_rate": input_rate, + "output_rate": output_rate, + "input_errors": input_errors, + "crc_errors": crc_errors, + "packet_loss": packet_loss + }) + + # Wait for next interval + time.sleep(interval) + +# Analyze collected data +df = pd.DataFrame(metrics) +df.to_csv("circuit_metrics.csv", index=False) + +# Create time-series plots for analysis +plt.figure(figsize=(12, 8)) +plt.subplot(3, 1, 1) +plt.plot(df["timestamp"], df["input_rate"], label="Input Rate") +plt.plot(df["timestamp"], df["output_rate"], label="Output Rate") +plt.legend() + +plt.subplot(3, 1, 2) +plt.plot(df["timestamp"], df["crc_errors"], label="CRC Errors") +plt.legend() + +plt.subplot(3, 1, 3) +plt.plot(df["timestamp"], df["packet_loss"], label="Packet Loss %") +plt.legend() + +plt.tight_layout() +plt.savefig("circuit_analysis.png") +``` + +## Network Configuration Backup and Restoration + +### Workflow 4: Automated Configuration Backup with Git Integration +**Scenario**: You need daily backups of all network device configurations with version control. + +```python +from napalm import get_network_driver +import os +import json +from datetime import datetime +import subprocess +import yaml + +# Load device inventory +with open("inventory.yaml", "r") as f: + inventory = yaml.safe_load(f) + +# Create backup directory structure +backup_dir = f"backups/{datetime.now().strftime('%Y-%m-%d')}" +os.makedirs(backup_dir, exist_ok=True) + +# Backup configurations +for device in inventory["devices"]: + try: + # Initialize the appropriate driver + driver = get_network_driver(device["os"]) + with driver(device["hostname"], device["username"], device["password"]) as device_conn: + # Get running configuration + running_config = device_conn.get_config()["running"] + + # Save to file + device_file = f"{backup_dir}/{device['hostname']}.cfg" + with open(device_file, "w") as f: + f.write(running_config) + + print(f"Backed up {device['hostname']}") + except Exception as e: + print(f"Failed to backup {device['hostname']}: {e}") + +# Git integration +try: + # Initialize Git repo if not exists + if not os.path.exists(os.path.join("backups", ".git")): + subprocess.run(["git", "init"], cwd="backups") + + # Add all files + subprocess.run(["git", "add", "."], cwd="backups") + + # Commit changes + commit_message = f"Daily backup {datetime.now().strftime('%Y-%m-%d')}" + subprocess.run(["git", "commit", "-m", commit_message], cwd="backups") + + # Push to remote (if configured) + subprocess.run(["git", "push"], cwd="backups") + + print("Backup committed to git repository") +except Exception as e: + print(f"Git operation failed: {e}") +``` + +## Network Change Management + +### Workflow 5: Phased Network Change with Rollback +**Scenario**: You need to deploy a new routing protocol across the WAN with automatic rollback if connectivity tests fail. + +```python +from nornir import InitNornir +from nornir_netmiko import netmiko_send_command, netmiko_send_config +from nornir.core.filter import F +import time + +# Initialize Nornir +nr = InitNornir(config_file="config.yaml") + +# Filter core routers for the first phase +core_routers = nr.filter(F(role="core_router")) + +# Define change configuration +ospf_config = [ + "router ospf 100", + "network 10.0.0.0 0.255.255.255 area 0", + "redistribute connected subnets", + "no auto-summary" +] + +# Define validation tests +def validate_connectivity(task): + # Test connectivity to critical hosts + result = task.run( + netmiko_send_command, + command_string="ping 10.100.1.1 repeat 5" + ) + + # Check success criteria + success = "Success rate is 100 percent" in result.result + return success + +# Phase 1: Deploy to core routers +print("Phase 1: Deploying to core routers...") +core_results = core_routers.run(netmiko_send_config, config_commands=ospf_config) + +# Save configurations if successful +for host, result in core_results.items(): + if not result.failed: + core_routers.filter(name=host).run( + netmiko_send_command, command_string="write memory" + ) + +# Validate core router changes +time.sleep(60) # Allow for convergence +validation_results = [] + +for host in core_routers.inventory.hosts: + success = core_routers.filter(name=host).run(task=validate_connectivity)[host][0].result + validation_results.append(success) + +# If validation fails, roll back +if not all(validation_results): + print("Validation failed! Rolling back changes...") + rollback_commands = [ + "no router ospf 100" + ] + core_routers.run(netmiko_send_config, config_commands=rollback_commands) + print("Rollback completed.") +else: + # Continue with Phase 2 (distribution routers) + print("Phase 1 successful. Proceeding to Phase 2...") + dist_routers = nr.filter(F(role="distribution_router")) + # Similar implementation for Phase 2 +``` + +## Network Capacity Planning + +### Workflow 6: Interface Utilization Analysis for Capacity Planning +**Scenario**: You need to identify potential bottlenecks by analyzing interface utilization trends. + +```python +import pandas as pd +import matplotlib.pyplot as plt +from netmiko import ConnectHandler +import time +from datetime import datetime, timedelta +import numpy as np + +# Connect to core switch +device = { + "device_type": "cisco_ios", + "host": "10.0.1.1", + "username": "admin", + "password": "cisco_pass" +} + +# Get list of uplink interfaces +with ConnectHandler(**device) as conn: + interfaces_output = conn.send_command("show interface status | include uplink") + uplink_interfaces = [line.split()[0] for line in interfaces_output.splitlines()] + +# Collect utilization data over 24 hours (simplified - in real world use SNMP or streaming telemetry) +collection_period = 24 * 60 * 60 # 24 hours +interval = 300 # 5 minutes +data = [] + +with ConnectHandler(**device) as conn: + start_time = time.time() + end_time = start_time + collection_period + + # For demo purposes, we'll simulate with fewer iterations + for _ in range(5): # In production, use actual time-based loop + timestamp = datetime.now() + + for interface in uplink_interfaces: + output = conn.send_command(f"show interface {interface} | include rate") + + # Extract input and output utilization + input_rate = int(output.split("input rate ")[1].split(" ")[0]) + output_rate = int(output.split("output rate ")[1].split(" ")[0]) + + # Convert to Mbps + input_mbps = input_rate * 8 / 1000000 + output_mbps = output_rate * 8 / 1000000 + + data.append({ + "timestamp": timestamp, + "interface": interface, + "input_mbps": input_mbps, + "output_mbps": output_mbps + }) + + # In real implementation, wait for next interval + # time.sleep(interval) + +# Convert to DataFrame +df = pd.DataFrame(data) + +# Analysis +utilization_summary = df.groupby("interface").agg({ + "input_mbps": ["mean", "max", "std"], + "output_mbps": ["mean", "max", "std"] +}) + +# Find interfaces with high utilization (>70% of capacity) +interface_capacity = { + "TenGigabitEthernet1/1": 10000, # 10G + "FortyGigabitEthernet2/1": 40000 # 40G +} + +for interface, capacity in interface_capacity.items(): + if interface in utilization_summary.index: + max_util = max( + utilization_summary.loc[interface, ("input_mbps", "max")], + utilization_summary.loc[interface, ("output_mbps", "max")] + ) + utilization_pct = (max_util / capacity) * 100 + + if utilization_pct > 70: + print(f"WARNING: {interface} reached {utilization_pct:.1f}% utilization") + + # Forecast when 90% will be reached based on trend + interface_data = df[df["interface"] == interface] + + # Time series forecast (simplified linear regression) + x = np.array(range(len(interface_data))) + y = interface_data["output_mbps"].values + + # Fit line to forecast trend + z = np.polyfit(x, y, 1) + slope = z[0] + + # Calculate days until 90% capacity + if slope > 0: + mbps_until_90pct = 0.9 * capacity - max_util + days_until_90pct = mbps_until_90pct / (slope * 288) # 288 = samples per day + print(f" Forecast: Will reach 90% capacity in {days_until_90pct:.1f} days") +``` + +## Network Security Automation + +### Workflow 7: Automated ACL Audit and Remediation +**Scenario**: You need to audit all access control lists to ensure compliance with security policies. + +```python +from napalm import get_network_driver +import re +import pandas as pd +import yaml + +# Load security policy rules +with open("security_policy.yaml", "r") as f: + security_policy = yaml.safe_load(f) + +# Load device inventory +with open("inventory.yaml", "r") as f: + inventory = yaml.safe_load(f) + +# Define non-compliant patterns +dangerous_patterns = [ + r"permit ip any any", # Too permissive + r"permit tcp any any eq 23", # Telnet access + r"permit tcp any any eq 21", # FTP access + r"remark temporary|remark TEMPORARY" # Temporary rules +] + +results = [] + +# Audit ACLs on each device +for device in inventory["devices"]: + try: + driver = get_network_driver(device["os"]) + with driver(device["hostname"], device["username"], device["password"]) as device_conn: + # Get ACLs from device + acls = device_conn.get_acl() + + for acl_name, acl_details in acls.items(): + for rule_num, rule in acl_details.items(): + if isinstance(rule, dict) and "access_list" in rule: + acl_line = rule["access_list"] + + # Check against dangerous patterns + for pattern in dangerous_patterns: + if re.search(pattern, acl_line): + results.append({ + "device": device["hostname"], + "acl_name": acl_name, + "rule_number": rule_num, + "rule_text": acl_line, + "violation": pattern, + "action": "Remove or restrict" + }) + + # Check for required rules (simplified) + for required_rule in security_policy["required_rules"]: + if required_rule["acl"] == acl_name and required_rule["rule"] not in acl_line: + # Note: This is simplified. In practice, need more sophisticated rule comparison + results.append({ + "device": device["hostname"], + "acl_name": acl_name, + "rule_number": "N/A", + "rule_text": "Missing required rule", + "violation": f"Missing: {required_rule['rule']}", + "action": "Add required rule" + }) + + except Exception as e: + print(f"Error auditing {device['hostname']}: {e}") + +# Create audit report +audit_df = pd.DataFrame(results) +audit_df.to_csv("acl_audit_results.csv", index=False) +print(f"Found {len(results)} ACL compliance issues") + +# Generate remediation commands (example for a specific device) +if results: + remediation_commands = {} + + for issue in results: + device = issue["device"] + if device not in remediation_commands: + remediation_commands[device] = [] + + if "Remove" in issue["action"]: + remediation_commands[device].append(f"ip access-list extended {issue['acl_name']}") + remediation_commands[device].append(f"no {issue['rule_number']}") + elif "Add required rule" in issue["action"]: + # Find the corresponding required rule + for required_rule in security_policy["required_rules"]: + if required_rule["acl"] == issue["acl_name"]: + remediation_commands[device].append(f"ip access-list extended {issue['acl_name']}") + remediation_commands[device].append(required_rule["rule"]) + + # Print remediation commands for review + for device, commands in remediation_commands.items(): + print(f"\nRemediation commands for {device}:") + for cmd in commands: + print(f" {cmd}") +``` + +## Network API Automation + +### Workflow 8: Automating Cloud Network Configuration via APIs +**Scenario**: You need to provision network resources in a cloud environment based on your on-premises configurations. + +```python +import requests +import json +import yaml +import time +from netmiko import ConnectHandler + +# Load on-premises VLANs that need to be mirrored in cloud +with ConnectHandler(device_type="cisco_ios", host="10.1.1.1", + username="admin", password="cisco_pass") as conn: + vlan_output = conn.send_command("show vlan brief") + +# Parse VLANs (simplified) +vlans = [] +for line in vlan_output.splitlines()[2:]: # Skip header lines + if line.strip(): + parts = line.split() + if parts[0].isdigit() and int(parts[0]) < 1000: # Filter standard VLANs + vlan_id = parts[0] + vlan_name = parts[1] + vlans.append({"id": vlan_id, "name": vlan_name}) + +# Load cloud provider API configuration +with open("cloud_config.yaml", "r") as f: + cloud_config = yaml.safe_load(f) + +# Set up API connection +api_url = cloud_config["api_url"] +headers = { + "Authorization": f"Bearer {cloud_config['api_key']}", + "Content-Type": "application/json" +} + +# Get existing cloud VPCs/VNets +response = requests.get(f"{api_url}/networks", headers=headers) +cloud_networks = response.json()["networks"] + +# Find our target VPC/VNet +target_vpc = next((n for n in cloud_networks if n["name"] == cloud_config["vpc_name"]), None) + +if not target_vpc: + print(f"VPC {cloud_config['vpc_name']} not found!") + exit(1) + +# Provision subnets in cloud that match our on-prem VLANs +vpc_id = target_vpc["id"] +existing_subnets_response = requests.get(f"{api_url}/networks/{vpc_id}/subnets", headers=headers) +existing_subnets = existing_subnets_response.json()["subnets"] +existing_subnet_names = [s["name"] for s in existing_subnets] + +# Calculate subnet CIDR blocks (example logic) +base_cidr = "10.100.0.0/16" # Cloud VPC CIDR +subnet_index = 0 + +for vlan in vlans: + subnet_name = f"subnet-{vlan['name']}" + + # Skip if subnet already exists + if subnet_name in existing_subnet_names: + print(f"Subnet {subnet_name} already exists. Skipping.") + continue + + # Calculate subnet CIDR (simplified) + subnet_cidr = f"10.100.{subnet_index}.0/24" + subnet_index += 1 + + # Create subnet via API + subnet_data = { + "name": subnet_name, + "cidr_block": subnet_cidr, + "vpc_id": vpc_id, + "tags": { + "corresponds_to_vlan": vlan["id"], + "environment": "production" + } + } + + try: + response = requests.post( + f"{api_url}/networks/{vpc_id}/subnets", + headers=headers, + data=json.dumps(subnet_data) + ) + + if response.status_code == 201: + print(f"Created subnet {subnet_name} with CIDR {subnet_cidr}") + else: + print(f"Failed to create subnet {subnet_name}: {response.text}") + + except Exception as e: + print(f"API error: {e}") + + # Rate limit API calls + time.sleep(1) + +# Setup VPC peering between on-prem and cloud (via API) +peering_data = { + "name": "on-prem-to-cloud-peer", + "vpc_id": vpc_id, + "peer_vpc_id": cloud_config["on_prem_gateway_id"], + "auto_accept": True +} + +try: + response = requests.post( + f"{api_url}/vpc-peerings", + headers=headers, + data=json.dumps(peering_data) + ) + + if response.status_code == 201: + print("VPC peering connection established") + + # Update on-prem router with routes to cloud subnets + with ConnectHandler(device_type="cisco_ios", host="10.1.1.1", + username="admin", password="cisco_pass") as conn: + + # Add static routes to cloud subnets + conn.send_config_set([ + f"ip route 10.100.0.0 255.255.0.0 {cloud_config['vpn_endpoint']}" + ]) + + print("Updated on-premises router with cloud routes") + else: + print(f"Failed to establish VPC peering: {response.text}") + +except Exception as e: + print(f"API error during peering: {e}") +``` + +These workflows showcase practical applications of Python for network engineering tasks spanning discovery, configuration management, monitoring, backup, change management, capacity planning, security auditing, and cloud integration. \ No newline at end of file