Update tech_docs/networking/networking_python.md

This commit is contained in:
2025-04-08 05:24:25 +00:00
parent 04fb323876
commit 60d1d5caf4

View File

@@ -228,4 +228,662 @@
# Extract all interfaces with admin status up
result = jq.compile('.interfaces[] | select(.admin_status=="up") | .name').input(data).all()
```
```
---
# Network Engineering Python Workflows
## Network Discovery and Inventory Management
### Workflow 1: Automated Device Inventory Collection
**Scenario**: You need to maintain an accurate inventory of all network devices across multiple sites.
```python
from netmiko import ConnectHandler
import pandas as pd
import json
# Seed devices from which to discover the network
seed_devices = [
{"ip": "10.1.1.1", "device_type": "cisco_ios", "username": "admin", "password": "cisco_pass"},
{"ip": "10.2.1.1", "device_type": "cisco_ios", "username": "admin", "password": "cisco_pass"}
]
inventory = []
for device in seed_devices:
try:
with ConnectHandler(**device) as conn:
# Get device facts
hostname = conn.send_command("show hostname").strip()
version = conn.send_command("show version | include Version")
serial = conn.send_command("show inventory | include PID")
# Get CDP neighbors to expand discovery
neighbors_output = conn.send_command("show cdp neighbors detail")
inventory.append({
"hostname": hostname,
"ip": device["ip"],
"version": version,
"serial": serial
})
# Parse CDP output to find new devices (implementation simplified)
# Add new discovered devices to seed_devices list
except Exception as e:
print(f"Error connecting to {device['ip']}: {e}")
# Convert to DataFrame for analysis
df = pd.DataFrame(inventory)
df.to_csv("network_inventory.csv", index=False)
```
## Configuration Management
### Workflow 2: Compliance Verification and Remediation
**Scenario**: You need to verify that all access switches have proper security configurations and fix any violations.
```python
from nornir import InitNornir
from nornir_netmiko import netmiko_send_command, netmiko_send_config
from nornir.core.filter import F
# Initialize Nornir with inventory
nr = InitNornir(config_file="config.yaml")
# Filter only access switches
access_switches = nr.filter(F(role="access_switch"))
# Define compliance checks
security_commands = {
"port_security": "show running-config | include switchport port-security",
"bpdu_guard": "show spanning-tree summary | include BPDU Guard",
"storm_control": "show running-config | include storm-control"
}
# Check compliance
results = {}
for check_name, command in security_commands.items():
results[check_name] = access_switches.run(netmiko_send_command, command_string=command)
# Find non-compliant devices and apply fix
non_compliant = []
for device_name, device_results in results["port_security"].items():
if "switchport port-security" not in device_results.result:
non_compliant.append(device_name)
# Apply remediation to non-compliant devices
remediation_config = [
"interface range GigabitEthernet 0/1-48",
"switchport port-security",
"switchport port-security maximum 3",
"switchport port-security violation restrict",
"spanning-tree bpduguard enable",
"storm-control broadcast level 50.00"
]
# Apply fix only to non-compliant devices
non_compliant_devices = nr.filter(F(name__in=non_compliant))
remediation_results = non_compliant_devices.run(
netmiko_send_config, config_commands=remediation_config
)
print(f"Fixed {len(non_compliant)} non-compliant devices")
```
## Network Monitoring and Troubleshooting
### Workflow 3: Automated Troubleshooting for Circuit Issues
**Scenario**: A WAN circuit is experiencing intermittent problems. You need to collect data continuously to identify patterns.
```python
import time
from netmiko import ConnectHandler
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
# Connect to the edge router
device = {
"device_type": "cisco_ios",
"host": "192.168.1.254",
"username": "admin",
"password": "cisco"
}
interface = "GigabitEthernet0/0"
monitoring_period = 3600 # 1 hour
interval = 30 # seconds
metrics = []
with ConnectHandler(**device) as conn:
print(f"Monitoring {interface} for {monitoring_period/60} minutes...")
start_time = time.time()
end_time = start_time + monitoring_period
while time.time() < end_time:
# Collect interface statistics
output = conn.send_command(f"show interface {interface}")
# Parse output (simplified)
input_rate = int(output.split("input rate ")[1].split(" ")[0])
output_rate = int(output.split("output rate ")[1].split(" ")[0])
input_errors = int(output.split("input errors, ")[0].split(" ")[-1])
crc_errors = int(output.split("CRC, ")[1].split(" ")[0])
# Collect ping statistics to provider edge
ping_result = conn.send_command("ping 172.16.1.1 repeat 5")
packet_loss = int(ping_result.split("%")[0].split(" ")[-1])
# Store metrics
metrics.append({
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"input_rate": input_rate,
"output_rate": output_rate,
"input_errors": input_errors,
"crc_errors": crc_errors,
"packet_loss": packet_loss
})
# Wait for next interval
time.sleep(interval)
# Analyze collected data
df = pd.DataFrame(metrics)
df.to_csv("circuit_metrics.csv", index=False)
# Create time-series plots for analysis
plt.figure(figsize=(12, 8))
plt.subplot(3, 1, 1)
plt.plot(df["timestamp"], df["input_rate"], label="Input Rate")
plt.plot(df["timestamp"], df["output_rate"], label="Output Rate")
plt.legend()
plt.subplot(3, 1, 2)
plt.plot(df["timestamp"], df["crc_errors"], label="CRC Errors")
plt.legend()
plt.subplot(3, 1, 3)
plt.plot(df["timestamp"], df["packet_loss"], label="Packet Loss %")
plt.legend()
plt.tight_layout()
plt.savefig("circuit_analysis.png")
```
## Network Configuration Backup and Restoration
### Workflow 4: Automated Configuration Backup with Git Integration
**Scenario**: You need daily backups of all network device configurations with version control.
```python
from napalm import get_network_driver
import os
import json
from datetime import datetime
import subprocess
import yaml
# Load device inventory
with open("inventory.yaml", "r") as f:
inventory = yaml.safe_load(f)
# Create backup directory structure
backup_dir = f"backups/{datetime.now().strftime('%Y-%m-%d')}"
os.makedirs(backup_dir, exist_ok=True)
# Backup configurations
for device in inventory["devices"]:
try:
# Initialize the appropriate driver
driver = get_network_driver(device["os"])
with driver(device["hostname"], device["username"], device["password"]) as device_conn:
# Get running configuration
running_config = device_conn.get_config()["running"]
# Save to file
device_file = f"{backup_dir}/{device['hostname']}.cfg"
with open(device_file, "w") as f:
f.write(running_config)
print(f"Backed up {device['hostname']}")
except Exception as e:
print(f"Failed to backup {device['hostname']}: {e}")
# Git integration
try:
# Initialize Git repo if not exists
if not os.path.exists(os.path.join("backups", ".git")):
subprocess.run(["git", "init"], cwd="backups")
# Add all files
subprocess.run(["git", "add", "."], cwd="backups")
# Commit changes
commit_message = f"Daily backup {datetime.now().strftime('%Y-%m-%d')}"
subprocess.run(["git", "commit", "-m", commit_message], cwd="backups")
# Push to remote (if configured)
subprocess.run(["git", "push"], cwd="backups")
print("Backup committed to git repository")
except Exception as e:
print(f"Git operation failed: {e}")
```
## Network Change Management
### Workflow 5: Phased Network Change with Rollback
**Scenario**: You need to deploy a new routing protocol across the WAN with automatic rollback if connectivity tests fail.
```python
from nornir import InitNornir
from nornir_netmiko import netmiko_send_command, netmiko_send_config
from nornir.core.filter import F
import time
# Initialize Nornir
nr = InitNornir(config_file="config.yaml")
# Filter core routers for the first phase
core_routers = nr.filter(F(role="core_router"))
# Define change configuration
ospf_config = [
"router ospf 100",
"network 10.0.0.0 0.255.255.255 area 0",
"redistribute connected subnets",
"no auto-summary"
]
# Define validation tests
def validate_connectivity(task):
# Test connectivity to critical hosts
result = task.run(
netmiko_send_command,
command_string="ping 10.100.1.1 repeat 5"
)
# Check success criteria
success = "Success rate is 100 percent" in result.result
return success
# Phase 1: Deploy to core routers
print("Phase 1: Deploying to core routers...")
core_results = core_routers.run(netmiko_send_config, config_commands=ospf_config)
# Save configurations if successful
for host, result in core_results.items():
if not result.failed:
core_routers.filter(name=host).run(
netmiko_send_command, command_string="write memory"
)
# Validate core router changes
time.sleep(60) # Allow for convergence
validation_results = []
for host in core_routers.inventory.hosts:
success = core_routers.filter(name=host).run(task=validate_connectivity)[host][0].result
validation_results.append(success)
# If validation fails, roll back
if not all(validation_results):
print("Validation failed! Rolling back changes...")
rollback_commands = [
"no router ospf 100"
]
core_routers.run(netmiko_send_config, config_commands=rollback_commands)
print("Rollback completed.")
else:
# Continue with Phase 2 (distribution routers)
print("Phase 1 successful. Proceeding to Phase 2...")
dist_routers = nr.filter(F(role="distribution_router"))
# Similar implementation for Phase 2
```
## Network Capacity Planning
### Workflow 6: Interface Utilization Analysis for Capacity Planning
**Scenario**: You need to identify potential bottlenecks by analyzing interface utilization trends.
```python
import pandas as pd
import matplotlib.pyplot as plt
from netmiko import ConnectHandler
import time
from datetime import datetime, timedelta
import numpy as np
# Connect to core switch
device = {
"device_type": "cisco_ios",
"host": "10.0.1.1",
"username": "admin",
"password": "cisco_pass"
}
# Get list of uplink interfaces
with ConnectHandler(**device) as conn:
interfaces_output = conn.send_command("show interface status | include uplink")
uplink_interfaces = [line.split()[0] for line in interfaces_output.splitlines()]
# Collect utilization data over 24 hours (simplified - in real world use SNMP or streaming telemetry)
collection_period = 24 * 60 * 60 # 24 hours
interval = 300 # 5 minutes
data = []
with ConnectHandler(**device) as conn:
start_time = time.time()
end_time = start_time + collection_period
# For demo purposes, we'll simulate with fewer iterations
for _ in range(5): # In production, use actual time-based loop
timestamp = datetime.now()
for interface in uplink_interfaces:
output = conn.send_command(f"show interface {interface} | include rate")
# Extract input and output utilization
input_rate = int(output.split("input rate ")[1].split(" ")[0])
output_rate = int(output.split("output rate ")[1].split(" ")[0])
# Convert to Mbps
input_mbps = input_rate * 8 / 1000000
output_mbps = output_rate * 8 / 1000000
data.append({
"timestamp": timestamp,
"interface": interface,
"input_mbps": input_mbps,
"output_mbps": output_mbps
})
# In real implementation, wait for next interval
# time.sleep(interval)
# Convert to DataFrame
df = pd.DataFrame(data)
# Analysis
utilization_summary = df.groupby("interface").agg({
"input_mbps": ["mean", "max", "std"],
"output_mbps": ["mean", "max", "std"]
})
# Find interfaces with high utilization (>70% of capacity)
interface_capacity = {
"TenGigabitEthernet1/1": 10000, # 10G
"FortyGigabitEthernet2/1": 40000 # 40G
}
for interface, capacity in interface_capacity.items():
if interface in utilization_summary.index:
max_util = max(
utilization_summary.loc[interface, ("input_mbps", "max")],
utilization_summary.loc[interface, ("output_mbps", "max")]
)
utilization_pct = (max_util / capacity) * 100
if utilization_pct > 70:
print(f"WARNING: {interface} reached {utilization_pct:.1f}% utilization")
# Forecast when 90% will be reached based on trend
interface_data = df[df["interface"] == interface]
# Time series forecast (simplified linear regression)
x = np.array(range(len(interface_data)))
y = interface_data["output_mbps"].values
# Fit line to forecast trend
z = np.polyfit(x, y, 1)
slope = z[0]
# Calculate days until 90% capacity
if slope > 0:
mbps_until_90pct = 0.9 * capacity - max_util
days_until_90pct = mbps_until_90pct / (slope * 288) # 288 = samples per day
print(f" Forecast: Will reach 90% capacity in {days_until_90pct:.1f} days")
```
## Network Security Automation
### Workflow 7: Automated ACL Audit and Remediation
**Scenario**: You need to audit all access control lists to ensure compliance with security policies.
```python
from napalm import get_network_driver
import re
import pandas as pd
import yaml
# Load security policy rules
with open("security_policy.yaml", "r") as f:
security_policy = yaml.safe_load(f)
# Load device inventory
with open("inventory.yaml", "r") as f:
inventory = yaml.safe_load(f)
# Define non-compliant patterns
dangerous_patterns = [
r"permit ip any any", # Too permissive
r"permit tcp any any eq 23", # Telnet access
r"permit tcp any any eq 21", # FTP access
r"remark temporary|remark TEMPORARY" # Temporary rules
]
results = []
# Audit ACLs on each device
for device in inventory["devices"]:
try:
driver = get_network_driver(device["os"])
with driver(device["hostname"], device["username"], device["password"]) as device_conn:
# Get ACLs from device
acls = device_conn.get_acl()
for acl_name, acl_details in acls.items():
for rule_num, rule in acl_details.items():
if isinstance(rule, dict) and "access_list" in rule:
acl_line = rule["access_list"]
# Check against dangerous patterns
for pattern in dangerous_patterns:
if re.search(pattern, acl_line):
results.append({
"device": device["hostname"],
"acl_name": acl_name,
"rule_number": rule_num,
"rule_text": acl_line,
"violation": pattern,
"action": "Remove or restrict"
})
# Check for required rules (simplified)
for required_rule in security_policy["required_rules"]:
if required_rule["acl"] == acl_name and required_rule["rule"] not in acl_line:
# Note: This is simplified. In practice, need more sophisticated rule comparison
results.append({
"device": device["hostname"],
"acl_name": acl_name,
"rule_number": "N/A",
"rule_text": "Missing required rule",
"violation": f"Missing: {required_rule['rule']}",
"action": "Add required rule"
})
except Exception as e:
print(f"Error auditing {device['hostname']}: {e}")
# Create audit report
audit_df = pd.DataFrame(results)
audit_df.to_csv("acl_audit_results.csv", index=False)
print(f"Found {len(results)} ACL compliance issues")
# Generate remediation commands (example for a specific device)
if results:
remediation_commands = {}
for issue in results:
device = issue["device"]
if device not in remediation_commands:
remediation_commands[device] = []
if "Remove" in issue["action"]:
remediation_commands[device].append(f"ip access-list extended {issue['acl_name']}")
remediation_commands[device].append(f"no {issue['rule_number']}")
elif "Add required rule" in issue["action"]:
# Find the corresponding required rule
for required_rule in security_policy["required_rules"]:
if required_rule["acl"] == issue["acl_name"]:
remediation_commands[device].append(f"ip access-list extended {issue['acl_name']}")
remediation_commands[device].append(required_rule["rule"])
# Print remediation commands for review
for device, commands in remediation_commands.items():
print(f"\nRemediation commands for {device}:")
for cmd in commands:
print(f" {cmd}")
```
## Network API Automation
### Workflow 8: Automating Cloud Network Configuration via APIs
**Scenario**: You need to provision network resources in a cloud environment based on your on-premises configurations.
```python
import requests
import json
import yaml
import time
from netmiko import ConnectHandler
# Load on-premises VLANs that need to be mirrored in cloud
with ConnectHandler(device_type="cisco_ios", host="10.1.1.1",
username="admin", password="cisco_pass") as conn:
vlan_output = conn.send_command("show vlan brief")
# Parse VLANs (simplified)
vlans = []
for line in vlan_output.splitlines()[2:]: # Skip header lines
if line.strip():
parts = line.split()
if parts[0].isdigit() and int(parts[0]) < 1000: # Filter standard VLANs
vlan_id = parts[0]
vlan_name = parts[1]
vlans.append({"id": vlan_id, "name": vlan_name})
# Load cloud provider API configuration
with open("cloud_config.yaml", "r") as f:
cloud_config = yaml.safe_load(f)
# Set up API connection
api_url = cloud_config["api_url"]
headers = {
"Authorization": f"Bearer {cloud_config['api_key']}",
"Content-Type": "application/json"
}
# Get existing cloud VPCs/VNets
response = requests.get(f"{api_url}/networks", headers=headers)
cloud_networks = response.json()["networks"]
# Find our target VPC/VNet
target_vpc = next((n for n in cloud_networks if n["name"] == cloud_config["vpc_name"]), None)
if not target_vpc:
print(f"VPC {cloud_config['vpc_name']} not found!")
exit(1)
# Provision subnets in cloud that match our on-prem VLANs
vpc_id = target_vpc["id"]
existing_subnets_response = requests.get(f"{api_url}/networks/{vpc_id}/subnets", headers=headers)
existing_subnets = existing_subnets_response.json()["subnets"]
existing_subnet_names = [s["name"] for s in existing_subnets]
# Calculate subnet CIDR blocks (example logic)
base_cidr = "10.100.0.0/16" # Cloud VPC CIDR
subnet_index = 0
for vlan in vlans:
subnet_name = f"subnet-{vlan['name']}"
# Skip if subnet already exists
if subnet_name in existing_subnet_names:
print(f"Subnet {subnet_name} already exists. Skipping.")
continue
# Calculate subnet CIDR (simplified)
subnet_cidr = f"10.100.{subnet_index}.0/24"
subnet_index += 1
# Create subnet via API
subnet_data = {
"name": subnet_name,
"cidr_block": subnet_cidr,
"vpc_id": vpc_id,
"tags": {
"corresponds_to_vlan": vlan["id"],
"environment": "production"
}
}
try:
response = requests.post(
f"{api_url}/networks/{vpc_id}/subnets",
headers=headers,
data=json.dumps(subnet_data)
)
if response.status_code == 201:
print(f"Created subnet {subnet_name} with CIDR {subnet_cidr}")
else:
print(f"Failed to create subnet {subnet_name}: {response.text}")
except Exception as e:
print(f"API error: {e}")
# Rate limit API calls
time.sleep(1)
# Setup VPC peering between on-prem and cloud (via API)
peering_data = {
"name": "on-prem-to-cloud-peer",
"vpc_id": vpc_id,
"peer_vpc_id": cloud_config["on_prem_gateway_id"],
"auto_accept": True
}
try:
response = requests.post(
f"{api_url}/vpc-peerings",
headers=headers,
data=json.dumps(peering_data)
)
if response.status_code == 201:
print("VPC peering connection established")
# Update on-prem router with routes to cloud subnets
with ConnectHandler(device_type="cisco_ios", host="10.1.1.1",
username="admin", password="cisco_pass") as conn:
# Add static routes to cloud subnets
conn.send_config_set([
f"ip route 10.100.0.0 255.255.0.0 {cloud_config['vpn_endpoint']}"
])
print("Updated on-premises router with cloud routes")
else:
print(f"Failed to establish VPC peering: {response.text}")
except Exception as e:
print(f"API error during peering: {e}")
```
These workflows showcase practical applications of Python for network engineering tasks spanning discovery, configuration management, monitoring, backup, change management, capacity planning, security auditing, and cloud integration.