Files
Copyrka/main.py

1845 lines
81 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, scrolledtext, simpledialog
import shutil
import os
from pathlib import Path
import threading
import queue
from datetime import datetime, time
import time as time_module
import schedule
import sys
import json
import tempfile
from typing import List, Dict, Optional
import traceback
import hashlib
import winreg
import contextlib
import logging
import logging.handlers
import fnmatch
import ctypes
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
import pystray
from PIL import Image, ImageDraw
except Exception:
pystray = None
Image = None
ImageDraw = None
DEFAULT_EXTENSIONS = ('*.bak', '*.BAK', '*.backup', '*.sql')
AUTOSTART_REG_PATH = r"Software\Microsoft\Windows\CurrentVersion\Run"
AUTOSTART_REG_NAME = "BackupCopier"
ICON_PATH = "icon.ico"
APP_NAME = "BackupCopier"
COPY_RETRIES = 2
COPY_RETRY_DELAY = 2
DEFAULT_MIN_FREE_GB = 1
DEFAULT_MAX_WORKERS = 3
DEFAULT_INCLUDE_MASKS = "*.bak;*.sql;*.backup"
DEFAULT_EXCLUDE_MASKS = "*.tmp;*.temp"
MUTEX_NAME = "Global\\BackupCopierMutex"
def find_latest_file_in_folder(folder_path: str, extensions=DEFAULT_EXTENSIONS,
include_masks: Optional[List[str]] = None,
exclude_masks: Optional[List[str]] = None) -> Optional[Path]:
"""Возвращает самый новый файл из папки по времени модификации."""
folder = Path(folder_path)
if not folder.exists():
return None
files: List[Path] = []
include_masks = include_masks or []
exclude_masks = exclude_masks or []
if include_masks:
for mask in include_masks:
files.extend(folder.glob(mask))
else:
for ext in extensions:
files.extend(folder.glob(ext))
filtered = []
seen = set()
for f in files:
if not f.is_file():
continue
if f.name in seen:
continue
if matches_masks(f.name, include_masks, exclude_masks):
filtered.append(f)
seen.add(f.name)
if not filtered:
return None
return max(filtered, key=lambda f: f.stat().st_mtime)
def should_copy_file(source: Path, target: Path) -> bool:
"""Определяет, нужно ли копировать файл."""
if not target.exists():
return True
return source.stat().st_mtime > target.stat().st_mtime
def compute_file_checksum(path: Path, chunk_size: int = 1024 * 1024) -> str:
"""Считает SHA-256 контрольную сумму файла."""
hasher = hashlib.sha256()
with path.open("rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
hasher.update(chunk)
return hasher.hexdigest()
def verify_copy(source: Path, target: Path) -> bool:
"""Проверяет, что файл скопирован корректно, по контрольной сумме."""
return compute_file_checksum(source) == compute_file_checksum(target)
def get_resource_path(relative_path: str) -> str:
"""Возвращает абсолютный путь к ресурсу (поддерживает PyInstaller)."""
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
return os.path.join(sys._MEIPASS, relative_path)
return os.path.abspath(relative_path)
def parse_masks(mask_text: str) -> List[str]:
parts = [p.strip() for p in (mask_text or "").split(";")]
return [p for p in parts if p]
def matches_masks(filename: str, include_masks: List[str], exclude_masks: List[str]) -> bool:
if include_masks:
if not any(fnmatch.fnmatch(filename, m) for m in include_masks):
return False
if exclude_masks:
if any(fnmatch.fnmatch(filename, m) for m in exclude_masks):
return False
return True
def get_free_space_gb(path: Path) -> float:
try:
usage = shutil.disk_usage(str(path))
return usage.free / (1024 ** 3)
except Exception:
return 0.0
def ensure_single_instance() -> Optional[int]:
"""Создает системный mutex. Возвращает handle или None если уже запущено."""
mutex = ctypes.windll.kernel32.CreateMutexW(None, True, MUTEX_NAME)
already_exists = ctypes.windll.kernel32.GetLastError() == 183
if already_exists:
return None
return mutex
class FileCopyScheduler:
"""Класс для управления расписанием копирования"""
def __init__(self, app):
self.app = app
self.scheduled_jobs = []
self.running = False
self.thread = None
def start(self):
"""Запускает планировщик в отдельном потоке"""
if not self.running:
self.running = True
self.thread = threading.Thread(target=self._run_scheduler, daemon=True)
self.thread.start()
self.app.log_message("🕒 Планировщик запущен", "info")
def stop(self):
"""Останавливает планировщик"""
self.running = False
self.app.log_message("🕒 Планировщик остановлен", "info")
def _run_scheduler(self):
"""Основной цикл планировщика"""
while self.running:
schedule.run_pending()
time_module.sleep(1)
def schedule_copy_job(self, job_id: str, schedules: List[dict], pairs: List[tuple]):
"""Добавляет задания в планировщик"""
# Очищаем предыдущие задания с таким же ID
schedule.clear(job_id)
if not schedules:
return
day_map = {
"Mon": schedule.every().monday,
"Tue": schedule.every().tuesday,
"Wed": schedule.every().wednesday,
"Thu": schedule.every().thursday,
"Fri": schedule.every().friday,
"Sat": schedule.every().saturday,
"Sun": schedule.every().sunday,
}
for entry in schedules:
time_str = entry.get("time", "03:00")
days = entry.get("days", [])
if not days:
schedule.every().day.at(time_str).do(self._execute_copy_job, pairs=pairs).tag(job_id)
self.app.log_message(f"📅 Запланировано копирование на {time_str} ежедневно", "info")
else:
for day in days:
if day in day_map:
day_map[day].at(time_str).do(self._execute_copy_job, pairs=pairs).tag(job_id)
self.app.log_message(f"📅 Запланировано копирование на {time_str} ({', '.join(days)})", "info")
def _execute_copy_job(self, pairs: List[tuple]):
"""Выполняет задание копирования"""
self.app.log_message("⏰ Запуск запланированного копирования...", "info")
# Запускаем копирование в отдельном потоке
copy_thread = threading.Thread(
target=self.app.copy_files_thread,
args=(pairs, True), # True означает фоновый режим
daemon=True
)
copy_thread.start()
class BackgroundFileCopyApp:
def __init__(self, root):
self.root = root
self.root.title("Планировщик копирования бекапов")
self.root.geometry("900x800")
self.setup_window_icon()
# Для работы с очередью сообщений из потоков
self.queue = queue.Queue()
# Список пар для копирования
self.copy_pairs = []
self.pair_counter = 0
self.selected_pair_id = None
# Планировщик
self.scheduler = FileCopyScheduler(self)
# Флаг для отслеживания состояния
self.is_copying = False
self.copy_lock = threading.Lock()
# Автозапуск
self.autostart_enabled = tk.BooleanVar(value=False)
# Сворачивание в трей
self.minimize_to_tray_enabled = tk.BooleanVar(value=True)
self.tray_icon = None
self.tray_thread = None
# Режим проверки без копирования
self.verify_only = tk.BooleanVar(value=False)
# Последний успешный запуск
self.last_success_time: Optional[str] = None
self.last_result_summary: Optional[str] = None
self.last_error_detail: Optional[str] = None
# Хеши последних копий
self.last_hashes: Dict[str, str] = {}
# Профили
self.profiles: Dict[str, dict] = {}
self.active_profile = tk.StringVar(value="Default")
self.include_masks_var = tk.StringVar(value=DEFAULT_INCLUDE_MASKS)
self.exclude_masks_var = tk.StringVar(value=DEFAULT_EXCLUDE_MASKS)
self.min_free_gb_var = tk.DoubleVar(value=DEFAULT_MIN_FREE_GB)
self.max_workers_var = tk.IntVar(value=DEFAULT_MAX_WORKERS)
self.schedules: List[dict] = []
self.schedule_days_vars = {
"Mon": tk.BooleanVar(value=False),
"Tue": tk.BooleanVar(value=False),
"Wed": tk.BooleanVar(value=False),
"Thu": tk.BooleanVar(value=False),
"Fri": tk.BooleanVar(value=False),
"Sat": tk.BooleanVar(value=False),
"Sun": tk.BooleanVar(value=False),
}
# Логгер в файл
self.file_logger = self.setup_file_logger()
# Создаем интерфейс
self.setup_ui()
# Загружаем сохраненные настройки
self.load_settings()
# Проверяем очередь каждые 100ms
self.process_queue()
# Запускаем планировщик при старте, если есть сохраненные настройки
if self.scheduler_enabled.get():
self.start_scheduler()
def setup_ui(self):
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
notebook = ttk.Notebook(main_frame)
notebook.pack(fill=tk.BOTH, expand=True)
settings_tab = ttk.Frame(notebook)
log_tab = ttk.Frame(notebook)
help_tab = ttk.Frame(notebook)
notebook.add(settings_tab, text="Настройки")
notebook.add(log_tab, text="Лог")
notebook.add(help_tab, text="Справка")
title_label = ttk.Label(settings_tab, text="Планировщик копирования бекапов",
font=("Arial", 14, "bold"))
title_label.pack(pady=8)
profile_frame = ttk.Frame(settings_tab)
profile_frame.pack(fill=tk.X, pady=4)
ttk.Label(profile_frame, text="Профиль:").pack(side=tk.LEFT)
self.profile_combo = ttk.Combobox(profile_frame, textvariable=self.active_profile, state="readonly", width=20)
self.profile_combo.pack(side=tk.LEFT, padx=5)
self.profile_combo.bind("<<ComboboxSelected>>", lambda _e: self.load_profile(self.active_profile.get()))
ttk.Button(profile_frame, text="", width=3, command=self.add_profile).pack(side=tk.LEFT, padx=2)
ttk.Button(profile_frame, text="", width=3, command=self.rename_profile).pack(side=tk.LEFT, padx=2)
ttk.Button(profile_frame, text="🗑", width=3, command=self.delete_profile).pack(side=tk.LEFT, padx=2)
ttk.Button(profile_frame, text="Импорт", command=self.import_profiles).pack(side=tk.LEFT, padx=6)
ttk.Button(profile_frame, text="Экспорт", command=self.export_profiles).pack(side=tk.LEFT, padx=2)
status_frame = ttk.LabelFrame(settings_tab, text="Состояние", padding="10")
status_frame.pack(fill=tk.X, pady=5)
self.status_summary_label = ttk.Label(status_frame, text="")
self.status_summary_label.pack(side=tk.LEFT)
self.next_run_label = ttk.Label(status_frame, text="(следующий запуск: —)",
font=("Arial", 9, "italic"))
self.next_run_label.pack(side=tk.LEFT, padx=10)
self.last_success_label = ttk.Label(status_frame, text="(последний успех: —)",
font=("Arial", 9, "italic"))
self.last_success_label.pack(side=tk.LEFT, padx=10)
self.last_result_label = ttk.Label(status_frame, text="(последний результат: —)",
font=("Arial", 9, "italic"))
self.last_result_label.pack(side=tk.LEFT, padx=10)
schedule_frame = ttk.LabelFrame(settings_tab, text="Расписание и поведение", padding="10")
schedule_frame.pack(fill=tk.X, pady=5)
time_frame = ttk.Frame(schedule_frame)
time_frame.pack(fill=tk.X, pady=5)
self.hour_var = tk.StringVar(value="03")
self.minute_var = tk.StringVar(value="00")
self.scheduler_enabled = tk.BooleanVar(value=False)
self.scheduler_check = ttk.Checkbutton(time_frame,
text="Включить расписание",
variable=self.scheduler_enabled,
command=self.toggle_scheduler)
self.scheduler_check.pack(side=tk.LEFT, padx=10)
self.scheduler_status = ttk.Label(time_frame, text="(остановлен)",
font=("Arial", 9, "italic"))
self.scheduler_status.pack(side=tk.LEFT, padx=10)
schedule_summary_frame = ttk.Frame(schedule_frame)
schedule_summary_frame.pack(fill=tk.X, pady=4)
self.schedule_summary_label = ttk.Label(schedule_summary_frame, text="Расписание: —")
self.schedule_summary_label.pack(side=tk.LEFT)
ttk.Button(schedule_summary_frame, text="Расписание...", command=self.open_schedule_dialog).pack(side=tk.LEFT, padx=8)
options_frame = ttk.Frame(schedule_frame)
options_frame.pack(fill=tk.X, pady=5)
self.autostart_check = ttk.Checkbutton(
options_frame,
text="Автозапуск Windows",
variable=self.autostart_enabled,
command=self.toggle_autostart
)
self.autostart_check.pack(side=tk.LEFT)
ttk.Button(options_frame, text="?", width=2,
command=lambda: self.show_hint("Автозапуск",
"Добавляет программу в автозапуск Windows.")).pack(side=tk.LEFT, padx=4)
self.minimize_check = ttk.Checkbutton(
options_frame,
text="Сворачивать в трей при закрытии",
variable=self.minimize_to_tray_enabled
)
self.minimize_check.pack(side=tk.LEFT, padx=10)
ttk.Button(options_frame, text="?", width=2,
command=lambda: self.show_hint("Сворачивание",
"По крестику окно скрывается в трей.")).pack(side=tk.LEFT, padx=4)
self.verify_check = ttk.Checkbutton(
options_frame,
text="Только проверка (без копирования)",
variable=self.verify_only
)
self.verify_check.pack(side=tk.LEFT, padx=10)
ttk.Button(options_frame, text="?", width=2,
command=lambda: self.show_hint("Проверка",
"Проверяет совпадение контрольных сумм без копирования.")).pack(side=tk.LEFT, padx=4)
perf_frame = ttk.Frame(schedule_frame)
perf_frame.pack(fill=tk.X, pady=4)
ttk.Label(perf_frame, text="Мин. свободно, ГБ:").pack(side=tk.LEFT)
self.min_free_entry = ttk.Entry(perf_frame, textvariable=self.min_free_gb_var, width=6)
self.min_free_entry.pack(side=tk.LEFT, padx=4)
ttk.Label(perf_frame, text="Параллельных потоков:").pack(side=tk.LEFT, padx=8)
self.max_workers_entry = ttk.Entry(perf_frame, textvariable=self.max_workers_var, width=4)
self.max_workers_entry.pack(side=tk.LEFT, padx=4)
masks_frame = ttk.Frame(schedule_frame)
masks_frame.pack(fill=tk.X, pady=4)
ttk.Label(masks_frame, text="Включать маски:").pack(side=tk.LEFT)
self.include_entry = ttk.Entry(masks_frame, textvariable=self.include_masks_var)
self.include_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=4)
ttk.Label(masks_frame, text="Исключать маски:").pack(side=tk.LEFT, padx=6)
self.exclude_entry = ttk.Entry(masks_frame, textvariable=self.exclude_masks_var)
self.exclude_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=4)
buttons_frame = ttk.Frame(schedule_frame)
buttons_frame.pack(fill=tk.X, pady=5)
ttk.Button(buttons_frame, text="🧭 Мастер настройки",
command=self.open_wizard).pack(side=tk.LEFT, padx=5)
ttk.Button(buttons_frame, text="💾 Сохранить",
command=self.check_and_save).pack(side=tk.LEFT, padx=5)
ttk.Button(buttons_frame, text="▶ Запустить сейчас",
command=self.start_manual_copy).pack(side=tk.LEFT, padx=5)
paths_frame = ttk.LabelFrame(settings_tab, text="Пары копирования", padding="10")
paths_frame.pack(fill=tk.BOTH, expand=True, pady=5)
self.pairs_tree = ttk.Treeview(
paths_frame,
columns=("source", "dest", "mode", "status"),
show="headings",
selectmode="browse"
)
self.pairs_tree.heading("source", text="Источник")
self.pairs_tree.heading("dest", text="Назначение")
self.pairs_tree.heading("mode", text="Режим")
self.pairs_tree.heading("status", text="Статус")
self.pairs_tree.column("source", width=280)
self.pairs_tree.column("dest", width=280)
self.pairs_tree.column("mode", width=110, anchor=tk.CENTER)
self.pairs_tree.column("status", width=120, anchor=tk.CENTER)
tree_scroll = ttk.Scrollbar(paths_frame, orient="vertical", command=self.pairs_tree.yview)
self.pairs_tree.configure(yscrollcommand=tree_scroll.set)
self.pairs_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
tree_scroll.pack(side=tk.RIGHT, fill=tk.Y)
self.pairs_tree.tag_configure("ok", foreground="green")
self.pairs_tree.tag_configure("warn", foreground="orange")
self.pairs_tree.tag_configure("error", foreground="red")
self.pairs_tree.tag_configure("idle", foreground="gray")
list_buttons_frame = ttk.Frame(settings_tab)
list_buttons_frame.pack(fill=tk.X, pady=5)
ttk.Button(list_buttons_frame, text=" Добавить",
command=self.add_path_pair).pack(side=tk.LEFT, padx=5)
ttk.Button(list_buttons_frame, text="❌ Удалить",
command=self.remove_selected_pair).pack(side=tk.LEFT, padx=5)
ttk.Button(list_buttons_frame, text="📂 Открыть назначение",
command=self.open_selected_destination).pack(side=tk.LEFT, padx=5)
ttk.Button(list_buttons_frame, text="🔁 Переключить режим",
command=self.toggle_selected_mode).pack(side=tk.LEFT, padx=5)
edit_frame = ttk.LabelFrame(settings_tab, text="Редактирование выбранной пары", padding="10")
edit_frame.pack(fill=tk.X, pady=5)
ttk.Label(edit_frame, text="Источник:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=2)
ttk.Label(edit_frame, text="Назначение:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=2)
self.edit_source = ttk.Entry(edit_frame)
self.edit_dest = ttk.Entry(edit_frame)
self.edit_source.grid(row=0, column=1, sticky=tk.EW, padx=5, pady=2)
self.edit_dest.grid(row=1, column=1, sticky=tk.EW, padx=5, pady=2)
ttk.Button(edit_frame, text="📁", width=3,
command=lambda: self.browse_folder(self.edit_source)).grid(row=0, column=2, padx=3)
ttk.Button(edit_frame, text="📁", width=3,
command=lambda: self.browse_folder(self.edit_dest)).grid(row=1, column=2, padx=3)
self.edit_full_copy = tk.BooleanVar(value=False)
ttk.Checkbutton(edit_frame, text="Вся папка", variable=self.edit_full_copy).grid(
row=0, column=3, rowspan=2, padx=10
)
ttk.Button(edit_frame, text="?", width=2,
command=lambda: self.show_hint("Вся папка",
"Копирует всю папку и подпапки, а не только последний файл.")).grid(row=0, column=4, rowspan=2, padx=3)
edit_frame.columnconfigure(1, weight=1)
self.edit_source.bind("<KeyRelease>", lambda _e: self.update_selected_pair_from_edit())
self.edit_dest.bind("<KeyRelease>", lambda _e: self.update_selected_pair_from_edit())
self.edit_full_copy.trace_add("write", lambda *_: self.update_selected_pair_from_edit())
self.pairs_tree.bind("<<TreeviewSelect>>", lambda _e: self.on_pair_select())
log_frame = ttk.LabelFrame(log_tab, text="Лог операций", padding="5")
log_frame.pack(fill=tk.BOTH, expand=True, pady=5)
log_buttons = ttk.Frame(log_frame)
log_buttons.pack(fill=tk.X, pady=2)
ttk.Button(log_buttons, text="📋 Очистить лог",
command=self.clear_log).pack(side=tk.RIGHT, padx=2)
ttk.Button(log_buttons, text="🔍 Отладка",
command=self.debug_settings).pack(side=tk.RIGHT, padx=2)
ttk.Button(log_buttons, text="📂 Открыть лог",
command=self.open_log_file).pack(side=tk.LEFT, padx=2)
ttk.Button(log_buttons, text="📁 Папка настроек",
command=self.open_settings_folder).pack(side=tk.LEFT, padx=2)
self.log_text = scrolledtext.ScrolledText(log_frame, height=8, wrap=tk.WORD)
self.log_text.pack(fill=tk.BOTH, expand=True)
self.log_text.tag_configure("success", foreground="green")
self.log_text.tag_configure("error", foreground="red")
self.log_text.tag_configure("warning", foreground="orange")
self.log_text.tag_configure("info", foreground="blue")
help_text = (
"Быстрый старт:\n"
"1) Нажмите 'Добавить' и заполните источник/назначение.\n"
"2) Выберите режим: последний файл или вся папка.\n"
"3) Настройте расписание и нажмите 'Сохранить'.\n\n"
"Подсказки доступны по кнопке '?'."
)
ttk.Label(help_tab, text=help_text, justify=tk.LEFT).pack(anchor=tk.W, padx=10, pady=10)
self.status_bar = ttk.Label(self.root, text="Готов к работе",
relief=tk.SUNKEN, anchor=tk.W)
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
self.progress_var = tk.IntVar(value=0)
self.progress = ttk.Progressbar(self.root, mode="determinate", variable=self.progress_var)
self.progress.pack(side=tk.BOTTOM, fill=tk.X)
def setup_window_icon(self):
icon_path = get_resource_path(ICON_PATH)
if os.path.exists(icon_path):
with contextlib.suppress(Exception):
self.root.iconbitmap(icon_path)
def setup_file_logger(self):
settings_path = self.get_settings_path()
log_dir = os.path.dirname(settings_path)
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "backup_copier.log")
logger = logging.getLogger("backup_copier")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.handlers.RotatingFileHandler(
log_path, maxBytes=2 * 1024 * 1024, backupCount=3, encoding="utf-8"
)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_settings_path(self):
"""Возвращает путь для сохранения настроек"""
if getattr(sys, 'frozen', False):
# Для EXE используем папку с EXE файлом
base_path = os.path.dirname(sys.executable)
else:
# Для скрипта используем папку со скриптом
base_path = os.path.dirname(os.path.abspath(__file__))
# Пробуем создать папку, если есть права
settings_dir = base_path
try:
# Проверяем, можем ли мы писать в эту папку
test_file = os.path.join(settings_dir, 'test_write.tmp')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
self.log_message(f"📁 Используется папка: {settings_dir}", "info")
except:
# Если не можем писать, используем AppData
settings_dir = os.path.join(os.environ.get('APPDATA', os.path.expanduser('~')), 'BackupCopier')
os.makedirs(settings_dir, exist_ok=True)
self.log_message(f"📁 Используется папка AppData: {settings_dir}", "info")
return os.path.join(settings_dir, 'backup_copier_settings.json')
def create_placeholder(self, entry, text):
"""Добавляет подсказку в поле ввода."""
placeholder_color = "gray"
normal_color = "black"
def on_focus_in(_):
if entry.get() == text and entry.cget("foreground") == placeholder_color:
entry.delete(0, tk.END)
entry.config(foreground=normal_color)
def on_focus_out(_):
if not entry.get():
entry.insert(0, text)
entry.config(foreground=placeholder_color)
entry.bind("<FocusIn>", on_focus_in)
entry.bind("<FocusOut>", on_focus_out)
on_focus_out(None)
def get_autostart_command(self) -> str:
"""Возвращает команду для автозапуска."""
if getattr(sys, 'frozen', False):
return f"\"{sys.executable}\""
script_path = os.path.abspath(__file__)
return f"\"{sys.executable}\" \"{script_path}\""
def is_autostart_enabled(self) -> bool:
"""Проверяет, включен ли автозапуск в реестре."""
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_REG_PATH, 0, winreg.KEY_READ) as key:
value, _ = winreg.QueryValueEx(key, AUTOSTART_REG_NAME)
return bool(value)
except FileNotFoundError:
return False
except OSError:
return False
def set_autostart_enabled(self, enabled: bool) -> None:
"""Включает/выключает автозапуск в реестре."""
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_REG_PATH, 0, winreg.KEY_SET_VALUE) as key:
if enabled:
winreg.SetValueEx(key, AUTOSTART_REG_NAME, 0, winreg.REG_SZ, self.get_autostart_command())
self.log_message("✅ Автозапуск включен", "success")
else:
try:
winreg.DeleteValue(key, AUTOSTART_REG_NAME)
except FileNotFoundError:
pass
self.log_message("⏭️ Автозапуск выключен", "info")
except Exception as e:
self.log_message(f"❌ Ошибка автозапуска: {e}", "error")
def toggle_autostart(self):
"""Обработчик галочки автозапуска."""
self.set_autostart_enabled(self.autostart_enabled.get())
def load_settings(self):
"""Загружает настройки из файла и отображает их"""
try:
settings_path = self.get_settings_path()
self.log_message(f"🔍 Загрузка настроек из: {settings_path}", "info")
if os.path.exists(settings_path):
with open(settings_path, 'r', encoding='utf-8') as f:
settings = json.load(f)
self.scheduler_enabled.set(settings.get('enabled', False))
desired_autostart = settings.get('autostart_enabled')
self.minimize_to_tray_enabled.set(settings.get('minimize_to_tray', True))
# Профили
profiles = settings.get("profiles")
if not isinstance(profiles, dict):
# Миграция со старого формата
profiles = {"Default": self.default_profile()}
profiles["Default"]["pairs"] = settings.get('pairs', [])
profiles["Default"]["include_masks"] = settings.get('include_masks', DEFAULT_INCLUDE_MASKS)
profiles["Default"]["exclude_masks"] = settings.get('exclude_masks', DEFAULT_EXCLUDE_MASKS)
profiles["Default"]["min_free_gb"] = settings.get('min_free_gb', DEFAULT_MIN_FREE_GB)
profiles["Default"]["max_workers"] = settings.get('max_workers', DEFAULT_MAX_WORKERS)
profiles["Default"]["verify_only"] = settings.get('verify_only', False)
profiles["Default"]["last_success_time"] = settings.get('last_success_time')
profiles["Default"]["last_result_summary"] = settings.get('last_result_summary')
profiles["Default"]["last_hashes"] = settings.get('last_hashes', {})
profiles["Default"]["schedules"] = settings.get('schedules', [])
self.profiles = profiles
self.refresh_profile_combo()
active = settings.get("active_profile") or self.active_profile.get()
self.load_profile(active)
# Настройки автозапуска
actual_autostart = self.is_autostart_enabled()
if desired_autostart is None:
self.autostart_enabled.set(actual_autostart)
else:
desired_autostart = bool(desired_autostart)
self.autostart_enabled.set(desired_autostart)
if desired_autostart != actual_autostart:
self.set_autostart_enabled(desired_autostart)
self.log_message(f"📂 Настройки загружены из {settings_path}", "info")
else:
# Если файла нет, добавляем пустую пару
self.log_message("🆕 Файл настроек не найден, создаем новую конфигурацию", "info")
self.profiles = {"Default": self.default_profile()}
self.refresh_profile_combo()
self.load_profile("Default")
self.autostart_enabled.set(self.is_autostart_enabled())
except Exception as e:
self.log_message(f"❌ Ошибка при загрузке настроек: {e}", "error")
import traceback
traceback.print_exc()
self.profiles = {"Default": self.default_profile()}
self.refresh_profile_combo()
self.load_profile("Default")
self.autostart_enabled.set(self.is_autostart_enabled())
def save_settings(self):
"""Сохраняет настройки в файл"""
try:
self.save_active_profile()
settings = {
'enabled': self.scheduler_enabled.get(),
'autostart_enabled': self.autostart_enabled.get(),
'minimize_to_tray': self.minimize_to_tray_enabled.get(),
'active_profile': self.active_profile.get(),
'profiles': self.profiles
}
settings_path = self.get_settings_path()
# Сохраняем с отступами для читаемости
with open(settings_path, 'w', encoding='utf-8') as f:
json.dump(settings, f, ensure_ascii=False, indent=2)
self.log_message(f"💾 Настройки сохранены (профилей: {len(self.profiles)})", "success")
self.log_message(f"📁 Файл: {settings_path}", "info")
messagebox.showinfo("Успех", f"Настройки сохранены!\n\nФайл: {settings_path}")
if self.scheduler_enabled.get():
# Перепланируем, чтобы учесть новые пути/время
self.start_scheduler()
except Exception as e:
self.log_message(f"❌ Ошибка при сохранении настроек: {e}", "error")
messagebox.showerror("Ошибка", f"Не удалось сохранить настройки:\n{e}")
def add_path_pair(self, source="", dest="", full_copy: bool = False):
"""Добавляет новую пару в таблицу."""
pair_id = f"pair_{self.pair_counter}"
self.pair_counter += 1
pair = {
"id": pair_id,
"source": source,
"dest": dest,
"full_copy": bool(full_copy),
"status": "",
"status_tag": "idle"
}
if source or dest:
status_text, status_tag = self.validate_pair(source, dest)
pair["status"] = status_text
pair["status_tag"] = status_tag
if source or dest:
status_text, status_tag = self.validate_pair(source, dest)
pair["status"] = status_text
pair["status_tag"] = status_tag
self.copy_pairs.append(pair)
mode_text = "Вся папка" if pair["full_copy"] else "Последний файл"
self.pairs_tree.insert("", "end", iid=pair_id,
values=(source, dest, mode_text, pair["status"]),
tags=(pair["status_tag"],))
self.pairs_tree.selection_set(pair_id)
self.on_pair_select()
def remove_selected_pair(self):
pair_id = self.selected_pair_id
if not pair_id:
return
self.copy_pairs = [p for p in self.copy_pairs if p["id"] != pair_id]
with contextlib.suppress(Exception):
self.pairs_tree.delete(pair_id)
self.selected_pair_id = None
self.edit_source.delete(0, tk.END)
self.edit_dest.delete(0, tk.END)
self.edit_full_copy.set(False)
def remove_path_pair(self, pair_id):
"""Совместимость: удалить пару по id."""
if pair_id:
self.selected_pair_id = pair_id
self.remove_selected_pair()
def remove_all_pairs(self, silent=False):
"""Удаляет все пары"""
if not silent:
if not messagebox.askyesno("Подтверждение", "Удалить все пути?"):
return
self.copy_pairs.clear()
for item in self.pairs_tree.get_children():
self.pairs_tree.delete(item)
self.selected_pair_id = None
def get_pair_by_id(self, pair_id: str):
for pair in self.copy_pairs:
if pair["id"] == pair_id:
return pair
return None
def update_pair_row(self, pair):
mode_text = "Вся папка" if pair["full_copy"] else "Последний файл"
self.pairs_tree.item(pair["id"], values=(pair["source"], pair["dest"], mode_text, pair["status"]),
tags=(pair["status_tag"],))
def on_pair_select(self):
selection = self.pairs_tree.selection()
if not selection:
return
pair_id = selection[0]
self.selected_pair_id = pair_id
pair = self.get_pair_by_id(pair_id)
if not pair:
return
self.edit_source.delete(0, tk.END)
self.edit_dest.delete(0, tk.END)
self.edit_source.insert(0, pair["source"])
self.edit_dest.insert(0, pair["dest"])
self.edit_full_copy.set(pair["full_copy"])
def update_selected_pair_from_edit(self):
pair_id = self.selected_pair_id
if not pair_id:
return
pair = self.get_pair_by_id(pair_id)
if not pair:
return
pair["source"] = self.edit_source.get().strip()
pair["dest"] = self.edit_dest.get().strip()
pair["full_copy"] = bool(self.edit_full_copy.get())
status_text, status_tag = self.validate_pair(pair["source"], pair["dest"])
pair["status"] = status_text
pair["status_tag"] = status_tag
self.update_pair_row(pair)
def validate_pair(self, source: str, dest: str):
if not source or not dest:
return "?", "idle"
if not os.path.exists(source):
return "Нет источника", "error"
if not os.path.exists(dest):
return "Нет назначения", "error"
test_file = os.path.join(dest, 'test_write.tmp')
try:
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
except Exception:
return "Нет доступа", "error"
return "OK", "ok"
def open_selected_destination(self):
pair_id = self.selected_pair_id
if not pair_id:
return
pair = self.get_pair_by_id(pair_id)
if not pair:
return
self.open_destination(pair["dest"])
def toggle_selected_mode(self):
pair_id = self.selected_pair_id
if not pair_id:
return
pair = self.get_pair_by_id(pair_id)
if not pair:
return
pair["full_copy"] = not pair["full_copy"]
self.edit_full_copy.set(pair["full_copy"])
self.update_pair_row(pair)
def browse_folder(self, entry):
"""Открывает диалог выбора папки"""
folder = filedialog.askdirectory(title="Выберите папку")
if folder:
entry.delete(0, tk.END)
entry.insert(0, folder)
def open_destination(self, path: str):
if not path or path.startswith("Например: "):
return
if os.path.exists(path):
with contextlib.suppress(Exception):
os.startfile(path)
def open_log_file(self):
log_path = os.path.join(os.path.dirname(self.get_settings_path()), "backup_copier.log")
if os.path.exists(log_path):
with contextlib.suppress(Exception):
os.startfile(log_path)
def open_settings_folder(self):
folder = os.path.dirname(self.get_settings_path())
if os.path.exists(folder):
with contextlib.suppress(Exception):
os.startfile(folder)
def open_schedule_dialog(self):
dlg = tk.Toplevel(self.root)
dlg.title("Расписание")
dlg.geometry("420x360")
dlg.transient(self.root)
dlg.grab_set()
time_frame = ttk.Frame(dlg)
time_frame.pack(fill=tk.X, pady=5, padx=10)
ttk.Label(time_frame, text="Время:").pack(side=tk.LEFT)
ttk.Combobox(time_frame, textvariable=self.hour_var, values=[f"{h:02d}" for h in range(24)], width=5, state="readonly").pack(side=tk.LEFT, padx=2)
ttk.Label(time_frame, text=":").pack(side=tk.LEFT)
ttk.Combobox(time_frame, textvariable=self.minute_var, values=[f"{m:02d}" for m in range(60)], width=5, state="readonly").pack(side=tk.LEFT, padx=2)
days_frame = ttk.Frame(dlg)
days_frame.pack(fill=tk.X, pady=5, padx=10)
ttk.Label(days_frame, text="Дни:").pack(side=tk.LEFT)
day_labels = [("Mon", "Пн"), ("Tue", "Вт"), ("Wed", "Ср"), ("Thu", "Чт"), ("Fri", "Пт"), ("Sat", "Сб"), ("Sun", "Вс")]
for key, label in day_labels:
ttk.Checkbutton(days_frame, text=label, variable=self.schedule_days_vars[key]).pack(side=tk.LEFT, padx=2)
list_frame = ttk.Frame(dlg)
list_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
tree = ttk.Treeview(list_frame, columns=("time", "days"), show="headings", height=8)
tree.heading("time", text="Время")
tree.heading("days", text="Дни")
tree.column("time", width=80, anchor=tk.CENTER)
tree.column("days", width=220)
scroll = ttk.Scrollbar(list_frame, orient="vertical", command=tree.yview)
tree.configure(yscrollcommand=scroll.set)
tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scroll.pack(side=tk.RIGHT, fill=tk.Y)
def refresh_tree():
for item in tree.get_children():
tree.delete(item)
for idx, entry in enumerate(self.schedules):
days = entry.get("days", [])
days_text = ",".join(days) if days else "Ежедневно"
tree.insert("", "end", iid=f"s{idx}", values=(entry.get("time"), days_text))
def add_entry():
time_str = f"{self.hour_var.get()}:{self.minute_var.get()}"
days = [k for k, v in self.schedule_days_vars.items() if v.get()]
self.schedules.append({"time": time_str, "days": days})
refresh_tree()
self.update_schedule_summary_label()
def remove_entry():
sel = tree.selection()
if not sel:
return
idx = int(sel[0].lstrip("s"))
if 0 <= idx < len(self.schedules):
self.schedules.pop(idx)
refresh_tree()
self.update_schedule_summary_label()
btns = ttk.Frame(dlg)
btns.pack(fill=tk.X, padx=10, pady=8)
ttk.Button(btns, text=" Добавить", command=add_entry).pack(side=tk.LEFT, padx=4)
ttk.Button(btns, text="❌ Удалить", command=remove_entry).pack(side=tk.LEFT, padx=4)
ttk.Button(btns, text="Закрыть", command=dlg.destroy).pack(side=tk.RIGHT, padx=4)
refresh_tree()
def refresh_profile_combo(self):
names = sorted(self.profiles.keys()) if self.profiles else ["Default"]
if not self.profiles:
self.profiles["Default"] = self.default_profile()
self.profile_combo["values"] = names
if self.active_profile.get() not in names:
self.active_profile.set(names[0])
def default_profile(self) -> dict:
return {
"pairs": [],
"schedules": [],
"include_masks": DEFAULT_INCLUDE_MASKS,
"exclude_masks": DEFAULT_EXCLUDE_MASKS,
"min_free_gb": DEFAULT_MIN_FREE_GB,
"max_workers": DEFAULT_MAX_WORKERS,
"verify_only": False,
"last_success_time": None,
"last_result_summary": None,
"last_hashes": {},
}
def save_active_profile(self):
name = self.active_profile.get() or "Default"
profile = self.profiles.get(name, self.default_profile())
profile["pairs"] = [
{
"source": p.get("source", ""),
"dest": p.get("dest", ""),
"full_copy": bool(p.get("full_copy", False)),
}
for p in self.copy_pairs
]
profile["schedules"] = list(self.schedules)
profile["include_masks"] = self.include_masks_var.get()
profile["exclude_masks"] = self.exclude_masks_var.get()
profile["min_free_gb"] = float(self.min_free_gb_var.get() or DEFAULT_MIN_FREE_GB)
profile["max_workers"] = int(self.max_workers_var.get() or DEFAULT_MAX_WORKERS)
profile["verify_only"] = bool(self.verify_only.get())
profile["last_success_time"] = self.last_success_time
profile["last_result_summary"] = self.last_result_summary
profile["last_hashes"] = self.last_hashes
self.profiles[name] = profile
def load_profile(self, name: str):
self.save_active_profile()
profile = self.profiles.get(name) or self.default_profile()
self.active_profile.set(name)
self.include_masks_var.set(profile.get("include_masks", DEFAULT_INCLUDE_MASKS))
self.exclude_masks_var.set(profile.get("exclude_masks", DEFAULT_EXCLUDE_MASKS))
self.min_free_gb_var.set(profile.get("min_free_gb", DEFAULT_MIN_FREE_GB))
self.max_workers_var.set(profile.get("max_workers", DEFAULT_MAX_WORKERS))
self.verify_only.set(profile.get("verify_only", False))
self.last_success_time = profile.get("last_success_time")
self.last_result_summary = profile.get("last_result_summary")
self.last_hashes = profile.get("last_hashes", {})
self.update_last_success_label()
self.update_status_summary()
self.remove_all_pairs(silent=True)
for item in profile.get("pairs", []):
self.add_path_pair(item.get("source", ""), item.get("dest", ""), item.get("full_copy", False))
self.schedules = profile.get("schedules", [])
self.update_schedule_summary_label()
if self.schedules:
first_time = self.schedules[0].get("time", "03:00")
if ":" in first_time:
hour, minute = first_time.split(":")
self.hour_var.set(hour)
self.minute_var.set(minute)
if self.scheduler_enabled.get():
self.start_scheduler()
def add_profile(self):
name = simpledialog.askstring("Новый профиль", "Введите имя профиля:")
if not name:
return
if name in self.profiles:
messagebox.showwarning("Профиль", "Профиль с таким именем уже существует.")
return
self.save_active_profile()
self.profiles[name] = self.default_profile()
self.active_profile.set(name)
self.refresh_profile_combo()
self.load_profile(name)
def rename_profile(self):
old = self.active_profile.get()
if not old:
return
new = simpledialog.askstring("Переименовать профиль", "Новое имя профиля:", initialvalue=old)
if not new or new == old:
return
if new in self.profiles:
messagebox.showwarning("Профиль", "Профиль с таким именем уже существует.")
return
self.save_active_profile()
self.profiles[new] = self.profiles.pop(old)
self.active_profile.set(new)
self.refresh_profile_combo()
def delete_profile(self):
name = self.active_profile.get()
if not name:
return
if not messagebox.askyesno("Профили", f"Удалить профиль '{name}'?"):
return
self.profiles.pop(name, None)
self.refresh_profile_combo()
self.load_profile(self.active_profile.get())
def export_profiles(self):
self.save_active_profile()
path = filedialog.asksaveasfilename(defaultextension=".json", filetypes=[("JSON", "*.json")])
if not path:
return
data = {
"active_profile": self.active_profile.get(),
"profiles": self.profiles
}
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
messagebox.showinfo("Экспорт", "Профили экспортированы.")
def import_profiles(self):
path = filedialog.askopenfilename(filetypes=[("JSON", "*.json")])
if not path:
return
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
profiles = data.get("profiles")
if isinstance(profiles, dict):
self.profiles.update(profiles)
self.refresh_profile_combo()
self.load_profile(data.get("active_profile") or self.active_profile.get())
def refresh_schedules_tree(self):
self.update_schedule_summary_label()
def add_schedule_from_inputs(self):
time_str = f"{self.hour_var.get()}:{self.minute_var.get()}"
days = [k for k, v in self.schedule_days_vars.items() if v.get()]
entry = {"time": time_str, "days": days}
self.schedules.append(entry)
self.update_schedule_summary_label()
if self.scheduler_enabled.get():
self.start_scheduler()
def remove_selected_schedule(self):
if not self.schedules:
return
self.schedules.pop()
self.update_schedule_summary_label()
if self.scheduler_enabled.get():
self.start_scheduler()
def show_hint(self, title: str, message: str):
messagebox.showinfo(title, message)
def open_wizard(self):
if getattr(self, "wizard_window", None) is not None:
try:
self.wizard_window.lift()
return
except Exception:
pass
wiz = tk.Toplevel(self.root)
wiz.title("Мастер настройки")
wiz.geometry("520x320")
wiz.transient(self.root)
wiz.grab_set()
self.wizard_window = wiz
step_var = tk.IntVar(value=0)
source_var = tk.StringVar()
dest_var = tk.StringVar()
full_copy_var = tk.BooleanVar(value=False)
schedule_enabled_var = tk.BooleanVar(value=self.scheduler_enabled.get())
hour_var = tk.StringVar(value=self.hour_var.get())
minute_var = tk.StringVar(value=self.minute_var.get())
autostart_var = tk.BooleanVar(value=self.autostart_enabled.get())
minimize_var = tk.BooleanVar(value=self.minimize_to_tray_enabled.get())
verify_var = tk.BooleanVar(value=self.verify_only.get())
def show_step(index: int):
step_var.set(index)
for i, frame in enumerate(steps):
frame.pack_forget()
if i == index:
frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
back_btn.config(state=tk.NORMAL if index > 0 else tk.DISABLED)
next_btn.config(state=tk.NORMAL if index < len(steps) - 1 else tk.DISABLED)
finish_btn.config(state=tk.NORMAL if index == len(steps) - 1 else tk.DISABLED)
steps = []
step1 = ttk.Frame(wiz)
ttk.Label(step1, text="Шаг 1/3: Пути", font=("Arial", 11, "bold")).pack(anchor=tk.W, pady=5)
ttk.Label(step1, text="Источник:").pack(anchor=tk.W)
src_entry = ttk.Entry(step1, textvariable=source_var)
src_entry.pack(fill=tk.X, pady=2)
ttk.Button(step1, text="Выбрать...", command=lambda: self.browse_folder(src_entry)).pack(anchor=tk.W, pady=2)
ttk.Label(step1, text="Назначение:").pack(anchor=tk.W, pady=5)
dst_entry = ttk.Entry(step1, textvariable=dest_var)
dst_entry.pack(fill=tk.X, pady=2)
ttk.Button(step1, text="Выбрать...", command=lambda: self.browse_folder(dst_entry)).pack(anchor=tk.W, pady=2)
steps.append(step1)
step2 = ttk.Frame(wiz)
ttk.Label(step2, text="Шаг 2/3: Режим копирования", font=("Arial", 11, "bold")).pack(anchor=tk.W, pady=5)
ttk.Checkbutton(step2, text="Копировать всю папку с подпапками", variable=full_copy_var).pack(anchor=tk.W)
steps.append(step2)
step3 = ttk.Frame(wiz)
ttk.Label(step3, text="Шаг 3/3: Расписание и поведение", font=("Arial", 11, "bold")).pack(anchor=tk.W, pady=5)
time_frame = ttk.Frame(step3)
time_frame.pack(fill=tk.X, pady=4)
ttk.Label(time_frame, text="Время:").pack(side=tk.LEFT)
ttk.Combobox(time_frame, textvariable=hour_var, values=[f"{h:02d}" for h in range(24)], width=5, state="readonly").pack(side=tk.LEFT, padx=2)
ttk.Label(time_frame, text=":").pack(side=tk.LEFT)
ttk.Combobox(time_frame, textvariable=minute_var, values=[f"{m:02d}" for m in range(60)], width=5, state="readonly").pack(side=tk.LEFT, padx=2)
ttk.Checkbutton(step3, text="Ежедневно", variable=schedule_enabled_var).pack(anchor=tk.W, pady=4)
ttk.Checkbutton(step3, text="Автозапуск Windows", variable=autostart_var).pack(anchor=tk.W)
ttk.Checkbutton(step3, text="Сворачивать в трей при закрытии", variable=minimize_var).pack(anchor=tk.W)
ttk.Checkbutton(step3, text="Только проверка (без копирования)", variable=verify_var).pack(anchor=tk.W)
steps.append(step3)
controls = ttk.Frame(wiz)
controls.pack(fill=tk.X, pady=5)
back_btn = ttk.Button(controls, text="Назад", command=lambda: show_step(step_var.get() - 1))
next_btn = ttk.Button(controls, text="Далее", command=lambda: show_step(step_var.get() + 1))
def finish():
if source_var.get().strip() and dest_var.get().strip():
self.add_path_pair(source_var.get().strip(), dest_var.get().strip(), full_copy_var.get())
self.hour_var.set(hour_var.get())
self.minute_var.set(minute_var.get())
self.scheduler_enabled.set(schedule_enabled_var.get())
self.autostart_enabled.set(autostart_var.get())
self.minimize_to_tray_enabled.set(minimize_var.get())
self.verify_only.set(verify_var.get())
if self.scheduler_enabled.get():
self.start_scheduler()
else:
self.stop_scheduler()
if self.autostart_enabled.get() != self.is_autostart_enabled():
self.set_autostart_enabled(self.autostart_enabled.get())
wiz.destroy()
self.wizard_window = None
finish_btn = ttk.Button(controls, text="Готово", command=finish)
back_btn.pack(side=tk.LEFT, padx=5)
next_btn.pack(side=tk.LEFT, padx=5)
finish_btn.pack(side=tk.RIGHT, padx=5)
def on_close():
self.wizard_window = None
wiz.destroy()
wiz.protocol("WM_DELETE_WINDOW", on_close)
show_step(0)
def get_valid_pairs(self) -> List[tuple]:
"""Возвращает список валидных пар папок"""
valid_pairs = []
for pair in self.copy_pairs:
source = pair.get("source", "").strip()
dest = pair.get("dest", "").strip()
full_copy = bool(pair.get("full_copy", False))
if source and dest:
valid_pairs.append((source, dest, full_copy))
return valid_pairs
def check_paths(self):
"""Проверяет доступность всех путей"""
self.log_message("🔍 Проверка путей:", "info")
valid_pairs = self.get_valid_pairs()
if not valid_pairs:
self.log_message("❌ Нет заполненных пар путей", "error")
return
for i, (source, dest, full_copy) in enumerate(valid_pairs, 1):
self.log_message(f"\nПара {i}:", "info")
mode_text = "вся папка" if full_copy else "последний файл"
self.log_message(f" Режим: {mode_text}", "info")
status_text, status_tag = self.validate_pair(source, dest)
if status_tag == "ok":
self.log_message(f" ✅ Проверка: OK", "success")
else:
self.log_message(f" ❌ Проверка: {status_text}", "error")
if os.path.exists(source):
bak_files = [p for p in Path(source).iterdir() if p.is_file() and p.suffix.lower() == '.bak']
self.log_message(f" Найдено .bak файлов: {len(bak_files)}", "info")
# Обновляем статусы в таблице
for pair in self.copy_pairs:
status_text, status_tag = self.validate_pair(pair.get("source", ""), pair.get("dest", ""))
pair["status"] = status_text
pair["status_tag"] = status_tag
self.update_pair_row(pair)
def find_latest_file(self, folder_path: str) -> Optional[Path]:
"""Находит самый последний файл в папке"""
try:
include_masks = parse_masks(self.include_masks_var.get())
exclude_masks = parse_masks(self.exclude_masks_var.get())
latest_file = find_latest_file_in_folder(folder_path, include_masks=include_masks, exclude_masks=exclude_masks)
if not latest_file:
if not Path(folder_path).exists():
self.log_message(f"❌ Папка не существует: {folder_path}", "error")
else:
self.log_message(f"⚠️ Не найдено .bak файлов в {folder_path}", "warning")
return None
file_time = datetime.fromtimestamp(latest_file.stat().st_mtime)
self.log_message(
f"📄 Последний файл: {latest_file.name} ({file_time.strftime('%Y-%m-%d %H:%M:%S')})",
"info"
)
return latest_file
except Exception as e:
self.log_message(f"❌ Ошибка при поиске файлов в {folder_path}: {e}", "error")
return None
def run_on_ui(self, func):
"""Планирует выполнение функции в UI-потоке."""
self.queue.put({'type': 'ui', 'func': func})
def set_status_text(self, text: str):
self.root.after(0, lambda: self.status_bar.config(text=text))
def set_progress_total(self, total: int):
def _set():
self.progress.config(maximum=max(1, total))
self.progress_var.set(0)
self.root.after(0, _set)
def step_progress(self, step: int = 1):
def _step():
self.progress_var.set(self.progress_var.get() + step)
self.root.after(0, _step)
def check_previous_hash(self, target_file: Path):
key = str(target_file)
if key in self.last_hashes:
try:
current_hash = compute_file_checksum(target_file)
if current_hash != self.last_hashes[key]:
self.log_message(f"⚠️ Несовпадение хеша прошлой копии: {target_file.name}", "warning")
except Exception as e:
self.log_message(f"⚠️ Не удалось проверить прошлую копию: {e}", "warning")
def copy_file_with_retries(self, source: Path, target: Path) -> bool:
for attempt in range(COPY_RETRIES + 1):
try:
shutil.copy2(source, target)
return True
except Exception as e:
if attempt >= COPY_RETRIES:
self.log_message(f"❌ Ошибка при копировании {source.name}: {e}", "error")
return False
time_module.sleep(COPY_RETRY_DELAY)
return False
def verify_only_mode(self, source: Path, target: Path) -> bool:
if not target.exists():
self.log_message(f"❌ Нет целевого файла для проверки: {target.name}", "error")
return False
if not should_copy_file(source, target):
self.log_message(f"⏭️ Проверка пропущена (не изменен): {target.name}", "warning")
return True
if verify_copy(source, target):
self.log_message(f"✅ Проверка OK: {target.name}", "success")
return True
self.log_message(f"❌ Проверка не пройдена: {target.name}", "error")
return False
def has_enough_space(self, dest_path: Path, size_bytes: int) -> bool:
min_gb = float(self.min_free_gb_var.get() or DEFAULT_MIN_FREE_GB)
free_gb = get_free_space_gb(dest_path)
if free_gb < min_gb:
self.log_message(f"❌ Недостаточно места на диске: свободно {free_gb:.2f} ГБ, минимум {min_gb} ГБ", "error")
return False
if size_bytes > 0 and (free_gb * 1024 ** 3) < size_bytes:
self.log_message(f"❌ Недостаточно места для файла: {size_bytes} байт", "error")
return False
return True
def copy_full_folder(self, source: Path, dest: Path) -> tuple:
copied = 0
skipped = 0
errors = 0
for root, _dirs, files in os.walk(source):
rel = os.path.relpath(root, source)
dest_root = dest / rel if rel != "." else dest
dest_root.mkdir(parents=True, exist_ok=True)
for fname in files:
src_file = Path(root) / fname
dst_file = dest_root / fname
try:
self.set_status_text(f"Копируется: {src_file.name}")
self.check_previous_hash(dst_file) if dst_file.exists() else None
if self.verify_only.get():
if self.verify_only_mode(src_file, dst_file):
skipped += 1
else:
errors += 1
self.step_progress()
continue
if should_copy_file(src_file, dst_file):
if not self.has_enough_space(dest_root, src_file.stat().st_size):
errors += 1
self.step_progress()
continue
if self.copy_file_with_retries(src_file, dst_file) and verify_copy(src_file, dst_file):
copied += 1
self.last_hashes[str(dst_file)] = compute_file_checksum(dst_file)
else:
errors += 1
self.step_progress()
else:
skipped += 1
self.step_progress()
except Exception as e:
errors += 1
self.step_progress()
msg = f"❌ Ошибка при копировании {src_file.name}: {e}"
self.log_message(msg, "error")
self.record_error_detail(msg)
return copied, skipped, errors
def process_pair(self, source: str, dest: str, full_copy: bool) -> tuple:
copied_files = 0
skipped_files = 0
error_files = 0
self.log_message(f"\n📁 Обработка папки: {source}", "info")
src_path = Path(source)
dest_path = Path(dest)
if full_copy:
if not src_path.exists():
msg = f"❌ Папка не существует: {source}"
self.log_message(msg, "error")
self.record_error_detail(msg)
return copied_files, skipped_files, error_files + 1
try:
dest_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
msg = f"Не могу создать папку {dest}: {e}"
self.log_message(msg, "error")
self.record_error_detail(msg)
return copied_files, skipped_files, error_files + 1
c, s, e = self.copy_full_folder(src_path, dest_path)
return copied_files + c, skipped_files + s, error_files + e
latest_file = self.find_latest_file(source)
if latest_file:
try:
dest_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
msg = f"Не могу создать папку {dest}: {e}"
self.log_message(msg, "error")
self.record_error_detail(msg)
return copied_files, skipped_files, error_files + 1
target_file = dest_path / latest_file.name
try:
self.set_status_text(f"Копируется: {latest_file.name}")
if target_file.exists():
self.check_previous_hash(target_file)
if self.verify_only.get():
if self.verify_only_mode(latest_file, target_file):
skipped_files += 1
else:
error_files += 1
self.step_progress()
return copied_files, skipped_files, error_files
target_existed = target_file.exists()
if should_copy_file(latest_file, target_file):
if not self.has_enough_space(dest_path, latest_file.stat().st_size):
self.step_progress()
return copied_files, skipped_files, error_files + 1
if self.copy_file_with_retries(latest_file, target_file) and verify_copy(latest_file, target_file):
copied_files += 1
self.last_hashes[str(target_file)] = compute_file_checksum(target_file)
if target_existed:
self.log_message(f"✅ Обновлен: {latest_file.name} (новее)", "success")
else:
self.log_message(f"✅ Скопирован: {latest_file.name}", "success")
else:
error_files += 1
msg = f"❌ Контрольная сумма не совпала: {latest_file.name}"
self.log_message(msg, "error")
self.record_error_detail(msg)
self.step_progress()
else:
skipped_files += 1
self.log_message(f"⏭️ Пропущен: {latest_file.name} (уже актуален)", "warning")
self.step_progress()
except Exception as e:
error_files += 1
self.step_progress()
msg = f"❌ Ошибка при копировании {latest_file.name}: {e}"
self.log_message(msg, "error")
self.record_error_detail(msg)
else:
error_files += 1
self.step_progress()
return copied_files, skipped_files, error_files
def copy_files_thread(self, pairs: List[tuple], background: bool = False):
"""Поток для копирования последних файлов"""
if not self.copy_lock.acquire(blocking=False):
self.log_message("⚠️ Копирование уже выполняется", "warning")
return
self.is_copying = True
self.root.after(0, lambda: self.status_bar.config(text="Идет копирование..."))
total_units = 0
for source, dest, full_copy in pairs:
if full_copy:
if os.path.exists(source):
for root, _dirs, files in os.walk(source):
for fname in files:
total_units += 1
else:
include_masks = parse_masks(self.include_masks_var.get())
exclude_masks = parse_masks(self.exclude_masks_var.get())
latest = find_latest_file_in_folder(source, include_masks=include_masks, exclude_masks=exclude_masks)
if latest:
total_units += 1
self.set_progress_total(total_units or 1)
try:
copied_files = 0
skipped_files = 0
error_files = 0
if not background:
self.log_message("\n" + "=" * 50, "info")
self.log_message("🚀 Начало копирования последних файлов", "info")
max_workers = max(1, int(self.max_workers_var.get() or DEFAULT_MAX_WORKERS))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.process_pair, source, dest, full_copy) for source, dest, full_copy in pairs]
for future in as_completed(futures):
try:
c, s, e = future.result()
copied_files += c
skipped_files += s
error_files += e
except Exception as e:
error_files += 1
msg = f"❌ Ошибка задачи копирования: {e}"
self.log_message(msg, "error")
self.record_error_detail(msg)
# Итог
self.log_message("\n" + "=" * 50, "info")
self.log_message("📊 ИТОГ:", "info")
self.log_message(f" ✅ Скопировано/обновлено: {copied_files}", "success")
self.log_message(f" ⏭️ Пропущено: {skipped_files}", "warning")
self.log_message(f" ❌ Ошибок: {error_files}", "error")
if background:
self.log_message("⏰ Копирование по расписанию завершено", "info")
else:
self.run_on_ui(lambda: messagebox.showinfo(
"Готово",
f"Копирование завершено!\n\n✅ Скопировано: {copied_files}\n⏭️ Пропущено: {skipped_files}\n❌ Ошибок: {error_files}"
))
self.last_result_summary = f"{copied_files} / ⏭️ {skipped_files} / ❌ {error_files}"
self.run_on_ui(self.update_status_summary)
if error_files == 0:
self.last_success_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.run_on_ui(self.update_last_success_label)
self.tray_notify("Копирование завершено", f"Успешно: {copied_files}, Пропущено: {skipped_files}")
else:
detail = self.last_error_detail or "Неизвестная ошибка"
self.tray_notify("Копирование с ошибками", f"Ошибок: {error_files}. {detail}")
except Exception as e:
self.log_message(f"❌ Критическая ошибка: {e}\n{traceback.format_exc()}", "error")
self.run_on_ui(lambda: messagebox.showerror("Ошибка", f"Произошла ошибка:\n{e}"))
finally:
self.is_copying = False
self.copy_lock.release()
self.root.after(0, lambda: self.status_bar.config(text="Готов к работе"))
self.root.after(0, lambda: self.progress_var.set(0))
def start_manual_copy(self):
"""Запускает ручное копирование"""
valid_pairs = self.get_valid_pairs()
if not valid_pairs:
messagebox.showwarning("Предупреждение", "Заполните хотя бы одну пару папок!")
return
# Запускаем копирование в отдельном потоке
copy_thread = threading.Thread(
target=self.copy_files_thread,
args=(valid_pairs, False),
daemon=True
)
copy_thread.start()
def toggle_scheduler(self):
"""Включает/выключает планировщик"""
if self.scheduler_enabled.get():
self.start_scheduler()
else:
self.stop_scheduler()
def start_scheduler(self):
"""Запускает планировщик"""
valid_pairs = self.get_valid_pairs()
if not valid_pairs:
messagebox.showwarning("Предупреждение",
"Нет настроенных путей! Планировщик не запущен.")
self.scheduler_enabled.set(False)
return
if not self.schedules:
# создаем расписание по умолчанию из полей времени
self.schedules = [{"time": f"{self.hour_var.get()}:{self.minute_var.get()}", "days": []}]
self.refresh_schedules_tree()
self.scheduler.schedule_copy_job("backup_job", self.schedules, valid_pairs)
self.scheduler.start()
self.scheduler_status.config(text=f"(активен, заданий: {len(self.schedules)})", foreground="green")
self.update_next_run_label()
self.update_last_success_label()
self.log_message("🕒 Планировщик запущен.", "info")
def stop_scheduler(self):
"""Останавливает планировщик"""
self.scheduler.stop()
self.scheduler_status.config(text="(остановлен)", foreground="red")
self.next_run_label.config(text="(следующий запуск: —)")
self.log_message("🕒 Планировщик остановлен", "info")
def update_next_run_label(self):
jobs = schedule.get_jobs("backup_job")
if jobs:
next_run = jobs[0].next_run
if next_run:
self.next_run_label.config(text=f"(следующий запуск: {next_run.strftime('%Y-%m-%d %H:%M')})")
return
self.next_run_label.config(text="(следующий запуск: —)")
def update_last_success_label(self):
if self.last_success_time:
self.last_success_label.config(text=f"(последний успех: {self.last_success_time})")
else:
self.last_success_label.config(text="(последний успех: —)")
def update_status_summary(self):
summary = self.last_result_summary or ""
self.status_summary_label.config(text=summary)
self.last_result_label.config(text=f"(последний результат: {summary})")
def update_schedule_summary_label(self):
if not self.schedules:
self.schedule_summary_label.config(text="Расписание: —")
return
parts = []
for entry in self.schedules:
time_str = entry.get("time", "")
days = entry.get("days", [])
if days:
days_text = ",".join(days)
parts.append(f"{time_str} ({days_text})")
else:
parts.append(f"{time_str} (ежедневно)")
self.schedule_summary_label.config(text="Расписание: " + "; ".join(parts))
def debug_settings(self):
"""Отладочный метод для проверки загрузки настроек"""
self.log_message("\n" + "=" * 50, "info")
self.log_message("🔧 ОТЛАДКА", "info")
self.log_message("=" * 50, "info")
settings_path = self.get_settings_path()
self.log_message(f"📁 Путь к настройкам: {settings_path}", "info")
self.log_message(f"📁 Файл существует: {os.path.exists(settings_path)}", "info")
if os.path.exists(settings_path):
try:
with open(settings_path, 'r', encoding='utf-8') as f:
content = f.read()
self.log_message(f"📄 Содержимое файла:", "info")
for line in content.split('\n'):
self.log_message(f" {line}", "info")
except Exception as e:
self.log_message(f"Не удалось прочитать файл: {e}", "error")
self.log_message(f"\n🖥️ Текущее состояние:", "info")
self.log_message(f" Пар в интерфейсе: {len(self.copy_pairs)}", "info")
for i, pair in enumerate(self.copy_pairs, 1):
source = pair.get("source", "")
dest = pair.get("dest", "")
mode = "вся папка" if pair.get("full_copy", False) else "последний файл"
self.log_message(f" Пара {i}:", "info")
self.log_message(f" Откуда: '{source}'", "info")
self.log_message(f" Куда: '{dest}'", "info")
self.log_message(f" Режим: {mode}", "info")
self.log_message("=" * 50, "info")
def log_message(self, message, tag=None):
"""Добавляет сообщение в лог (вызывается из любого потока)"""
# Используем queue для потокобезопасности
self.queue.put({'type': 'log', 'message': message, 'tag': tag})
file_logger = getattr(self, "file_logger", None)
if file_logger:
try:
level = logging.INFO
if tag == "error":
level = logging.ERROR
elif tag == "warning":
level = logging.WARNING
file_logger.log(level, message)
except Exception:
pass
def record_error_detail(self, message: str):
self.last_error_detail = message
def clear_log(self):
"""Очищает лог"""
self.log_text.delete(1.0, tk.END)
def process_queue(self):
"""Обрабатывает очередь сообщений из потоков"""
try:
while True:
msg = self.queue.get_nowait()
if msg['type'] == 'log':
timestamp = datetime.now().strftime("%H:%M:%S")
log_entry = f"[{timestamp}] {msg['message']}\n"
self.log_text.insert(tk.END, log_entry, msg.get('tag'))
self.log_text.see(tk.END)
elif msg['type'] == 'ui':
try:
msg['func']()
except Exception as e:
self.log_message(f"❌ Ошибка UI: {e}", "error")
except queue.Empty:
pass
finally:
self.root.after(100, self.process_queue)
def check_and_save(self):
self.check_paths()
self.save_settings()
def create_tray_image(self):
if Image is None:
return None
icon_path = get_resource_path(ICON_PATH)
if os.path.exists(icon_path):
with contextlib.suppress(Exception):
return Image.open(icon_path)
image = Image.new("RGB", (64, 64), color=(40, 40, 40))
draw = ImageDraw.Draw(image)
draw.rectangle((8, 8, 56, 56), fill=(30, 144, 255))
draw.text((18, 20), "B", fill=(255, 255, 255))
return image
def setup_tray_icon(self):
if pystray is None:
self.log_message("⚠️ Модуль pystray не установлен, трей недоступен", "warning")
return
if self.tray_icon is not None:
return
image = self.create_tray_image()
if image is None:
self.log_message("⚠️ Не удалось создать иконку для трея", "warning")
return
def on_show(_icon, _item):
self.root.after(0, self.show_window)
def on_copy(_icon, _item):
self.root.after(0, self.start_manual_copy)
def on_exit(_icon, _item):
self.root.after(0, self.exit_app)
menu = pystray.Menu(
pystray.MenuItem("Открыть", on_show, default=True),
pystray.MenuItem("Запустить копирование", on_copy),
pystray.MenuItem("Выход", on_exit),
)
self.tray_icon = pystray.Icon(APP_NAME, image, APP_NAME, menu)
self.tray_thread = threading.Thread(target=self.tray_icon.run, daemon=True)
self.tray_thread.start()
def tray_notify(self, title: str, message: str):
if self.tray_icon is None:
return
with contextlib.suppress(Exception):
self.tray_icon.notify(message, title)
def show_window(self):
self.root.deiconify()
self.root.lift()
self.root.focus_force()
def hide_to_tray(self):
if not self.minimize_to_tray_enabled.get():
self.exit_app()
return
if pystray is None:
self.root.iconify()
self.log_message("⚠️ pystray не установлен, окно свернуто в панель задач", "warning")
return
self.setup_tray_icon()
self.root.withdraw()
self.log_message("🧰 Приложение свернуто в трей", "info")
def exit_app(self):
if self.scheduler_enabled.get():
self.stop_scheduler()
if self.tray_icon is not None:
with contextlib.suppress(Exception):
self.tray_icon.stop()
self.tray_icon = None
self.root.destroy()
def main():
mutex = ensure_single_instance()
if mutex is None:
ctypes.windll.user32.MessageBoxW(None, "Приложение уже запущено.", APP_NAME, 0x00000010)
return
root = tk.Tk()
app = BackgroundFileCopyApp(root)
# Обработка закрытия окна
def on_closing():
app.hide_to_tray()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
if __name__ == "__main__":
main()