walrus/walrus.py
2024-11-23 10:51:55 +00:00

80 lines
2.3 KiB
Python

import asyncio
import aiohttp
import json
import ipaddress
import os
PROGRESS_FILE = "progress.json"
IPS_UP_FILE = "ips_up.json"
MAX_CONCURRENT_SCANS = 500 # Adjust based on system capabilities
async def save_progress(progress):
"""Save the progress to a JSON file."""
with open(PROGRESS_FILE, "w") as f:
json.dump(progress, f)
def load_progress():
"""Load the progress from a JSON file."""
if os.path.exists(PROGRESS_FILE):
with open(PROGRESS_FILE, "r") as f:
return json.load(f)
return {"current_ip": "0.0.0.0"}
def load_ips_up():
"""Load the list of IPs that are up."""
if os.path.exists(IPS_UP_FILE):
with open(IPS_UP_FILE, "r") as f:
return json.load(f)
return []
async def save_ips_up(ips_up):
"""Save the list of IPs that are up."""
with open(IPS_UP_FILE, "w") as f:
json.dump(ips_up, f)
async def scan_ip(ip, session, ips_up):
"""Simulate an IP scan and check if it's reachable."""
try:
async with session.get(f"http://{ip}", timeout=2) as response:
if response.status == 200: # Or use any other status indicating 'up'
print(f"IP: {ip} is UP")
ips_up.append(ip)
else:
print(f"IP: {ip} is DOWN, Status: {response.status}")
except Exception as e:
print(f"IP: {ip} is DOWN, Error: {e}")
async def scan_range(start_ip):
"""Scan all IPv4 addresses starting from a specific IP."""
progress = load_progress()
ips_up = load_ips_up()
current_ip = progress.get("current_ip", start_ip)
ip_pool = ipaddress.IPv4Address(current_ip)
total_ips = ipaddress.IPv4Address("255.255.255.255")
async with aiohttp.ClientSession() as session:
while ip_pool <= total_ips:
tasks = []
for _ in range(MAX_CONCURRENT_SCANS):
if ip_pool > total_ips:
break
tasks.append(scan_ip(str(ip_pool), session, ips_up))
ip_pool += 1
await asyncio.gather(*tasks)
progress["current_ip"] = str(ip_pool)
await save_progress(progress)
await save_ips_up(ips_up)
if __name__ == "__main__":
try:
asyncio.run(scan_range("0.0.0.0"))
except KeyboardInterrupt:
print("Scan interrupted. Progress and reachable IPs saved.")