import asyncio import aiohttp import json import ipaddress import os PROGRESS_FILE = "progress.json" IPS_UP_FILE = "ips_up.json" MAX_CONCURRENT_SCANS = 500 # Adjust based on system capabilities async def save_progress(progress): """Save the progress to a JSON file.""" with open(PROGRESS_FILE, "w") as f: json.dump(progress, f) def load_progress(): """Load the progress from a JSON file.""" if os.path.exists(PROGRESS_FILE): with open(PROGRESS_FILE, "r") as f: return json.load(f) return {"current_ip": "0.0.0.0"} def load_ips_up(): """Load the list of IPs that are up.""" if os.path.exists(IPS_UP_FILE): with open(IPS_UP_FILE, "r") as f: return json.load(f) return [] async def save_ips_up(ips_up): """Save the list of IPs that are up.""" with open(IPS_UP_FILE, "w") as f: json.dump(ips_up, f) async def scan_ip(ip, session, ips_up): """Simulate an IP scan and check if it's reachable.""" try: async with session.get(f"http://{ip}", timeout=2) as response: if response.status == 200: # Or use any other status indicating 'up' print(f"IP: {ip} is UP") ips_up.append(ip) else: print(f"IP: {ip} is DOWN, Status: {response.status}") except Exception as e: print(f"IP: {ip} is DOWN, Error: {e}") async def scan_range(start_ip): """Scan all IPv4 addresses starting from a specific IP.""" progress = load_progress() ips_up = load_ips_up() current_ip = progress.get("current_ip", start_ip) ip_pool = ipaddress.IPv4Address(current_ip) total_ips = ipaddress.IPv4Address("255.255.255.255") async with aiohttp.ClientSession() as session: while ip_pool <= total_ips: tasks = [] for _ in range(MAX_CONCURRENT_SCANS): if ip_pool > total_ips: break tasks.append(scan_ip(str(ip_pool), session, ips_up)) ip_pool += 1 await asyncio.gather(*tasks) progress["current_ip"] = str(ip_pool) await save_progress(progress) await save_ips_up(ips_up) if __name__ == "__main__": try: asyncio.run(scan_range("0.0.0.0")) except KeyboardInterrupt: print("Scan interrupted. Progress and reachable IPs saved.")