Subir archivos a "/"

This commit is contained in:
nix 2025-11-08 00:19:44 +00:00
parent 50c4c9669f
commit 0cf44045c9
3 changed files with 215 additions and 125 deletions

View File

@ -1,94 +1,73 @@
import os import os
import hashlib
import json import json
import hashlib
import requests import requests
import subprocess
from datetime import datetime, timedelta from datetime import datetime, timedelta
from flask import Flask, jsonify, Response, request from flask import Flask, jsonify, Response, request
from urllib.parse import urljoin
import tokenext
from healthcheck import ensure_config_exists
BASE_API = "https://ws1.smn.gob.ar" config = ensure_config_exists()
CACHE_DIR = "json.api.cache" server_cfg = config["server"]
PORT = int(server_cfg.get("port", 6942))
PASSWORD = server_cfg.get("password", "").strip()
CACHE_DIR = server_cfg.get("cache_dir", "cache")
BASE_API = server_cfg.get("base_api", "https://ws1.smn.gob.ar")
LOG_FILE = server_cfg.get("log_file", "").strip()
SMN_TOKEN_FILE = "token"
CACHE_TTL = timedelta(minutes=60) CACHE_TTL = timedelta(minutes=60)
SMN_TOKEN_FILE = "token.txt" AUTH_ENABLED = PASSWORD != ""
ACCESS_TOKEN = "i.hate.smn"
TIMEOUT = 10
app = Flask(__name__) app = Flask(__name__)
app.config["JSONIFY_PRETTYPRINT_REGULAR"] = False
@app.after_request
def remove_server_header(response):
response.headers["Server"] = ""
response.headers["X-Powered-By"] = ""
return response
def log(msg: str):
if LOG_FILE:
with open(LOG_FILE, "a") as f:
f.write(f"[{datetime.now().isoformat()}] {msg}\n")
print(msg)
def get_cache_filename(url: str) -> str: def get_cache_filename(url: str) -> str:
h = hashlib.sha256(url.encode()).hexdigest() h = hashlib.sha256(url.encode()).hexdigest()
return os.path.join(CACHE_DIR, f"{h}.json") return os.path.join(CACHE_DIR, f"{h}.json")
def load_cache(url: str): def load_cache(url: str):
path = get_cache_filename(url)
if not os.path.exists(path):
return None
mtime = datetime.fromtimestamp(os.path.getmtime(path))
if datetime.now() - mtime > CACHE_TTL:
return None
try: try:
path = get_cache_filename(url)
if not os.path.exists(path):
return None
mtime = datetime.fromtimestamp(os.path.getmtime(path))
if datetime.now() - mtime > CACHE_TTL:
return None
with open(path, "r", encoding="utf-8") as f: with open(path, "r", encoding="utf-8") as f:
return json.load(f) return json.load(f)
except Exception: except Exception:
return None return None
def save_cache(url: str, data: dict): def save_cache(url: str, data: dict):
try: os.makedirs(CACHE_DIR, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True) path = get_cache_filename(url)
path = get_cache_filename(url) with open(path, "w", encoding="utf-8") as f:
with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False)
json.dump(data, f, ensure_ascii=False)
except Exception as e:
print(f"[CACHE] Failed to save cache: {e}")
def load_smn_token(): def load_smn_token():
try: with open(SMN_TOKEN_FILE, "r") as f:
with open(SMN_TOKEN_FILE, "r", encoding="utf-8") as f: return f.read().strip()
token = f.read().strip()
if not token:
raise ValueError("Empty token file.")
return token
except Exception as e:
print(f"[TOKEN] Error loading token: {e}")
return ""
def refresh_smn_token(): def refresh_smn_token():
print("[TOKEN] Refreshing SMN token...") log("[TOKEN] Refreshing SMN token...")
try: ok = tokenext.refresh_token(output_file=SMN_TOKEN_FILE, headless=True, wait_seconds=8)
result = subprocess.run( if ok:
["python3", "tokenext.py"], log("[TOKEN] Token refreshed successfully.")
capture_output=True, else:
text=True, log("[TOKEN] Failed to refresh token.")
timeout=60
)
if result.returncode == 0:
print("[TOKEN] Token refreshed successfully.")
else:
print(f"[TOKEN] Refresh failed: {result.stderr.strip()}")
except Exception as e:
print(f"[TOKEN] Error running tokenext.py: {e}")
def check_access_token(): def check_access_token():
if not AUTH_ENABLED:
return True
header_token = request.headers.get("Authorization", "").strip() header_token = request.headers.get("Authorization", "").strip()
if header_token != ACCESS_TOKEN: return header_token == PASSWORD
# Drop unauthorized message (empty response)
return Response("", status=401)
return None
def fetch_from_smn(url: str, retry: bool = True): def fetch_from_smn(url: str, retry: bool = True):
token = load_smn_token() token = load_smn_token()
@ -99,52 +78,50 @@ def fetch_from_smn(url: str, retry: bool = True):
} }
try: try:
resp = requests.get(url, headers=headers, timeout=TIMEOUT) resp = requests.get(url, headers=headers, timeout=10)
except requests.RequestException as e: except requests.RequestException as e:
print(f"[ERROR] Request failed: {e}") return Response(str(e), status=502)
return None
if resp.status_code == 401 and retry: if resp.status_code == 401 and retry:
print("[AUTH] SMN token expired, refreshing...") log("[AUTH] SMN token expired, trying to refresh...")
refresh_smn_token() refresh_smn_token()
return fetch_from_smn(url, retry=False) return fetch_from_smn(url, retry=False)
return resp return resp
@app.route("/smn/<path:subpath>")
@app.route("/<path:subpath>")
def smn_proxy(subpath): def smn_proxy(subpath):
unauthorized = check_access_token() if not check_access_token():
if unauthorized: return jsonify({"error": "Unauthorized"}), 401
return unauthorized
url = f"{BASE_API}/{subpath}" if ".." in subpath or subpath.startswith("/"):
return jsonify({"error": "Invalid path"}), 400
url = urljoin(BASE_API + "/", subpath)
# Cache check
cached = load_cache(url) cached = load_cache(url)
if cached: if cached:
print(f"[CACHE] Loaded {subpath}") log(f"[CACHE] Loaded {subpath}")
return jsonify(cached) return jsonify(cached)
print(f"[FETCH] {url}") log(f"[FETCH] {url}")
resp = fetch_from_smn(url) resp = fetch_from_smn(url)
if not resp:
return Response("", status=502)
if resp.status_code >= 400: if not hasattr(resp, "status_code"):
print(f"[WARN] {resp.status_code} for {url}") return Response("Upstream error", status=502)
return Response("", status=resp.status_code)
if resp.status_code != 200:
return Response(resp.text, status=resp.status_code,
content_type=resp.headers.get("Content-Type", "text/plain"))
try: try:
data = resp.json() data = resp.json()
save_cache(url, data)
return jsonify(data)
except Exception: except Exception:
print("[ERROR] Invalid JSON response.") return Response("Invalid JSON from SMN", status=502)
return Response("", status=502)
save_cache(url, data)
return jsonify(data)
if __name__ == "__main__": if __name__ == "__main__":
os.makedirs(CACHE_DIR, exist_ok=True) os.makedirs(CACHE_DIR, exist_ok=True)
app.run(host="0.0.0.0", port=6942, debug=False, use_reloader=False) log(f"[STARTUP] Server starting on port {PORT}")
app.run(host="0.0.0.0", port=PORT)

