Compare commits
73 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a3e4c188e | ||
|
|
ac2013bcca | ||
| c77ca4652b | |||
| 4ec24c6d0f | |||
| 72edfffd9e | |||
| db5d901435 | |||
| cd5e6e1f6b | |||
| 4b3347a069 | |||
| e0628b1792 | |||
| 201184700f | |||
| 5253c942e8 | |||
| c645d964bf | |||
| 13890fbbfc | |||
| d7494c1092 | |||
| 67f6910435 | |||
| 2c502fe3bf | |||
| 02078282bc | |||
| b1ed97a826 | |||
| 5be8ab9af7 | |||
| 0f07fe250c | |||
| fc0c98ee49 | |||
| e22eac6de3 | |||
| bf7e5e599e | |||
| d1714a86c7 | |||
| 781bf679ff | |||
| 813dafd6b8 | |||
| 965d09d47c | |||
| 1b4760167f | |||
| 039c1fa38a | |||
| df3a4c49c5 | |||
| 8d4bc10cb7 | |||
| a6cee33cf6 | |||
| b30437faef | |||
| 147988242f | |||
| 44deba1382 | |||
| eda8d43b9c | |||
| cf6d6bcbd0 | |||
| 61948a51c6 | |||
| 97c52c5a51 | |||
| 862c2c8899 | |||
| 1013a1ce38 | |||
| f15e71996b | |||
| 34272d01c8 | |||
| 4e6502bab7 | |||
| 89237590c7 | |||
| aca2bdfa85 | |||
| 9d40f0017e | |||
| 798eacbf9a | |||
| a9a394cf7d | |||
| e1e2f8f0e8 | |||
| 4d84d2ebe5 | |||
| 02350cfca1 | |||
| 68fa841857 | |||
| bca9007463 | |||
| 52b1301982 | |||
| 90b3b4fc9d | |||
| 190e67c931 | |||
| 2eb4c52b81 | |||
| 3d73a504d2 | |||
| 1524271be7 | |||
| 561cf43e09 | |||
| e8930f7550 | |||
| c8da0f9191 | |||
| 37ce500fd2 | |||
| 098a84e5bd | |||
| 5aa17c1a84 | |||
| dde14f3714 | |||
| fa5d4c6993 | |||
| f9e0225243 | |||
| c42b23bea5 | |||
| b52cdea425 | |||
| b7fad78a71 | |||
| e590a6cde0 |
35
.gitea/workflows/ci.yml
Normal file
35
.gitea/workflows/ci.yml
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
name: Desktop CI
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
pull_request:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
tests:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: https://git.daemonlord.ru/actions/checkout@v4
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
|
||||||
|
- name: Set up Python
|
||||||
|
uses: https://git.daemonlord.ru/actions/setup-python@v5
|
||||||
|
with:
|
||||||
|
python-version: "3.13"
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
run: |
|
||||||
|
python -m pip install --upgrade pip
|
||||||
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
- name: Validate syntax
|
||||||
|
run: |
|
||||||
|
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_main_contracts.py tests/test_token_store.py tests/test_update_reentry_runtime.py tests/test_update_service.py tests/test_updater_gui.py
|
||||||
|
|
||||||
|
- name: Run tests
|
||||||
|
run: |
|
||||||
|
python -m unittest discover -s tests -p "test_*.py" -v
|
||||||
148
.gitea/workflows/release-dev.yml
Normal file
148
.gitea/workflows/release-dev.yml
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
name: Desktop Dev Pre-release
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- dev
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
prerelease:
|
||||||
|
runs-on: windows
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: https://git.daemonlord.ru/actions/checkout@v4
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
tags: true
|
||||||
|
|
||||||
|
- name: Ensure Python 3.13
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
if (Get-Command python -ErrorAction SilentlyContinue) {
|
||||||
|
python --version
|
||||||
|
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
|
||||||
|
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
|
||||||
|
if (-not $pyExe) {
|
||||||
|
throw "Python 3.13 launcher is available, but interpreter was not found."
|
||||||
|
}
|
||||||
|
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
|
||||||
|
python --version
|
||||||
|
} else {
|
||||||
|
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
python -m pip install --upgrade pip
|
||||||
|
pip install -r requirements.txt pyinstaller
|
||||||
|
|
||||||
|
- name: Extract prerelease metadata
|
||||||
|
id: meta
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
|
||||||
|
$commit = (git rev-parse --short HEAD).Trim()
|
||||||
|
$tag = "v$version-$commit"
|
||||||
|
$archive = "AnabasisManager-$version-$commit"
|
||||||
|
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "commit=$commit`n", $utf8NoBom)
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "tag=$tag`n", $utf8NoBom)
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "archive=$archive`n", $utf8NoBom)
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
|
||||||
|
Write-Host "Detected tag: $tag"
|
||||||
|
|
||||||
|
- name: Stop if prerelease already exists
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$tag = "${{ steps.meta.outputs.tag }}"
|
||||||
|
$apiUrl = "https://git.daemonlord.ru/api/v1/repos/${{ gitea.repository }}/releases?page=1&limit=100"
|
||||||
|
$headers = @{ Authorization = "token ${{ secrets.API_TOKEN }}" }
|
||||||
|
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||||
|
try {
|
||||||
|
$response = Invoke-RestMethod -Uri $apiUrl -Headers $headers -Method Get
|
||||||
|
$found = $false
|
||||||
|
foreach ($release in $response) {
|
||||||
|
if ($release.tag_name -eq $tag) {
|
||||||
|
$found = $true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ($found) {
|
||||||
|
Write-Host "Pre-release $tag already exists, stopping job."
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
|
||||||
|
} else {
|
||||||
|
Write-Host "Pre-release $tag not found, continuing workflow..."
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
Write-Host "Failed to query releases list, continuing workflow..."
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Run tests
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
|
||||||
|
python -m unittest discover -s tests -p "test_*.py" -v
|
||||||
|
|
||||||
|
- name: Build release zip
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
python build.py
|
||||||
|
|
||||||
|
- name: Prepare prerelease artifacts
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = "${{ steps.meta.outputs.version }}"
|
||||||
|
$archiveBase = "${{ steps.meta.outputs.archive }}"
|
||||||
|
$srcZip = "dist/AnabasisManager-$version.zip"
|
||||||
|
$dstZip = "dist/$archiveBase.zip"
|
||||||
|
if (-not (Test-Path $srcZip)) {
|
||||||
|
throw "Archive not found: $srcZip"
|
||||||
|
}
|
||||||
|
Copy-Item -Path $srcZip -Destination $dstZip -Force
|
||||||
|
$hash = (Get-FileHash -Path $dstZip -Algorithm SHA256).Hash.ToLower()
|
||||||
|
"$hash $archiveBase.zip" | Set-Content -Path "dist/$archiveBase.zip.sha256" -Encoding UTF8
|
||||||
|
|
||||||
|
- name: Configure git identity
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
git config user.name "gitea-actions"
|
||||||
|
git config user.email "gitea-actions@daemonlord.ru"
|
||||||
|
|
||||||
|
- name: Create git tag
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$tag = "${{ steps.meta.outputs.tag }}"
|
||||||
|
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
|
||||||
|
if ([string]::IsNullOrWhiteSpace($tagLine)) {
|
||||||
|
git tag "$tag"
|
||||||
|
git push origin "$tag"
|
||||||
|
} else {
|
||||||
|
Write-Host "Tag $tag already exists on origin, skipping tag push."
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Create Gitea Pre-release
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
|
||||||
|
with:
|
||||||
|
server_url: https://git.daemonlord.ru
|
||||||
|
repository: ${{ gitea.repository }}
|
||||||
|
token: ${{ secrets.API_TOKEN }}
|
||||||
|
tag_name: ${{ steps.meta.outputs.tag }}
|
||||||
|
name: Anabasis Manager ${{ steps.meta.outputs.version }} (dev ${{ steps.meta.outputs.commit }})
|
||||||
|
prerelease: true
|
||||||
|
body: |
|
||||||
|
Development pre-release for commit ${{ steps.meta.outputs.commit }}
|
||||||
|
Version base: ${{ steps.meta.outputs.version }}
|
||||||
|
files: |
|
||||||
|
dist/${{ steps.meta.outputs.archive }}.zip
|
||||||
|
dist/${{ steps.meta.outputs.archive }}.zip.sha256
|
||||||
208
.gitea/workflows/release.yml
Normal file
208
.gitea/workflows/release.yml
Normal file
@@ -0,0 +1,208 @@
|
|||||||
|
name: Desktop Release
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
release:
|
||||||
|
runs-on: windows
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: https://git.daemonlord.ru/actions/checkout@v4
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
tags: true
|
||||||
|
|
||||||
|
- name: Ensure Python 3.13
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
if (Get-Command python -ErrorAction SilentlyContinue) {
|
||||||
|
python --version
|
||||||
|
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
|
||||||
|
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
|
||||||
|
if (-not $pyExe) {
|
||||||
|
throw "Python 3.13 launcher is available, but interpreter was not found."
|
||||||
|
}
|
||||||
|
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
|
||||||
|
python --version
|
||||||
|
} else {
|
||||||
|
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
python -m pip install --upgrade pip
|
||||||
|
pip install -r requirements.txt pyinstaller
|
||||||
|
|
||||||
|
- name: Ensure Inno Setup 6
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$isccPath = ""
|
||||||
|
$inPath = Get-Command iscc.exe -ErrorAction SilentlyContinue
|
||||||
|
if ($inPath) {
|
||||||
|
$isccPath = $inPath.Source
|
||||||
|
Write-Host "Inno Setup compiler found in PATH."
|
||||||
|
} elseif (Test-Path "C:\Program Files (x86)\Inno Setup 6\ISCC.exe") {
|
||||||
|
"C:\Program Files (x86)\Inno Setup 6" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
|
||||||
|
$isccPath = "C:\Program Files (x86)\Inno Setup 6\ISCC.exe"
|
||||||
|
Write-Host "Inno Setup compiler found in Program Files (x86)."
|
||||||
|
} elseif (Test-Path "C:\Program Files\Inno Setup 6\ISCC.exe") {
|
||||||
|
"C:\Program Files\Inno Setup 6" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
|
||||||
|
$isccPath = "C:\Program Files\Inno Setup 6\ISCC.exe"
|
||||||
|
Write-Host "Inno Setup compiler found in Program Files."
|
||||||
|
} else {
|
||||||
|
throw "Inno Setup 6 is not installed on runner. Install Inno Setup and restart runner service."
|
||||||
|
}
|
||||||
|
Write-Host "Using ISCC: $isccPath"
|
||||||
|
exit 0
|
||||||
|
|
||||||
|
- name: Extract app version
|
||||||
|
id: extract_version
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
|
||||||
|
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
|
||||||
|
Write-Host "Detected version: $version"
|
||||||
|
|
||||||
|
- name: Initialize release flow
|
||||||
|
id: flow_init
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
|
||||||
|
exit 0
|
||||||
|
|
||||||
|
- name: Stop if release already exists
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = "${{ steps.extract_version.outputs.version }}"
|
||||||
|
$tag = "v$version"
|
||||||
|
$apiUrl = "https://git.daemonlord.ru/api/v1/repos/${{ gitea.repository }}/releases?page=1&limit=100"
|
||||||
|
$headers = @{ Authorization = "token ${{ secrets.API_TOKEN }}" }
|
||||||
|
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||||
|
try {
|
||||||
|
$response = Invoke-RestMethod -Uri $apiUrl -Headers $headers -Method Get
|
||||||
|
$found = $false
|
||||||
|
foreach ($release in $response) {
|
||||||
|
if ($release.tag_name -eq $tag) {
|
||||||
|
$found = $true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ($found) {
|
||||||
|
Write-Host "Release $tag already exists, stopping job."
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
|
||||||
|
} else {
|
||||||
|
Write-Host "Release $tag not found, continuing workflow..."
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
Write-Host "Failed to query releases list, continuing workflow..."
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Run tests
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
|
||||||
|
python -m unittest discover -s tests -p "test_*.py" -v
|
||||||
|
|
||||||
|
- name: Build release zip
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
env:
|
||||||
|
PYTHONUTF8: "1"
|
||||||
|
PYTHONIOENCODING: "utf-8"
|
||||||
|
run: |
|
||||||
|
$ErrorActionPreference = "Continue"
|
||||||
|
$repoRoot = (git rev-parse --show-toplevel).Trim()
|
||||||
|
if (-not [string]::IsNullOrWhiteSpace($repoRoot)) {
|
||||||
|
Set-Location $repoRoot
|
||||||
|
}
|
||||||
|
$logDir = Join-Path $env:RUNNER_TEMP "anabasis-build"
|
||||||
|
New-Item -ItemType Directory -Force -Path $logDir | Out-Null
|
||||||
|
$buildLog = Join-Path $logDir "build.log"
|
||||||
|
python build.py *>&1 | Tee-Object -FilePath $buildLog
|
||||||
|
$code = $LASTEXITCODE
|
||||||
|
if ($code -ne 0) {
|
||||||
|
Write-Host "Build failed with exit code $code. Dumping build log:"
|
||||||
|
if (Test-Path $buildLog) {
|
||||||
|
Get-Content -Path $buildLog -Raw
|
||||||
|
} else {
|
||||||
|
Write-Host "Build log was not created: $buildLog"
|
||||||
|
}
|
||||||
|
exit $code
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Ensure artifacts exist
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = "${{ steps.extract_version.outputs.version }}"
|
||||||
|
$archivePath = "dist/AnabasisManager-$version.zip"
|
||||||
|
$installerPath = "dist/AnabasisManager-setup-$version.exe"
|
||||||
|
if (-not (Test-Path $archivePath)) {
|
||||||
|
throw "Archive not found: $archivePath"
|
||||||
|
}
|
||||||
|
if (-not (Test-Path $installerPath)) {
|
||||||
|
throw "Installer not found: $installerPath"
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Generate SHA256 checksum
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = "${{ steps.extract_version.outputs.version }}"
|
||||||
|
$archiveName = "AnabasisManager-$version.zip"
|
||||||
|
$installerName = "AnabasisManager-setup-$version.exe"
|
||||||
|
foreach ($name in @($archiveName, $installerName)) {
|
||||||
|
$path = "dist/$name"
|
||||||
|
$checksumPath = "dist/$name.sha256"
|
||||||
|
$hash = (Get-FileHash -Path $path -Algorithm SHA256).Hash.ToLower()
|
||||||
|
"$hash $name" | Set-Content -Path $checksumPath -Encoding UTF8
|
||||||
|
Write-Host "Checksum created: $checksumPath"
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Configure git identity
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
git config user.name "gitea-actions"
|
||||||
|
git config user.email "gitea-actions@daemonlord.ru"
|
||||||
|
|
||||||
|
- name: Create git tag
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = "${{ steps.extract_version.outputs.version }}"
|
||||||
|
$tag = "v$version"
|
||||||
|
$sha = "${{ gitea.sha }}"
|
||||||
|
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
|
||||||
|
if ([string]::IsNullOrWhiteSpace($tagLine)) {
|
||||||
|
git tag -a "$tag" -m "Release $tag" "$sha"
|
||||||
|
git push origin "$tag"
|
||||||
|
} else {
|
||||||
|
Write-Host "Tag $tag already exists on origin, skipping tag push."
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Create Gitea Release
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
|
||||||
|
with:
|
||||||
|
server_url: https://git.daemonlord.ru
|
||||||
|
repository: ${{ gitea.repository }}
|
||||||
|
token: ${{ secrets.API_TOKEN }}
|
||||||
|
tag_name: v${{ steps.extract_version.outputs.version }}
|
||||||
|
target_commitish: ${{ gitea.sha }}
|
||||||
|
name: Anabasis Manager ${{ steps.extract_version.outputs.version }}
|
||||||
|
body: |
|
||||||
|
Desktop release v${{ steps.extract_version.outputs.version }}
|
||||||
|
files: |
|
||||||
|
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip
|
||||||
|
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256
|
||||||
|
dist/AnabasisManager-setup-${{ steps.extract_version.outputs.version }}.exe
|
||||||
|
dist/AnabasisManager-setup-${{ steps.extract_version.outputs.version }}.exe.sha256
|
||||||
9
.gitignore
vendored
9
.gitignore
vendored
@@ -3,4 +3,11 @@
|
|||||||
/build_cx/
|
/build_cx/
|
||||||
/build_linux/
|
/build_linux/
|
||||||
/build_win32/
|
/build_win32/
|
||||||
/build_darwin/
|
/build_darwin/
|
||||||
|
.idea/
|
||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
tests/__pycache__/
|
||||||
|
build/
|
||||||
|
dist/
|
||||||
|
AnabasisManager.spec
|
||||||
|
|||||||
15
README.md
15
README.md
@@ -13,9 +13,12 @@
|
|||||||
* Моментальная загрузка всех доступных чатов пользователя.
|
* Моментальная загрузка всех доступных чатов пользователя.
|
||||||
* Групповой выбор чатов («Выбрать все» / «Снять выбор»).
|
* Групповой выбор чатов («Выбрать все» / «Снять выбор»).
|
||||||
* Быстрое обновление списка бесед.
|
* Быстрое обновление списка бесед.
|
||||||
|
* Выполнение массовых действий в фоновом потоке без подвисания интерфейса.
|
||||||
|
* Визуальный прогресс-бар по ходу операции.
|
||||||
* **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`).
|
* **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`).
|
||||||
* **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно.
|
* **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно.
|
||||||
* **🛡 Стабильность:** Улучшенная обработка ошибок VK API и автоматическая реакция на смену IP-адреса.
|
* **🔄 Безопасные обновления:** Проверка SHA256 и защищенная распаковка архива обновления.
|
||||||
|
* **🛡 Стабильность и безопасность:** Улучшенная обработка ошибок VK API, автоматическая реакция на смену IP-адреса и безопасное хранение токена с шифрованием DPAPI в Windows.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -47,7 +50,7 @@
|
|||||||
|
|
||||||
3. **Установите зависимости:**
|
3. **Установите зависимости:**
|
||||||
```bash
|
```bash
|
||||||
pip install PySide6 vk_api
|
pip install -r requirements.txt
|
||||||
```
|
```
|
||||||
|
|
||||||
4. **Запустите приложение:**
|
4. **Запустите приложение:**
|
||||||
@@ -68,6 +71,12 @@
|
|||||||
|
|
||||||
## 📂 Техническая информация
|
## 📂 Техническая информация
|
||||||
|
|
||||||
|
### Последние обновления
|
||||||
|
- Массовые операции VK (`remove/add/admin`) выполняются в фоновом потоке, чтобы интерфейс не зависал; добавлен визуальный прогресс-бар.
|
||||||
|
- Распаковка архива автообновления теперь валидирует пути перед извлечением для защиты от path traversal.
|
||||||
|
- Проверка обновлений переведена на `QThread` (модель потоков Qt) вместо Python `threading.Thread`.
|
||||||
|
- В Windows сохранение токена требует успешного шифрования через DPAPI; при ошибке шифрования сессия продолжается, но токен не сохраняется на диск.
|
||||||
|
|
||||||
### Сборка проекта (для разработчиков)
|
### Сборка проекта (для разработчиков)
|
||||||
Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`.
|
Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`.
|
||||||
|
|
||||||
@@ -101,4 +110,4 @@ python build.py
|
|||||||
---
|
---
|
||||||
Проект распространяется под лицензией MIT.
|
Проект распространяется под лицензией MIT.
|
||||||
|
|
||||||
Сэкономьте часы ручного труда с Anabasis VK Chat Manager.
|
Сэкономьте часы ручного труда с Anabasis VK Chat Manager.
|
||||||
|
|||||||
1
app_version.py
Normal file
1
app_version.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
APP_VERSION = "2.2.5"
|
||||||
@@ -74,5 +74,21 @@ def main_auth(auth_url, output_path):
|
|||||||
webview.start(private_mode=False, storage_path=storage_path)
|
webview.start(private_mode=False, storage_path=storage_path)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# Supports both: `python auth_webview.py <auth_url> <output_path>`
|
||||||
|
# and: `python auth_webview.py --auth <auth_url> <output_path>`
|
||||||
|
args = sys.argv[1:]
|
||||||
|
if len(args) == 3 and args[0] == "--auth":
|
||||||
|
auth_url, output_path = args[1], args[2]
|
||||||
|
elif len(args) == 2:
|
||||||
|
auth_url, output_path = args[0], args[1]
|
||||||
|
else:
|
||||||
|
print("Usage: auth_webview.py [--auth] <auth_url> <output_path>")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
main_auth(auth_url, output_path)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
sys.exit(main())
|
||||||
|
|||||||
201
build.py
201
build.py
@@ -2,15 +2,20 @@ import os
|
|||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
from app_version import APP_VERSION
|
||||||
|
|
||||||
# --- Конфигурация ---
|
# --- Configuration ---
|
||||||
APP_NAME = "AnabasisManager"
|
APP_NAME = "AnabasisManager"
|
||||||
VERSION = "1.5" # Ваша версия
|
UPDATER_NAME = "AnabasisUpdater"
|
||||||
|
VERSION = APP_VERSION # Единая версия приложения
|
||||||
MAIN_SCRIPT = "main.py"
|
MAIN_SCRIPT = "main.py"
|
||||||
|
UPDATER_SCRIPT = "updater_gui.py"
|
||||||
ICON_PATH = "icon.ico"
|
ICON_PATH = "icon.ico"
|
||||||
|
INSTALLER_SCRIPT = os.path.join("installer", "AnabasisManager.iss")
|
||||||
DIST_DIR = os.path.join("dist", APP_NAME)
|
DIST_DIR = os.path.join("dist", APP_NAME)
|
||||||
ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия
|
ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия
|
||||||
SAFE_CLEAN_ROOT_FILES = {"main.py", "requirements.txt", "build.py"}
|
INSTALLER_NAME = f"{APP_NAME}-setup-{VERSION}.exe"
|
||||||
|
SAFE_CLEAN_ROOT_FILES = {"main.py", "updater_gui.py", "requirements.txt", "build.py"}
|
||||||
REMOVE_LIST = [
|
REMOVE_LIST = [
|
||||||
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
|
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
|
||||||
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
|
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
|
||||||
@@ -21,16 +26,46 @@ REMOVE_LIST = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def write_version_marker():
|
||||||
|
marker_path = os.path.join(DIST_DIR, "version.txt")
|
||||||
|
try:
|
||||||
|
os.makedirs(DIST_DIR, exist_ok=True)
|
||||||
|
with open(marker_path, "w", encoding="utf-8") as f:
|
||||||
|
f.write(str(VERSION).strip() + "\n")
|
||||||
|
print(f"[OK] Version marker written: {marker_path}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] Failed to write version.txt: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def copy_icon_to_dist():
|
||||||
|
icon_abs_path = os.path.abspath(ICON_PATH)
|
||||||
|
if not os.path.exists(icon_abs_path):
|
||||||
|
print("[WARN] icon.ico not found, skipping icon copy into dist.")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
os.makedirs("dist", exist_ok=True)
|
||||||
|
os.makedirs(DIST_DIR, exist_ok=True)
|
||||||
|
shutil.copy2(icon_abs_path, os.path.join("dist", "icon.ico"))
|
||||||
|
shutil.copy2(icon_abs_path, os.path.join(DIST_DIR, "icon.ico"))
|
||||||
|
print("[OK] Icon copied to dist/icon.ico and dist/AnabasisManager/icon.ico")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] Failed to copy icon.ico into dist: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
def ensure_project_root():
|
def ensure_project_root():
|
||||||
missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)]
|
missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)]
|
||||||
if missing:
|
if missing:
|
||||||
print("[ERROR] Скрипт нужно запускать из корня проекта.")
|
print("[ERROR] Run this script from the project root.")
|
||||||
print(f"[ERROR] Не найдены: {', '.join(missing)}")
|
print(f"[ERROR] Missing files: {', '.join(missing)}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
def run_build():
|
def run_build():
|
||||||
print(f"--- 1. Запуск PyInstaller для {APP_NAME} v{VERSION} ---")
|
print(f"--- 1. Running PyInstaller for {APP_NAME} v{VERSION} ---")
|
||||||
|
icon_abs_path = os.path.abspath(ICON_PATH)
|
||||||
|
has_icon = os.path.exists(icon_abs_path)
|
||||||
|
|
||||||
command = [
|
command = [
|
||||||
"pyinstaller",
|
"pyinstaller",
|
||||||
@@ -41,8 +76,8 @@ def run_build():
|
|||||||
"--exclude-module", "PySide6.QtWebEngineWidgets",
|
"--exclude-module", "PySide6.QtWebEngineWidgets",
|
||||||
"--exclude-module", "PySide6.QtWebEngineQuick",
|
"--exclude-module", "PySide6.QtWebEngineQuick",
|
||||||
f"--name={APP_NAME}",
|
f"--name={APP_NAME}",
|
||||||
f"--icon={ICON_PATH}" if os.path.exists(ICON_PATH) else "",
|
f"--icon={icon_abs_path}" if has_icon else "",
|
||||||
f"--add-data={ICON_PATH}{os.pathsep}." if os.path.exists(ICON_PATH) else "",
|
f"--add-data={icon_abs_path}{os.pathsep}." if has_icon else "",
|
||||||
f"--add-data=auth_webview.py{os.pathsep}.",
|
f"--add-data=auth_webview.py{os.pathsep}.",
|
||||||
MAIN_SCRIPT
|
MAIN_SCRIPT
|
||||||
]
|
]
|
||||||
@@ -51,14 +86,44 @@ def run_build():
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
subprocess.check_call(command)
|
subprocess.check_call(command)
|
||||||
print("\n[OK] Сборка PyInstaller завершена.")
|
print("\n[OK] PyInstaller build completed.")
|
||||||
except subprocess.CalledProcessError as e:
|
except subprocess.CalledProcessError as e:
|
||||||
print(f"\n[ERROR] Ошибка при сборке: {e}")
|
print(f"\n[ERROR] Build failed: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def run_updater_build():
|
||||||
|
print(f"\n--- 1.2 Building {UPDATER_NAME} ---")
|
||||||
|
icon_abs_path = os.path.abspath(ICON_PATH)
|
||||||
|
has_icon = os.path.exists(icon_abs_path)
|
||||||
|
updater_spec_dir = os.path.join("build", "updater_spec")
|
||||||
|
updater_spec_path = os.path.join(updater_spec_dir, f"{UPDATER_NAME}.spec")
|
||||||
|
if os.path.exists(updater_spec_path):
|
||||||
|
os.remove(updater_spec_path)
|
||||||
|
command = [
|
||||||
|
"pyinstaller",
|
||||||
|
"--noconfirm",
|
||||||
|
"--clean",
|
||||||
|
"--onefile",
|
||||||
|
"--windowed",
|
||||||
|
f"--name={UPDATER_NAME}",
|
||||||
|
"--distpath", DIST_DIR,
|
||||||
|
"--workpath", os.path.join("build", "updater"),
|
||||||
|
"--specpath", updater_spec_dir,
|
||||||
|
f"--icon={icon_abs_path}" if has_icon else "",
|
||||||
|
UPDATER_SCRIPT,
|
||||||
|
]
|
||||||
|
command = [arg for arg in command if arg]
|
||||||
|
try:
|
||||||
|
subprocess.check_call(command)
|
||||||
|
print(f"[OK] {UPDATER_NAME} built.")
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
print(f"[ERROR] Failed to build {UPDATER_NAME}: {e}")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
def run_cleanup():
|
def run_cleanup():
|
||||||
print(f"\n--- 2. Оптимизация папки {APP_NAME} ---")
|
print(f"\n--- 2. Optimizing {APP_NAME} folder ---")
|
||||||
|
|
||||||
# Пытаемся найти папку PySide6 внутри сборки
|
# Пытаемся найти папку PySide6 внутри сборки
|
||||||
pyside_path = os.path.join(DIST_DIR, "PySide6")
|
pyside_path = os.path.join(DIST_DIR, "PySide6")
|
||||||
@@ -73,21 +138,116 @@ def run_cleanup():
|
|||||||
shutil.rmtree(path)
|
shutil.rmtree(path)
|
||||||
else:
|
else:
|
||||||
os.remove(path)
|
os.remove(path)
|
||||||
print(f"Удалено: {item}")
|
print(f"Removed: {item}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Пропуск {item}: {e}")
|
print(f"Skipped {item}: {e}")
|
||||||
|
|
||||||
|
|
||||||
def create_archive():
|
def create_archive():
|
||||||
print(f"\n--- 3. Создание архива {ARCHIVE_NAME}.zip ---")
|
print(f"\n--- 3. Creating archive {ARCHIVE_NAME}.zip ---")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Создаем zip-архив из папки DIST_DIR
|
# Создаем zip-архив из папки DIST_DIR
|
||||||
# base_name - имя файла без расширения, format - 'zip', root_dir - что упаковываем
|
# base_name - имя файла без расширения, format - 'zip', root_dir - что упаковываем
|
||||||
shutil.make_archive(os.path.join("dist", ARCHIVE_NAME), 'zip', DIST_DIR)
|
shutil.make_archive(os.path.join("dist", ARCHIVE_NAME), 'zip', DIST_DIR)
|
||||||
print(f"[OK] Архив создан: dist/{ARCHIVE_NAME}.zip")
|
print(f"[OK] Archive created: dist/{ARCHIVE_NAME}.zip")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] Не удалось создать архив: {e}")
|
print(f"[ERROR] Failed to create archive: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def _find_iscc():
|
||||||
|
candidates = []
|
||||||
|
iscc_env = os.getenv("ISCC_PATH", "").strip()
|
||||||
|
if iscc_env:
|
||||||
|
candidates.append(iscc_env)
|
||||||
|
candidates.append(shutil.which("iscc"))
|
||||||
|
candidates.append(shutil.which("ISCC.exe"))
|
||||||
|
candidates.append(r"C:\Program Files (x86)\Inno Setup 6\ISCC.exe")
|
||||||
|
candidates.append(r"C:\Program Files\Inno Setup 6\ISCC.exe")
|
||||||
|
for candidate in candidates:
|
||||||
|
if candidate and os.path.exists(candidate):
|
||||||
|
return candidate
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_process_output(raw_bytes):
|
||||||
|
if raw_bytes is None:
|
||||||
|
return ""
|
||||||
|
if isinstance(raw_bytes, str):
|
||||||
|
return raw_bytes
|
||||||
|
for enc in ("utf-8-sig", "utf-16", "utf-16-le", "cp1251", "cp866", "latin-1"):
|
||||||
|
try:
|
||||||
|
return raw_bytes.decode(enc)
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
return raw_bytes.decode("utf-8", errors="replace")
|
||||||
|
|
||||||
|
|
||||||
|
def build_installer():
|
||||||
|
print(f"\n--- 4. Building installer {INSTALLER_NAME} ---")
|
||||||
|
if os.name != "nt":
|
||||||
|
print("[INFO] Inno Setup installer is built only on Windows. Step skipped.")
|
||||||
|
return
|
||||||
|
if not os.path.exists(INSTALLER_SCRIPT):
|
||||||
|
print(f"[ERROR] Installer script not found: {INSTALLER_SCRIPT}")
|
||||||
|
sys.exit(1)
|
||||||
|
if not os.path.exists(DIST_DIR):
|
||||||
|
print(f"[ERROR] Build output folder not found: {DIST_DIR}")
|
||||||
|
sys.exit(1)
|
||||||
|
iscc_path = _find_iscc()
|
||||||
|
if not iscc_path:
|
||||||
|
print("[ERROR] Inno Setup Compiler (ISCC.exe) not found.")
|
||||||
|
print("[ERROR] Install Inno Setup 6 or set ISCC_PATH environment variable.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
project_root = os.path.abspath(".")
|
||||||
|
source_dir = os.path.abspath(DIST_DIR)
|
||||||
|
output_dir = os.path.abspath("dist")
|
||||||
|
iss_path = os.path.abspath(INSTALLER_SCRIPT)
|
||||||
|
icon_path = os.path.abspath(ICON_PATH)
|
||||||
|
print(f"[INFO] ISCC source dir: {source_dir}")
|
||||||
|
print(f"[INFO] ISCC output dir: {output_dir}")
|
||||||
|
print(f"[INFO] ISCC script: {iss_path}")
|
||||||
|
print(f"[INFO] ISCC icon path: {icon_path}")
|
||||||
|
if not os.path.exists(source_dir):
|
||||||
|
print(f"[ERROR] Source dir does not exist: {source_dir}")
|
||||||
|
sys.exit(1)
|
||||||
|
if not os.path.exists(iss_path):
|
||||||
|
print(f"[ERROR] Installer script does not exist: {iss_path}")
|
||||||
|
sys.exit(1)
|
||||||
|
if not os.path.exists(icon_path):
|
||||||
|
print(f"[ERROR] Icon file does not exist: {icon_path}")
|
||||||
|
sys.exit(1)
|
||||||
|
command = [
|
||||||
|
iscc_path,
|
||||||
|
f"/DMyAppVersion={VERSION}",
|
||||||
|
f"/DMyIconFile={icon_path}",
|
||||||
|
f"/O{output_dir}",
|
||||||
|
iss_path,
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
completed = subprocess.run(
|
||||||
|
command,
|
||||||
|
capture_output=True,
|
||||||
|
cwd=project_root,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
stdout_text = _decode_process_output(completed.stdout)
|
||||||
|
stderr_text = _decode_process_output(completed.stderr)
|
||||||
|
if stdout_text:
|
||||||
|
print(stdout_text.rstrip())
|
||||||
|
if stderr_text:
|
||||||
|
print(stderr_text.rstrip())
|
||||||
|
if completed.returncode != 0:
|
||||||
|
raise RuntimeError(f"ISCC exited with code {completed.returncode}")
|
||||||
|
installer_path = os.path.join("dist", INSTALLER_NAME)
|
||||||
|
if not os.path.exists(installer_path):
|
||||||
|
print(f"[ERROR] Installer was not created: {installer_path}")
|
||||||
|
sys.exit(1)
|
||||||
|
print(f"[OK] Installer created: {installer_path}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] Failed to build installer: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
@@ -98,10 +258,15 @@ if __name__ == "__main__":
|
|||||||
shutil.rmtree(folder)
|
shutil.rmtree(folder)
|
||||||
|
|
||||||
run_build()
|
run_build()
|
||||||
|
run_updater_build()
|
||||||
run_cleanup()
|
run_cleanup()
|
||||||
|
copy_icon_to_dist()
|
||||||
|
write_version_marker()
|
||||||
create_archive()
|
create_archive()
|
||||||
|
build_installer()
|
||||||
|
|
||||||
print("\n" + "=" * 30)
|
print("\n" + "=" * 30)
|
||||||
print("ПРОЦЕСС ЗАВЕРШЕН")
|
print("BUILD COMPLETED")
|
||||||
print(f"Файл для отправки: dist/{ARCHIVE_NAME}.zip")
|
print(f"Release archive: dist/{ARCHIVE_NAME}.zip")
|
||||||
|
print(f"Installer: dist/{INSTALLER_NAME}")
|
||||||
print("=" * 30)
|
print("=" * 30)
|
||||||
|
|||||||
42
installer/AnabasisManager.iss
Normal file
42
installer/AnabasisManager.iss
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
#define MyAppName "Anabasis Manager"
|
||||||
|
#ifndef MyAppVersion
|
||||||
|
#define MyAppVersion "0.0.0"
|
||||||
|
#endif
|
||||||
|
#ifndef MyIconFile
|
||||||
|
#define MyIconFile "..\icon.ico"
|
||||||
|
#endif
|
||||||
|
|
||||||
|
[Setup]
|
||||||
|
AppId={{6CD9D6F2-4B95-4E9C-A8D8-2A9C8F6AA741}
|
||||||
|
AppName={#MyAppName}
|
||||||
|
AppVersion={#MyAppVersion}
|
||||||
|
AppPublisher=Benya
|
||||||
|
DefaultDirName={localappdata}\Programs\Anabasis Manager
|
||||||
|
DefaultGroupName=Anabasis Manager
|
||||||
|
DisableProgramGroupPage=yes
|
||||||
|
PrivilegesRequired=lowest
|
||||||
|
OutputDir=..\dist
|
||||||
|
OutputBaseFilename=AnabasisManager-setup-{#MyAppVersion}
|
||||||
|
Compression=lzma2
|
||||||
|
SolidCompression=yes
|
||||||
|
WizardStyle=modern
|
||||||
|
ArchitecturesInstallIn64BitMode=x64compatible
|
||||||
|
UninstallDisplayIcon={app}\AnabasisManager.exe
|
||||||
|
SetupIconFile={#MyIconFile}
|
||||||
|
|
||||||
|
[Languages]
|
||||||
|
Name: "russian"; MessagesFile: "compiler:Languages\Russian.isl"
|
||||||
|
Name: "english"; MessagesFile: "compiler:Default.isl"
|
||||||
|
|
||||||
|
[Tasks]
|
||||||
|
Name: "desktopicon"; Description: "Создать ярлык на рабочем столе"; GroupDescription: "Дополнительные задачи:"
|
||||||
|
|
||||||
|
[Files]
|
||||||
|
Source: "..\dist\AnabasisManager\*"; DestDir: "{app}"; Flags: ignoreversion recursesubdirs createallsubdirs
|
||||||
|
|
||||||
|
[Icons]
|
||||||
|
Name: "{group}\Anabasis Manager"; Filename: "{app}\AnabasisManager.exe"
|
||||||
|
Name: "{autodesktop}\Anabasis Manager"; Filename: "{app}\AnabasisManager.exe"; Tasks: desktopicon
|
||||||
|
|
||||||
|
[Run]
|
||||||
|
Filename: "{app}\AnabasisManager.exe"; Description: "Запустить Anabasis Manager"; Flags: nowait postinstall skipifsilent
|
||||||
5
services/__init__.py
Normal file
5
services/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
from .auto_update_service import AutoUpdateService
|
||||||
|
from .chat_actions import load_chat_conversations, resolve_user_ids
|
||||||
|
from .token_store import load_token, save_token
|
||||||
|
from .update_service import UpdateChecker, detect_update_repository_url
|
||||||
|
from .vk_service import VkService
|
||||||
232
services/auto_update_service.py
Normal file
232
services/auto_update_service.py
Normal file
@@ -0,0 +1,232 @@
|
|||||||
|
import hashlib
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
import urllib.request
|
||||||
|
import zipfile
|
||||||
|
|
||||||
|
|
||||||
|
class AutoUpdateService:
|
||||||
|
@staticmethod
|
||||||
|
def _safe_extract_zip(archive, destination_dir):
|
||||||
|
destination_real = os.path.realpath(destination_dir)
|
||||||
|
for member in archive.infolist():
|
||||||
|
member_name = member.filename or ""
|
||||||
|
if not member_name:
|
||||||
|
continue
|
||||||
|
target_path = os.path.realpath(os.path.join(destination_dir, member_name))
|
||||||
|
if target_path != destination_real and not target_path.startswith(destination_real + os.sep):
|
||||||
|
raise RuntimeError(f"Unsafe path in update archive: {member_name}")
|
||||||
|
archive.extractall(destination_dir)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def download_update_archive(download_url, destination_path):
|
||||||
|
request = urllib.request.Request(
|
||||||
|
download_url,
|
||||||
|
headers={"User-Agent": "AnabasisManager-Updater"},
|
||||||
|
)
|
||||||
|
with urllib.request.urlopen(request, timeout=60) as response:
|
||||||
|
with open(destination_path, "wb") as f:
|
||||||
|
shutil.copyfileobj(response, f)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def download_update_text(url):
|
||||||
|
request = urllib.request.Request(
|
||||||
|
url,
|
||||||
|
headers={"User-Agent": "AnabasisManager-Updater"},
|
||||||
|
)
|
||||||
|
with urllib.request.urlopen(request, timeout=30) as response:
|
||||||
|
return response.read().decode("utf-8", errors="replace")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def sha256_file(path):
|
||||||
|
digest = hashlib.sha256()
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
||||||
|
digest.update(chunk)
|
||||||
|
return digest.hexdigest().lower()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_sha256_from_text(checksum_text, target_file_name):
|
||||||
|
target = (target_file_name or "").strip().lower()
|
||||||
|
for raw_line in checksum_text.splitlines():
|
||||||
|
line = raw_line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
|
||||||
|
if not match:
|
||||||
|
continue
|
||||||
|
checksum = match.group(1).lower()
|
||||||
|
if not target:
|
||||||
|
return checksum
|
||||||
|
line_lower = line.lower()
|
||||||
|
if target in line_lower:
|
||||||
|
return checksum
|
||||||
|
if os.path.basename(target) in line_lower:
|
||||||
|
return checksum
|
||||||
|
return ""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def verify_update_checksum(cls, zip_path, checksum_url, download_name):
|
||||||
|
if not checksum_url:
|
||||||
|
raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
|
||||||
|
checksum_text = cls.download_update_text(checksum_url)
|
||||||
|
expected_hash = cls.extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
|
||||||
|
if not expected_hash:
|
||||||
|
raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
|
||||||
|
actual_hash = cls.sha256_file(zip_path)
|
||||||
|
if actual_hash != expected_hash:
|
||||||
|
raise RuntimeError("SHA256 не совпадает, обновление отменено.")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def locate_extracted_root(extracted_dir):
|
||||||
|
entries = []
|
||||||
|
for name in os.listdir(extracted_dir):
|
||||||
|
full_path = os.path.join(extracted_dir, name)
|
||||||
|
if os.path.isdir(full_path):
|
||||||
|
entries.append(full_path)
|
||||||
|
if len(entries) == 1:
|
||||||
|
candidate = entries[0]
|
||||||
|
if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
|
||||||
|
return candidate
|
||||||
|
return extracted_dir
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_update_script(app_dir, source_dir, exe_name, target_pid):
|
||||||
|
script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
|
||||||
|
script_lines = [
|
||||||
|
"@echo off",
|
||||||
|
"setlocal EnableExtensions",
|
||||||
|
f"set \"APP_DIR={app_dir}\"",
|
||||||
|
f"set \"SRC_DIR={source_dir}\"",
|
||||||
|
f"set \"EXE_NAME={exe_name}\"",
|
||||||
|
f"set \"TARGET_PID={target_pid}\"",
|
||||||
|
"set \"BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%\"",
|
||||||
|
"set \"UPDATE_LOG=%APP_DIR%\\update_error.log\"",
|
||||||
|
"echo [%DATE% %TIME%] Update start > \"%UPDATE_LOG%\"",
|
||||||
|
"if not exist \"%SRC_DIR%\\%EXE_NAME%\" (",
|
||||||
|
" echo Source executable not found: \"%SRC_DIR%\\%EXE_NAME%\" >> \"%UPDATE_LOG%\"",
|
||||||
|
" exit /b 3",
|
||||||
|
")",
|
||||||
|
"set /a WAIT_LOOPS=0",
|
||||||
|
":wait_for_exit",
|
||||||
|
"tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
|
||||||
|
"if %ERRORLEVEL% EQU 0 (",
|
||||||
|
" set /a WAIT_LOOPS+=1",
|
||||||
|
" if %WAIT_LOOPS% GEQ 180 (",
|
||||||
|
" echo Timeout waiting for process %TARGET_PID%, attempting force stop >> \"%UPDATE_LOG%\"",
|
||||||
|
" taskkill /PID %TARGET_PID% /T /F >nul 2>&1",
|
||||||
|
" timeout /t 2 /nobreak >nul",
|
||||||
|
" tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
|
||||||
|
" if %ERRORLEVEL% EQU 0 goto :pid_still_running",
|
||||||
|
" goto :wait_image_unlock",
|
||||||
|
" )",
|
||||||
|
" timeout /t 1 /nobreak >nul",
|
||||||
|
" goto :wait_for_exit",
|
||||||
|
")",
|
||||||
|
":wait_image_unlock",
|
||||||
|
"set /a IMG_LOOPS=0",
|
||||||
|
":check_image",
|
||||||
|
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
|
||||||
|
"if %ERRORLEVEL% EQU 0 (",
|
||||||
|
" set /a IMG_LOOPS+=1",
|
||||||
|
" if %IMG_LOOPS% GEQ 60 goto :image_still_running",
|
||||||
|
" timeout /t 1 /nobreak >nul",
|
||||||
|
" goto :check_image",
|
||||||
|
")",
|
||||||
|
":backup",
|
||||||
|
"timeout /t 1 /nobreak >nul",
|
||||||
|
"mkdir \"%BACKUP_DIR%\" >nul 2>&1",
|
||||||
|
"robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
|
||||||
|
"set \"RC=%ERRORLEVEL%\"",
|
||||||
|
"if %RC% GEQ 8 goto :backup_error",
|
||||||
|
"robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul",
|
||||||
|
"set \"RC=%ERRORLEVEL%\"",
|
||||||
|
"if %RC% GEQ 8 goto :rollback",
|
||||||
|
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
|
||||||
|
"timeout /t 2 /nobreak >nul",
|
||||||
|
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
|
||||||
|
"if %ERRORLEVEL% NEQ 0 goto :rollback",
|
||||||
|
"echo Update success >> \"%UPDATE_LOG%\"",
|
||||||
|
"rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1",
|
||||||
|
"exit /b 0",
|
||||||
|
":rollback",
|
||||||
|
"robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
|
||||||
|
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
|
||||||
|
"echo Auto-update failed. Rollback executed. >> \"%UPDATE_LOG%\"",
|
||||||
|
"exit /b 2",
|
||||||
|
":backup_error",
|
||||||
|
"echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"",
|
||||||
|
"exit /b %RC%",
|
||||||
|
":pid_still_running",
|
||||||
|
"echo Auto-update aborted: process %TARGET_PID% is still running after force stop. >> \"%UPDATE_LOG%\"",
|
||||||
|
"exit /b 4",
|
||||||
|
":image_still_running",
|
||||||
|
"echo Auto-update aborted: %EXE_NAME% still running and file lock may remain. >> \"%UPDATE_LOG%\"",
|
||||||
|
"exit /b 5",
|
||||||
|
]
|
||||||
|
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
|
||||||
|
f.write("\r\n".join(script_lines) + "\r\n")
|
||||||
|
return script_path
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def launch_update_script(script_path, work_dir):
|
||||||
|
creation_flags = 0
|
||||||
|
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
|
||||||
|
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
|
||||||
|
if hasattr(subprocess, "DETACHED_PROCESS"):
|
||||||
|
creation_flags |= subprocess.DETACHED_PROCESS
|
||||||
|
subprocess.Popen(
|
||||||
|
["cmd.exe", "/c", script_path],
|
||||||
|
cwd=work_dir,
|
||||||
|
creationflags=creation_flags,
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def launch_gui_updater(app_exe, source_dir, work_dir, target_pid, version=""):
|
||||||
|
app_dir = os.path.dirname(app_exe)
|
||||||
|
exe_name = os.path.basename(app_exe)
|
||||||
|
updater_exe = os.path.join(app_dir, "AnabasisUpdater.exe")
|
||||||
|
if not os.path.exists(updater_exe):
|
||||||
|
raise RuntimeError("Файл AnabasisUpdater.exe не найден в папке приложения.")
|
||||||
|
|
||||||
|
creation_flags = 0
|
||||||
|
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
|
||||||
|
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
|
||||||
|
if hasattr(subprocess, "DETACHED_PROCESS"):
|
||||||
|
creation_flags |= subprocess.DETACHED_PROCESS
|
||||||
|
|
||||||
|
subprocess.Popen(
|
||||||
|
[
|
||||||
|
updater_exe,
|
||||||
|
"--app-dir",
|
||||||
|
app_dir,
|
||||||
|
"--source-dir",
|
||||||
|
source_dir,
|
||||||
|
"--exe-name",
|
||||||
|
exe_name,
|
||||||
|
"--target-pid",
|
||||||
|
str(target_pid),
|
||||||
|
"--version",
|
||||||
|
str(version or ""),
|
||||||
|
"--work-dir",
|
||||||
|
str(work_dir or ""),
|
||||||
|
],
|
||||||
|
cwd=work_dir,
|
||||||
|
creationflags=creation_flags,
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def prepare_update(cls, download_url, checksum_url, download_name):
|
||||||
|
work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
|
||||||
|
zip_path = os.path.join(work_dir, "update.zip")
|
||||||
|
unpack_dir = os.path.join(work_dir, "extracted")
|
||||||
|
cls.download_update_archive(download_url, zip_path)
|
||||||
|
cls.verify_update_checksum(zip_path, checksum_url, download_name)
|
||||||
|
os.makedirs(unpack_dir, exist_ok=True)
|
||||||
|
with zipfile.ZipFile(zip_path, "r") as archive:
|
||||||
|
cls._safe_extract_zip(archive, unpack_dir)
|
||||||
|
source_dir = cls.locate_extracted_root(unpack_dir)
|
||||||
|
return work_dir, source_dir
|
||||||
105
services/chat_actions.py
Normal file
105
services/chat_actions.py
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_log(log_func, context, message):
|
||||||
|
if not log_func:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
log_func(context, message)
|
||||||
|
except TypeError:
|
||||||
|
log_func(f"{context}: {message}")
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_user_ids(vk_call_with_retry, vk_api, links):
|
||||||
|
resolved_ids = []
|
||||||
|
failed_links = []
|
||||||
|
for link in links:
|
||||||
|
try:
|
||||||
|
path = urlparse(link).path
|
||||||
|
screen_name = path.split("/")[-1] if path else ""
|
||||||
|
if not screen_name and len(path.split("/")) > 1:
|
||||||
|
screen_name = path.split("/")[-2]
|
||||||
|
if not screen_name:
|
||||||
|
failed_links.append((link, None))
|
||||||
|
continue
|
||||||
|
resolved_object = vk_call_with_retry(vk_api.utils.resolveScreenName, screen_name=screen_name)
|
||||||
|
if resolved_object and resolved_object.get("type") == "user":
|
||||||
|
resolved_ids.append(resolved_object["object_id"])
|
||||||
|
else:
|
||||||
|
failed_links.append((link, None))
|
||||||
|
except Exception as e:
|
||||||
|
failed_links.append((link, e))
|
||||||
|
return resolved_ids, failed_links
|
||||||
|
|
||||||
|
|
||||||
|
def load_chat_conversations(vk_call_with_retry, vk_api, log_func=None):
|
||||||
|
conversations = []
|
||||||
|
start_from = None
|
||||||
|
seen_start_tokens = set()
|
||||||
|
total_count = None
|
||||||
|
page_num = 0
|
||||||
|
while True:
|
||||||
|
params = {"count": 200, "filter": "all"}
|
||||||
|
if start_from:
|
||||||
|
if start_from in seen_start_tokens:
|
||||||
|
_safe_log(log_func, "load_chats_page", f"stop duplicate next_from={start_from}")
|
||||||
|
break
|
||||||
|
params["start_from"] = start_from
|
||||||
|
seen_start_tokens.add(start_from)
|
||||||
|
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
|
||||||
|
page_num += 1
|
||||||
|
if total_count is None:
|
||||||
|
total_count = response.get("count")
|
||||||
|
page_items = response.get("items", [])
|
||||||
|
_safe_log(
|
||||||
|
log_func,
|
||||||
|
"load_chats_page",
|
||||||
|
f"page={page_num} items={len(page_items)} next_from={response.get('next_from')} total={total_count}",
|
||||||
|
)
|
||||||
|
if not page_items:
|
||||||
|
break
|
||||||
|
conversations.extend(page_items)
|
||||||
|
start_from = response.get("next_from")
|
||||||
|
if not start_from:
|
||||||
|
break
|
||||||
|
|
||||||
|
if total_count is not None and total_count > len(conversations):
|
||||||
|
_safe_log(
|
||||||
|
log_func,
|
||||||
|
"load_chats_fallback",
|
||||||
|
f"start offset pagination total={total_count} current={len(conversations)}",
|
||||||
|
)
|
||||||
|
seen_keys = set()
|
||||||
|
for conv in conversations:
|
||||||
|
peer = (conv.get("conversation") or {}).get("peer", {})
|
||||||
|
key = (peer.get("type"), peer.get("id") or peer.get("local_id"))
|
||||||
|
seen_keys.add(key)
|
||||||
|
|
||||||
|
offset = len(conversations)
|
||||||
|
safety_pages = 0
|
||||||
|
while offset < total_count:
|
||||||
|
params = {"count": 200, "filter": "all", "offset": offset}
|
||||||
|
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
|
||||||
|
page_items = response.get("items", [])
|
||||||
|
_safe_log(
|
||||||
|
log_func,
|
||||||
|
"load_chats_fallback",
|
||||||
|
f"offset={offset} items={len(page_items)} total={response.get('count')}",
|
||||||
|
)
|
||||||
|
if not page_items:
|
||||||
|
break
|
||||||
|
for item in page_items:
|
||||||
|
peer = (item.get("conversation") or {}).get("peer", {})
|
||||||
|
key = (peer.get("type"), peer.get("id") or peer.get("local_id"))
|
||||||
|
if key in seen_keys:
|
||||||
|
continue
|
||||||
|
seen_keys.add(key)
|
||||||
|
conversations.append(item)
|
||||||
|
offset += len(page_items)
|
||||||
|
safety_pages += 1
|
||||||
|
if safety_pages > 50:
|
||||||
|
_safe_log(log_func, "load_chats_fallback", "stop safety_pages>50")
|
||||||
|
break
|
||||||
|
|
||||||
|
return conversations
|
||||||
|
|
||||||
136
services/token_store.py
Normal file
136
services/token_store.py
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
import base64
|
||||||
|
import ctypes
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from ctypes import wintypes
|
||||||
|
|
||||||
|
|
||||||
|
class _DataBlob(ctypes.Structure):
|
||||||
|
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
|
||||||
|
|
||||||
|
|
||||||
|
_crypt32 = None
|
||||||
|
_kernel32 = None
|
||||||
|
if os.name == "nt":
|
||||||
|
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
|
||||||
|
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
|
||||||
|
_crypt32.CryptProtectData.argtypes = [
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
wintypes.LPCWSTR,
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
ctypes.c_void_p,
|
||||||
|
ctypes.c_void_p,
|
||||||
|
wintypes.DWORD,
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
]
|
||||||
|
_crypt32.CryptProtectData.restype = wintypes.BOOL
|
||||||
|
_crypt32.CryptUnprotectData.argtypes = [
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
ctypes.POINTER(wintypes.LPWSTR),
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
ctypes.c_void_p,
|
||||||
|
ctypes.c_void_p,
|
||||||
|
wintypes.DWORD,
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
]
|
||||||
|
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
|
||||||
|
|
||||||
|
|
||||||
|
def _crypt_protect_data(data, description=""):
|
||||||
|
buffer = ctypes.create_string_buffer(data)
|
||||||
|
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
|
||||||
|
data_out = _DataBlob()
|
||||||
|
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
|
||||||
|
raise ctypes.WinError(ctypes.get_last_error())
|
||||||
|
try:
|
||||||
|
return ctypes.string_at(data_out.pbData, data_out.cbData)
|
||||||
|
finally:
|
||||||
|
_kernel32.LocalFree(data_out.pbData)
|
||||||
|
|
||||||
|
|
||||||
|
def _crypt_unprotect_data(data):
|
||||||
|
buffer = ctypes.create_string_buffer(data)
|
||||||
|
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
|
||||||
|
data_out = _DataBlob()
|
||||||
|
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
|
||||||
|
raise ctypes.WinError(ctypes.get_last_error())
|
||||||
|
try:
|
||||||
|
return ctypes.string_at(data_out.pbData, data_out.cbData)
|
||||||
|
finally:
|
||||||
|
_kernel32.LocalFree(data_out.pbData)
|
||||||
|
|
||||||
|
|
||||||
|
def _encrypt_token(token):
|
||||||
|
if os.name != "nt":
|
||||||
|
raise RuntimeError("DPAPI is available only on Windows.")
|
||||||
|
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
|
||||||
|
return base64.b64encode(encrypted_bytes).decode("ascii")
|
||||||
|
|
||||||
|
|
||||||
|
def _decrypt_token(token_data):
|
||||||
|
if os.name != "nt":
|
||||||
|
raise RuntimeError("DPAPI is available only on Windows.")
|
||||||
|
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
|
||||||
|
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
|
||||||
|
return decrypted_bytes.decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def save_token(token, token_file, app_data_dir, expires_in=0):
|
||||||
|
try:
|
||||||
|
expires_in = int(expires_in)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
expires_in = 0
|
||||||
|
|
||||||
|
os.makedirs(app_data_dir, exist_ok=True)
|
||||||
|
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
|
||||||
|
|
||||||
|
stored_token = token
|
||||||
|
encrypted = False
|
||||||
|
if os.name == "nt":
|
||||||
|
try:
|
||||||
|
stored_token = _encrypt_token(token)
|
||||||
|
encrypted = True
|
||||||
|
except Exception as exc:
|
||||||
|
raise RuntimeError("Failed to securely store token with DPAPI.") from exc
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"token": stored_token,
|
||||||
|
"expiration_time": expiration_time,
|
||||||
|
"encrypted": encrypted,
|
||||||
|
}
|
||||||
|
|
||||||
|
with open(token_file, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(data, f)
|
||||||
|
return expiration_time
|
||||||
|
|
||||||
|
|
||||||
|
def load_token(token_file):
|
||||||
|
if not os.path.exists(token_file):
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
with open(token_file, "r", encoding="utf-8") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
token = data.get("token")
|
||||||
|
encrypted = data.get("encrypted", False)
|
||||||
|
if token and encrypted:
|
||||||
|
try:
|
||||||
|
token = _decrypt_token(token)
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
os.remove(token_file)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
expiration_time = data.get("expiration_time")
|
||||||
|
if token and (expiration_time == 0 or expiration_time > time.time()):
|
||||||
|
return token, expiration_time
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.remove(token_file)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return None, None
|
||||||
|
|
||||||
264
services/update_service.py
Normal file
264
services/update_service.py
Normal file
@@ -0,0 +1,264 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import urllib.error
|
||||||
|
import urllib.request
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
try:
|
||||||
|
from PySide6.QtCore import QObject, Signal
|
||||||
|
except Exception:
|
||||||
|
class _FallbackBoundSignal:
|
||||||
|
def __init__(self):
|
||||||
|
self._callbacks = []
|
||||||
|
|
||||||
|
def connect(self, callback):
|
||||||
|
if callback is not None:
|
||||||
|
self._callbacks.append(callback)
|
||||||
|
|
||||||
|
def emit(self, *args, **kwargs):
|
||||||
|
for callback in list(self._callbacks):
|
||||||
|
callback(*args, **kwargs)
|
||||||
|
|
||||||
|
class _FallbackSignalDescriptor:
|
||||||
|
def __init__(self):
|
||||||
|
self._storage_name = ""
|
||||||
|
|
||||||
|
def __set_name__(self, owner, name):
|
||||||
|
self._storage_name = f"__fallback_signal_{name}"
|
||||||
|
|
||||||
|
def __get__(self, instance, owner):
|
||||||
|
if instance is None:
|
||||||
|
return self
|
||||||
|
signal = instance.__dict__.get(self._storage_name)
|
||||||
|
if signal is None:
|
||||||
|
signal = _FallbackBoundSignal()
|
||||||
|
instance.__dict__[self._storage_name] = signal
|
||||||
|
return signal
|
||||||
|
|
||||||
|
class QObject:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def Signal(*_args, **_kwargs):
|
||||||
|
return _FallbackSignalDescriptor()
|
||||||
|
|
||||||
|
|
||||||
|
def _version_key(version_text):
|
||||||
|
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
|
||||||
|
if not parts:
|
||||||
|
return (0, 0, 0)
|
||||||
|
while len(parts) < 3:
|
||||||
|
parts.append(0)
|
||||||
|
return tuple(parts[:3])
|
||||||
|
|
||||||
|
|
||||||
|
def _is_newer_version(latest_version, current_version):
|
||||||
|
latest_key = _version_key(latest_version)
|
||||||
|
current_key = _version_key(current_version)
|
||||||
|
return latest_key > current_key
|
||||||
|
|
||||||
|
|
||||||
|
def _sanitize_repo_url(value):
|
||||||
|
value = (value or "").strip()
|
||||||
|
if not value:
|
||||||
|
return ""
|
||||||
|
if "://" not in value and value.count("/") == 1:
|
||||||
|
return f"https://github.com/{value}"
|
||||||
|
parsed = urlparse(value)
|
||||||
|
if not parsed.scheme or not parsed.netloc:
|
||||||
|
return ""
|
||||||
|
clean_path = parsed.path.rstrip("/")
|
||||||
|
if clean_path.endswith(".git"):
|
||||||
|
clean_path = clean_path[:-4]
|
||||||
|
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_update_channel(value):
|
||||||
|
channel = (value or "").strip().lower()
|
||||||
|
if channel in ("beta", "betas", "pre", "prerelease", "pre-release"):
|
||||||
|
return "beta"
|
||||||
|
return "stable"
|
||||||
|
|
||||||
|
|
||||||
|
def _select_release_from_list(releases):
|
||||||
|
for item in releases:
|
||||||
|
if not isinstance(item, dict):
|
||||||
|
continue
|
||||||
|
if item.get("draft"):
|
||||||
|
continue
|
||||||
|
tag_name = (item.get("tag_name") or item.get("name") or "").strip()
|
||||||
|
if not tag_name:
|
||||||
|
continue
|
||||||
|
return item
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_release_payload(release_data, repository_url, current_version):
|
||||||
|
parsed = urlparse(repository_url)
|
||||||
|
base_url = f"{parsed.scheme}://{parsed.netloc}"
|
||||||
|
repo_path = parsed.path.strip("/")
|
||||||
|
releases_url = f"{base_url}/{repo_path}/releases"
|
||||||
|
|
||||||
|
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
|
||||||
|
latest_version = latest_tag.lstrip("vV").strip()
|
||||||
|
html_url = release_data.get("html_url") or releases_url
|
||||||
|
release_notes = (release_data.get("body") or "").strip()
|
||||||
|
assets = release_data.get("assets") or []
|
||||||
|
download_url = ""
|
||||||
|
download_name = ""
|
||||||
|
checksum_url = ""
|
||||||
|
installer_url = ""
|
||||||
|
installer_name = ""
|
||||||
|
for asset in assets:
|
||||||
|
url = asset.get("browser_download_url", "")
|
||||||
|
if url.lower().endswith(".zip"):
|
||||||
|
download_url = url
|
||||||
|
download_name = asset.get("name", "")
|
||||||
|
break
|
||||||
|
if not download_url and assets:
|
||||||
|
download_url = assets[0].get("browser_download_url", "")
|
||||||
|
download_name = assets[0].get("name", "")
|
||||||
|
|
||||||
|
for asset in assets:
|
||||||
|
url = asset.get("browser_download_url", "")
|
||||||
|
name = asset.get("name", "")
|
||||||
|
name_lower = name.lower()
|
||||||
|
if installer_url:
|
||||||
|
break
|
||||||
|
if url.lower().endswith(".exe") and ("setup" in name_lower or "installer" in name_lower):
|
||||||
|
installer_url = url
|
||||||
|
installer_name = name
|
||||||
|
|
||||||
|
for asset in assets:
|
||||||
|
name = asset.get("name", "").lower()
|
||||||
|
if not name:
|
||||||
|
continue
|
||||||
|
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
|
||||||
|
if not is_checksum_asset:
|
||||||
|
continue
|
||||||
|
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
|
||||||
|
checksum_url = asset.get("browser_download_url", "")
|
||||||
|
break
|
||||||
|
if not checksum_url:
|
||||||
|
checksum_url = asset.get("browser_download_url", "")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"repository_url": repository_url,
|
||||||
|
"latest_version": latest_version,
|
||||||
|
"current_version": current_version,
|
||||||
|
"latest_tag": latest_tag,
|
||||||
|
"release_url": html_url,
|
||||||
|
"release_notes": release_notes,
|
||||||
|
"download_url": download_url,
|
||||||
|
"download_name": download_name,
|
||||||
|
"installer_url": installer_url,
|
||||||
|
"installer_name": installer_name,
|
||||||
|
"checksum_url": checksum_url,
|
||||||
|
"has_update": _is_newer_version(latest_version, current_version),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def detect_update_repository_url(configured_url="", configured_repo=""):
|
||||||
|
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
|
||||||
|
if env_url:
|
||||||
|
return env_url
|
||||||
|
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
|
||||||
|
if env_repo:
|
||||||
|
return env_repo
|
||||||
|
cfg_url = _sanitize_repo_url(configured_url)
|
||||||
|
if cfg_url:
|
||||||
|
return cfg_url
|
||||||
|
cfg_repo = _sanitize_repo_url(configured_repo)
|
||||||
|
if cfg_repo:
|
||||||
|
return cfg_repo
|
||||||
|
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
|
||||||
|
if not os.path.exists(git_config_path):
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
with open(git_config_path, "r", encoding="utf-8") as f:
|
||||||
|
content = f.read()
|
||||||
|
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
|
||||||
|
if not match:
|
||||||
|
return ""
|
||||||
|
remote = match.group(1).strip()
|
||||||
|
if remote.startswith("git@"):
|
||||||
|
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
|
||||||
|
if ssh_match:
|
||||||
|
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
|
||||||
|
return _sanitize_repo_url(remote)
|
||||||
|
except Exception:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
class UpdateChecker(QObject):
|
||||||
|
check_finished = Signal(dict)
|
||||||
|
check_failed = Signal(str)
|
||||||
|
|
||||||
|
def __init__(self, repository_url, current_version, request_timeout=8, channel="stable"):
|
||||||
|
super().__init__()
|
||||||
|
self.repository_url = repository_url
|
||||||
|
self.current_version = current_version
|
||||||
|
self.request_timeout = request_timeout
|
||||||
|
self.channel = _normalize_update_channel(channel)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
if not self.repository_url:
|
||||||
|
self.check_failed.emit("Не задан URL репозитория обновлений.")
|
||||||
|
return
|
||||||
|
|
||||||
|
parsed = urlparse(self.repository_url)
|
||||||
|
base_url = f"{parsed.scheme}://{parsed.netloc}"
|
||||||
|
repo_path = parsed.path.strip("/")
|
||||||
|
if not repo_path or repo_path.count("/") < 1:
|
||||||
|
self.check_failed.emit("Некорректный URL репозитория обновлений.")
|
||||||
|
return
|
||||||
|
|
||||||
|
use_beta_channel = self.channel == "beta"
|
||||||
|
if parsed.netloc.lower().endswith("github.com"):
|
||||||
|
if use_beta_channel:
|
||||||
|
api_url = f"https://api.github.com/repos/{repo_path}/releases"
|
||||||
|
else:
|
||||||
|
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
|
||||||
|
else:
|
||||||
|
if use_beta_channel:
|
||||||
|
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases"
|
||||||
|
else:
|
||||||
|
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
|
||||||
|
releases_url = f"{base_url}/{repo_path}/releases"
|
||||||
|
request = urllib.request.Request(
|
||||||
|
api_url,
|
||||||
|
headers={
|
||||||
|
"Accept": "application/vnd.github+json",
|
||||||
|
"User-Agent": "AnabasisManager-Updater",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(request, timeout=self.request_timeout) as response:
|
||||||
|
response_data = json.loads(response.read().decode("utf-8"))
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
|
||||||
|
return
|
||||||
|
except urllib.error.URLError as e:
|
||||||
|
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
release_data = response_data
|
||||||
|
if use_beta_channel:
|
||||||
|
if not isinstance(response_data, list):
|
||||||
|
self.check_failed.emit("Сервер вернул некорректный ответ списка релизов.")
|
||||||
|
return
|
||||||
|
release_data = _select_release_from_list(response_data)
|
||||||
|
if not release_data:
|
||||||
|
self.check_failed.emit("В канале beta не найдено доступных релизов.")
|
||||||
|
return
|
||||||
|
elif not isinstance(response_data, dict):
|
||||||
|
self.check_failed.emit("Сервер вернул некорректный ответ релиза.")
|
||||||
|
return
|
||||||
|
|
||||||
|
payload = _extract_release_payload(release_data, self.repository_url, self.current_version)
|
||||||
|
payload["release_channel"] = self.channel
|
||||||
|
payload["releases_url"] = releases_url
|
||||||
|
self.check_finished.emit(payload)
|
||||||
59
services/vk_service.py
Normal file
59
services/vk_service.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
from vk_api import VkApi
|
||||||
|
from vk_api.exceptions import VkApiError
|
||||||
|
|
||||||
|
|
||||||
|
class VkService:
|
||||||
|
def __init__(self):
|
||||||
|
self.session = None
|
||||||
|
self.api = None
|
||||||
|
|
||||||
|
def set_token(self, token):
|
||||||
|
self.session = VkApi(token=token)
|
||||||
|
self.api = self.session.get_api()
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
self.session = None
|
||||||
|
self.api = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_auth_command(auth_url, output_path, entry_script_path=None):
|
||||||
|
if getattr(sys, "frozen", False):
|
||||||
|
return sys.executable, ["--auth", auth_url, output_path]
|
||||||
|
script_path = entry_script_path or os.path.abspath(__file__)
|
||||||
|
return sys.executable, [script_path, "--auth", auth_url, output_path]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def vk_error_code(exc):
|
||||||
|
error = getattr(exc, "error", None)
|
||||||
|
if isinstance(error, dict):
|
||||||
|
return error.get("error_code")
|
||||||
|
return getattr(exc, "code", None)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def is_auth_error(cls, exc, formatted_message=None):
|
||||||
|
code = cls.vk_error_code(exc)
|
||||||
|
if code == 5:
|
||||||
|
return True
|
||||||
|
message = (formatted_message or str(exc)).lower()
|
||||||
|
return "invalid_access_token" in message or "user authorization failed" in message
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def is_retryable_error(cls, exc):
|
||||||
|
return cls.vk_error_code(exc) in (6, 9, 10)
|
||||||
|
|
||||||
|
def call_with_retry(self, func, *args, **kwargs):
|
||||||
|
max_attempts = 5
|
||||||
|
for attempt in range(1, max_attempts + 1):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except VkApiError as e:
|
||||||
|
if not self.is_retryable_error(e) or attempt == max_attempts:
|
||||||
|
raise
|
||||||
|
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
|
||||||
|
if self.vk_error_code(e) == 9:
|
||||||
|
delay = max(delay, 1.0)
|
||||||
|
time.sleep(delay)
|
||||||
51
tests/test_auto_update_service.py
Normal file
51
tests/test_auto_update_service.py
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
import hashlib
|
||||||
|
import importlib.util
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
_SPEC = importlib.util.spec_from_file_location(
|
||||||
|
"auto_update_service",
|
||||||
|
Path("services/auto_update_service.py"),
|
||||||
|
)
|
||||||
|
_MODULE = importlib.util.module_from_spec(_SPEC)
|
||||||
|
_SPEC.loader.exec_module(_MODULE)
|
||||||
|
AutoUpdateService = _MODULE.AutoUpdateService
|
||||||
|
|
||||||
|
|
||||||
|
class AutoUpdateServiceTests(unittest.TestCase):
|
||||||
|
def test_extract_sha256_from_text(self):
|
||||||
|
digest = "a" * 64
|
||||||
|
text = f"{digest} AnabasisManager-1.0.0-win.zip\n"
|
||||||
|
extracted = AutoUpdateService.extract_sha256_from_text(
|
||||||
|
text,
|
||||||
|
"AnabasisManager-1.0.0-win.zip",
|
||||||
|
)
|
||||||
|
self.assertEqual(extracted, digest)
|
||||||
|
|
||||||
|
def test_sha256_file(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "payload.bin"
|
||||||
|
payload = b"anabasis"
|
||||||
|
path.write_bytes(payload)
|
||||||
|
expected = hashlib.sha256(payload).hexdigest()
|
||||||
|
self.assertEqual(AutoUpdateService.sha256_file(str(path)), expected)
|
||||||
|
|
||||||
|
def test_build_update_script_contains_core_vars(self):
|
||||||
|
script = AutoUpdateService.build_update_script(
|
||||||
|
app_dir=r"C:\Apps\AnabasisManager",
|
||||||
|
source_dir=r"C:\Temp\Extracted",
|
||||||
|
exe_name="AnabasisManager.exe",
|
||||||
|
target_pid=1234,
|
||||||
|
)
|
||||||
|
script_text = Path(script).read_text(encoding="utf-8")
|
||||||
|
self.assertIn("set \"APP_DIR=", script_text)
|
||||||
|
self.assertIn("set \"SRC_DIR=", script_text)
|
||||||
|
self.assertIn("set \"EXE_NAME=", script_text)
|
||||||
|
self.assertIn("set \"TARGET_PID=", script_text)
|
||||||
|
self.assertIn(":rollback", script_text)
|
||||||
|
self.assertIn("if not exist \"%SRC_DIR%\\%EXE_NAME%\"", script_text)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
65
tests/test_chat_actions.py
Normal file
65
tests/test_chat_actions.py
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
import unittest
|
||||||
|
import importlib.util
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
_SPEC = importlib.util.spec_from_file_location(
|
||||||
|
"chat_actions",
|
||||||
|
Path("services/chat_actions.py"),
|
||||||
|
)
|
||||||
|
_MODULE = importlib.util.module_from_spec(_SPEC)
|
||||||
|
_SPEC.loader.exec_module(_MODULE)
|
||||||
|
load_chat_conversations = _MODULE.load_chat_conversations
|
||||||
|
resolve_user_ids = _MODULE.resolve_user_ids
|
||||||
|
|
||||||
|
|
||||||
|
class ChatActionsTests(unittest.TestCase):
|
||||||
|
def test_resolve_user_ids_mixed_results(self):
|
||||||
|
mapping = {
|
||||||
|
"id1": {"type": "user", "object_id": 1},
|
||||||
|
"id2": {"type": "group", "object_id": 2},
|
||||||
|
}
|
||||||
|
|
||||||
|
def call_with_retry(func, **kwargs):
|
||||||
|
return func(**kwargs)
|
||||||
|
|
||||||
|
def resolve_screen_name(screen_name):
|
||||||
|
if screen_name == "boom":
|
||||||
|
raise RuntimeError("boom")
|
||||||
|
return mapping.get(screen_name)
|
||||||
|
|
||||||
|
vk_api = SimpleNamespace(utils=SimpleNamespace(resolveScreenName=resolve_screen_name))
|
||||||
|
links = [
|
||||||
|
"https://vk.com/id1",
|
||||||
|
"https://vk.com/id2",
|
||||||
|
"https://vk.com/boom",
|
||||||
|
"https://vk.com/",
|
||||||
|
]
|
||||||
|
resolved, failed = resolve_user_ids(call_with_retry, vk_api, links)
|
||||||
|
|
||||||
|
self.assertEqual(resolved, [1])
|
||||||
|
self.assertEqual(len(failed), 3)
|
||||||
|
self.assertEqual(failed[0][0], "https://vk.com/id2")
|
||||||
|
self.assertIsNone(failed[0][1])
|
||||||
|
|
||||||
|
def test_load_chat_conversations_paginated(self):
|
||||||
|
pages = [
|
||||||
|
{"items": [{"id": 1}], "next_from": "page-2"},
|
||||||
|
{"items": [{"id": 2}]},
|
||||||
|
]
|
||||||
|
|
||||||
|
def get_conversations(**kwargs):
|
||||||
|
if kwargs.get("start_from") == "page-2":
|
||||||
|
return pages[1]
|
||||||
|
return pages[0]
|
||||||
|
|
||||||
|
def call_with_retry(func, **kwargs):
|
||||||
|
return func(**kwargs)
|
||||||
|
|
||||||
|
vk_api = SimpleNamespace(messages=SimpleNamespace(getConversations=get_conversations))
|
||||||
|
items = load_chat_conversations(call_with_retry, vk_api)
|
||||||
|
self.assertEqual(items, [{"id": 1}, {"id": 2}])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
114
tests/test_main_contracts.py
Normal file
114
tests/test_main_contracts.py
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
import ast
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
class MainContractsTests(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
cls.main_source = Path("main.py").read_text(encoding="utf-8-sig")
|
||||||
|
cls.module = ast.parse(cls.main_source)
|
||||||
|
cls.vk_chat_manager = cls._find_class("VkChatManager")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _find_class(cls, class_name):
|
||||||
|
for node in cls.module.body:
|
||||||
|
if isinstance(node, ast.ClassDef) and node.name == class_name:
|
||||||
|
return node
|
||||||
|
raise AssertionError(f"Class {class_name} not found")
|
||||||
|
|
||||||
|
def _find_method(self, method_name):
|
||||||
|
for node in self.vk_chat_manager.body:
|
||||||
|
if isinstance(node, ast.FunctionDef) and node.name == method_name:
|
||||||
|
return node
|
||||||
|
self.fail(f"Method {method_name} not found")
|
||||||
|
|
||||||
|
def _iter_nodes(self, node):
|
||||||
|
return ast.walk(node)
|
||||||
|
|
||||||
|
def test_auth_error_contexts_contains_only_supported_contexts(self):
|
||||||
|
expected_contexts = {"load_chats", "execute_user_action", "set_user_admin"}
|
||||||
|
for node in self.module.body:
|
||||||
|
if isinstance(node, ast.Assign):
|
||||||
|
for target in node.targets:
|
||||||
|
if isinstance(target, ast.Name) and target.id == "AUTH_ERROR_CONTEXTS":
|
||||||
|
actual = set(ast.literal_eval(node.value))
|
||||||
|
self.assertSetEqual(actual, expected_contexts)
|
||||||
|
return
|
||||||
|
self.fail("AUTH_ERROR_CONTEXTS assignment not found")
|
||||||
|
|
||||||
|
def test_check_for_updates_has_reentry_guard(self):
|
||||||
|
method = self._find_method("check_for_updates")
|
||||||
|
has_guard = False
|
||||||
|
for node in method.body:
|
||||||
|
if not isinstance(node, ast.If):
|
||||||
|
continue
|
||||||
|
test = node.test
|
||||||
|
if (
|
||||||
|
isinstance(test, ast.Attribute)
|
||||||
|
and isinstance(test.value, ast.Name)
|
||||||
|
and test.value.id == "self"
|
||||||
|
and test.attr == "_update_in_progress"
|
||||||
|
):
|
||||||
|
has_guard = any(isinstance(stmt, ast.Return) for stmt in node.body)
|
||||||
|
if has_guard:
|
||||||
|
break
|
||||||
|
self.assertTrue(has_guard, "check_for_updates must return when update is already in progress")
|
||||||
|
|
||||||
|
def test_check_for_updates_connects_thread_finish_handler(self):
|
||||||
|
method = self._find_method("check_for_updates")
|
||||||
|
for node in self._iter_nodes(method):
|
||||||
|
if not isinstance(node, ast.Call):
|
||||||
|
continue
|
||||||
|
func = node.func
|
||||||
|
if not (isinstance(func, ast.Attribute) and func.attr == "connect"):
|
||||||
|
continue
|
||||||
|
value = func.value
|
||||||
|
if not (
|
||||||
|
isinstance(value, ast.Attribute)
|
||||||
|
and value.attr == "finished"
|
||||||
|
and isinstance(value.value, ast.Attribute)
|
||||||
|
and value.value.attr == "update_thread"
|
||||||
|
and isinstance(value.value.value, ast.Name)
|
||||||
|
and value.value.value.id == "self"
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
if len(node.args) != 1:
|
||||||
|
continue
|
||||||
|
arg = node.args[0]
|
||||||
|
if (
|
||||||
|
isinstance(arg, ast.Attribute)
|
||||||
|
and arg.attr == "_on_update_thread_finished"
|
||||||
|
and isinstance(arg.value, ast.Name)
|
||||||
|
and arg.value.id == "self"
|
||||||
|
):
|
||||||
|
return
|
||||||
|
self.fail("update_thread.finished must be connected to _on_update_thread_finished")
|
||||||
|
|
||||||
|
def test_on_update_thread_finished_clears_update_state(self):
|
||||||
|
method = self._find_method("_on_update_thread_finished")
|
||||||
|
assignments = {}
|
||||||
|
for node in method.body:
|
||||||
|
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
|
||||||
|
continue
|
||||||
|
target = node.targets[0]
|
||||||
|
if (
|
||||||
|
isinstance(target, ast.Attribute)
|
||||||
|
and isinstance(target.value, ast.Name)
|
||||||
|
and target.value.id == "self"
|
||||||
|
):
|
||||||
|
assignments[target.attr] = node.value
|
||||||
|
|
||||||
|
self.assertIn("_update_in_progress", assignments)
|
||||||
|
self.assertIn("update_checker", assignments)
|
||||||
|
self.assertIn("update_thread", assignments)
|
||||||
|
self.assertIsInstance(assignments["_update_in_progress"], ast.Constant)
|
||||||
|
self.assertIs(assignments["_update_in_progress"].value, False)
|
||||||
|
self.assertIsInstance(assignments["update_checker"], ast.Constant)
|
||||||
|
self.assertIsNone(assignments["update_checker"].value)
|
||||||
|
self.assertIsInstance(assignments["update_thread"], ast.Constant)
|
||||||
|
self.assertIsNone(assignments["update_thread"].value)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
53
tests/test_token_store.py
Normal file
53
tests/test_token_store.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
import importlib.util
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
_SPEC = importlib.util.spec_from_file_location(
|
||||||
|
"token_store",
|
||||||
|
Path("services/token_store.py"),
|
||||||
|
)
|
||||||
|
_MODULE = importlib.util.module_from_spec(_SPEC)
|
||||||
|
_SPEC.loader.exec_module(_MODULE)
|
||||||
|
load_token = _MODULE.load_token
|
||||||
|
save_token = _MODULE.save_token
|
||||||
|
|
||||||
|
|
||||||
|
class TokenStoreTests(unittest.TestCase):
|
||||||
|
def test_save_and_load_non_expiring_token(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
token_file = Path(td) / "token.json"
|
||||||
|
with patch.object(_MODULE.os, "name", "posix"):
|
||||||
|
expiration = save_token(
|
||||||
|
token="abc123",
|
||||||
|
token_file=str(token_file),
|
||||||
|
app_data_dir=td,
|
||||||
|
expires_in=0,
|
||||||
|
)
|
||||||
|
token, loaded_expiration = load_token(str(token_file))
|
||||||
|
|
||||||
|
self.assertEqual(expiration, 0)
|
||||||
|
self.assertEqual(token, "abc123")
|
||||||
|
self.assertEqual(loaded_expiration, 0)
|
||||||
|
|
||||||
|
def test_expired_token_is_removed(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
token_file = Path(td) / "token.json"
|
||||||
|
with patch.object(_MODULE.os, "name", "posix"):
|
||||||
|
with patch.object(_MODULE.time, "time", return_value=1000):
|
||||||
|
save_token(
|
||||||
|
token="abc123",
|
||||||
|
token_file=str(token_file),
|
||||||
|
app_data_dir=td,
|
||||||
|
expires_in=1,
|
||||||
|
)
|
||||||
|
with patch.object(_MODULE.time, "time", return_value=2000):
|
||||||
|
token, expiration = load_token(str(token_file))
|
||||||
|
|
||||||
|
self.assertIsNone(token)
|
||||||
|
self.assertIsNone(expiration)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
97
tests/test_update_reentry_runtime.py
Normal file
97
tests/test_update_reentry_runtime.py
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
import unittest
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
|
||||||
|
class _DummySignal:
|
||||||
|
def __init__(self):
|
||||||
|
self._callbacks = []
|
||||||
|
|
||||||
|
def connect(self, callback):
|
||||||
|
if callback is not None:
|
||||||
|
self._callbacks.append(callback)
|
||||||
|
|
||||||
|
def emit(self, *args, **kwargs):
|
||||||
|
for callback in list(self._callbacks):
|
||||||
|
callback(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class _DummyThread:
|
||||||
|
created = 0
|
||||||
|
|
||||||
|
def __init__(self, _parent=None):
|
||||||
|
type(self).created += 1
|
||||||
|
self.started = _DummySignal()
|
||||||
|
self.finished = _DummySignal()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.started.emit()
|
||||||
|
|
||||||
|
def quit(self):
|
||||||
|
self.finished.emit()
|
||||||
|
|
||||||
|
def deleteLater(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class _DummyChecker:
|
||||||
|
created = 0
|
||||||
|
|
||||||
|
def __init__(self, *_args, **_kwargs):
|
||||||
|
type(self).created += 1
|
||||||
|
self.check_finished = _DummySignal()
|
||||||
|
self.check_failed = _DummySignal()
|
||||||
|
|
||||||
|
def moveToThread(self, _thread):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def deleteLater(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class UpdateReentryRuntimeTests(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
try:
|
||||||
|
import main # noqa: PLC0415
|
||||||
|
except Exception as exc:
|
||||||
|
raise unittest.SkipTest(f"main import unavailable: {exc}") from exc
|
||||||
|
cls.main = main
|
||||||
|
|
||||||
|
def test_repeated_update_check_is_ignored_until_thread_finishes(self):
|
||||||
|
_DummyChecker.created = 0
|
||||||
|
_DummyThread.created = 0
|
||||||
|
manager = self.main.VkChatManager.__new__(self.main.VkChatManager)
|
||||||
|
manager._update_in_progress = False
|
||||||
|
manager._update_check_silent = False
|
||||||
|
manager.update_channel = "stable"
|
||||||
|
manager.update_repository_url = "https://example.com/org/repo"
|
||||||
|
manager.update_checker = None
|
||||||
|
manager.update_thread = None
|
||||||
|
manager.status_label = SimpleNamespace(setText=lambda *_args, **_kwargs: None)
|
||||||
|
manager._log_event = lambda *_args, **_kwargs: None
|
||||||
|
manager._set_update_action_state = lambda *_args, **_kwargs: None
|
||||||
|
|
||||||
|
with mock.patch.object(self.main, "UpdateChecker", _DummyChecker), mock.patch.object(self.main, "QThread", _DummyThread):
|
||||||
|
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
|
||||||
|
self.assertTrue(manager._update_in_progress)
|
||||||
|
self.assertEqual(_DummyChecker.created, 1)
|
||||||
|
self.assertEqual(_DummyThread.created, 1)
|
||||||
|
first_thread = manager.update_thread
|
||||||
|
|
||||||
|
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
|
||||||
|
self.assertEqual(_DummyChecker.created, 1)
|
||||||
|
self.assertEqual(_DummyThread.created, 1)
|
||||||
|
self.assertIs(manager.update_thread, first_thread)
|
||||||
|
|
||||||
|
manager.update_checker.check_finished.emit({"has_update": False, "current_version": self.main.APP_VERSION})
|
||||||
|
self.assertFalse(manager._update_in_progress)
|
||||||
|
self.assertIsNone(manager.update_checker)
|
||||||
|
self.assertIsNone(manager.update_thread)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
53
tests/test_update_service.py
Normal file
53
tests/test_update_service.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
import unittest
|
||||||
|
import importlib.util
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
MODULE_PATH = Path("services/update_service.py")
|
||||||
|
SPEC = importlib.util.spec_from_file_location("update_service_under_test", MODULE_PATH)
|
||||||
|
update_service = importlib.util.module_from_spec(SPEC)
|
||||||
|
SPEC.loader.exec_module(update_service)
|
||||||
|
|
||||||
|
|
||||||
|
class UpdateServiceTests(unittest.TestCase):
|
||||||
|
def test_normalize_update_channel(self):
|
||||||
|
self.assertEqual(update_service._normalize_update_channel("stable"), "stable")
|
||||||
|
self.assertEqual(update_service._normalize_update_channel("beta"), "beta")
|
||||||
|
self.assertEqual(update_service._normalize_update_channel("pre-release"), "beta")
|
||||||
|
self.assertEqual(update_service._normalize_update_channel("unknown"), "stable")
|
||||||
|
self.assertEqual(update_service._normalize_update_channel(""), "stable")
|
||||||
|
|
||||||
|
def test_select_release_from_list_skips_drafts(self):
|
||||||
|
releases = [
|
||||||
|
{"tag_name": "v2.0.0", "draft": True},
|
||||||
|
{"tag_name": "", "draft": False},
|
||||||
|
{"tag_name": "v1.9.0-beta.1", "draft": False},
|
||||||
|
]
|
||||||
|
selected = update_service._select_release_from_list(releases)
|
||||||
|
self.assertIsNotNone(selected)
|
||||||
|
self.assertEqual(selected["tag_name"], "v1.9.0-beta.1")
|
||||||
|
|
||||||
|
def test_extract_release_payload_uses_zip_and_checksum(self):
|
||||||
|
release_data = {
|
||||||
|
"tag_name": "v1.7.2",
|
||||||
|
"html_url": "https://example.com/release/v1.7.2",
|
||||||
|
"assets": [
|
||||||
|
{"name": "notes.txt", "browser_download_url": "https://example.com/notes.txt"},
|
||||||
|
{"name": "AnabasisManager-win64.zip", "browser_download_url": "https://example.com/app.zip"},
|
||||||
|
{"name": "AnabasisManager-setup-1.7.2.exe", "browser_download_url": "https://example.com/setup.exe"},
|
||||||
|
{"name": "AnabasisManager-win64.zip.sha256", "browser_download_url": "https://example.com/app.zip.sha256"},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
payload = update_service._extract_release_payload(
|
||||||
|
release_data=release_data,
|
||||||
|
repository_url="https://git.daemonlord.ru/benya/AnabasisChatRemove",
|
||||||
|
current_version="1.7.1",
|
||||||
|
)
|
||||||
|
self.assertEqual(payload["latest_version"], "1.7.2")
|
||||||
|
self.assertEqual(payload["download_url"], "https://example.com/app.zip")
|
||||||
|
self.assertEqual(payload["installer_url"], "https://example.com/setup.exe")
|
||||||
|
self.assertEqual(payload["checksum_url"], "https://example.com/app.zip.sha256")
|
||||||
|
self.assertTrue(payload["has_update"])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
107
tests/test_updater_gui.py
Normal file
107
tests/test_updater_gui.py
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
import importlib.util
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
import types
|
||||||
|
|
||||||
|
|
||||||
|
def _install_pyside6_stubs():
|
||||||
|
pyside6_module = types.ModuleType("PySide6")
|
||||||
|
pyside6_module.__path__ = [] # treat as package
|
||||||
|
qtcore_module = types.ModuleType("PySide6.QtCore")
|
||||||
|
qtgui_module = types.ModuleType("PySide6.QtGui")
|
||||||
|
qtwidgets_module = types.ModuleType("PySide6.QtWidgets")
|
||||||
|
|
||||||
|
class _Signal:
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def connect(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class _QObject:
|
||||||
|
pass
|
||||||
|
|
||||||
|
class _QThread:
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class _QTimer:
|
||||||
|
@staticmethod
|
||||||
|
def singleShot(*args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class _QUrl:
|
||||||
|
@staticmethod
|
||||||
|
def fromLocalFile(path):
|
||||||
|
return path
|
||||||
|
|
||||||
|
class _QDesktopServices:
|
||||||
|
@staticmethod
|
||||||
|
def openUrl(*args, **kwargs):
|
||||||
|
return True
|
||||||
|
|
||||||
|
class _Widget:
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
qtcore_module.QObject = _QObject
|
||||||
|
qtcore_module.Qt = type("Qt", (), {})
|
||||||
|
qtcore_module.QThread = _QThread
|
||||||
|
qtcore_module.Signal = _Signal
|
||||||
|
qtcore_module.QTimer = _QTimer
|
||||||
|
qtcore_module.QUrl = _QUrl
|
||||||
|
qtgui_module.QDesktopServices = _QDesktopServices
|
||||||
|
qtwidgets_module.QApplication = _Widget
|
||||||
|
qtwidgets_module.QLabel = _Widget
|
||||||
|
qtwidgets_module.QProgressBar = _Widget
|
||||||
|
qtwidgets_module.QVBoxLayout = _Widget
|
||||||
|
qtwidgets_module.QWidget = _Widget
|
||||||
|
qtwidgets_module.QPushButton = _Widget
|
||||||
|
qtwidgets_module.QHBoxLayout = _Widget
|
||||||
|
|
||||||
|
# Force stubs even if real PySide6 was imported earlier in the process.
|
||||||
|
for mod_name in list(sys.modules.keys()):
|
||||||
|
if mod_name == "PySide6" or mod_name.startswith("PySide6."):
|
||||||
|
del sys.modules[mod_name]
|
||||||
|
|
||||||
|
sys.modules["PySide6"] = pyside6_module
|
||||||
|
sys.modules["PySide6.QtCore"] = qtcore_module
|
||||||
|
sys.modules["PySide6.QtGui"] = qtgui_module
|
||||||
|
sys.modules["PySide6.QtWidgets"] = qtwidgets_module
|
||||||
|
|
||||||
|
|
||||||
|
MODULE_PATH = Path("updater_gui.py")
|
||||||
|
_install_pyside6_stubs()
|
||||||
|
SPEC = importlib.util.spec_from_file_location("updater_gui_under_test", MODULE_PATH)
|
||||||
|
updater_gui = importlib.util.module_from_spec(SPEC)
|
||||||
|
SPEC.loader.exec_module(updater_gui)
|
||||||
|
|
||||||
|
|
||||||
|
class UpdaterGuiTests(unittest.TestCase):
|
||||||
|
def test_read_version_marker(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||||
|
marker = Path(tmp_dir) / "version.txt"
|
||||||
|
marker.write_text("2.0.1\n", encoding="utf-8")
|
||||||
|
value = updater_gui._read_version_marker(tmp_dir)
|
||||||
|
self.assertEqual(value, "2.0.1")
|
||||||
|
|
||||||
|
def test_mirror_tree_skips_selected_file(self):
|
||||||
|
with tempfile.TemporaryDirectory() as src_tmp, tempfile.TemporaryDirectory() as dst_tmp:
|
||||||
|
src = Path(src_tmp)
|
||||||
|
dst = Path(dst_tmp)
|
||||||
|
(src / "keep.txt").write_text("ok", encoding="utf-8")
|
||||||
|
(src / "skip.bin").write_text("x", encoding="utf-8")
|
||||||
|
(src / "sub").mkdir()
|
||||||
|
(src / "sub" / "nested.txt").write_text("nested", encoding="utf-8")
|
||||||
|
|
||||||
|
updater_gui._mirror_tree(str(src), str(dst), skip_names={"skip.bin"})
|
||||||
|
|
||||||
|
self.assertTrue((dst / "keep.txt").exists())
|
||||||
|
self.assertTrue((dst / "sub" / "nested.txt").exists())
|
||||||
|
self.assertFalse((dst / "skip.bin").exists())
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
25
ui/dialogs.py
Normal file
25
ui/dialogs.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
from PySide6.QtWidgets import QDialog, QDialogButtonBox, QLabel, QTextEdit, QVBoxLayout
|
||||||
|
|
||||||
|
|
||||||
|
class MultiLinkDialog(QDialog):
|
||||||
|
def __init__(self, parent=None):
|
||||||
|
super().__init__(parent)
|
||||||
|
self.setWindowTitle("Ввод нескольких ссылок")
|
||||||
|
self.setMinimumSize(400, 300)
|
||||||
|
|
||||||
|
layout = QVBoxLayout(self)
|
||||||
|
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
|
||||||
|
layout.addWidget(label)
|
||||||
|
|
||||||
|
self.links_text_edit = QTextEdit()
|
||||||
|
layout.addWidget(self.links_text_edit)
|
||||||
|
|
||||||
|
button_box = QDialogButtonBox()
|
||||||
|
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
|
||||||
|
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
|
||||||
|
button_box.accepted.connect(self.accept)
|
||||||
|
button_box.rejected.connect(self.reject)
|
||||||
|
layout.addWidget(button_box)
|
||||||
|
|
||||||
|
def get_links(self):
|
||||||
|
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split("\n") if line.strip()]
|
||||||
9
ui/main_window.py
Normal file
9
ui/main_window.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
def instructions_text():
|
||||||
|
return (
|
||||||
|
"Инструкция:\n"
|
||||||
|
"1. Авторизуйтесь через VK.\n"
|
||||||
|
"2. Выберите чаты.\n"
|
||||||
|
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
|
||||||
|
"4. Для массовых операций нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
|
||||||
|
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
|
||||||
|
)
|
||||||
276
updater_gui.py
Normal file
276
updater_gui.py
Normal file
@@ -0,0 +1,276 @@
|
|||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
|
||||||
|
from PySide6.QtCore import QObject, Qt, QThread, Signal, QTimer, QUrl
|
||||||
|
from PySide6.QtGui import QDesktopServices
|
||||||
|
from PySide6.QtWidgets import QApplication, QLabel, QProgressBar, QVBoxLayout, QWidget, QPushButton, QHBoxLayout
|
||||||
|
|
||||||
|
|
||||||
|
def _write_log(log_path, message):
|
||||||
|
try:
|
||||||
|
os.makedirs(os.path.dirname(log_path), exist_ok=True)
|
||||||
|
with open(log_path, "a", encoding="utf-8") as f:
|
||||||
|
ts = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
f.write(f"[{ts}] {message.rstrip()}\n")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _is_pid_running(pid):
|
||||||
|
if pid <= 0:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
completed = subprocess.run(
|
||||||
|
["tasklist", "/FI", f"PID eq {pid}"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=5,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
return str(pid) in (completed.stdout or "")
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _copy_file_with_retries(source_file, target_file, retries=20, delay=0.5):
|
||||||
|
last_error = None
|
||||||
|
for _ in range(max(1, retries)):
|
||||||
|
try:
|
||||||
|
os.makedirs(os.path.dirname(target_file), exist_ok=True)
|
||||||
|
shutil.copy2(source_file, target_file)
|
||||||
|
return
|
||||||
|
except Exception as exc:
|
||||||
|
last_error = exc
|
||||||
|
time.sleep(delay)
|
||||||
|
raise last_error if last_error else RuntimeError(f"Не удалось скопировать файл: {source_file}")
|
||||||
|
|
||||||
|
|
||||||
|
def _mirror_tree(src_dir, dst_dir, skip_names=None, retries=20, delay=0.5):
|
||||||
|
skip_set = {name.lower() for name in (skip_names or [])}
|
||||||
|
os.makedirs(dst_dir, exist_ok=True)
|
||||||
|
for root, dirs, files in os.walk(src_dir):
|
||||||
|
rel = os.path.relpath(root, src_dir)
|
||||||
|
target_root = dst_dir if rel == "." else os.path.join(dst_dir, rel)
|
||||||
|
os.makedirs(target_root, exist_ok=True)
|
||||||
|
for file_name in files:
|
||||||
|
if file_name.lower() in skip_set:
|
||||||
|
continue
|
||||||
|
source_file = os.path.join(root, file_name)
|
||||||
|
target_file = os.path.join(target_root, file_name)
|
||||||
|
_copy_file_with_retries(source_file, target_file, retries=retries, delay=delay)
|
||||||
|
|
||||||
|
|
||||||
|
def _read_version_marker(base_dir):
|
||||||
|
marker_path = os.path.join(base_dir, "version.txt")
|
||||||
|
if not os.path.exists(marker_path):
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
with open(marker_path, "r", encoding="utf-8") as f:
|
||||||
|
return f.read().strip()
|
||||||
|
except Exception:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
class UpdateWorker(QObject):
|
||||||
|
status = Signal(int, str)
|
||||||
|
failed = Signal(str)
|
||||||
|
done = Signal()
|
||||||
|
|
||||||
|
def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""):
|
||||||
|
super().__init__()
|
||||||
|
self.app_dir = app_dir
|
||||||
|
self.source_dir = source_dir
|
||||||
|
self.exe_name = exe_name
|
||||||
|
self.target_pid = int(target_pid or 0)
|
||||||
|
self.version = version or ""
|
||||||
|
self.work_dir = work_dir or ""
|
||||||
|
self.log_path = os.path.join(app_dir, "update_error.log")
|
||||||
|
|
||||||
|
def _start_app(self):
|
||||||
|
app_exe = os.path.join(self.app_dir, self.exe_name)
|
||||||
|
if not os.path.exists(app_exe):
|
||||||
|
raise RuntimeError(f"Не найден файл приложения: {app_exe}")
|
||||||
|
creation_flags = 0
|
||||||
|
if hasattr(subprocess, "DETACHED_PROCESS"):
|
||||||
|
creation_flags |= subprocess.DETACHED_PROCESS
|
||||||
|
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
|
||||||
|
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
|
||||||
|
subprocess.Popen([app_exe], cwd=self.app_dir, creationflags=creation_flags)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
backup_dir = os.path.join(tempfile.gettempdir(), f"anabasis_backup_{int(time.time())}")
|
||||||
|
skip_names = {"anabasisupdater.exe"}
|
||||||
|
prev_version = _read_version_marker(self.app_dir)
|
||||||
|
source_version = _read_version_marker(self.source_dir)
|
||||||
|
expected_version = (self.version or "").strip()
|
||||||
|
try:
|
||||||
|
self.status.emit(1, "Ожидание завершения приложения...")
|
||||||
|
wait_loops = 0
|
||||||
|
while _is_pid_running(self.target_pid):
|
||||||
|
time.sleep(1)
|
||||||
|
wait_loops += 1
|
||||||
|
if wait_loops >= 180:
|
||||||
|
self.status.emit(1, "Принудительное завершение зависшего процесса...")
|
||||||
|
subprocess.run(
|
||||||
|
["taskkill", "/PID", str(self.target_pid), "/T", "/F"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=10,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
time.sleep(2)
|
||||||
|
if _is_pid_running(self.target_pid):
|
||||||
|
raise RuntimeError(f"Процесс {self.target_pid} не завершился.")
|
||||||
|
break
|
||||||
|
|
||||||
|
self.status.emit(2, "Проверка содержимого обновления...")
|
||||||
|
source_app_exe = os.path.join(self.source_dir, self.exe_name)
|
||||||
|
if not os.path.exists(source_app_exe):
|
||||||
|
raise RuntimeError(f"В обновлении отсутствует {self.exe_name}")
|
||||||
|
if expected_version and source_version and source_version != expected_version:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Версия пакета ({source_version}) не совпадает с ожидаемой ({expected_version})."
|
||||||
|
)
|
||||||
|
|
||||||
|
self.status.emit(3, "Создание резервной копии...")
|
||||||
|
_mirror_tree(self.app_dir, backup_dir, skip_names=skip_names)
|
||||||
|
|
||||||
|
self.status.emit(4, "Применение обновления...")
|
||||||
|
_mirror_tree(self.source_dir, self.app_dir, skip_names=skip_names, retries=30, delay=0.6)
|
||||||
|
|
||||||
|
self.status.emit(5, "Проверка установленной версии...")
|
||||||
|
installed_version = _read_version_marker(self.app_dir)
|
||||||
|
if expected_version and installed_version and installed_version != expected_version:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"После обновления версия {installed_version}, ожидалась {expected_version}."
|
||||||
|
)
|
||||||
|
if expected_version and prev_version and prev_version == expected_version:
|
||||||
|
_write_log(self.log_path, f"Предупреждение: версия до обновления уже была {expected_version}.")
|
||||||
|
|
||||||
|
self.status.emit(6, "Запуск обновленного приложения...")
|
||||||
|
self._start_app()
|
||||||
|
|
||||||
|
_write_log(self.log_path, f"Update success to version {expected_version or source_version or 'unknown'}")
|
||||||
|
self.status.emit(7, "Очистка временных файлов...")
|
||||||
|
try:
|
||||||
|
shutil.rmtree(backup_dir, ignore_errors=True)
|
||||||
|
if self.work_dir and os.path.isdir(self.work_dir):
|
||||||
|
shutil.rmtree(self.work_dir, ignore_errors=True)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self.done.emit()
|
||||||
|
except Exception as exc:
|
||||||
|
_write_log(self.log_path, f"Update failed: {exc}")
|
||||||
|
try:
|
||||||
|
self.status.emit(6, "Восстановление из резервной копии...")
|
||||||
|
if os.path.isdir(backup_dir):
|
||||||
|
_mirror_tree(backup_dir, self.app_dir, skip_names=skip_names, retries=20, delay=0.5)
|
||||||
|
_write_log(self.log_path, "Rollback completed.")
|
||||||
|
try:
|
||||||
|
self._start_app()
|
||||||
|
_write_log(self.log_path, "Restored app started after rollback.")
|
||||||
|
except Exception as start_exc:
|
||||||
|
_write_log(self.log_path, f"Failed to start app after rollback: {start_exc}")
|
||||||
|
except Exception as rollback_exc:
|
||||||
|
_write_log(self.log_path, f"Rollback failed: {rollback_exc}")
|
||||||
|
self.failed.emit(str(exc))
|
||||||
|
|
||||||
|
|
||||||
|
class UpdaterWindow(QWidget):
|
||||||
|
def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""):
|
||||||
|
super().__init__()
|
||||||
|
self.setWindowTitle("Anabasis Updater")
|
||||||
|
self.setMinimumWidth(480)
|
||||||
|
self.log_path = os.path.join(app_dir, "update_error.log")
|
||||||
|
|
||||||
|
self.label = QLabel("Подготовка обновления...")
|
||||||
|
self.label.setWordWrap(True)
|
||||||
|
self.progress = QProgressBar()
|
||||||
|
self.progress.setRange(0, 7)
|
||||||
|
self.progress.setValue(0)
|
||||||
|
|
||||||
|
self.open_log_btn = QPushButton("Открыть лог")
|
||||||
|
self.open_log_btn.setEnabled(False)
|
||||||
|
self.open_log_btn.clicked.connect(self.open_log)
|
||||||
|
self.close_btn = QPushButton("Закрыть")
|
||||||
|
self.close_btn.setEnabled(False)
|
||||||
|
self.close_btn.clicked.connect(self.close)
|
||||||
|
|
||||||
|
layout = QVBoxLayout(self)
|
||||||
|
layout.addWidget(self.label)
|
||||||
|
layout.addWidget(self.progress)
|
||||||
|
actions = QHBoxLayout()
|
||||||
|
actions.addStretch(1)
|
||||||
|
actions.addWidget(self.open_log_btn)
|
||||||
|
actions.addWidget(self.close_btn)
|
||||||
|
layout.addLayout(actions)
|
||||||
|
|
||||||
|
self.thread = QThread(self)
|
||||||
|
self.worker = UpdateWorker(app_dir, source_dir, exe_name, target_pid, version, work_dir=work_dir)
|
||||||
|
self.worker.moveToThread(self.thread)
|
||||||
|
self.thread.started.connect(self.worker.run)
|
||||||
|
self.worker.status.connect(self.on_status)
|
||||||
|
self.worker.failed.connect(self.on_failed)
|
||||||
|
self.worker.done.connect(self.on_done)
|
||||||
|
self.worker.done.connect(self.thread.quit)
|
||||||
|
self.worker.failed.connect(self.thread.quit)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
def on_status(self, step, text):
|
||||||
|
self.label.setText(text)
|
||||||
|
self.progress.setValue(max(0, min(7, int(step))))
|
||||||
|
|
||||||
|
def on_done(self):
|
||||||
|
self.label.setText("Обновление успешно применено. Приложение запущено.")
|
||||||
|
self.progress.setValue(7)
|
||||||
|
self.open_log_btn.setEnabled(True)
|
||||||
|
QTimer.singleShot(900, self.close)
|
||||||
|
|
||||||
|
def on_failed(self, error_text):
|
||||||
|
self.label.setText(
|
||||||
|
"Не удалось применить обновление.\n"
|
||||||
|
f"Причина: {error_text}\n"
|
||||||
|
"Подробности сохранены в update_error.log."
|
||||||
|
)
|
||||||
|
self.open_log_btn.setEnabled(True)
|
||||||
|
self.close_btn.setEnabled(True)
|
||||||
|
|
||||||
|
def open_log(self):
|
||||||
|
if os.path.exists(self.log_path):
|
||||||
|
QDesktopServices.openUrl(QUrl.fromLocalFile(self.log_path))
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--app-dir", required=True)
|
||||||
|
parser.add_argument("--source-dir", required=True)
|
||||||
|
parser.add_argument("--exe-name", required=True)
|
||||||
|
parser.add_argument("--target-pid", required=True)
|
||||||
|
parser.add_argument("--version", default="")
|
||||||
|
parser.add_argument("--work-dir", default="")
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
args = parse_args()
|
||||||
|
app = QApplication(sys.argv)
|
||||||
|
app.setStyle("Fusion")
|
||||||
|
window = UpdaterWindow(
|
||||||
|
app_dir=args.app_dir,
|
||||||
|
source_dir=args.source_dir,
|
||||||
|
exe_name=args.exe_name,
|
||||||
|
target_pid=args.target_pid,
|
||||||
|
version=args.version,
|
||||||
|
work_dir=args.work_dir,
|
||||||
|
)
|
||||||
|
window.show()
|
||||||
|
return app.exec()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
Reference in New Issue
Block a user