auth done even better!

This commit is contained in:
Nix 2025-03-31 19:06:12 -03:00 committed by GitHub
parent 18120360cb
commit 343788f38c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,43 +1,19 @@
import sys
import subprocess
import re
import colorama
import requests
from PyQt5.QtWidgets import (QApplication, QDialog, QLabel, QVBoxLayout,
QPushButton, QLineEdit, QMessageBox)
from PyQt5.QtCore import QThread, pyqtSignal, Qt, QUrl, QObject
from PyQt5.QtCore import QThread, pyqtSignal, Qt, QUrl, QObject, QTimer
from PyQt5.QtGui import QDesktopServices
from picomc.logging import logger
class AuthenticationParser:
@staticmethod
def clean_ansi(text):
ansi_clean = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
printable_clean = re.compile(r'[^\x20-\x7E\n]')
text = ansi_clean.sub('', text)
text = printable_clean.sub('', text)
return text.strip()
@staticmethod
def is_auth_error(output):
cleaned_output = AuthenticationParser.clean_ansi(output)
return "AADSTS70016" in cleaned_output and "not yet been authorized" in cleaned_output
@staticmethod
def parse_auth_output(output):
cleaned_output = AuthenticationParser.clean_ansi(output)
if AuthenticationParser.is_auth_error(cleaned_output):
return None
pattern = r"https://[^\s]+"
code_pattern = r"code\s+([A-Z0-9]+)"
url_match = re.search(pattern, cleaned_output)
code_match = re.search(code_pattern, cleaned_output, re.IGNORECASE)
if url_match and code_match:
return {
'url': url_match.group(0),
'code': code_match.group(1)
}
return None
# Constants
URL_DEVICE_AUTH = "https://login.microsoftonline.com/consumers/oauth2/v2.0/devicecode"
URL_TOKEN = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token"
CLIENT_ID = "c52aed44-3b4d-4215-99c5-824033d2bc0f"
SCOPE = "XboxLive.signin offline_access"
GRANT_TYPE = "urn:ietf:params:oauth:grant-type:device_code"
class AuthDialog(QDialog):
def __init__(self, url, code, parent=None, error_mode=False):
@ -106,66 +82,81 @@ class AuthenticationThread(QThread):
error_occurred = pyqtSignal(str)
auth_error_detected = pyqtSignal(str)
finished = pyqtSignal()
access_token_received = pyqtSignal(str, str)
def __init__(self, account):
super().__init__()
self.account = account
self.process = None
self.device_code = None
self.is_running = True
self.current_output = ""
self.waiting_for_auth = False
def run(self):
try:
command = f'picomc account authenticate {self.account}'
self.process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
self.current_output = ""
while self.is_running and self.process.poll() is None:
line = self.process.stdout.readline()
if line:
self.current_output += line
if not self.waiting_for_auth:
parsed_data = AuthenticationParser.parse_auth_output(self.current_output)
if parsed_data:
self.auth_data_received.emit(parsed_data)
self.waiting_for_auth = True
self.current_output = ""
elif AuthenticationParser.is_auth_error(self.current_output):
self.auth_error_detected.emit(self.current_output)
self.waiting_for_auth = False
self.current_output = ""
self.process.wait()
self.finished.emit()
self.authenticate(self.account)
except Exception as e:
self.error_occurred.emit(str(e))
self.finished.emit()
def authenticate(self, account):
try:
data = {"client_id": CLIENT_ID, "scope": SCOPE}
# Request device code
resp = requests.post(URL_DEVICE_AUTH, data)
resp.raise_for_status()
j = resp.json()
self.device_code = j["device_code"]
user_code = j["user_code"]
link = j["verification_uri"]
# Format message with colorama
msg = j["message"]
msg = msg.replace(
user_code, colorama.Fore.RED + user_code + colorama.Fore.RESET
).replace(link, colorama.Style.BRIGHT + link + colorama.Style.NORMAL)
# Emit auth data received signal
self.auth_data_received.emit({'url': link, 'code': user_code})
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
self.error_occurred.emit(str(e))
self.finished.emit()
def poll_for_token(self):
try:
data = {"code": self.device_code, "grant_type": GRANT_TYPE, "client_id": CLIENT_ID}
resp = requests.post(URL_TOKEN, data)
if resp.status_code == 400:
j = resp.json()
logger.debug(j)
if j["error"] == "authorization_pending":
logger.warning(j["error_description"])
self.auth_error_detected.emit(j["error_description"])
return
else:
raise Exception(j["error_description"])
resp.raise_for_status()
j = resp.json()
access_token = j["access_token"]
refresh_token = j["refresh_token"]
logger.debug("OAuth device code flow successful")
self.access_token_received.emit(access_token, refresh_token)
self.finished.emit()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
self.error_occurred.emit(str(e))
self.finished.emit()
def send_enter(self):
if self.process and self.process.poll() is None:
self.process.stdin.write("\n")
self.process.stdin.flush()
self.poll_for_token()
def stop(self):
self.is_running = False
if self.process:
self.process.terminate()
class MinecraftAuthenticator(QObject): # Changed to inherit from QObject
auth_finished = pyqtSignal(bool) # Add signal for completion
class MinecraftAuthenticator(QObject):
auth_finished = pyqtSignal(bool)
def __init__(self, parent=None):
super().__init__(parent)
@ -175,15 +166,12 @@ class MinecraftAuthenticator(QObject): # Changed to inherit from QObject
self.success = False
def authenticate(self, username):
"""
Start the authentication process for the given username
Returns immediately, authentication result will be emitted via auth_finished signal
"""
self.success = False
self.auth_thread = AuthenticationThread(username)
self.auth_thread.auth_data_received.connect(self.show_auth_dialog)
self.auth_thread.auth_error_detected.connect(self.handle_auth_error)
self.auth_thread.error_occurred.connect(self.show_error)
self.auth_thread.access_token_received.connect(self.on_access_token_received)
self.auth_thread.finished.connect(self.on_authentication_finished)
self.auth_thread.start()
@ -217,6 +205,11 @@ class MinecraftAuthenticator(QObject): # Changed to inherit from QObject
self.success = False
self.auth_finished.emit(False)
def on_access_token_received(self, access_token, refresh_token):
QMessageBox.information(None, "Success", "Authentication successful!")
self.success = True
self.auth_finished.emit(True)
def on_authentication_finished(self):
if self.auth_dialog is not None:
self.auth_dialog.close()
@ -226,8 +219,8 @@ class MinecraftAuthenticator(QObject): # Changed to inherit from QObject
self.auth_thread.stop()
self.auth_thread = None
self.success = True
self.auth_finished.emit(True)
if not self.success:
self.auth_finished.emit(False)
def cleanup(self):
if self.auth_dialog is not None:
@ -240,5 +233,7 @@ class MinecraftAuthenticator(QObject): # Changed to inherit from QObject
# Example usage
if __name__ == '__main__':
app = QApplication(sys.argv)
authenticator = MinecraftAuthenticator()
authenticator.authenticate("TestUser")
sys.exit(app.exec_())