@@ -8,6 +8,7 @@ import time
import auth_webview
import os
import re
import hashlib
import subprocess
import threading
import tempfile
@@ -357,13 +358,30 @@ class UpdateChecker(QObject):
html_url = release_data . get ( " html_url " ) or releases_url
assets = release_data . get ( " assets " ) or [ ]
download_url = " "
download_name = " "
checksum_url = " "
for asset in assets :
url = asset . get ( " browser_download_url " , " " )
if url . lower ( ) . endswith ( " .zip " ) :
download_url = url
download_name = asset . get ( " name " , " " )
break
if not download_url and assets :
download_url = assets [ 0 ] . get ( " browser_download_url " , " " )
download_name = assets [ 0 ] . get ( " name " , " " )
for asset in assets :
name = asset . get ( " name " , " " ) . lower ( )
if not name :
continue
is_checksum_asset = name . endswith ( " .sha256 " ) or name . endswith ( " .sha256.txt " ) or name in ( " checksums.txt " , " sha256sums.txt " )
if not is_checksum_asset :
continue
if download_name and ( download_name . lower ( ) in name or name in ( f " { download_name . lower ( ) } .sha256 " , f " { download_name . lower ( ) } .sha256.txt " ) ) :
checksum_url = asset . get ( " browser_download_url " , " " )
break
if not checksum_url :
checksum_url = asset . get ( " browser_download_url " , " " )
self . check_finished . emit (
{
@@ -373,6 +391,8 @@ class UpdateChecker(QObject):
" latest_tag " : latest_tag ,
" release_url " : html_url ,
" download_url " : download_url ,
" download_name " : download_name ,
" checksum_url " : checksum_url ,
" has_update " : _is_newer_version ( latest_version , self . current_version ) ,
}
)
@@ -707,9 +727,11 @@ class VkChatManager(QMainWindow):
clicked = message_box . clickedButton ( )
download_url = result . get ( " download_url " )
checksum_url = result . get ( " checksum_url " )
download_name = result . get ( " download_name " )
release_url = result . get ( " release_url " )
if clicked is update_now_button and download_url :
if not self . _start_auto_update ( download_url , latest_version ) :
if not self . _start_auto_update ( download_url , latest_version , checksum_url , download_name ):
if release_url :
QDesktopServices . openUrl ( QUrl ( release_url ) )
return
@@ -754,6 +776,53 @@ class VkChatManager(QMainWindow):
with open ( destination_path , " wb " ) as f :
shutil . copyfileobj ( response , f )
def _download_update_text ( self , url ) :
request = urllib . request . Request (
url ,
headers = { " User-Agent " : " AnabasisManager-Updater " } ,
)
with urllib . request . urlopen ( request , timeout = 30 ) as response :
return response . read ( ) . decode ( " utf-8 " , errors = " replace " )
@staticmethod
def _sha256_file ( path ) :
digest = hashlib . sha256 ( )
with open ( path , " rb " ) as f :
for chunk in iter ( lambda : f . read ( 1024 * 1024 ) , b " " ) :
digest . update ( chunk )
return digest . hexdigest ( ) . lower ( )
@staticmethod
def _extract_sha256_from_text ( checksum_text , target_file_name ) :
target = ( target_file_name or " " ) . strip ( ) . lower ( )
for raw_line in checksum_text . splitlines ( ) :
line = raw_line . strip ( )
if not line :
continue
match = re . search ( r " \ b([A-Fa-f0-9] {64} ) \ b " , line )
if not match :
continue
checksum = match . group ( 1 ) . lower ( )
if not target :
return checksum
line_lower = line . lower ( )
if target in line_lower :
return checksum
if os . path . basename ( target ) in line_lower :
return checksum
return " "
def _verify_update_checksum ( self , zip_path , checksum_url , download_name ) :
if not checksum_url :
raise RuntimeError ( " В релизе нет файла SHA256. Автообновление остановлено." )
checksum_text = self . _download_update_text ( checksum_url )
expected_hash = self . _extract_sha256_from_text ( checksum_text , download_name or os . path . basename ( zip_path ) )
if not expected_hash :
raise RuntimeError ( " Н е удалось найти SHA256 для архива обновления." )
actual_hash = self . _sha256_file ( zip_path )
if actual_hash != expected_hash :
raise RuntimeError ( " SHA256 не совпадает, обновление отменено. " )
def _locate_extracted_root ( self , extracted_dir ) :
entries = [ ]
for name in os . listdir ( extracted_dir ) :
@@ -766,7 +835,7 @@ class VkChatManager(QMainWindow):
return candidate
return extracted_dir
def _build_update_script ( self , app_dir , source_dir , exe_name ) :
def _build_update_script ( self , app_dir , source_dir , exe_name , target_pid ):
script_path = os . path . join ( tempfile . gettempdir ( ) , " anabasis_apply_update.cmd " )
script_lines = [
" @echo off " ,
@@ -774,21 +843,42 @@ class VkChatManager(QMainWindow):
f " set APP_DIR= { app_dir } " ,
f " set SRC_DIR= { source_dir } " ,
f " set EXE_NAME= { exe_name } " ,
" timeout /t 2 /nobreak >nul " ,
" robocopy \" % SRC _DIR% \" \" % APP_DIR % \" /E /NFL /NDL /NJH /NJS /NP /R:3 /W:1 >nul " ,
f " set TARGET_PID= { target_pid } " ,
" set BACKUP _DIR= % TEMP % \\ anabasis_backup_ % RANDOM %% RANDOM % " ,
" :wait_for_exit " ,
" tasklist /FI \" PID eq % TARGET_PID % \" | find \" % TARGET_PID % \" >nul " ,
" if %E RRORLEVEL % E QU 0 ( " ,
" timeout /t 1 /nobreak >nul " ,
" goto :wait_for_exit " ,
" ) " ,
" timeout /t 1 /nobreak >nul " ,
" mkdir \" % BACKUP_DIR % \" >nul 2>&1 " ,
" robocopy \" % APP_DIR % \" \" % BACKUP_DIR % \" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul " ,
" set RC= %E RRORLEVEL % " ,
" if % RC % G EQ 8 goto :copy _error " ,
" if % RC % G EQ 8 goto :backup _error " ,
" robocopy \" % SRC_DIR % \" \" % APP_DIR % \" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul " ,
" set RC= %E RRORLEVEL % " ,
" if % RC % G EQ 8 goto :rollback " ,
" start \" \" \" % APP_DIR % \\ %E XE_NAME % \" " ,
" timeout /t 2 /nobreak >nul " ,
" tasklist /FI \" IMAGENAME eq %E XE_NAME % \" | find /I \" %E XE_NAME % \" >nul " ,
" if %E RRORLEVEL % NEQ 0 goto :rollback " ,
" rmdir /S /Q \" % BACKUP_DIR % \" >nul 2>&1 " ,
" exit /b 0 " ,
" :copy_error " ,
" echo Auto-update failed with code % RC % > \" % APP_DIR % \\ update_error.log \" " ,
" :rollback " ,
" robocopy \" % BACKUP_DIR % \" \" % APP_DIR % \" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul " ,
" start \" \" \" % APP_DIR % \\ %E XE_NAME % \" " ,
" echo Auto-update failed. Rollback executed. > \" % APP_DIR % \\ update_error.log \" " ,
" exit /b 2 " ,
" :backup_error " ,
" echo Auto-update failed during backup. Code % RC % > \" % APP_DIR % \\ update_error.log \" " ,
" exit /b % RC % " ,
]
with open ( script_path , " w " , encoding = " utf-8 " , newline = " \r \n " ) as f :
f . write ( " \r \n " . join ( script_lines ) + " \r \n " )
return script_path
def _start_auto_update ( self , download_url , latest_version ) :
def _start_auto_update ( self , download_url , latest_version , checksum_url = " " , download_name = " " ):
if os . name != " nt " :
QMessageBox . information (
self ,
@@ -814,6 +904,7 @@ class VkChatManager(QMainWindow):
unpack_dir = os . path . join ( work_dir , " extracted " )
try :
self . _download_update_archive ( download_url , zip_path )
self . _verify_update_checksum ( zip_path , checksum_url , download_name )
os . makedirs ( unpack_dir , exist_ok = True )
with zipfile . ZipFile ( zip_path , " r " ) as archive :
archive . extractall ( unpack_dir )
@@ -822,7 +913,7 @@ class VkChatManager(QMainWindow):
app_exe = sys . executable
app_dir = os . path . dirname ( app_exe )
exe_name = os . path . basename ( app_exe )
script_path = self . _build_update_script ( app_dir , source_dir , exe_name )
script_path = self . _build_update_script ( app_dir , source_dir , exe_name , os . getpid ( ) )
creation_flags = 0
if hasattr ( subprocess , " CREATE_NEW_PROCESS_GROUP " ) :
@@ -841,7 +932,7 @@ class VkChatManager(QMainWindow):
" Обновление запущено " ,
" Обновление скачано. Приложение будет перезапущено. " ,
)
QTimer . singleShot ( 150 , QApplication. instance ( ) . quit )
QApplication . instance ( ) . quit ( )
return True
except Exception as e :
self . _log_event ( " auto_update_failed " , str ( e ) , level = " ERROR " )