Added clean SIGINIT support

This commit is contained in:
Nixietab 2025-11-13 11:31:59 -03:00
parent 3f9f6042cf
commit 684e8b4735
5 changed files with 26 additions and 465 deletions

View File

@ -1,42 +0,0 @@
import os
import configparser
CONFIG_FILE = "config.ini"
DEFAULT_CONFIG = {
"server": {
"port": "6942",
"password": "debug",
"cache_dir": "cache",
"base_api": "https://ws1.smn.gob.ar",
"log_file": "",
"base_path": "/smn"
}
}
def ensure_config_exists():
config = configparser.ConfigParser()
if not os.path.exists(CONFIG_FILE):
print("[CONFIG] config.ini not found - creating with default values")
config.read_dict(DEFAULT_CONFIG)
with open(CONFIG_FILE, "w") as f:
config.write(f)
else:
config.read(CONFIG_FILE)
changed = False
for section, values in DEFAULT_CONFIG.items():
if section not in config:
config[section] = values
changed = True
else:
for key, val in values.items():
if key not in config[section]:
config[section][key] = val
changed = True
if changed:
with open(CONFIG_FILE, "w") as f:
config.write(f)
print("[CONFIG] Missing keys added to config.ini")
return config

View File

@ -1,103 +0,0 @@
import argparse
import os
import sys
import json
from configparser import ConfigParser
from healthcheck import ensure_config_exists
def configparser_to_dict(cp: ConfigParser) -> dict:
d = {}
for section in cp.sections():
d[section] = {}
for key, val in cp.items(section):
d[section][key] = val
return d
def load_config(config_path=None):
if config_path:
if not os.path.exists(config_path):
print(f"[ERROR] Config file not found: {config_path}")
sys.exit(1)
try:
with open(config_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data
except json.JSONDecodeError:
cp = ConfigParser()
cp.read(config_path)
return configparser_to_dict(cp)
except Exception as e:
print(f"[ERROR] Failed to read config file: {e}")
sys.exit(1)
else:
cfg = ensure_config_exists()
if isinstance(cfg, ConfigParser):
return configparser_to_dict(cfg)
return cfg
def parse_args():
parser = argparse.ArgumentParser(
prog="OpenSMN",
description="Run the SMN API with custom configuration."
)
parser.add_argument(
"-c", "--config",
type=str,
help="Path to a custom configuration file (INI or JSON)."
)
parser.add_argument(
"-p", "--port",
type=int,
help="Override the server port defined in config."
)
parser.add_argument(
"--no-auth",
action="store_true",
help="Disable authentication even if password is defined in config."
)
parser.add_argument(
"--show-config",
action="store_true",
help="Print the loaded configuration and exit."
)
parser.add_argument(
"--base-path",
type=str,
help="Override the base API path."
)
return parser.parse_args()
def main():
args = parse_args()
config = load_config(args.config)
server_cfg = config.get("server", {})
# apply cli overrides
if args.port:
server_cfg["port"] = str(args.port)
if args.base_path:
server_cfg["base_path"] = args.base_path
if args.no_auth:
server_cfg["password"] = ""
if args.show_config:
print(json.dumps(config, indent=2, ensure_ascii=False))
sys.exit(0)
# run the server
import server
server.run_from_cli(config)
if __name__ == "__main__":
main()

View File

@ -1,183 +0,0 @@
import os
import json
import hashlib
import requests
from datetime import datetime, timedelta
from flask import Flask, jsonify, Response, request
from urllib.parse import urljoin
import tokenext
from healthcheck import ensure_config_exists
app = Flask(__name__)
# Global Data
PORT = 6942
PASSWORD = ""
CACHE_DIR = "cache"
BASE_API = "https://ws1.smn.gob.ar"
LOG_FILE = ""
BASE_PATH = "/smn"
AUTH_ENABLED = False
CACHE_TTL = timedelta(minutes=60)
SMN_TOKEN_FILE = "token"
def log(msg: str):
if LOG_FILE:
with open(LOG_FILE, "a") as f:
f.write(f"[{datetime.now().isoformat()}] {msg}\n")
print(msg)
def get_cache_filename(url: str) -> str:
h = hashlib.sha256(url.encode()).hexdigest()
return os.path.join(CACHE_DIR, f"{h}.json")
def load_cache(url: str):
path = get_cache_filename(url)
if not os.path.exists(path):
return None
mtime = datetime.fromtimestamp(os.path.getmtime(path))
if datetime.now() - mtime > CACHE_TTL:
return None
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return None
def save_cache(url: str, data: dict):
os.makedirs(CACHE_DIR, exist_ok=True)
path = get_cache_filename(url)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def load_smn_token():
if not os.path.exists(SMN_TOKEN_FILE):
log("[TOKEN] Token file not found — refreshing token...")
refresh_smn_token()
if not os.path.exists(SMN_TOKEN_FILE):
raise FileNotFoundError("Token file could not be created.")
with open(SMN_TOKEN_FILE, "r") as f:
return f.read().strip()
def refresh_smn_token():
log("[TOKEN] Refreshing SMN token...")
ok = tokenext.refresh_token(output_file=SMN_TOKEN_FILE, headless=True, wait_seconds=8)
if ok:
log("[TOKEN] Token refreshed successfully.")
else:
log("[TOKEN] Failed to refresh token.")
def check_access_token():
if not AUTH_ENABLED:
return True
header_token = request.headers.get("Authorization", "").strip()
return header_token == PASSWORD
def fetch_from_smn(url: str, retry: bool = True):
token = load_smn_token()
headers = {
"Authorization": f"JWT {token}",
"Accept": "application/json",
"User-Agent": "Mozilla/5.0"
}
try:
resp = requests.get(url, headers=headers, timeout=10)
except requests.RequestException as e:
return Response(str(e), status=502)
if resp.status_code == 401 and retry:
log("[AUTH] SMN token expired, trying to refresh...")
refresh_smn_token()
return fetch_from_smn(url, retry=False)
return resp
@app.route(f"{BASE_PATH}/<path:subpath>")
def smn_proxy(subpath):
if not check_access_token():
return jsonify({"error": "Unauthorized"}), 401
if ".." in subpath or subpath.startswith("/"):
return jsonify({"error": "Invalid path"}), 400
url = urljoin(BASE_API + "/", subpath)
cached = load_cache(url)
if cached:
log(f"[CACHE] Loaded {subpath}")
return jsonify(cached)
log(f"[FETCH] {url}")
resp = fetch_from_smn(url)
if not hasattr(resp, "status_code"):
return Response("Upstream error", status=502)
if resp.status_code != 200:
return Response(resp.text, status=resp.status_code,
content_type=resp.headers.get("Content-Type", "text/plain"))
try:
data = resp.json()
save_cache(url, data)
return jsonify(data)
except Exception:
return Response("Invalid JSON from SMN", status=502)
@app.errorhandler(404)
def handle_not_found(e):
return jsonify({
"error": "Endpoint not found",
"message": f"The requested URL '{request.path}' is not a valid API endpoint.",
}), 200
@app.errorhandler(405)
def handle_method_not_allowed(e):
return jsonify({
"error": "Method not allowed",
"allowed": ["GET"],
"path": request.path
}), 200
@app.errorhandler(Exception)
def handle_general_error(e):
log(f"[ERROR] Unexpected exception: {e}")
return jsonify({
"error": "Internal error",
"message": str(e),
"path": request.path
}), 200
def run_from_cli(config=None):
global PORT, PASSWORD, CACHE_DIR, BASE_API, LOG_FILE, BASE_PATH, AUTH_ENABLED
if config is None:
config = ensure_config_exists()
server_cfg = config.get("server", {})
PORT = int(server_cfg.get("port", 6942))
PASSWORD = server_cfg.get("password", "").strip()
CACHE_DIR = server_cfg.get("cache_dir", "cache")
BASE_API = server_cfg.get("base_api", "https://ws1.smn.gob.ar")
LOG_FILE = server_cfg.get("log_file", "").strip()
BASE_PATH = server_cfg.get("base_path", "/smn").strip()
if not BASE_PATH.startswith("/"):
BASE_PATH = "/" + BASE_PATH
if BASE_PATH.endswith("/"):
BASE_PATH = BASE_PATH[:-1]
AUTH_ENABLED = PASSWORD != ""
os.makedirs(CACHE_DIR, exist_ok=True)
if not os.path.exists(SMN_TOKEN_FILE):
log("[STARTUP] No token file found, generating a new one.")
refresh_smn_token()
log(f"[STARTUP] Server starting on port {PORT}")
log(f"[STARTUP] Base path set to '{BASE_PATH}/<path>'")
app.run(host="0.0.0.0", port=PORT)
if __name__ == "__main__":
run_from_cli()

View File

@ -1,136 +0,0 @@
# tokenext.py
import re
import time
from typing import Optional
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
DEFAULT_URL = "https://www.smn.gob.ar/"
DEFAULT_OUTPUT_FILE = "token"
def extract_token_from_source(source: str) -> Optional[str]:
m = re.search(
r"localStorage\.setItem\(\s*['\"]token['\"]\s*,\s*['\"]([^'\"]+)['\"]\s*\)",
source,
)
if m:
return m.group(1)
m = re.search(r"localStorage\.token\s*=\s*['\"]([^'\"]+)['\"]", source)
if m:
return m.group(1)
return None
def make_chrome_options(headless: bool = True) -> Options:
opts = Options()
if headless:
try:
opts.add_argument("--headless=new")
except Exception:
opts.add_argument("--headless")
opts.add_argument("--no-sandbox")
opts.add_argument("--disable-dev-shm-usage")
opts.add_argument("--disable-gpu")
opts.add_argument("--disable-blink-features=AutomationControlled")
opts.add_argument("--window-size=1920,1080")
opts.add_argument(
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/118.0.5993.90 Safari/537.36"
)
return opts
def get_token(
url: str = DEFAULT_URL,
headless: bool = True,
wait_seconds: int = 8,
driver_wait_timeout: int = 20,
chrome_driver_path: Optional[str] = None,
) -> Optional[str]:
options = make_chrome_options(headless=headless)
driver = None
try:
if chrome_driver_path:
driver = webdriver.Chrome(executable_path=chrome_driver_path, options=options)
else:
driver = webdriver.Chrome(options=options)
# load page
driver.get(url)
if wait_seconds:
time.sleep(wait_seconds)
try:
WebDriverWait(driver, driver_wait_timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
except Exception:
pass
token = None
try:
token = driver.execute_script("return window.localStorage.getItem('token');")
except Exception:
token = None
# fallback to searching page source
if not token:
token = extract_token_from_source(driver.page_source)
return token
except Exception as ex:
return None
finally:
if driver:
try:
driver.quit()
except Exception:
pass
def refresh_token(
output_file: str = DEFAULT_OUTPUT_FILE,
**get_token_kwargs,
) -> bool:
token = get_token(**get_token_kwargs)
if token:
try:
with open(output_file, "w", encoding="utf-8") as f:
f.write(token)
return True
except Exception:
return False
return False
if __name__ == "__main__":
import sys
url = DEFAULT_URL
if len(sys.argv) > 1:
url = sys.argv[1]
print(f"Loading URL {url}")
token = get_token(headless=True)
if token:
print(f"\n[+] Token found:\n{token}\n")
try:
with open(DEFAULT_OUTPUT_FILE, "w", encoding="utf-8") as fh:
fh.write(token)
print(f"[+] Saved to {DEFAULT_OUTPUT_FILE}")
except Exception as e:
print("[!] Failed to save token:", e)
else:
print("[!] No token found in localStorage or page source.")

View File

@ -2,6 +2,8 @@ import os
import json import json
import hashlib import hashlib
import requests import requests
import signal
import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
from flask import Flask, jsonify, Response, request from flask import Flask, jsonify, Response, request
from urllib.parse import urljoin from urllib.parse import urljoin
@ -149,6 +151,13 @@ def handle_general_error(e):
"path": request.path "path": request.path
}), 200 }), 200
def handle_sigint(signum, frame):
log("shutting down gracefully...")
sys.exit(0)
signal.signal(signal.SIGINT, handle_sigint)
def run_from_cli(config=None): def run_from_cli(config=None):
global PORT, PASSWORD, CACHE_DIR, BASE_API, LOG_FILE, BASE_PATH, AUTH_ENABLED global PORT, PASSWORD, CACHE_DIR, BASE_API, LOG_FILE, BASE_PATH, AUTH_ENABLED
@ -177,7 +186,23 @@ def run_from_cli(config=None):
log(f"[STARTUP] Server starting on port {PORT}") log(f"[STARTUP] Server starting on port {PORT}")
log(f"[STARTUP] Base path set to '{BASE_PATH}/<path>'") log(f"[STARTUP] Base path set to '{BASE_PATH}/<path>'")
app.run(host="0.0.0.0", port=PORT)
try:
app.run(host="0.0.0.0", port=PORT)
except KeyboardInterrupt:
log("[SHUTDOWN] Interrupted. server stopped cleanly.")
except Exception as e:
log(f"[ERROR] Unhandled exception: {e}")
finally:
log("[SHUTDOWN] Goodbye.")
if __name__ == "__main__": if __name__ == "__main__":
import signal
import sys
def handle_sigint(signum, frame):
log("[SHUTDOWN] shutting down gracefully...")
sys.exit(0)
signal.signal(signal.SIGINT, handle_sigint)
run_from_cli() run_from_cli()