mirror of
https://github.com/nixietab/picodulce.git
synced 2025-04-19 14:55:30 +01:00
Compare commits
15 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
6dff77c7e4 | ||
![]() |
5d55c33a22 | ||
![]() |
9cbcd6683b | ||
![]() |
d4d496d867 | ||
![]() |
9ae94befa7 | ||
![]() |
f32df5301b | ||
![]() |
a3c6b57283 | ||
![]() |
82d0122d75 | ||
![]() |
7062b43065 | ||
![]() |
c7b9dde2cc | ||
![]() |
5661627c9d | ||
![]() |
3c7d91fe82 | ||
![]() |
a36b177e26 | ||
![]() |
416ee639dd | ||
![]() |
d644a73a5c |
356
authser.py
356
authser.py
@ -1,29 +1,43 @@
|
||||
import sys
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
import asyncio
|
||||
import aiohttp
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
import re
|
||||
from PyQt5.QtWidgets import (QApplication, QDialog, QLabel, QVBoxLayout,
|
||||
QPushButton, QLineEdit, QMessageBox)
|
||||
QPushButton, QLineEdit, QMessageBox)
|
||||
from PyQt5.QtCore import QThread, pyqtSignal, Qt, QUrl, QObject
|
||||
from PyQt5.QtGui import QDesktopServices
|
||||
from picomc.logging import logger
|
||||
from picomc.launcher import get_default_root, Launcher
|
||||
|
||||
# Constants for Microsoft Authentication
|
||||
URL_DEVICE_AUTH = "https://login.microsoftonline.com/consumers/oauth2/v2.0/devicecode"
|
||||
URL_TOKEN = "https://login.microsoftonline.com/consumers/oauth2/v2.0/token"
|
||||
URL_XBL = "https://user.auth.xboxlive.com/user/authenticate"
|
||||
URL_XSTS = "https://xsts.auth.xboxlive.com/xsts/authorize"
|
||||
URL_MC = "https://api.minecraftservices.com/authentication/login_with_xbox"
|
||||
URL_PROFILE = "https://api.minecraftservices.com/minecraft/profile"
|
||||
class AuthenticationParser:
|
||||
@staticmethod
|
||||
def clean_ansi(text):
|
||||
ansi_clean = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
||||
printable_clean = re.compile(r'[^\x20-\x7E\n]')
|
||||
text = ansi_clean.sub('', text)
|
||||
text = printable_clean.sub('', text)
|
||||
return text.strip()
|
||||
|
||||
CLIENT_ID = "c52aed44-3b4d-4215-99c5-824033d2bc0f"
|
||||
SCOPE = "XboxLive.signin offline_access"
|
||||
GRANT_TYPE = "urn:ietf:params:oauth:grant-type:device_code"
|
||||
@staticmethod
|
||||
def is_auth_error(output):
|
||||
cleaned_output = AuthenticationParser.clean_ansi(output)
|
||||
return "AADSTS70016" in cleaned_output and "not yet been authorized" in cleaned_output
|
||||
|
||||
@staticmethod
|
||||
def parse_auth_output(output):
|
||||
cleaned_output = AuthenticationParser.clean_ansi(output)
|
||||
if AuthenticationParser.is_auth_error(cleaned_output):
|
||||
return None
|
||||
|
||||
pattern = r"https://[^\s]+"
|
||||
code_pattern = r"code\s+([A-Z0-9]+)"
|
||||
|
||||
url_match = re.search(pattern, cleaned_output)
|
||||
code_match = re.search(code_pattern, cleaned_output, re.IGNORECASE)
|
||||
|
||||
if url_match and code_match:
|
||||
return {
|
||||
'url': url_match.group(0),
|
||||
'code': code_match.group(1)
|
||||
}
|
||||
return None
|
||||
|
||||
class AuthDialog(QDialog):
|
||||
def __init__(self, url, code, parent=None, error_mode=False):
|
||||
@ -32,10 +46,10 @@ class AuthDialog(QDialog):
|
||||
self.setWindowFlags(self.windowFlags() & ~Qt.WindowContextHelpButtonHint)
|
||||
self.setModal(True)
|
||||
self.setup_ui(url, code, error_mode)
|
||||
|
||||
|
||||
def setup_ui(self, url, code, error_mode):
|
||||
layout = QVBoxLayout(self)
|
||||
|
||||
|
||||
if error_mode:
|
||||
error_label = QLabel("Error in Login - Please try again")
|
||||
error_label.setStyleSheet("QLabel { color: red; font-weight: bold; }")
|
||||
@ -92,254 +106,117 @@ class AuthenticationThread(QThread):
|
||||
error_occurred = pyqtSignal(str)
|
||||
auth_error_detected = pyqtSignal(str)
|
||||
finished = pyqtSignal()
|
||||
access_token_received = pyqtSignal(dict)
|
||||
|
||||
def __init__(self, username):
|
||||
def __init__(self, account):
|
||||
super().__init__()
|
||||
self.username = username
|
||||
self.device_code = None
|
||||
self.account = account
|
||||
self.process = None
|
||||
self.is_running = True
|
||||
|
||||
async def _ms_oauth(self):
|
||||
data = {"client_id": CLIENT_ID, "scope": SCOPE}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(URL_DEVICE_AUTH, data=data) as resp:
|
||||
if resp.status != 200:
|
||||
raise Exception(f"Failed to get device code: {await resp.text()}")
|
||||
j = await resp.json()
|
||||
self.device_code = j["device_code"]
|
||||
self.auth_data_received.emit({
|
||||
'url': j["verification_uri"],
|
||||
'code': j["user_code"]
|
||||
})
|
||||
|
||||
while self.is_running:
|
||||
data = {
|
||||
"grant_type": GRANT_TYPE,
|
||||
"client_id": CLIENT_ID,
|
||||
"device_code": self.device_code
|
||||
}
|
||||
|
||||
async with session.post(URL_TOKEN, data=data) as resp:
|
||||
j = await resp.json()
|
||||
if resp.status == 400:
|
||||
if j["error"] == "authorization_pending":
|
||||
await asyncio.sleep(2)
|
||||
continue
|
||||
else:
|
||||
raise Exception(j["error_description"])
|
||||
elif resp.status != 200:
|
||||
raise Exception(f"Token request failed: {j}")
|
||||
|
||||
return j["access_token"], j["refresh_token"]
|
||||
|
||||
async def _xbl_auth(self, access_token):
|
||||
data = {
|
||||
"Properties": {
|
||||
"AuthMethod": "RPS",
|
||||
"SiteName": "user.auth.xboxlive.com",
|
||||
"RpsTicket": f"d={access_token}"
|
||||
},
|
||||
"RelyingParty": "http://auth.xboxlive.com",
|
||||
"TokenType": "JWT"
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(URL_XBL, json=data) as resp:
|
||||
if resp.status != 200:
|
||||
raise Exception(f"XBL auth failed: {await resp.text()}")
|
||||
j = await resp.json()
|
||||
return j["Token"], j["DisplayClaims"]["xui"][0]["uhs"]
|
||||
|
||||
async def _xsts_auth(self, xbl_token):
|
||||
data = {
|
||||
"Properties": {
|
||||
"SandboxId": "RETAIL",
|
||||
"UserTokens": [xbl_token]
|
||||
},
|
||||
"RelyingParty": "rp://api.minecraftservices.com/",
|
||||
"TokenType": "JWT"
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(URL_XSTS, json=data) as resp:
|
||||
if resp.status != 200:
|
||||
raise Exception(f"XSTS auth failed: {await resp.text()}")
|
||||
j = await resp.json()
|
||||
return j["Token"]
|
||||
|
||||
async def _mc_auth(self, uhs, xsts_token):
|
||||
data = {
|
||||
"identityToken": f"XBL3.0 x={uhs};{xsts_token}"
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(URL_MC, json=data) as resp:
|
||||
if resp.status != 200:
|
||||
raise Exception(f"MC auth failed: {await resp.text()}")
|
||||
j = await resp.json()
|
||||
return j["access_token"]
|
||||
|
||||
async def _get_profile(self, mc_token):
|
||||
headers = {
|
||||
"Authorization": f"Bearer {mc_token}"
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(URL_PROFILE, headers=headers) as resp:
|
||||
if resp.status != 200:
|
||||
raise Exception(f"Profile request failed: {await resp.text()}")
|
||||
return await resp.json()
|
||||
|
||||
async def _auth_flow(self):
|
||||
try:
|
||||
ms_access_token, refresh_token = await self._ms_oauth()
|
||||
xbl_token, uhs = await self._xbl_auth(ms_access_token)
|
||||
xsts_token = await self._xsts_auth(xbl_token)
|
||||
mc_token = await self._mc_auth(uhs, xsts_token)
|
||||
profile = await self._get_profile(mc_token)
|
||||
|
||||
self.access_token_received.emit({
|
||||
'access_token': mc_token,
|
||||
'refresh_token': refresh_token,
|
||||
'profile': profile
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
self.error_occurred.emit(str(e))
|
||||
self.current_output = ""
|
||||
self.waiting_for_auth = False
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(self._auth_flow())
|
||||
command = f'picomc account authenticate {self.account}'
|
||||
|
||||
self.process = subprocess.Popen(
|
||||
command,
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
stdin=subprocess.PIPE,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
universal_newlines=True
|
||||
)
|
||||
|
||||
self.current_output = ""
|
||||
while self.is_running and self.process.poll() is None:
|
||||
line = self.process.stdout.readline()
|
||||
if line:
|
||||
self.current_output += line
|
||||
|
||||
if not self.waiting_for_auth:
|
||||
parsed_data = AuthenticationParser.parse_auth_output(self.current_output)
|
||||
if parsed_data:
|
||||
self.auth_data_received.emit(parsed_data)
|
||||
self.waiting_for_auth = True
|
||||
self.current_output = ""
|
||||
elif AuthenticationParser.is_auth_error(self.current_output):
|
||||
self.auth_error_detected.emit(self.current_output)
|
||||
self.waiting_for_auth = False
|
||||
self.current_output = ""
|
||||
|
||||
self.process.wait()
|
||||
self.finished.emit()
|
||||
|
||||
except Exception as e:
|
||||
self.error_occurred.emit(str(e))
|
||||
finally:
|
||||
self.finished.emit()
|
||||
|
||||
def send_enter(self):
|
||||
if self.process and self.process.poll() is None:
|
||||
self.process.stdin.write("\n")
|
||||
self.process.stdin.flush()
|
||||
|
||||
def stop(self):
|
||||
self.is_running = False
|
||||
if self.process:
|
||||
self.process.terminate()
|
||||
|
||||
class MinecraftAuthenticator(QObject):
|
||||
auth_finished = pyqtSignal(bool)
|
||||
class MinecraftAuthenticator(QObject): # Changed to inherit from QObject
|
||||
auth_finished = pyqtSignal(bool) # Add signal for completion
|
||||
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
self.auth_thread = None
|
||||
self.current_auth_data = None
|
||||
self.auth_dialog = None
|
||||
self.success = False
|
||||
self.username = None
|
||||
|
||||
# Initialize the launcher to get the correct config path
|
||||
with Launcher.new() as launcher:
|
||||
self.config_path = launcher.root
|
||||
|
||||
def authenticate(self, username):
|
||||
self.username = username
|
||||
"""
|
||||
Start the authentication process for the given username
|
||||
Returns immediately, authentication result will be emitted via auth_finished signal
|
||||
"""
|
||||
self.success = False
|
||||
|
||||
# Create accounts.json if it doesn't exist
|
||||
if not self.save_to_accounts_json():
|
||||
return
|
||||
|
||||
self.auth_thread = AuthenticationThread(username)
|
||||
self.auth_thread.auth_data_received.connect(self.show_auth_dialog)
|
||||
self.auth_thread.auth_error_detected.connect(self.handle_auth_error)
|
||||
self.auth_thread.error_occurred.connect(self.show_error)
|
||||
self.auth_thread.access_token_received.connect(self.on_access_token_received)
|
||||
self.auth_thread.finished.connect(self.on_authentication_finished)
|
||||
self.auth_thread.start()
|
||||
|
||||
def show_auth_dialog(self, auth_data):
|
||||
self.current_auth_data = auth_data
|
||||
|
||||
if self.auth_dialog is not None:
|
||||
self.auth_dialog.close()
|
||||
|
||||
self.auth_dialog = None
|
||||
|
||||
self.auth_dialog = AuthDialog(auth_data['url'], auth_data['code'])
|
||||
|
||||
result = self.auth_dialog.exec_()
|
||||
|
||||
if result != QDialog.Accepted:
|
||||
self.auth_thread.stop()
|
||||
if self.auth_dialog.exec_() == QDialog.Accepted:
|
||||
self.auth_thread.send_enter()
|
||||
|
||||
def show_error(self, error_msg):
|
||||
QMessageBox.critical(None, "Error", error_msg)
|
||||
def handle_auth_error(self, output):
|
||||
if self.current_auth_data:
|
||||
if self.auth_dialog is not None:
|
||||
self.auth_dialog.close()
|
||||
self.auth_dialog = None
|
||||
|
||||
self.auth_dialog = AuthDialog(
|
||||
self.current_auth_data['url'],
|
||||
self.current_auth_data['code'],
|
||||
error_mode=True
|
||||
)
|
||||
if self.auth_dialog.exec_() == QDialog.Accepted:
|
||||
self.auth_thread.send_enter()
|
||||
|
||||
def show_error(self, error_message):
|
||||
QMessageBox.critical(None, "Error", f"Authentication error: {error_message}")
|
||||
self.success = False
|
||||
self.auth_finished.emit(False)
|
||||
|
||||
def save_to_accounts_json(self):
|
||||
try:
|
||||
accounts_file = Path(self.config_path) / "accounts.json"
|
||||
|
||||
if accounts_file.exists():
|
||||
with open(accounts_file) as f:
|
||||
config = json.load(f)
|
||||
else:
|
||||
config = {
|
||||
"default": None,
|
||||
"accounts": {},
|
||||
"client_token": str(uuid.uuid4())
|
||||
}
|
||||
accounts_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Only create/update if account doesn't exist
|
||||
if self.username not in config["accounts"]:
|
||||
config["accounts"][self.username] = {
|
||||
"uuid": "-",
|
||||
"online": True,
|
||||
"microsoft": True,
|
||||
"gname": "-",
|
||||
"access_token": "-",
|
||||
"refresh_token": "-",
|
||||
"is_authenticated": False
|
||||
}
|
||||
|
||||
# Set as default if no default exists
|
||||
if config["default"] is None:
|
||||
config["default"] = self.username
|
||||
|
||||
with open(accounts_file, 'w') as f:
|
||||
json.dump(config, f, indent=4)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize account data: {str(e)}")
|
||||
QMessageBox.critical(None, "Error", f"Failed to initialize account data: {str(e)}")
|
||||
return False
|
||||
|
||||
def on_access_token_received(self, data):
|
||||
try:
|
||||
accounts_file = Path(self.config_path) / "accounts.json"
|
||||
|
||||
with open(accounts_file) as f:
|
||||
config = json.load(f)
|
||||
|
||||
if self.username in config["accounts"]:
|
||||
config["accounts"][self.username].update({
|
||||
"access_token": data['access_token'],
|
||||
"refresh_token": data['refresh_token'],
|
||||
"uuid": data['profile']['id'],
|
||||
"gname": data['profile']['name'],
|
||||
"is_authenticated": True
|
||||
})
|
||||
|
||||
with open(accounts_file, 'w') as f:
|
||||
json.dump(config, f, indent=4)
|
||||
|
||||
self.success = True
|
||||
QMessageBox.information(None, "Success",
|
||||
f"Successfully authenticated account: {self.username}")
|
||||
else:
|
||||
raise Exception("Account not found in configuration")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update account data: {str(e)}")
|
||||
QMessageBox.critical(None, "Error", f"Failed to update account data: {str(e)}")
|
||||
self.success = False
|
||||
|
||||
self.auth_finished.emit(self.success)
|
||||
|
||||
def on_authentication_finished(self):
|
||||
if self.auth_dialog is not None:
|
||||
self.auth_dialog.close()
|
||||
@ -349,8 +226,8 @@ class MinecraftAuthenticator(QObject):
|
||||
self.auth_thread.stop()
|
||||
self.auth_thread = None
|
||||
|
||||
if not self.success:
|
||||
self.auth_finished.emit(False)
|
||||
self.success = True
|
||||
self.auth_finished.emit(True)
|
||||
|
||||
def cleanup(self):
|
||||
if self.auth_dialog is not None:
|
||||
@ -361,6 +238,7 @@ class MinecraftAuthenticator(QObject):
|
||||
self.auth_thread.stop()
|
||||
self.auth_thread.wait()
|
||||
|
||||
def create_authenticator():
|
||||
"""Factory function to create a new MinecraftAuthenticator instance"""
|
||||
return MinecraftAuthenticator()
|
||||
# Example usage
|
||||
if __name__ == '__main__':
|
||||
authenticator = MinecraftAuthenticator()
|
||||
authenticator.authenticate("TestUser")
|
||||
|
@ -116,4 +116,37 @@ class HealthCheck:
|
||||
|
||||
# Check if both files exist and print OK message
|
||||
if os.path.isfile(dark_theme_file) and os.path.isfile(native_theme_file):
|
||||
print("Theme Integrity OK")
|
||||
print("Theme Integrity OK")
|
||||
|
||||
def locales_integrity(self):
|
||||
# Define the locales folder path
|
||||
locales_folder = "locales"
|
||||
version_url = "https://raw.githubusercontent.com/nixietab/picodulce/main/version.json"
|
||||
|
||||
# Step 1: Ensure the locales folder exists
|
||||
if not os.path.exists(locales_folder):
|
||||
print(f"Creating folder: {locales_folder}")
|
||||
os.makedirs(locales_folder)
|
||||
self.download_locales(version_url)
|
||||
else:
|
||||
print("Locales folder already exists.")
|
||||
|
||||
def download_locales(self, url):
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
locales_links = data.get("locales", [])
|
||||
|
||||
for link in locales_links:
|
||||
locale_name = os.path.basename(link)
|
||||
locale_path = os.path.join("locales", locale_name)
|
||||
locale_response = requests.get(link)
|
||||
|
||||
if locale_response.status_code == 200:
|
||||
with open(locale_path, "w", encoding="utf-8") as locale_file:
|
||||
locale_file.write(locale_response.text)
|
||||
print(f"Downloaded and created file: {locale_path}")
|
||||
else:
|
||||
print(f"Failed to download {link}")
|
||||
else:
|
||||
print("Failed to fetch version.json")
|
1
locales/locales-go-here
Normal file
1
locales/locales-go-here
Normal file
@ -0,0 +1 @@
|
||||
|
BIN
locales/picodulce_es.qm
Normal file
BIN
locales/picodulce_es.qm
Normal file
Binary file not shown.
BIN
locales/picodulce_it.qm
Normal file
BIN
locales/picodulce_it.qm
Normal file
Binary file not shown.
BIN
locales/picodulce_pt.qm
Normal file
BIN
locales/picodulce_pt.qm
Normal file
Binary file not shown.
30
modulecli.py
30
modulecli.py
@ -1,30 +0,0 @@
|
||||
import click
|
||||
from picomc.cli.main import picomc_cli
|
||||
from io import StringIO
|
||||
import sys
|
||||
|
||||
def run_command(command="picomc"):
|
||||
# Redirect stdout and stderr to capture the command output
|
||||
old_stdout, old_stderr = sys.stdout, sys.stderr
|
||||
sys.stdout = mystdout = StringIO()
|
||||
sys.stderr = mystderr = StringIO()
|
||||
|
||||
try:
|
||||
picomc_cli.main(args=command.split())
|
||||
except SystemExit as e:
|
||||
if e.code != 0:
|
||||
print(f"Command exited with code {e.code}", file=sys.stderr)
|
||||
except Exception as e:
|
||||
print(f"Unexpected error: {e}", file=sys.stderr)
|
||||
finally:
|
||||
# Restore stdout and stderr
|
||||
sys.stdout = old_stdout
|
||||
sys.stderr = old_stderr
|
||||
|
||||
output = mystdout.getvalue().strip()
|
||||
error = mystderr.getvalue().strip()
|
||||
|
||||
if not output:
|
||||
return f"Error: No output from command. Stderr: {error}"
|
||||
|
||||
return output
|
692
picodulce.py
692
picodulce.py
File diff suppressed because it is too large
Load Diff
@ -1,6 +1,5 @@
|
||||
picomc
|
||||
PyQt5
|
||||
requests
|
||||
aiohttp
|
||||
pypresence
|
||||
tqdm
|
||||
|
13
version.json
13
version.json
@ -1,5 +1,5 @@
|
||||
{
|
||||
"version": "0.13.1",
|
||||
"version": "0.13",
|
||||
"links": [
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/version.json",
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/picodulce.py",
|
||||
@ -7,9 +7,12 @@
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/drums.gif",
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/marroc.py",
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/holiday.ico",
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/authser.py",
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/healthcheck.py",
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/modulecli.py"
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/authser.py"
|
||||
],
|
||||
"versionBleeding": "0.13.1-202"
|
||||
"locales": [
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/locales/picodulce_es.qm",
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/locales/picodulce_it.qm",
|
||||
"https://raw.githubusercontent.com/nixietab/picodulce/main/locales/picodulce_pt.qm"
|
||||
],
|
||||
"versionBleeding": "0.12-174"
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user