diff --git a/berretin.py b/berretin.py index ef52168..f86ada3 100644 --- a/berretin.py +++ b/berretin.py @@ -1,94 +1,73 @@ import os -import hashlib import json +import hashlib import requests -import subprocess from datetime import datetime, timedelta from flask import Flask, jsonify, Response, request +from urllib.parse import urljoin +import tokenext +from healthcheck import ensure_config_exists -BASE_API = "https://ws1.smn.gob.ar" -CACHE_DIR = "json.api.cache" +config = ensure_config_exists() +server_cfg = config["server"] + +PORT = int(server_cfg.get("port", 6942)) +PASSWORD = server_cfg.get("password", "").strip() +CACHE_DIR = server_cfg.get("cache_dir", "cache") +BASE_API = server_cfg.get("base_api", "https://ws1.smn.gob.ar") +LOG_FILE = server_cfg.get("log_file", "").strip() +SMN_TOKEN_FILE = "token" CACHE_TTL = timedelta(minutes=60) -SMN_TOKEN_FILE = "token.txt" -ACCESS_TOKEN = "i.hate.smn" -TIMEOUT = 10 +AUTH_ENABLED = PASSWORD != "" app = Flask(__name__) -app.config["JSONIFY_PRETTYPRINT_REGULAR"] = False - - -@app.after_request -def remove_server_header(response): - response.headers["Server"] = "" - response.headers["X-Powered-By"] = "" - return response +def log(msg: str): + if LOG_FILE: + with open(LOG_FILE, "a") as f: + f.write(f"[{datetime.now().isoformat()}] {msg}\n") + print(msg) def get_cache_filename(url: str) -> str: h = hashlib.sha256(url.encode()).hexdigest() return os.path.join(CACHE_DIR, f"{h}.json") - def load_cache(url: str): + path = get_cache_filename(url) + if not os.path.exists(path): + return None + mtime = datetime.fromtimestamp(os.path.getmtime(path)) + if datetime.now() - mtime > CACHE_TTL: + return None try: - path = get_cache_filename(url) - if not os.path.exists(path): - return None - mtime = datetime.fromtimestamp(os.path.getmtime(path)) - if datetime.now() - mtime > CACHE_TTL: - return None with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return None - def save_cache(url: str, data: dict): - try: - os.makedirs(CACHE_DIR, exist_ok=True) - path = get_cache_filename(url) - with open(path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False) - except Exception as e: - print(f"[CACHE] Failed to save cache: {e}") - + os.makedirs(CACHE_DIR, exist_ok=True) + path = get_cache_filename(url) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) def load_smn_token(): - try: - with open(SMN_TOKEN_FILE, "r", encoding="utf-8") as f: - token = f.read().strip() - if not token: - raise ValueError("Empty token file.") - return token - except Exception as e: - print(f"[TOKEN] Error loading token: {e}") - return "" - + with open(SMN_TOKEN_FILE, "r") as f: + return f.read().strip() def refresh_smn_token(): - print("[TOKEN] Refreshing SMN token...") - try: - result = subprocess.run( - ["python3", "tokenext.py"], - capture_output=True, - text=True, - timeout=60 - ) - if result.returncode == 0: - print("[TOKEN] Token refreshed successfully.") - else: - print(f"[TOKEN] Refresh failed: {result.stderr.strip()}") - except Exception as e: - print(f"[TOKEN] Error running tokenext.py: {e}") - + log("[TOKEN] Refreshing SMN token...") + ok = tokenext.refresh_token(output_file=SMN_TOKEN_FILE, headless=True, wait_seconds=8) + if ok: + log("[TOKEN] Token refreshed successfully.") + else: + log("[TOKEN] Failed to refresh token.") def check_access_token(): + if not AUTH_ENABLED: + return True header_token = request.headers.get("Authorization", "").strip() - if header_token != ACCESS_TOKEN: - # Drop unauthorized message (empty response) - return Response("", status=401) - return None - + return header_token == PASSWORD def fetch_from_smn(url: str, retry: bool = True): token = load_smn_token() @@ -99,52 +78,50 @@ def fetch_from_smn(url: str, retry: bool = True): } try: - resp = requests.get(url, headers=headers, timeout=TIMEOUT) + resp = requests.get(url, headers=headers, timeout=10) except requests.RequestException as e: - print(f"[ERROR] Request failed: {e}") - return None + return Response(str(e), status=502) if resp.status_code == 401 and retry: - print("[AUTH] SMN token expired, refreshing...") + log("[AUTH] SMN token expired, trying to refresh...") refresh_smn_token() return fetch_from_smn(url, retry=False) return resp - -@app.route("/") +@app.route("/smn/") def smn_proxy(subpath): - unauthorized = check_access_token() - if unauthorized: - return unauthorized + if not check_access_token(): + return jsonify({"error": "Unauthorized"}), 401 - url = f"{BASE_API}/{subpath}" + if ".." in subpath or subpath.startswith("/"): + return jsonify({"error": "Invalid path"}), 400 + + url = urljoin(BASE_API + "/", subpath) - # Cache check cached = load_cache(url) if cached: - print(f"[CACHE] Loaded {subpath}") + log(f"[CACHE] Loaded {subpath}") return jsonify(cached) - print(f"[FETCH] {url}") + log(f"[FETCH] {url}") resp = fetch_from_smn(url) - if not resp: - return Response("", status=502) - if resp.status_code >= 400: - print(f"[WARN] {resp.status_code} for {url}") - return Response("", status=resp.status_code) + if not hasattr(resp, "status_code"): + return Response("Upstream error", status=502) + + if resp.status_code != 200: + return Response(resp.text, status=resp.status_code, + content_type=resp.headers.get("Content-Type", "text/plain")) try: data = resp.json() + save_cache(url, data) + return jsonify(data) except Exception: - print("[ERROR] Invalid JSON response.") - return Response("", status=502) - - save_cache(url, data) - return jsonify(data) - + return Response("Invalid JSON from SMN", status=502) if __name__ == "__main__": os.makedirs(CACHE_DIR, exist_ok=True) - app.run(host="0.0.0.0", port=6942, debug=False, use_reloader=False) + log(f"[STARTUP] Server starting on port {PORT}") + app.run(host="0.0.0.0", port=PORT) diff --git a/healthcheck.py b/healthcheck.py new file mode 100644 index 0000000..52c9783 --- /dev/null +++ b/healthcheck.py @@ -0,0 +1,39 @@ +import os +import configparser + +CONFIG_FILE = "config.ini" + +DEFAULT_CONFIG = { + "server": { + "port": "6942", + "password": "debug", + "cache_dir": "cache", + "base_api": "https://ws1.smn.gob.ar", + "log_file": "" + } +} + +def ensure_config_exists(): + config = configparser.ConfigParser() + if not os.path.exists(CONFIG_FILE): + print("[CONFIG] config.ini not found - creating with default values") + config.read_dict(DEFAULT_CONFIG) + with open(CONFIG_FILE, "w") as f: + config.write(f) + else: + config.read(CONFIG_FILE) + changed = False + for section, values in DEFAULT_CONFIG.items(): + if section not in config: + config[section] = values + changed = True + else: + for key, val in values.items(): + if key not in config[section]: + config[section][key] = val + changed = True + if changed: + with open(CONFIG_FILE, "w") as f: + config.write(f) + print("[CONFIG] Missing keys added to config.ini") + return config diff --git a/tokenext.py b/tokenext.py index 88bec5b..055faca 100644 --- a/tokenext.py +++ b/tokenext.py @@ -1,62 +1,136 @@ +# tokenext.py import re import time +from typing import Optional from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC -URL = "https://www.smn.gob.ar/" -OUTPUT_FILE = "token.txt" +DEFAULT_URL = "https://www.smn.gob.ar/" +DEFAULT_OUTPUT_FILE = "token" -def extract_token_from_source(source: str): - m = re.search(r"localStorage\.setItem\(\s*['\"]token['\"]\s*,\s*['\"]([^'\"]+)['\"]\s*\)", source) - return m.group(1) if m else None -chrome_options = Options() -chrome_options.add_argument("--headless=new") -chrome_options.add_argument("--no-sandbox") -chrome_options.add_argument("--disable-dev-shm-usage") -chrome_options.add_argument("--disable-gpu") -chrome_options.add_argument("--disable-blink-features=AutomationControlled") -chrome_options.add_argument("--window-size=1920,1080") -chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/118.0.5993.90 Safari/537.36") +def extract_token_from_source(source: str) -> Optional[str]: + m = re.search( + r"localStorage\.setItem\(\s*['\"]token['\"]\s*,\s*['\"]([^'\"]+)['\"]\s*\)", + source, + ) + if m: + return m.group(1) -driver = webdriver.Chrome(options=chrome_options) + m = re.search(r"localStorage\.token\s*=\s*['\"]([^'\"]+)['\"]", source) + if m: + return m.group(1) -try: - print(f"Loading URL {URL}") - driver.get(URL) + return None - time.sleep(8) +def make_chrome_options(headless: bool = True) -> Options: + opts = Options() + if headless: + try: + opts.add_argument("--headless=new") + except Exception: + opts.add_argument("--headless") + opts.add_argument("--no-sandbox") + opts.add_argument("--disable-dev-shm-usage") + opts.add_argument("--disable-gpu") + opts.add_argument("--disable-blink-features=AutomationControlled") + opts.add_argument("--window-size=1920,1080") + opts.add_argument( + "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/118.0.5993.90 Safari/537.36" + ) + return opts + + +def get_token( + url: str = DEFAULT_URL, + headless: bool = True, + wait_seconds: int = 8, + driver_wait_timeout: int = 20, + chrome_driver_path: Optional[str] = None, +) -> Optional[str]: + + options = make_chrome_options(headless=headless) + + driver = None try: - WebDriverWait(driver, 20).until( - EC.presence_of_element_located((By.TAG_NAME, "body")) - ) - except Exception: - pass + if chrome_driver_path: + driver = webdriver.Chrome(executable_path=chrome_driver_path, options=options) + else: + driver = webdriver.Chrome(options=options) + + # load page + driver.get(url) + + if wait_seconds: + time.sleep(wait_seconds) + + try: + WebDriverWait(driver, driver_wait_timeout).until( + EC.presence_of_element_located((By.TAG_NAME, "body")) + ) + except Exception: + pass - try: - token = driver.execute_script("return window.localStorage.getItem('token');") - except Exception: token = None + try: + token = driver.execute_script("return window.localStorage.getItem('token');") + except Exception: + token = None - if not token: - token = extract_token_from_source(driver.page_source) + # fallback to searching page source + if not token: + token = extract_token_from_source(driver.page_source) + return token + + except Exception as ex: + return None + + finally: + if driver: + try: + driver.quit() + except Exception: + pass + + +def refresh_token( + output_file: str = DEFAULT_OUTPUT_FILE, + **get_token_kwargs, +) -> bool: + token = get_token(**get_token_kwargs) + if token: + try: + with open(output_file, "w", encoding="utf-8") as f: + f.write(token) + return True + except Exception: + return False + return False + + +if __name__ == "__main__": + import sys + + url = DEFAULT_URL + if len(sys.argv) > 1: + url = sys.argv[1] + + print(f"Loading URL {url}") + token = get_token(headless=True) if token: print(f"\n[+] Token found:\n{token}\n") - with open(OUTPUT_FILE, "w", encoding="utf-8") as f: - f.write(token) - print(f"[+] Saved to {OUTPUT_FILE}") + try: + with open(DEFAULT_OUTPUT_FILE, "w", encoding="utf-8") as fh: + fh.write(token) + print(f"[+] Saved to {DEFAULT_OUTPUT_FILE}") + except Exception as e: + print("[!] Failed to save token:", e) else: print("[!] No token found in localStorage or page source.") - with open("page_debug.html", "w", encoding="utf-8") as f: - f.write(driver.page_source) - print("[i] Saved full page source to page_debug.html for manual check.") - -finally: - driver.quit()