39
healthcheck.py Normal file
View File

@ -0,0 +1,39 @@
import os
import configparser
CONFIG_FILE = "config.ini"
DEFAULT_CONFIG = {
"server": {
"port": "6942",
"password": "debug",
"cache_dir": "cache",
"base_api": "https://ws1.smn.gob.ar",
"log_file": ""
}
}
def ensure_config_exists():
config = configparser.ConfigParser()
if not os.path.exists(CONFIG_FILE):
print("[CONFIG] config.ini not found - creating with default values")
config.read_dict(DEFAULT_CONFIG)
with open(CONFIG_FILE, "w") as f:
config.write(f)
else:
config.read(CONFIG_FILE)
changed = False
for section, values in DEFAULT_CONFIG.items():
if section not in config:
config[section] = values
changed = True
else:
for key, val in values.items():
if key not in config[section]:
config[section][key] = val
changed = True
if changed:
with open(CONFIG_FILE, "w") as f:
config.write(f)
print("[CONFIG] Missing keys added to config.ini")
return config

View File

@ -1,62 +1,136 @@
# tokenext.py
import re import re
import time import time
from typing import Optional
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
URL = "https://www.smn.gob.ar/" DEFAULT_URL = "https://www.smn.gob.ar/"
OUTPUT_FILE = "token.txt" DEFAULT_OUTPUT_FILE = "token"
def extract_token_from_source(source: str):
m = re.search(r"localStorage\.setItem\(\s*['\"]token['\"]\s*,\s*['\"]([^'\"]+)['\"]\s*\)", source)
return m.group(1) if m else None
chrome_options = Options() def extract_token_from_source(source: str) -> Optional[str]:
chrome_options.add_argument("--headless=new") m = re.search(
chrome_options.add_argument("--no-sandbox") r"localStorage\.setItem\(\s*['\"]token['\"]\s*,\s*['\"]([^'\"]+)['\"]\s*\)",
chrome_options.add_argument("--disable-dev-shm-usage") source,
chrome_options.add_argument("--disable-gpu") )
chrome_options.add_argument("--disable-blink-features=AutomationControlled") if m:
chrome_options.add_argument("--window-size=1920,1080") return m.group(1)
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/118.0.5993.90 Safari/537.36")
driver = webdriver.Chrome(options=chrome_options) m = re.search(r"localStorage\.token\s*=\s*['\"]([^'\"]+)['\"]", source)
if m:
return m.group(1)
try: return None
print(f"Loading URL {URL}")
driver.get(URL)
time.sleep(8)
def make_chrome_options(headless: bool = True) -> Options:
opts = Options()
if headless:
try:
opts.add_argument("--headless=new")
except Exception:
opts.add_argument("--headless")
opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage")
opts.add_argument("--disable-gpu")
opts.add_argument("--disable-blink-features=AutomationControlled")
opts.add_argument("--window-size=1920,1080")
opts.add_argument(
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/118.0.5993.90 Safari/537.36"
)
return opts
def get_token(
url: str = DEFAULT_URL,
headless: bool = True,
wait_seconds: int = 8,
driver_wait_timeout: int = 20,
chrome_driver_path: Optional[str] = None,
) -> Optional[str]:
options = make_chrome_options(headless=headless)
driver = None
try: try:
WebDriverWait(driver, 20).until( if chrome_driver_path:
EC.presence_of_element_located((By.TAG_NAME, "body")) driver = webdriver.Chrome(executable_path=chrome_driver_path, options=options)
) else:
except Exception: driver = webdriver.Chrome(options=options)
pass
# load page
driver.get(url)
if wait_seconds:
time.sleep(wait_seconds)
try:
WebDriverWait(driver, driver_wait_timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
except Exception:
pass
try:
token = driver.execute_script("return window.localStorage.getItem('token');")
except Exception:
token = None token = None
try:
token = driver.execute_script("return window.localStorage.getItem('token');")
except Exception:
token = None
if not token: # fallback to searching page source
token = extract_token_from_source(driver.page_source) if not token:
token = extract_token_from_source(driver.page_source)
return token
except Exception as ex:
return None
finally:
if driver:
try:
driver.quit()
except Exception:
pass
def refresh_token(
output_file: str = DEFAULT_OUTPUT_FILE,
**get_token_kwargs,
) -> bool:
token = get_token(**get_token_kwargs)
if token:
try:
with open(output_file, "w", encoding="utf-8") as f:
f.write(token)
return True
except Exception:
return False
return False
if __name__ == "__main__":
import sys
url = DEFAULT_URL
if len(sys.argv) > 1:
url = sys.argv[1]
print(f"Loading URL {url}")
token = get_token(headless=True)
if token: if token:
print(f"\n[+] Token found:\n{token}\n") print(f"\n[+] Token found:\n{token}\n")
with open(OUTPUT_FILE, "w", encoding="utf-8") as f: try:
f.write(token) with open(DEFAULT_OUTPUT_FILE, "w", encoding="utf-8") as fh:
print(f"[+] Saved to {OUTPUT_FILE}") fh.write(token)
print(f"[+] Saved to {DEFAULT_OUTPUT_FILE}")
except Exception as e:
print("[!] Failed to save token:", e)
else: else:
print("[!] No token found in localStorage or page source.") print("[!] No token found in localStorage or page source.")
with open("page_debug.html", "w", encoding="utf-8") as f:
f.write(driver.page_source)
print("[i] Saved full page source to page_debug.html for manual check.")
finally:
driver.quit()