mirror of
https://github.com/nixietab/picodulce.git
synced 2025-04-15 21:08:56 +01:00
367 lines
13 KiB
Python
367 lines
13 KiB
Python
import sys
|
|
import json
|
|
import os
|
|
import uuid
|
|
import asyncio
|
|
import aiohttp
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from PyQt5.QtWidgets import (QApplication, QDialog, QLabel, QVBoxLayout,
|
|
QPushButton, QLineEdit, QMessageBox)
|
|
from PyQt5.QtCore import QThread, pyqtSignal, Qt, QUrl, QObject
|
|
from PyQt5.QtGui import QDesktopServices
|
|
from picomc.logging import logger
|
|
from picomc.launcher import get_default_root, Launcher
|
|
|
|
# Constants for Microsoft Authentication
|
|
URL_DEVICE_AUTH = "https://login.microsoftonline.com/consumers/oauth2/v2.0/devicecode"
|
|
URL_TOKEN = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token"
|
|
URL_XBL = "https://user.auth.xboxlive.com/user/authenticate"
|
|
URL_XSTS = "https://xsts.auth.xboxlive.com/xsts/authorize"
|
|
URL_MC = "https://api.minecraftservices.com/authentication/login_with_xbox"
|
|
URL_PROFILE = "https://api.minecraftservices.com/minecraft/profile"
|
|
|
|
CLIENT_ID = "c52aed44-3b4d-4215-99c5-824033d2bc0f"
|
|
SCOPE = "XboxLive.signin offline_access"
|
|
GRANT_TYPE = "urn:ietf:params:oauth:grant-type:device_code"
|
|
|
|
class AuthDialog(QDialog):
|
|
def __init__(self, url, code, parent=None, error_mode=False):
|
|
super().__init__(parent)
|
|
self.setWindowTitle("Microsoft Authentication")
|
|
self.setWindowFlags(self.windowFlags() & ~Qt.WindowContextHelpButtonHint)
|
|
self.setModal(True)
|
|
self.setup_ui(url, code, error_mode)
|
|
|
|
def setup_ui(self, url, code, error_mode):
|
|
layout = QVBoxLayout(self)
|
|
|
|
if error_mode:
|
|
error_label = QLabel("Error in Login - Please try again")
|
|
error_label.setStyleSheet("QLabel { color: red; font-weight: bold; }")
|
|
layout.addWidget(error_label)
|
|
|
|
instructions = QLabel(
|
|
"To authenticate your Microsoft Account:\n\n"
|
|
"1. Click 'Open Authentication Page' or visit:\n"
|
|
"2. Copy the code below\n"
|
|
"3. Paste the code on the Microsoft website\n"
|
|
"4. After completing authentication, click 'I've Completed Authentication'"
|
|
)
|
|
instructions.setWordWrap(True)
|
|
layout.addWidget(instructions)
|
|
|
|
url_label = QLabel(url)
|
|
url_label.setTextInteractionFlags(Qt.TextSelectableByMouse)
|
|
url_label.setWordWrap(True)
|
|
layout.addWidget(url_label)
|
|
|
|
self.code_input = QLineEdit(code)
|
|
self.code_input.setReadOnly(True)
|
|
self.code_input.setAlignment(Qt.AlignCenter)
|
|
self.code_input.setStyleSheet("""
|
|
QLineEdit {
|
|
font-size: 16pt;
|
|
font-weight: bold;
|
|
padding: 5px;
|
|
}
|
|
""")
|
|
layout.addWidget(self.code_input)
|
|
|
|
copy_button = QPushButton("Copy Code")
|
|
copy_button.clicked.connect(self.copy_code)
|
|
layout.addWidget(copy_button)
|
|
|
|
open_url_button = QPushButton("Open Authentication Page")
|
|
open_url_button.clicked.connect(lambda: self.open_url(url))
|
|
layout.addWidget(open_url_button)
|
|
|
|
continue_button = QPushButton("I've Completed Authentication")
|
|
continue_button.clicked.connect(self.accept)
|
|
layout.addWidget(continue_button)
|
|
|
|
def copy_code(self):
|
|
clipboard = QApplication.clipboard()
|
|
clipboard.setText(self.code_input.text())
|
|
|
|
def open_url(self, url):
|
|
QDesktopServices.openUrl(QUrl(url))
|
|
|
|
class AuthenticationThread(QThread):
|
|
auth_data_received = pyqtSignal(dict)
|
|
error_occurred = pyqtSignal(str)
|
|
auth_error_detected = pyqtSignal(str)
|
|
finished = pyqtSignal()
|
|
access_token_received = pyqtSignal(dict)
|
|
|
|
def __init__(self, username):
|
|
super().__init__()
|
|
self.username = username
|
|
self.device_code = None
|
|
self.is_running = True
|
|
|
|
async def _ms_oauth(self):
|
|
data = {"client_id": CLIENT_ID, "scope": SCOPE}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(URL_DEVICE_AUTH, data=data) as resp:
|
|
if resp.status != 200:
|
|
raise Exception(f"Failed to get device code: {await resp.text()}")
|
|
j = await resp.json()
|
|
self.device_code = j["device_code"]
|
|
self.auth_data_received.emit({
|
|
'url': j["verification_uri"],
|
|
'code': j["user_code"]
|
|
})
|
|
|
|
while self.is_running:
|
|
data = {
|
|
"grant_type": GRANT_TYPE,
|
|
"client_id": CLIENT_ID,
|
|
"device_code": self.device_code
|
|
}
|
|
|
|
async with session.post(URL_TOKEN, data=data) as resp:
|
|
j = await resp.json()
|
|
if resp.status == 400:
|
|
if j["error"] == "authorization_pending":
|
|
await asyncio.sleep(2)
|
|
continue
|
|
else:
|
|
raise Exception(j["error_description"])
|
|
elif resp.status != 200:
|
|
raise Exception(f"Token request failed: {j}")
|
|
|
|
return j["access_token"], j["refresh_token"]
|
|
|
|
async def _xbl_auth(self, access_token):
|
|
data = {
|
|
"Properties": {
|
|
"AuthMethod": "RPS",
|
|
"SiteName": "user.auth.xboxlive.com",
|
|
"RpsTicket": f"d={access_token}"
|
|
},
|
|
"RelyingParty": "http://auth.xboxlive.com",
|
|
"TokenType": "JWT"
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(URL_XBL, json=data) as resp:
|
|
if resp.status != 200:
|
|
raise Exception(f"XBL auth failed: {await resp.text()}")
|
|
j = await resp.json()
|
|
return j["Token"], j["DisplayClaims"]["xui"][0]["uhs"]
|
|
|
|
async def _xsts_auth(self, xbl_token):
|
|
data = {
|
|
"Properties": {
|
|
"SandboxId": "RETAIL",
|
|
"UserTokens": [xbl_token]
|
|
},
|
|
"RelyingParty": "rp://api.minecraftservices.com/",
|
|
"TokenType": "JWT"
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(URL_XSTS, json=data) as resp:
|
|
if resp.status != 200:
|
|
raise Exception(f"XSTS auth failed: {await resp.text()}")
|
|
j = await resp.json()
|
|
return j["Token"]
|
|
|
|
async def _mc_auth(self, uhs, xsts_token):
|
|
data = {
|
|
"identityToken": f"XBL3.0 x={uhs};{xsts_token}"
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(URL_MC, json=data) as resp:
|
|
if resp.status != 200:
|
|
raise Exception(f"MC auth failed: {await resp.text()}")
|
|
j = await resp.json()
|
|
return j["access_token"]
|
|
|
|
async def _get_profile(self, mc_token):
|
|
headers = {
|
|
"Authorization": f"Bearer {mc_token}"
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(URL_PROFILE, headers=headers) as resp:
|
|
if resp.status != 200:
|
|
raise Exception(f"Profile request failed: {await resp.text()}")
|
|
return await resp.json()
|
|
|
|
async def _auth_flow(self):
|
|
try:
|
|
ms_access_token, refresh_token = await self._ms_oauth()
|
|
xbl_token, uhs = await self._xbl_auth(ms_access_token)
|
|
xsts_token = await self._xsts_auth(xbl_token)
|
|
mc_token = await self._mc_auth(uhs, xsts_token)
|
|
profile = await self._get_profile(mc_token)
|
|
|
|
self.access_token_received.emit({
|
|
'access_token': mc_token,
|
|
'refresh_token': refresh_token,
|
|
'profile': profile
|
|
})
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(str(e))
|
|
|
|
def run(self):
|
|
try:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
loop.run_until_complete(self._auth_flow())
|
|
except Exception as e:
|
|
self.error_occurred.emit(str(e))
|
|
finally:
|
|
self.finished.emit()
|
|
|
|
def stop(self):
|
|
self.is_running = False
|
|
|
|
class MinecraftAuthenticator(QObject):
|
|
auth_finished = pyqtSignal(bool)
|
|
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
self.auth_thread = None
|
|
self.auth_dialog = None
|
|
self.success = False
|
|
self.username = None
|
|
|
|
# Initialize the launcher to get the correct config path
|
|
with Launcher.new() as launcher:
|
|
self.config_path = launcher.root
|
|
|
|
def authenticate(self, username):
|
|
self.username = username
|
|
self.success = False
|
|
|
|
# Create accounts.json if it doesn't exist
|
|
if not self.save_to_accounts_json():
|
|
return
|
|
|
|
self.auth_thread = AuthenticationThread(username)
|
|
self.auth_thread.auth_data_received.connect(self.show_auth_dialog)
|
|
self.auth_thread.error_occurred.connect(self.show_error)
|
|
self.auth_thread.access_token_received.connect(self.on_access_token_received)
|
|
self.auth_thread.finished.connect(self.on_authentication_finished)
|
|
self.auth_thread.start()
|
|
|
|
def show_auth_dialog(self, auth_data):
|
|
if self.auth_dialog is not None:
|
|
self.auth_dialog.close()
|
|
|
|
self.auth_dialog = AuthDialog(auth_data['url'], auth_data['code'])
|
|
|
|
result = self.auth_dialog.exec_()
|
|
|
|
if result != QDialog.Accepted:
|
|
self.auth_thread.stop()
|
|
|
|
def show_error(self, error_msg):
|
|
QMessageBox.critical(None, "Error", error_msg)
|
|
self.success = False
|
|
self.auth_finished.emit(False)
|
|
|
|
def save_to_accounts_json(self):
|
|
try:
|
|
accounts_file = Path(self.config_path) / "accounts.json"
|
|
|
|
if accounts_file.exists():
|
|
with open(accounts_file) as f:
|
|
config = json.load(f)
|
|
else:
|
|
config = {
|
|
"default": None,
|
|
"accounts": {},
|
|
"client_token": str(uuid.uuid4())
|
|
}
|
|
accounts_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Only create/update if account doesn't exist
|
|
if self.username not in config["accounts"]:
|
|
config["accounts"][self.username] = {
|
|
"uuid": "-",
|
|
"online": True,
|
|
"microsoft": True,
|
|
"gname": "-",
|
|
"access_token": "-",
|
|
"refresh_token": "-",
|
|
"is_authenticated": False
|
|
}
|
|
|
|
# Set as default if no default exists
|
|
if config["default"] is None:
|
|
config["default"] = self.username
|
|
|
|
with open(accounts_file, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize account data: {str(e)}")
|
|
QMessageBox.critical(None, "Error", f"Failed to initialize account data: {str(e)}")
|
|
return False
|
|
|
|
def on_access_token_received(self, data):
|
|
try:
|
|
accounts_file = Path(self.config_path) / "accounts.json"
|
|
|
|
with open(accounts_file) as f:
|
|
config = json.load(f)
|
|
|
|
if self.username in config["accounts"]:
|
|
config["accounts"][self.username].update({
|
|
"access_token": data['access_token'],
|
|
"refresh_token": data['refresh_token'],
|
|
"uuid": data['profile']['id'],
|
|
"gname": data['profile']['name'],
|
|
"is_authenticated": True
|
|
})
|
|
|
|
with open(accounts_file, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
|
|
self.success = True
|
|
QMessageBox.information(None, "Success",
|
|
f"Successfully authenticated account: {self.username}")
|
|
else:
|
|
raise Exception("Account not found in configuration")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update account data: {str(e)}")
|
|
QMessageBox.critical(None, "Error", f"Failed to update account data: {str(e)}")
|
|
self.success = False
|
|
|
|
self.auth_finished.emit(self.success)
|
|
|
|
def on_authentication_finished(self):
|
|
if self.auth_dialog is not None:
|
|
self.auth_dialog.close()
|
|
self.auth_dialog = None
|
|
|
|
if self.auth_thread:
|
|
self.auth_thread.stop()
|
|
self.auth_thread = None
|
|
|
|
if not self.success:
|
|
self.auth_finished.emit(False)
|
|
|
|
def cleanup(self):
|
|
if self.auth_dialog is not None:
|
|
self.auth_dialog.close()
|
|
self.auth_dialog = None
|
|
|
|
if self.auth_thread and self.auth_thread.isRunning():
|
|
self.auth_thread.stop()
|
|
self.auth_thread.wait()
|
|
|
|
def create_authenticator():
|
|
"""Factory function to create a new MinecraftAuthenticator instance"""
|
|
return MinecraftAuthenticator()
|