From dedc59d09cfa95fda03adf9efa6c17af4d0d3a87 Mon Sep 17 00:00:00 2001 From: OmeletGit Date: Tue, 15 Apr 2025 03:12:53 -0300 Subject: [PATCH] Fix the authentication system dont saving tokens (#13) --- authser.py | 325 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 226 insertions(+), 99 deletions(-) diff --git a/authser.py b/authser.py index 0d4f615..d6dac9c 100644 --- a/authser.py +++ b/authser.py @@ -1,16 +1,26 @@ import sys -import re -import colorama -import requests +import json +import os +import uuid +import asyncio +import aiohttp +from datetime import datetime, timezone +from pathlib import Path from PyQt5.QtWidgets import (QApplication, QDialog, QLabel, QVBoxLayout, - QPushButton, QLineEdit, QMessageBox) -from PyQt5.QtCore import QThread, pyqtSignal, Qt, QUrl, QObject, QTimer + QPushButton, QLineEdit, QMessageBox) +from PyQt5.QtCore import QThread, pyqtSignal, Qt, QUrl, QObject from PyQt5.QtGui import QDesktopServices from picomc.logging import logger +from picomc.launcher import get_default_root, Launcher -# Constants +# Constants for Microsoft Authentication URL_DEVICE_AUTH = "https://login.microsoftonline.com/consumers/oauth2/v2.0/devicecode" URL_TOKEN = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token" +URL_XBL = "https://user.auth.xboxlive.com/user/authenticate" +URL_XSTS = "https://xsts.auth.xboxlive.com/xsts/authorize" +URL_MC = "https://api.minecraftservices.com/authentication/login_with_xbox" +URL_PROFILE = "https://api.minecraftservices.com/minecraft/profile" + CLIENT_ID = "c52aed44-3b4d-4215-99c5-824033d2bc0f" SCOPE = "XboxLive.signin offline_access" GRANT_TYPE = "urn:ietf:params:oauth:grant-type:device_code" @@ -22,10 +32,10 @@ class AuthDialog(QDialog): self.setWindowFlags(self.windowFlags() & ~Qt.WindowContextHelpButtonHint) self.setModal(True) self.setup_ui(url, code, error_mode) - + def setup_ui(self, url, code, error_mode): layout = QVBoxLayout(self) - + if error_mode: error_label = QLabel("Error in Login - Please try again") error_label.setStyleSheet("QLabel { color: red; font-weight: bold; }") @@ -82,75 +92,132 @@ class AuthenticationThread(QThread): error_occurred = pyqtSignal(str) auth_error_detected = pyqtSignal(str) finished = pyqtSignal() - access_token_received = pyqtSignal(str, str) + access_token_received = pyqtSignal(dict) - def __init__(self, account): + def __init__(self, username): super().__init__() - self.account = account + self.username = username self.device_code = None self.is_running = True - def run(self): + async def _ms_oauth(self): + data = {"client_id": CLIENT_ID, "scope": SCOPE} + + async with aiohttp.ClientSession() as session: + async with session.post(URL_DEVICE_AUTH, data=data) as resp: + if resp.status != 200: + raise Exception(f"Failed to get device code: {await resp.text()}") + j = await resp.json() + self.device_code = j["device_code"] + self.auth_data_received.emit({ + 'url': j["verification_uri"], + 'code': j["user_code"] + }) + + while self.is_running: + data = { + "grant_type": GRANT_TYPE, + "client_id": CLIENT_ID, + "device_code": self.device_code + } + + async with session.post(URL_TOKEN, data=data) as resp: + j = await resp.json() + if resp.status == 400: + if j["error"] == "authorization_pending": + await asyncio.sleep(2) + continue + else: + raise Exception(j["error_description"]) + elif resp.status != 200: + raise Exception(f"Token request failed: {j}") + + return j["access_token"], j["refresh_token"] + + async def _xbl_auth(self, access_token): + data = { + "Properties": { + "AuthMethod": "RPS", + "SiteName": "user.auth.xboxlive.com", + "RpsTicket": f"d={access_token}" + }, + "RelyingParty": "http://auth.xboxlive.com", + "TokenType": "JWT" + } + + async with aiohttp.ClientSession() as session: + async with session.post(URL_XBL, json=data) as resp: + if resp.status != 200: + raise Exception(f"XBL auth failed: {await resp.text()}") + j = await resp.json() + return j["Token"], j["DisplayClaims"]["xui"][0]["uhs"] + + async def _xsts_auth(self, xbl_token): + data = { + "Properties": { + "SandboxId": "RETAIL", + "UserTokens": [xbl_token] + }, + "RelyingParty": "rp://api.minecraftservices.com/", + "TokenType": "JWT" + } + + async with aiohttp.ClientSession() as session: + async with session.post(URL_XSTS, json=data) as resp: + if resp.status != 200: + raise Exception(f"XSTS auth failed: {await resp.text()}") + j = await resp.json() + return j["Token"] + + async def _mc_auth(self, uhs, xsts_token): + data = { + "identityToken": f"XBL3.0 x={uhs};{xsts_token}" + } + + async with aiohttp.ClientSession() as session: + async with session.post(URL_MC, json=data) as resp: + if resp.status != 200: + raise Exception(f"MC auth failed: {await resp.text()}") + j = await resp.json() + return j["access_token"] + + async def _get_profile(self, mc_token): + headers = { + "Authorization": f"Bearer {mc_token}" + } + + async with aiohttp.ClientSession() as session: + async with session.get(URL_PROFILE, headers=headers) as resp: + if resp.status != 200: + raise Exception(f"Profile request failed: {await resp.text()}") + return await resp.json() + + async def _auth_flow(self): try: - self.authenticate(self.account) + ms_access_token, refresh_token = await self._ms_oauth() + xbl_token, uhs = await self._xbl_auth(ms_access_token) + xsts_token = await self._xsts_auth(xbl_token) + mc_token = await self._mc_auth(uhs, xsts_token) + profile = await self._get_profile(mc_token) + + self.access_token_received.emit({ + 'access_token': mc_token, + 'refresh_token': refresh_token, + 'profile': profile + }) + except Exception as e: self.error_occurred.emit(str(e)) - self.finished.emit() - def authenticate(self, account): + def run(self): try: - data = {"client_id": CLIENT_ID, "scope": SCOPE} - - # Request device code - resp = requests.post(URL_DEVICE_AUTH, data) - resp.raise_for_status() - - j = resp.json() - self.device_code = j["device_code"] - user_code = j["user_code"] - link = j["verification_uri"] - - # Format message with colorama - msg = j["message"] - msg = msg.replace( - user_code, colorama.Fore.RED + user_code + colorama.Fore.RESET - ).replace(link, colorama.Style.BRIGHT + link + colorama.Style.NORMAL) - - # Emit auth data received signal - self.auth_data_received.emit({'url': link, 'code': user_code}) - - except requests.exceptions.RequestException as e: - logger.error(f"Request failed: {e}") + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self._auth_flow()) + except Exception as e: self.error_occurred.emit(str(e)) + finally: self.finished.emit() - - def poll_for_token(self): - try: - data = {"code": self.device_code, "grant_type": GRANT_TYPE, "client_id": CLIENT_ID} - resp = requests.post(URL_TOKEN, data) - if resp.status_code == 400: - j = resp.json() - logger.debug(j) - if j["error"] == "authorization_pending": - logger.warning(j["error_description"]) - self.auth_error_detected.emit(j["error_description"]) - return - else: - raise Exception(j["error_description"]) - resp.raise_for_status() - j = resp.json() - access_token = j["access_token"] - refresh_token = j["refresh_token"] - logger.debug("OAuth device code flow successful") - self.access_token_received.emit(access_token, refresh_token) - self.finished.emit() - except requests.exceptions.RequestException as e: - logger.error(f"Request failed: {e}") - self.error_occurred.emit(str(e)) - self.finished.emit() - - def send_enter(self): - self.poll_for_token() def stop(self): self.is_running = False @@ -161,54 +228,117 @@ class MinecraftAuthenticator(QObject): def __init__(self, parent=None): super().__init__(parent) self.auth_thread = None - self.current_auth_data = None self.auth_dialog = None self.success = False + self.username = None + # Initialize the launcher to get the correct config path + with Launcher.new() as launcher: + self.config_path = launcher.root + def authenticate(self, username): + self.username = username self.success = False + + # Create accounts.json if it doesn't exist + if not self.save_to_accounts_json(): + return + self.auth_thread = AuthenticationThread(username) self.auth_thread.auth_data_received.connect(self.show_auth_dialog) - self.auth_thread.auth_error_detected.connect(self.handle_auth_error) self.auth_thread.error_occurred.connect(self.show_error) self.auth_thread.access_token_received.connect(self.on_access_token_received) self.auth_thread.finished.connect(self.on_authentication_finished) self.auth_thread.start() def show_auth_dialog(self, auth_data): - self.current_auth_data = auth_data - if self.auth_dialog is not None: self.auth_dialog.close() - self.auth_dialog = None - - self.auth_dialog = AuthDialog(auth_data['url'], auth_data['code']) - if self.auth_dialog.exec_() == QDialog.Accepted: - self.auth_thread.send_enter() - - def handle_auth_error(self, output): - if self.current_auth_data: - if self.auth_dialog is not None: - self.auth_dialog.close() - self.auth_dialog = None - self.auth_dialog = AuthDialog( - self.current_auth_data['url'], - self.current_auth_data['code'], - error_mode=True - ) - if self.auth_dialog.exec_() == QDialog.Accepted: - self.auth_thread.send_enter() + self.auth_dialog = AuthDialog(auth_data['url'], auth_data['code']) + + result = self.auth_dialog.exec_() + + if result != QDialog.Accepted: + self.auth_thread.stop() - def show_error(self, error_message): - QMessageBox.critical(None, "Error", f"Authentication error: {error_message}") + def show_error(self, error_msg): + QMessageBox.critical(None, "Error", error_msg) self.success = False self.auth_finished.emit(False) - def on_access_token_received(self, access_token, refresh_token): - QMessageBox.information(None, "Success", "Authentication successful!") - self.success = True - self.auth_finished.emit(True) + def save_to_accounts_json(self): + try: + accounts_file = Path(self.config_path) / "accounts.json" + + if accounts_file.exists(): + with open(accounts_file) as f: + config = json.load(f) + else: + config = { + "default": None, + "accounts": {}, + "client_token": str(uuid.uuid4()) + } + accounts_file.parent.mkdir(parents=True, exist_ok=True) + + # Only create/update if account doesn't exist + if self.username not in config["accounts"]: + config["accounts"][self.username] = { + "uuid": "-", + "online": True, + "microsoft": True, + "gname": "-", + "access_token": "-", + "refresh_token": "-", + "is_authenticated": False + } + + # Set as default if no default exists + if config["default"] is None: + config["default"] = self.username + + with open(accounts_file, 'w') as f: + json.dump(config, f, indent=4) + + return True + + except Exception as e: + logger.error(f"Failed to initialize account data: {str(e)}") + QMessageBox.critical(None, "Error", f"Failed to initialize account data: {str(e)}") + return False + + def on_access_token_received(self, data): + try: + accounts_file = Path(self.config_path) / "accounts.json" + + with open(accounts_file) as f: + config = json.load(f) + + if self.username in config["accounts"]: + config["accounts"][self.username].update({ + "access_token": data['access_token'], + "refresh_token": data['refresh_token'], + "uuid": data['profile']['id'], + "gname": data['profile']['name'], + "is_authenticated": True + }) + + with open(accounts_file, 'w') as f: + json.dump(config, f, indent=4) + + self.success = True + QMessageBox.information(None, "Success", + f"Successfully authenticated account: {self.username}") + else: + raise Exception("Account not found in configuration") + + except Exception as e: + logger.error(f"Failed to update account data: {str(e)}") + QMessageBox.critical(None, "Error", f"Failed to update account data: {str(e)}") + self.success = False + + self.auth_finished.emit(self.success) def on_authentication_finished(self): if self.auth_dialog is not None: @@ -231,9 +361,6 @@ class MinecraftAuthenticator(QObject): self.auth_thread.stop() self.auth_thread.wait() -# Example usage -if __name__ == '__main__': - app = QApplication(sys.argv) - authenticator = MinecraftAuthenticator() - authenticator.authenticate("TestUser") - sys.exit(app.exec_()) +def create_authenticator(): + """Factory function to create a new MinecraftAuthenticator instance""" + return MinecraftAuthenticator()