Fix the authentication system dont saving tokens (#13)

This commit is contained in:
OmeletGit 2025-04-15 03:12:53 -03:00 committed by GitHub
parent 32e4783218
commit dedc59d09c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,16 +1,26 @@
import sys
import re
import colorama
import requests
import json
import os
import uuid
import asyncio
import aiohttp
from datetime import datetime, timezone
from pathlib import Path
from PyQt5.QtWidgets import (QApplication, QDialog, QLabel, QVBoxLayout,
QPushButton, QLineEdit, QMessageBox)
from PyQt5.QtCore import QThread, pyqtSignal, Qt, QUrl, QObject, QTimer
QPushButton, QLineEdit, QMessageBox)
from PyQt5.QtCore import QThread, pyqtSignal, Qt, QUrl, QObject
from PyQt5.QtGui import QDesktopServices
from picomc.logging import logger
from picomc.launcher import get_default_root, Launcher
# Constants
# Constants for Microsoft Authentication
URL_DEVICE_AUTH = "https://login.microsoftonline.com/consumers/oauth2/v2.0/devicecode"
URL_TOKEN = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token"
URL_XBL = "https://user.auth.xboxlive.com/user/authenticate"
URL_XSTS = "https://xsts.auth.xboxlive.com/xsts/authorize"
URL_MC = "https://api.minecraftservices.com/authentication/login_with_xbox"
URL_PROFILE = "https://api.minecraftservices.com/minecraft/profile"
CLIENT_ID = "c52aed44-3b4d-4215-99c5-824033d2bc0f"
SCOPE = "XboxLive.signin offline_access"
GRANT_TYPE = "urn:ietf:params:oauth:grant-type:device_code"
@ -22,10 +32,10 @@ class AuthDialog(QDialog):
self.setWindowFlags(self.windowFlags() & ~Qt.WindowContextHelpButtonHint)
self.setModal(True)
self.setup_ui(url, code, error_mode)
def setup_ui(self, url, code, error_mode):
layout = QVBoxLayout(self)
if error_mode:
error_label = QLabel("Error in Login - Please try again")
error_label.setStyleSheet("QLabel { color: red; font-weight: bold; }")
@ -82,75 +92,132 @@ class AuthenticationThread(QThread):
error_occurred = pyqtSignal(str)
auth_error_detected = pyqtSignal(str)
finished = pyqtSignal()
access_token_received = pyqtSignal(str, str)
access_token_received = pyqtSignal(dict)
def __init__(self, account):
def __init__(self, username):
super().__init__()
self.account = account
self.username = username
self.device_code = None
self.is_running = True
def run(self):
async def _ms_oauth(self):
data = {"client_id": CLIENT_ID, "scope": SCOPE}
async with aiohttp.ClientSession() as session:
async with session.post(URL_DEVICE_AUTH, data=data) as resp:
if resp.status != 200:
raise Exception(f"Failed to get device code: {await resp.text()}")
j = await resp.json()
self.device_code = j["device_code"]
self.auth_data_received.emit({
'url': j["verification_uri"],
'code': j["user_code"]
})
while self.is_running:
data = {
"grant_type": GRANT_TYPE,
"client_id": CLIENT_ID,
"device_code": self.device_code
}
async with session.post(URL_TOKEN, data=data) as resp:
j = await resp.json()
if resp.status == 400:
if j["error"] == "authorization_pending":
await asyncio.sleep(2)
continue
else:
raise Exception(j["error_description"])
elif resp.status != 200:
raise Exception(f"Token request failed: {j}")
return j["access_token"], j["refresh_token"]
async def _xbl_auth(self, access_token):
data = {
"Properties": {
"AuthMethod": "RPS",
"SiteName": "user.auth.xboxlive.com",
"RpsTicket": f"d={access_token}"
},
"RelyingParty": "http://auth.xboxlive.com",
"TokenType": "JWT"
}
async with aiohttp.ClientSession() as session:
async with session.post(URL_XBL, json=data) as resp:
if resp.status != 200:
raise Exception(f"XBL auth failed: {await resp.text()}")
j = await resp.json()
return j["Token"], j["DisplayClaims"]["xui"][0]["uhs"]
async def _xsts_auth(self, xbl_token):
data = {
"Properties": {
"SandboxId": "RETAIL",
"UserTokens": [xbl_token]
},
"RelyingParty": "rp://api.minecraftservices.com/",
"TokenType": "JWT"
}
async with aiohttp.ClientSession() as session:
async with session.post(URL_XSTS, json=data) as resp:
if resp.status != 200:
raise Exception(f"XSTS auth failed: {await resp.text()}")
j = await resp.json()
return j["Token"]
async def _mc_auth(self, uhs, xsts_token):
data = {
"identityToken": f"XBL3.0 x={uhs};{xsts_token}"
}
async with aiohttp.ClientSession() as session:
async with session.post(URL_MC, json=data) as resp:
if resp.status != 200:
raise Exception(f"MC auth failed: {await resp.text()}")
j = await resp.json()
return j["access_token"]
async def _get_profile(self, mc_token):
headers = {
"Authorization": f"Bearer {mc_token}"
}
async with aiohttp.ClientSession() as session:
async with session.get(URL_PROFILE, headers=headers) as resp:
if resp.status != 200:
raise Exception(f"Profile request failed: {await resp.text()}")
return await resp.json()
async def _auth_flow(self):
try:
self.authenticate(self.account)
ms_access_token, refresh_token = await self._ms_oauth()
xbl_token, uhs = await self._xbl_auth(ms_access_token)
xsts_token = await self._xsts_auth(xbl_token)
mc_token = await self._mc_auth(uhs, xsts_token)
profile = await self._get_profile(mc_token)
self.access_token_received.emit({
'access_token': mc_token,
'refresh_token': refresh_token,
'profile': profile
})
except Exception as e:
self.error_occurred.emit(str(e))
self.finished.emit()
def authenticate(self, account):
def run(self):
try:
data = {"client_id": CLIENT_ID, "scope": SCOPE}
# Request device code
resp = requests.post(URL_DEVICE_AUTH, data)
resp.raise_for_status()
j = resp.json()
self.device_code = j["device_code"]
user_code = j["user_code"]
link = j["verification_uri"]
# Format message with colorama
msg = j["message"]
msg = msg.replace(
user_code, colorama.Fore.RED + user_code + colorama.Fore.RESET
).replace(link, colorama.Style.BRIGHT + link + colorama.Style.NORMAL)
# Emit auth data received signal
self.auth_data_received.emit({'url': link, 'code': user_code})
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._auth_flow())
except Exception as e:
self.error_occurred.emit(str(e))
finally:
self.finished.emit()
def poll_for_token(self):
try:
data = {"code": self.device_code, "grant_type": GRANT_TYPE, "client_id": CLIENT_ID}
resp = requests.post(URL_TOKEN, data)
if resp.status_code == 400:
j = resp.json()
logger.debug(j)
if j["error"] == "authorization_pending":
logger.warning(j["error_description"])
self.auth_error_detected.emit(j["error_description"])
return
else:
raise Exception(j["error_description"])
resp.raise_for_status()
j = resp.json()
access_token = j["access_token"]
refresh_token = j["refresh_token"]
logger.debug("OAuth device code flow successful")
self.access_token_received.emit(access_token, refresh_token)
self.finished.emit()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
self.error_occurred.emit(str(e))
self.finished.emit()
def send_enter(self):
self.poll_for_token()
def stop(self):
self.is_running = False
@ -161,54 +228,117 @@ class MinecraftAuthenticator(QObject):
def __init__(self, parent=None):
super().__init__(parent)
self.auth_thread = None
self.current_auth_data = None
self.auth_dialog = None
self.success = False
self.username = None
# Initialize the launcher to get the correct config path
with Launcher.new() as launcher:
self.config_path = launcher.root
def authenticate(self, username):
self.username = username
self.success = False
# Create accounts.json if it doesn't exist
if not self.save_to_accounts_json():
return
self.auth_thread = AuthenticationThread(username)
self.auth_thread.auth_data_received.connect(self.show_auth_dialog)
self.auth_thread.auth_error_detected.connect(self.handle_auth_error)
self.auth_thread.error_occurred.connect(self.show_error)
self.auth_thread.access_token_received.connect(self.on_access_token_received)
self.auth_thread.finished.connect(self.on_authentication_finished)
self.auth_thread.start()
def show_auth_dialog(self, auth_data):
self.current_auth_data = auth_data
if self.auth_dialog is not None:
self.auth_dialog.close()
self.auth_dialog = None
self.auth_dialog = AuthDialog(auth_data['url'], auth_data['code'])
if self.auth_dialog.exec_() == QDialog.Accepted:
self.auth_thread.send_enter()
def handle_auth_error(self, output):
if self.current_auth_data:
if self.auth_dialog is not None:
self.auth_dialog.close()
self.auth_dialog = None
self.auth_dialog = AuthDialog(
self.current_auth_data['url'],
self.current_auth_data['code'],
error_mode=True
)
if self.auth_dialog.exec_() == QDialog.Accepted:
self.auth_thread.send_enter()
self.auth_dialog = AuthDialog(auth_data['url'], auth_data['code'])
result = self.auth_dialog.exec_()
if result != QDialog.Accepted:
self.auth_thread.stop()
def show_error(self, error_message):
QMessageBox.critical(None, "Error", f"Authentication error: {error_message}")
def show_error(self, error_msg):
QMessageBox.critical(None, "Error", error_msg)
self.success = False
self.auth_finished.emit(False)
def on_access_token_received(self, access_token, refresh_token):
QMessageBox.information(None, "Success", "Authentication successful!")
self.success = True
self.auth_finished.emit(True)
def save_to_accounts_json(self):
try:
accounts_file = Path(self.config_path) / "accounts.json"
if accounts_file.exists():
with open(accounts_file) as f:
config = json.load(f)
else:
config = {
"default": None,
"accounts": {},
"client_token": str(uuid.uuid4())
}
accounts_file.parent.mkdir(parents=True, exist_ok=True)
# Only create/update if account doesn't exist
if self.username not in config["accounts"]:
config["accounts"][self.username] = {
"uuid": "-",
"online": True,
"microsoft": True,
"gname": "-",
"access_token": "-",
"refresh_token": "-",
"is_authenticated": False
}
# Set as default if no default exists
if config["default"] is None:
config["default"] = self.username
with open(accounts_file, 'w') as f:
json.dump(config, f, indent=4)
return True
except Exception as e:
logger.error(f"Failed to initialize account data: {str(e)}")
QMessageBox.critical(None, "Error", f"Failed to initialize account data: {str(e)}")
return False
def on_access_token_received(self, data):
try:
accounts_file = Path(self.config_path) / "accounts.json"
with open(accounts_file) as f:
config = json.load(f)
if self.username in config["accounts"]:
config["accounts"][self.username].update({
"access_token": data['access_token'],
"refresh_token": data['refresh_token'],
"uuid": data['profile']['id'],
"gname": data['profile']['name'],
"is_authenticated": True
})
with open(accounts_file, 'w') as f:
json.dump(config, f, indent=4)
self.success = True
QMessageBox.information(None, "Success",
f"Successfully authenticated account: {self.username}")
else:
raise Exception("Account not found in configuration")
except Exception as e:
logger.error(f"Failed to update account data: {str(e)}")
QMessageBox.critical(None, "Error", f"Failed to update account data: {str(e)}")
self.success = False
self.auth_finished.emit(self.success)
def on_authentication_finished(self):
if self.auth_dialog is not None:
@ -231,9 +361,6 @@ class MinecraftAuthenticator(QObject):
self.auth_thread.stop()
self.auth_thread.wait()
# Example usage
if __name__ == '__main__':
app = QApplication(sys.argv)
authenticator = MinecraftAuthenticator()
authenticator.authenticate("TestUser")
sys.exit(app.exec_())
def create_authenticator():
"""Factory function to create a new MinecraftAuthenticator instance"""
return MinecraftAuthenticator()