diff --git a/authser.py b/authser.py index 544d3a5..0d4f615 100644 --- a/authser.py +++ b/authser.py @@ -1,43 +1,19 @@ import sys -import subprocess import re +import colorama +import requests from PyQt5.QtWidgets import (QApplication, QDialog, QLabel, QVBoxLayout, QPushButton, QLineEdit, QMessageBox) -from PyQt5.QtCore import QThread, pyqtSignal, Qt, QUrl, QObject +from PyQt5.QtCore import QThread, pyqtSignal, Qt, QUrl, QObject, QTimer from PyQt5.QtGui import QDesktopServices +from picomc.logging import logger -class AuthenticationParser: - @staticmethod - def clean_ansi(text): - ansi_clean = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') - printable_clean = re.compile(r'[^\x20-\x7E\n]') - text = ansi_clean.sub('', text) - text = printable_clean.sub('', text) - return text.strip() - - @staticmethod - def is_auth_error(output): - cleaned_output = AuthenticationParser.clean_ansi(output) - return "AADSTS70016" in cleaned_output and "not yet been authorized" in cleaned_output - - @staticmethod - def parse_auth_output(output): - cleaned_output = AuthenticationParser.clean_ansi(output) - if AuthenticationParser.is_auth_error(cleaned_output): - return None - - pattern = r"https://[^\s]+" - code_pattern = r"code\s+([A-Z0-9]+)" - - url_match = re.search(pattern, cleaned_output) - code_match = re.search(code_pattern, cleaned_output, re.IGNORECASE) - - if url_match and code_match: - return { - 'url': url_match.group(0), - 'code': code_match.group(1) - } - return None +# Constants +URL_DEVICE_AUTH = "https://login.microsoftonline.com/consumers/oauth2/v2.0/devicecode" +URL_TOKEN = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token" +CLIENT_ID = "c52aed44-3b4d-4215-99c5-824033d2bc0f" +SCOPE = "XboxLive.signin offline_access" +GRANT_TYPE = "urn:ietf:params:oauth:grant-type:device_code" class AuthDialog(QDialog): def __init__(self, url, code, parent=None, error_mode=False): @@ -106,66 +82,81 @@ class AuthenticationThread(QThread): error_occurred = pyqtSignal(str) auth_error_detected = pyqtSignal(str) finished = pyqtSignal() + access_token_received = pyqtSignal(str, str) def __init__(self, account): super().__init__() self.account = account - self.process = None + self.device_code = None self.is_running = True - self.current_output = "" - self.waiting_for_auth = False def run(self): try: - command = f'picomc account authenticate {self.account}' - - self.process = subprocess.Popen( - command, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - stdin=subprocess.PIPE, - text=True, - bufsize=1, - universal_newlines=True - ) - - self.current_output = "" - while self.is_running and self.process.poll() is None: - line = self.process.stdout.readline() - if line: - self.current_output += line - - if not self.waiting_for_auth: - parsed_data = AuthenticationParser.parse_auth_output(self.current_output) - if parsed_data: - self.auth_data_received.emit(parsed_data) - self.waiting_for_auth = True - self.current_output = "" - elif AuthenticationParser.is_auth_error(self.current_output): - self.auth_error_detected.emit(self.current_output) - self.waiting_for_auth = False - self.current_output = "" - - self.process.wait() - self.finished.emit() - + self.authenticate(self.account) except Exception as e: self.error_occurred.emit(str(e)) self.finished.emit() + def authenticate(self, account): + try: + data = {"client_id": CLIENT_ID, "scope": SCOPE} + + # Request device code + resp = requests.post(URL_DEVICE_AUTH, data) + resp.raise_for_status() + + j = resp.json() + self.device_code = j["device_code"] + user_code = j["user_code"] + link = j["verification_uri"] + + # Format message with colorama + msg = j["message"] + msg = msg.replace( + user_code, colorama.Fore.RED + user_code + colorama.Fore.RESET + ).replace(link, colorama.Style.BRIGHT + link + colorama.Style.NORMAL) + + # Emit auth data received signal + self.auth_data_received.emit({'url': link, 'code': user_code}) + + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + self.error_occurred.emit(str(e)) + self.finished.emit() + + def poll_for_token(self): + try: + data = {"code": self.device_code, "grant_type": GRANT_TYPE, "client_id": CLIENT_ID} + resp = requests.post(URL_TOKEN, data) + if resp.status_code == 400: + j = resp.json() + logger.debug(j) + if j["error"] == "authorization_pending": + logger.warning(j["error_description"]) + self.auth_error_detected.emit(j["error_description"]) + return + else: + raise Exception(j["error_description"]) + resp.raise_for_status() + j = resp.json() + access_token = j["access_token"] + refresh_token = j["refresh_token"] + logger.debug("OAuth device code flow successful") + self.access_token_received.emit(access_token, refresh_token) + self.finished.emit() + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + self.error_occurred.emit(str(e)) + self.finished.emit() + def send_enter(self): - if self.process and self.process.poll() is None: - self.process.stdin.write("\n") - self.process.stdin.flush() + self.poll_for_token() def stop(self): self.is_running = False - if self.process: - self.process.terminate() -class MinecraftAuthenticator(QObject): # Changed to inherit from QObject - auth_finished = pyqtSignal(bool) # Add signal for completion +class MinecraftAuthenticator(QObject): + auth_finished = pyqtSignal(bool) def __init__(self, parent=None): super().__init__(parent) @@ -175,15 +166,12 @@ class MinecraftAuthenticator(QObject): # Changed to inherit from QObject self.success = False def authenticate(self, username): - """ - Start the authentication process for the given username - Returns immediately, authentication result will be emitted via auth_finished signal - """ self.success = False self.auth_thread = AuthenticationThread(username) self.auth_thread.auth_data_received.connect(self.show_auth_dialog) self.auth_thread.auth_error_detected.connect(self.handle_auth_error) self.auth_thread.error_occurred.connect(self.show_error) + self.auth_thread.access_token_received.connect(self.on_access_token_received) self.auth_thread.finished.connect(self.on_authentication_finished) self.auth_thread.start() @@ -217,6 +205,11 @@ class MinecraftAuthenticator(QObject): # Changed to inherit from QObject self.success = False self.auth_finished.emit(False) + def on_access_token_received(self, access_token, refresh_token): + QMessageBox.information(None, "Success", "Authentication successful!") + self.success = True + self.auth_finished.emit(True) + def on_authentication_finished(self): if self.auth_dialog is not None: self.auth_dialog.close() @@ -226,8 +219,8 @@ class MinecraftAuthenticator(QObject): # Changed to inherit from QObject self.auth_thread.stop() self.auth_thread = None - self.success = True - self.auth_finished.emit(True) + if not self.success: + self.auth_finished.emit(False) def cleanup(self): if self.auth_dialog is not None: @@ -240,5 +233,7 @@ class MinecraftAuthenticator(QObject): # Changed to inherit from QObject # Example usage if __name__ == '__main__': + app = QApplication(sys.argv) authenticator = MinecraftAuthenticator() authenticator.authenticate("TestUser") + sys.exit(app.exec_())