80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
import asyncio
|
|
import aiohttp
|
|
import json
|
|
import ipaddress
|
|
import os
|
|
|
|
PROGRESS_FILE = "progress.json"
|
|
IPS_UP_FILE = "ips_up.json"
|
|
MAX_CONCURRENT_SCANS = 500 # Adjust based on system capabilities
|
|
|
|
|
|
async def save_progress(progress):
|
|
"""Save the progress to a JSON file."""
|
|
with open(PROGRESS_FILE, "w") as f:
|
|
json.dump(progress, f)
|
|
|
|
|
|
def load_progress():
|
|
"""Load the progress from a JSON file."""
|
|
if os.path.exists(PROGRESS_FILE):
|
|
with open(PROGRESS_FILE, "r") as f:
|
|
return json.load(f)
|
|
return {"current_ip": "0.0.0.0"}
|
|
|
|
|
|
def load_ips_up():
|
|
"""Load the list of IPs that are up."""
|
|
if os.path.exists(IPS_UP_FILE):
|
|
with open(IPS_UP_FILE, "r") as f:
|
|
return json.load(f)
|
|
return []
|
|
|
|
|
|
async def save_ips_up(ips_up):
|
|
"""Save the list of IPs that are up."""
|
|
with open(IPS_UP_FILE, "w") as f:
|
|
json.dump(ips_up, f)
|
|
|
|
|
|
async def scan_ip(ip, session, ips_up):
|
|
"""Simulate an IP scan and check if it's reachable."""
|
|
try:
|
|
async with session.get(f"http://{ip}", timeout=2) as response:
|
|
if response.status == 200: # Or use any other status indicating 'up'
|
|
print(f"IP: {ip} is UP")
|
|
ips_up.append(ip)
|
|
else:
|
|
print(f"IP: {ip} is DOWN, Status: {response.status}")
|
|
except Exception as e:
|
|
print(f"IP: {ip} is DOWN, Error: {e}")
|
|
|
|
|
|
async def scan_range(start_ip):
|
|
"""Scan all IPv4 addresses starting from a specific IP."""
|
|
progress = load_progress()
|
|
ips_up = load_ips_up()
|
|
current_ip = progress.get("current_ip", start_ip)
|
|
ip_pool = ipaddress.IPv4Address(current_ip)
|
|
total_ips = ipaddress.IPv4Address("255.255.255.255")
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
while ip_pool <= total_ips:
|
|
tasks = []
|
|
for _ in range(MAX_CONCURRENT_SCANS):
|
|
if ip_pool > total_ips:
|
|
break
|
|
tasks.append(scan_ip(str(ip_pool), session, ips_up))
|
|
ip_pool += 1
|
|
|
|
await asyncio.gather(*tasks)
|
|
progress["current_ip"] = str(ip_pool)
|
|
await save_progress(progress)
|
|
await save_ips_up(ips_up)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(scan_range("0.0.0.0"))
|
|
except KeyboardInterrupt:
|
|
print("Scan interrupted. Progress and reachable IPs saved.") |