import tkinter as tk from tkinter import ttk, filedialog, messagebox, scrolledtext, simpledialog import shutil import os from pathlib import Path import threading import queue from datetime import datetime, time import time as time_module import schedule import sys import json import tempfile from typing import List, Dict, Optional import traceback import hashlib import winreg import contextlib import logging import logging.handlers import fnmatch import ctypes import calendar from concurrent.futures import ThreadPoolExecutor, as_completed try: import pystray from PIL import Image, ImageDraw except Exception: pystray = None Image = None ImageDraw = None DEFAULT_EXTENSIONS = ('*.bak', '*.BAK', '*.backup', '*.sql') AUTOSTART_REG_PATH = r"Software\Microsoft\Windows\CurrentVersion\Run" AUTOSTART_REG_NAME = "Copyrka" ICON_PATH = "icon.ico" APP_NAME = "Copyrka" COPY_RETRIES = 2 COPY_RETRY_DELAY = 2 DEFAULT_MIN_FREE_GB = 1 DEFAULT_MAX_WORKERS = 3 DEFAULT_KEEP_FILES = 1 DEFAULT_INCLUDE_MASKS = "*.bak;*.sql;*.backup" DEFAULT_EXCLUDE_MASKS = "*.tmp;*.temp" MUTEX_NAME = "Global\\BackupCopierMutex" def find_latest_file_in_folder(folder_path: str, extensions=DEFAULT_EXTENSIONS, include_masks: Optional[List[str]] = None, exclude_masks: Optional[List[str]] = None) -> Optional[Path]: """Возвращает самый новый файл из папки по времени модификации.""" folder = Path(folder_path) if not folder.exists(): return None files: List[Path] = [] include_masks = include_masks or [] exclude_masks = exclude_masks or [] if include_masks: for mask in include_masks: files.extend(folder.glob(mask)) else: for ext in extensions: files.extend(folder.glob(ext)) filtered = [] seen = set() for f in files: if not f.is_file(): continue if f.name in seen: continue if matches_masks(f.name, include_masks, exclude_masks): filtered.append(f) seen.add(f.name) if not filtered: return None return max(filtered, key=lambda f: f.stat().st_mtime) def should_copy_file(source: Path, target: Path) -> bool: """Определяет, нужно ли копировать файл.""" if not target.exists(): return True return source.stat().st_mtime > target.stat().st_mtime def compute_file_checksum(path: Path, chunk_size: int = 1024 * 1024) -> str: """Считает SHA-256 контрольную сумму файла.""" hasher = hashlib.sha256() with path.open("rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break hasher.update(chunk) return hasher.hexdigest() def verify_copy(source: Path, target: Path) -> bool: """Проверяет, что файл скопирован корректно, по контрольной сумме.""" return compute_file_checksum(source) == compute_file_checksum(target) def compare_file_checksums(source: Path, target: Path) -> tuple[bool, str, str]: src_hash = compute_file_checksum(source) dst_hash = compute_file_checksum(target) return src_hash == dst_hash, src_hash, dst_hash def get_resource_path(relative_path: str) -> str: """Возвращает абсолютный путь к ресурсу (поддерживает PyInstaller).""" if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): return os.path.join(sys._MEIPASS, relative_path) return os.path.abspath(relative_path) def parse_masks(mask_text: str) -> List[str]: parts = [p.strip() for p in (mask_text or "").split(";")] return [p for p in parts if p] def matches_masks(filename: str, include_masks: List[str], exclude_masks: List[str]) -> bool: if include_masks: if not any(fnmatch.fnmatch(filename, m) for m in include_masks): return False if exclude_masks: if any(fnmatch.fnmatch(filename, m) for m in exclude_masks): return False return True def get_free_space_gb(path: Path) -> float: try: usage = shutil.disk_usage(str(path)) return usage.free / (1024 ** 3) except Exception: return 0.0 def get_total_copy_size(source: Path, dest: Path) -> int: total = 0 for root, _dirs, files in os.walk(source): rel = os.path.relpath(root, source) dest_root = dest / rel if rel != "." else dest for fname in files: src_file = Path(root) / fname dst_file = dest_root / fname try: if should_copy_file(src_file, dst_file): total += src_file.stat().st_size except Exception: pass return total def ensure_single_instance() -> Optional[int]: """Создает системный mutex. Возвращает handle или None если уже запущено.""" mutex = ctypes.windll.kernel32.CreateMutexW(None, True, MUTEX_NAME) already_exists = ctypes.windll.kernel32.GetLastError() == 183 if already_exists: return None return mutex class FileCopyScheduler: """Класс для управления расписанием копирования""" def __init__(self, app): self.app = app self.scheduled_jobs = [] self.running = False self.thread = None def start(self): """Запускает планировщик в отдельном потоке""" if not self.running: self.running = True self.thread = threading.Thread(target=self._run_scheduler, daemon=True) self.thread.start() self.app.log_message("🕒 Планировщик запущен", "info") def stop(self): """Останавливает планировщик""" self.running = False self.app.log_message("🕒 Планировщик остановлен", "info") def _run_scheduler(self): """Основной цикл планировщика""" while self.running: schedule.run_pending() time_module.sleep(1) def schedule_copy_job(self, job_id: str, schedules: List[dict], pairs: List[tuple]): """Добавляет задания в планировщик""" # Очищаем предыдущие задания с таким же ID schedule.clear(job_id) if not schedules: return day_map = { "Mon": schedule.every().monday, "Tue": schedule.every().tuesday, "Wed": schedule.every().wednesday, "Thu": schedule.every().thursday, "Fri": schedule.every().friday, "Sat": schedule.every().saturday, "Sun": schedule.every().sunday, } for entry in schedules: time_str = entry.get("time", "03:00") sched_type = entry.get("type", "daily") days = entry.get("days", []) if sched_type == "monthly": day_num = int(entry.get("day", 1)) schedule.every().day.at(time_str).do(self._execute_copy_job_if_monthday, pairs=pairs, day=day_num).tag(job_id) self.app.log_message(f"📅 Запланировано копирование на {time_str} (ежемесячно, день {day_num})", "info") elif sched_type == "weekly" and days: for day in days: if day in day_map: day_map[day].at(time_str).do(self._execute_copy_job, pairs=pairs).tag(job_id) self.app.log_message(f"📅 Запланировано копирование на {time_str} ({', '.join(days)})", "info") else: schedule.every().day.at(time_str).do(self._execute_copy_job, pairs=pairs).tag(job_id) self.app.log_message(f"📅 Запланировано копирование на {time_str} ежедневно", "info") def _execute_copy_job(self, pairs: List[tuple]): """Выполняет задание копирования""" self.app.log_message("⏰ Запуск запланированного копирования...", "info") # Запускаем копирование в отдельном потоке copy_thread = threading.Thread( target=self.app.copy_files_thread, args=(pairs, True), # True означает фоновый режим daemon=True ) copy_thread.start() def _execute_copy_job_if_monthday(self, pairs: List[tuple], day: int): today = datetime.now() last_day = calendar.monthrange(today.year, today.month)[1] run_day = min(int(day), last_day) if today.day != run_day: return self._execute_copy_job(pairs) class BackgroundFileCopyApp: def __init__(self, root): self.root = root self.root.title("Копирка") self.root.geometry("900x800") self.root.minsize(900, 700) self.setup_window_icon() # Для работы с очередью сообщений из потоков self.queue = queue.Queue() # Список пар для копирования self.copy_pairs = [] self.pair_counter = 0 self.selected_pair_id = None self.inline_editor = None self.profile_combo = None self.cancel_event = threading.Event() self.autosave_job = None self.autosave_enabled = True self.skip_cleanup_confirm = False self.loading_settings = False # Планировщик self.scheduler = FileCopyScheduler(self) # Флаг для отслеживания состояния self.is_copying = False self.copy_lock = threading.Lock() self.active_files_lock = threading.Lock() self.active_files: Dict[str, int] = {} # Автозапуск self.autostart_enabled = tk.BooleanVar(value=False) # Сворачивание в трей self.minimize_to_tray_enabled = tk.BooleanVar(value=True) self.tray_icon = None self.tray_thread = None # Режим проверки без копирования self.verify_only = tk.BooleanVar(value=False) # Последний успешный запуск self.last_success_time: Optional[str] = None self.last_result_summary: Optional[str] = None self.last_error_detail: Optional[str] = None # Хеши последних копий self.last_hashes: Dict[str, str] = {} # Профили self.profiles: Dict[str, dict] = {} self.active_profile = tk.StringVar(value="Default") self.include_masks_var = tk.StringVar(value=DEFAULT_INCLUDE_MASKS) self.exclude_masks_var = tk.StringVar(value=DEFAULT_EXCLUDE_MASKS) self.min_free_gb_var = tk.DoubleVar(value=DEFAULT_MIN_FREE_GB) self.max_workers_var = tk.IntVar(value=DEFAULT_MAX_WORKERS) self.cleanup_old_var = tk.BooleanVar(value=False) self.keep_files_var = tk.IntVar(value=DEFAULT_KEEP_FILES) self.schedules: List[dict] = [] self.schedule_days_vars = { "Mon": tk.BooleanVar(value=False), "Tue": tk.BooleanVar(value=False), "Wed": tk.BooleanVar(value=False), "Thu": tk.BooleanVar(value=False), "Fri": tk.BooleanVar(value=False), "Sat": tk.BooleanVar(value=False), "Sun": tk.BooleanVar(value=False), } self.schedule_type_var = tk.StringVar(value="daily") self.schedule_monthday_var = tk.IntVar(value=1) # Логгер в файл self.file_logger = self.setup_file_logger() # Создаем интерфейс self.setup_ui() # Загружаем сохраненные настройки self.load_settings() # Проверяем очередь каждые 100ms self.process_queue() # Запускаем планировщик при старте, если есть сохраненные настройки if self.scheduler_enabled.get(): self.start_scheduler() def setup_ui(self): main_frame = ttk.Frame(self.root, padding="10") main_frame.pack(fill=tk.BOTH, expand=True) notebook = ttk.Notebook(main_frame) notebook.pack(fill=tk.BOTH, expand=True) settings_tab = ttk.Frame(notebook) log_tab = ttk.Frame(notebook) help_tab = ttk.Frame(notebook) notebook.add(settings_tab, text="Настройки") notebook.add(log_tab, text="Лог") notebook.add(help_tab, text="Справка") title_label = ttk.Label(settings_tab, text="Копирка", font=("Arial", 14, "bold")) title_label.pack(pady=8) toolbar = ttk.Frame(settings_tab) toolbar.pack(fill=tk.X, pady=2) toolbar_row1 = ttk.Frame(toolbar) toolbar_row1.pack(fill=tk.X, pady=1) toolbar_row2 = ttk.Frame(toolbar) toolbar_row2.pack(fill=tk.X, pady=1) ttk.Button(toolbar_row1, text="➕ Добавить", command=self.add_path_pair).pack(side=tk.LEFT, padx=4) ttk.Button(toolbar_row1, text="❌ Удалить", command=self.remove_selected_pair).pack(side=tk.LEFT, padx=4) ttk.Button(toolbar_row1, text="📂 Источник", command=lambda: self.pick_folder_for_item(self.selected_pair_id, "source", "Выберите папку источника")).pack(side=tk.LEFT, padx=4) ttk.Button(toolbar_row1, text="📂 Назначение", command=lambda: self.pick_folder_for_item(self.selected_pair_id, "dest", "Выберите папку назначения")).pack(side=tk.LEFT, padx=4) self.search_var = tk.StringVar(value="") self.filter_status_var = tk.StringVar(value="Все") self.compact_view_var = tk.BooleanVar(value=False) ttk.Label(toolbar_row2, text="Поиск:").pack(side=tk.LEFT, padx=6) search_entry = ttk.Entry(toolbar_row2, textvariable=self.search_var, width=20) search_entry.pack(side=tk.LEFT, padx=4) search_entry.bind("", lambda _e: self.render_pairs()) ttk.Label(toolbar_row2, text="Статус:").pack(side=tk.LEFT, padx=6) status_combo = ttk.Combobox(toolbar_row2, textvariable=self.filter_status_var, state="readonly", values=["Все", "OK", "Нет источника", "Нет назначения", "Нет доступа", "—"], width=14) status_combo.pack(side=tk.LEFT, padx=4) status_combo.bind("<>", lambda _e: self.render_pairs()) ttk.Checkbutton(toolbar_row2, text="Компактно", variable=self.compact_view_var, command=self.toggle_compact_view).pack(side=tk.LEFT, padx=6) # Профили скрыты по запросу пользователя status_frame = ttk.LabelFrame(settings_tab, text="Состояние", padding="10") status_frame.pack(fill=tk.X, pady=5) self.status_summary_label = ttk.Label(status_frame, text="—") self.status_summary_label.pack(side=tk.LEFT) self.next_run_label = ttk.Label(status_frame, text="(следующий запуск: —)", font=("Arial", 9, "italic")) self.next_run_label.pack(side=tk.LEFT, padx=10) self.last_success_label = ttk.Label(status_frame, text="(последний успех: —)", font=("Arial", 9, "italic")) self.last_success_label.pack(side=tk.LEFT, padx=10) self.last_result_label = ttk.Label(status_frame, text="(последний результат: —)", font=("Arial", 9, "italic")) self.last_result_label.pack(side=tk.LEFT, padx=10) schedule_frame = ttk.LabelFrame(settings_tab, text="Расписание и поведение", padding="10") schedule_frame.pack(fill=tk.X, pady=5) scheduler_box = ttk.LabelFrame(schedule_frame, text="Планировщик", padding="8") scheduler_box.pack(fill=tk.X, pady=(0, 6)) behavior_box = ttk.LabelFrame(schedule_frame, text="Параметры копирования", padding="8") behavior_box.pack(fill=tk.X) time_frame = ttk.Frame(scheduler_box) time_frame.pack(fill=tk.X, pady=5) self.hour_var = tk.StringVar(value="03") self.minute_var = tk.StringVar(value="00") self.scheduler_enabled = tk.BooleanVar(value=False) self.scheduler_check = ttk.Checkbutton(time_frame, text="Включить расписание", variable=self.scheduler_enabled, command=self.toggle_scheduler) self.scheduler_check.pack(side=tk.LEFT, padx=10) self.scheduler_status = ttk.Label(time_frame, text="(остановлен)", font=("Arial", 9, "italic")) self.scheduler_status.pack(side=tk.LEFT, padx=10) schedule_summary_frame = ttk.Frame(scheduler_box) schedule_summary_frame.pack(fill=tk.X, pady=4) self.schedule_summary_label = ttk.Label(schedule_summary_frame, text="Расписание: —") self.schedule_summary_label.pack(side=tk.LEFT) ttk.Button(schedule_summary_frame, text="Расписание...", command=self.open_schedule_dialog).pack(side=tk.LEFT, padx=8) options_frame = ttk.Frame(behavior_box) options_frame.pack(fill=tk.X, pady=5) options_row1 = ttk.Frame(options_frame) options_row1.pack(fill=tk.X, pady=3) options_row2 = ttk.Frame(options_frame) options_row2.pack(fill=tk.X, pady=3) self.autostart_check = ttk.Checkbutton( options_row1, text="Автозапуск Windows", variable=self.autostart_enabled, command=self.toggle_autostart ) autostart_help = ttk.Button( options_row1, text="?", width=2, command=lambda: self.show_hint("Автозапуск", "Добавляет программу в автозапуск Windows.") ) self.minimize_check = ttk.Checkbutton( options_row1, text="Сворачивать в трей при закрытии", variable=self.minimize_to_tray_enabled ) minimize_help = ttk.Button( options_row1, text="?", width=2, command=lambda: self.show_hint("Сворачивание", "По крестику окно скрывается в трей.") ) self.autostart_check.grid(row=0, column=0, sticky="w", padx=(0, 4)) autostart_help.grid(row=0, column=1, sticky="w", padx=(0, 12)) self.minimize_check.grid(row=0, column=2, sticky="w", padx=(0, 4)) minimize_help.grid(row=0, column=3, sticky="w") self.verify_check = ttk.Checkbutton( options_row2, text="Только проверка (без копирования)", variable=self.verify_only ) verify_help = ttk.Button( options_row2, text="?", width=2, command=lambda: self.show_hint("Проверка", "Проверяет совпадение контрольных сумм без копирования.") ) self.cleanup_check = ttk.Checkbutton( options_row2, text="Удалять старые файлы в назначении", variable=self.cleanup_old_var, command=self.on_cleanup_toggle ) cleanup_help = ttk.Button( options_row2, text="?", width=2, command=lambda: self.show_hint("Очистка", "В режиме 'последний файл' удаляет старые файлы в папке назначения.") ) self.verify_check.grid(row=0, column=0, sticky="w", padx=(0, 4)) verify_help.grid(row=0, column=1, sticky="w", padx=(0, 12)) self.cleanup_check.grid(row=0, column=2, sticky="w", padx=(0, 4)) cleanup_help.grid(row=0, column=3, sticky="w") perf_frame = ttk.Frame(behavior_box) perf_frame.pack(fill=tk.X, pady=4) ttk.Label(perf_frame, text="Мин. свободно, ГБ:", width=20, anchor=tk.W).pack(side=tk.LEFT) self.min_free_entry = ttk.Entry(perf_frame, textvariable=self.min_free_gb_var, width=6) self.min_free_entry.pack(side=tk.LEFT, padx=(2, 6)) self.min_free_entry.bind("", lambda _e: self.request_autosave()) ttk.Label(perf_frame, text="Параллельных потоков:", width=23, anchor=tk.W).pack(side=tk.LEFT, padx=(4, 0)) self.max_workers_entry = ttk.Entry(perf_frame, textvariable=self.max_workers_var, width=4) self.max_workers_entry.pack(side=tk.LEFT, padx=(2, 6)) self.max_workers_entry.bind("", lambda _e: self.request_autosave()) ttk.Label(perf_frame, text="Хранить последних:", width=19, anchor=tk.W).pack(side=tk.LEFT, padx=(4, 0)) self.keep_files_spin = ttk.Spinbox(perf_frame, from_=1, to=999, width=4, textvariable=self.keep_files_var) self.keep_files_spin.pack(side=tk.LEFT, padx=(2, 0)) self.keep_files_spin.bind("", lambda _e: self.request_autosave()) self.keep_files_spin.bind("", lambda _e: self.request_autosave()) masks_frame = ttk.Frame(behavior_box) masks_frame.pack(fill=tk.X, pady=4) ttk.Label(masks_frame, text="Включать маски:").pack(side=tk.LEFT) self.include_entry = ttk.Entry(masks_frame, textvariable=self.include_masks_var) self.include_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=4) self.include_entry.bind("", lambda _e: self.request_autosave()) ttk.Label(masks_frame, text="Исключать маски:").pack(side=tk.LEFT, padx=6) self.exclude_entry = ttk.Entry(masks_frame, textvariable=self.exclude_masks_var) self.exclude_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=4) self.exclude_entry.bind("", lambda _e: self.request_autosave()) ttk.Button(masks_frame, text="Шаблоны", command=self.show_mask_templates).pack(side=tk.LEFT, padx=6) buttons_frame = ttk.Frame(behavior_box) buttons_frame.pack(fill=tk.X, pady=5) ttk.Button(buttons_frame, text="🧭 Мастер настройки", command=self.open_wizard).pack(side=tk.LEFT, padx=5) ttk.Button(buttons_frame, text="▶ Запустить сейчас", command=self.start_manual_copy).pack(side=tk.LEFT, padx=5) ttk.Button(buttons_frame, text="⏹ Остановить", command=self.stop_copying).pack(side=tk.LEFT, padx=5) ttk.Button(buttons_frame, text="💾 Сохранить", command=self.check_and_save).pack(side=tk.LEFT, padx=5) paths_frame = ttk.LabelFrame(settings_tab, text="Пары копирования", padding="10") paths_frame.pack(fill=tk.BOTH, expand=True, pady=5) self.pairs_tree = ttk.Treeview( paths_frame, columns=("source", "dest", "mode", "last", "status"), show="headings", selectmode="browse" ) self.pairs_tree.heading("source", text="Источник") self.pairs_tree.heading("dest", text="Назначение") self.pairs_tree.heading("mode", text="Режим") self.pairs_tree.heading("last", text="Последний файл") self.pairs_tree.heading("status", text="Статус") self.pairs_tree.column("source", width=240) self.pairs_tree.column("dest", width=240) self.pairs_tree.column("mode", width=110, anchor=tk.CENTER) self.pairs_tree.column("last", width=160) self.pairs_tree.column("status", width=120, anchor=tk.CENTER) tree_scroll = ttk.Scrollbar(paths_frame, orient="vertical", command=self.pairs_tree.yview) self.pairs_tree.configure(yscrollcommand=tree_scroll.set) self.pairs_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) tree_scroll.pack(side=tk.RIGHT, fill=tk.Y) self.pairs_tree.tag_configure("ok", foreground="green") self.pairs_tree.tag_configure("warn", foreground="orange") self.pairs_tree.tag_configure("error", foreground="red") self.pairs_tree.tag_configure("idle", foreground="gray") self.pairs_tree.bind("", self.on_tree_double_click) self.pairs_tree.bind("", self.on_tree_f2_edit) self.pairs_tree.bind("", self.on_tree_right_click) self.pairs_tree.bind("", self.on_tree_motion) self.pairs_tree.bind("", lambda _e: self.remove_selected_pair()) self.pairs_tree.bind("", lambda _e: self.open_selected_destination()) self.pairs_tree.bind("<>", lambda _e: self.on_pair_select()) log_frame = ttk.LabelFrame(log_tab, text="Лог операций", padding="5") log_frame.pack(fill=tk.BOTH, expand=True, pady=5) log_buttons = ttk.Frame(log_frame) log_buttons.pack(fill=tk.X, pady=2) ttk.Button(log_buttons, text="📋 Очистить лог", command=self.clear_log).pack(side=tk.RIGHT, padx=2) ttk.Button(log_buttons, text="🔍 Отладка", command=self.debug_settings).pack(side=tk.RIGHT, padx=2) ttk.Button(log_buttons, text="📂 Открыть лог", command=self.open_log_file).pack(side=tk.LEFT, padx=2) ttk.Button(log_buttons, text="📁 Папка настроек", command=self.open_settings_folder).pack(side=tk.LEFT, padx=2) self.log_text = scrolledtext.ScrolledText(log_frame, height=8, wrap=tk.WORD) self.log_text.pack(fill=tk.BOTH, expand=True) self.log_text.tag_configure("success", foreground="green") self.log_text.tag_configure("error", foreground="red") self.log_text.tag_configure("warning", foreground="orange") self.log_text.tag_configure("info", foreground="blue") help_text = ( "Быстрый старт:\n" "1) Нажмите 'Добавить' и заполните источник/назначение.\n" "2) Выберите режим: последний файл или вся папка.\n" "3) Настройте расписание и нажмите 'Сохранить'.\n\n" "Подсказки доступны по кнопке '?'." ) ttk.Label(help_tab, text=help_text, justify=tk.LEFT).pack(anchor=tk.W, padx=10, pady=10) footer = ttk.Frame(self.root) footer.pack(side=tk.BOTTOM, fill=tk.X) self.status_bar = ttk.Label(footer, text="F2 — редактировать, двойной клик — выбрать папку, ПКМ — меню", relief=tk.SUNKEN, anchor=tk.W) self.status_bar.pack(side=tk.TOP, fill=tk.X) self.progress_var = tk.IntVar(value=0) self.progress = ttk.Progressbar(footer, mode="determinate", variable=self.progress_var) self.progress.pack(side=tk.TOP, fill=tk.X) def setup_window_icon(self): icon_path = get_resource_path(ICON_PATH) if os.path.exists(icon_path): with contextlib.suppress(Exception): self.root.iconbitmap(icon_path) def setup_file_logger(self): settings_path = self.get_settings_path() log_dir = os.path.dirname(settings_path) os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, "backup_copier.log") logger = logging.getLogger("backup_copier") logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.handlers.RotatingFileHandler( log_path, maxBytes=2 * 1024 * 1024, backupCount=3, encoding="utf-8" ) formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) return logger def get_settings_path(self): """Возвращает путь для сохранения настроек""" if getattr(sys, 'frozen', False): # Для EXE используем папку с EXE файлом base_path = os.path.dirname(sys.executable) else: # Для скрипта используем папку со скриптом base_path = os.path.dirname(os.path.abspath(__file__)) # Пробуем создать папку, если есть права settings_dir = base_path try: # Проверяем, можем ли мы писать в эту папку test_file = os.path.join(settings_dir, 'test_write.tmp') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) self.log_message(f"📁 Используется папка: {settings_dir}", "info") except: # Если не можем писать, используем AppData settings_dir = os.path.join(os.environ.get('APPDATA', os.path.expanduser('~')), 'BackupCopier') os.makedirs(settings_dir, exist_ok=True) self.log_message(f"📁 Используется папка AppData: {settings_dir}", "info") return os.path.join(settings_dir, 'backup_copier_settings.json') def create_placeholder(self, entry, text): """Добавляет подсказку в поле ввода.""" placeholder_color = "gray" normal_color = "black" def on_focus_in(_): if entry.get() == text and entry.cget("foreground") == placeholder_color: entry.delete(0, tk.END) entry.config(foreground=normal_color) def on_focus_out(_): if not entry.get(): entry.insert(0, text) entry.config(foreground=placeholder_color) entry.bind("", on_focus_in) entry.bind("", on_focus_out) on_focus_out(None) def get_autostart_command(self) -> str: """Возвращает команду для автозапуска.""" if getattr(sys, 'frozen', False): return f"\"{sys.executable}\"" script_path = os.path.abspath(__file__) return f"\"{sys.executable}\" \"{script_path}\"" def is_autostart_enabled(self) -> bool: """Проверяет, включен ли автозапуск в реестре.""" try: with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_REG_PATH, 0, winreg.KEY_READ) as key: value, _ = winreg.QueryValueEx(key, AUTOSTART_REG_NAME) return bool(value) except FileNotFoundError: return False except OSError: return False def set_autostart_enabled(self, enabled: bool) -> None: """Включает/выключает автозапуск в реестре.""" try: with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_REG_PATH, 0, winreg.KEY_SET_VALUE) as key: if enabled: winreg.SetValueEx(key, AUTOSTART_REG_NAME, 0, winreg.REG_SZ, self.get_autostart_command()) self.log_message("✅ Автозапуск включен", "success") else: try: winreg.DeleteValue(key, AUTOSTART_REG_NAME) except FileNotFoundError: pass self.log_message("⏭️ Автозапуск выключен", "info") except Exception as e: self.log_message(f"❌ Ошибка автозапуска: {e}", "error") def toggle_autostart(self): """Обработчик галочки автозапуска.""" self.set_autostart_enabled(self.autostart_enabled.get()) def load_settings(self): """Загружает настройки из файла и отображает их""" try: self.loading_settings = True self.autosave_enabled = False settings_path = self.get_settings_path() self.log_message(f"🔍 Загрузка настроек из: {settings_path}", "info") if os.path.exists(settings_path): with open(settings_path, 'r', encoding='utf-8') as f: settings = json.load(f) self.scheduler_enabled.set(settings.get('enabled', False)) desired_autostart = settings.get('autostart_enabled') self.minimize_to_tray_enabled.set(settings.get('minimize_to_tray', True)) # Профили profiles = settings.get("profiles") if not isinstance(profiles, dict): # Миграция со старого формата profiles = {"Default": self.default_profile()} profiles["Default"]["pairs"] = settings.get('pairs', []) profiles["Default"]["include_masks"] = settings.get('include_masks', DEFAULT_INCLUDE_MASKS) profiles["Default"]["exclude_masks"] = settings.get('exclude_masks', DEFAULT_EXCLUDE_MASKS) profiles["Default"]["min_free_gb"] = settings.get('min_free_gb', DEFAULT_MIN_FREE_GB) profiles["Default"]["max_workers"] = settings.get('max_workers', DEFAULT_MAX_WORKERS) profiles["Default"]["verify_only"] = settings.get('verify_only', False) profiles["Default"]["keep_files"] = settings.get('keep_files', DEFAULT_KEEP_FILES) profiles["Default"]["last_success_time"] = settings.get('last_success_time') profiles["Default"]["last_result_summary"] = settings.get('last_result_summary') profiles["Default"]["last_hashes"] = settings.get('last_hashes', {}) profiles["Default"]["schedules"] = settings.get('schedules', []) self.profiles = profiles self.refresh_profile_combo() active = settings.get("active_profile") or self.active_profile.get() self.load_profile(active, save_current=False) # Настройки автозапуска actual_autostart = self.is_autostart_enabled() if desired_autostart is None: self.autostart_enabled.set(actual_autostart) else: desired_autostart = bool(desired_autostart) self.autostart_enabled.set(desired_autostart) if desired_autostart != actual_autostart: self.set_autostart_enabled(desired_autostart) self.log_message(f"📂 Настройки загружены из {settings_path}", "info") else: # Если файла нет, добавляем пустую пару self.log_message("🆕 Файл настроек не найден, создаем новую конфигурацию", "info") self.profiles = {"Default": self.default_profile()} self.refresh_profile_combo() self.load_profile("Default", save_current=False) self.autostart_enabled.set(self.is_autostart_enabled()) except Exception as e: self.log_message(f"❌ Ошибка при загрузке настроек: {e}", "error") import traceback traceback.print_exc() self.profiles = {"Default": self.default_profile()} self.refresh_profile_combo() self.load_profile("Default", save_current=False) self.autostart_enabled.set(self.is_autostart_enabled()) finally: self.autosave_enabled = True self.loading_settings = False def save_settings(self, show_message: bool = True): """Сохраняет настройки в файл""" try: self.save_active_profile() settings = { 'enabled': self.scheduler_enabled.get(), 'autostart_enabled': self.autostart_enabled.get(), 'minimize_to_tray': self.minimize_to_tray_enabled.get(), 'active_profile': self.active_profile.get(), 'profiles': self.profiles } settings_path = self.get_settings_path() # Сохраняем с отступами для читаемости with open(settings_path, 'w', encoding='utf-8') as f: json.dump(settings, f, ensure_ascii=False, indent=2) if show_message: self.log_message(f"💾 Настройки сохранены (профилей: {len(self.profiles)})", "success") self.log_message(f"📁 Файл: {settings_path}", "info") messagebox.showinfo("Успех", f"Настройки сохранены!\n\nФайл: {settings_path}") if self.scheduler_enabled.get(): # Перепланируем, чтобы учесть новые пути/время self.start_scheduler() except Exception as e: self.log_message(f"❌ Ошибка при сохранении настроек: {e}", "error") messagebox.showerror("Ошибка", f"Не удалось сохранить настройки:\n{e}") def add_path_pair(self, source="", dest="", full_copy: bool = False): """Добавляет новую пару в таблицу.""" pair_id = f"pair_{self.pair_counter}" self.pair_counter += 1 pair = { "id": pair_id, "source": source, "dest": dest, "full_copy": bool(full_copy), "last_file": "", "include_masks": "", "exclude_masks": "", "status": "—", "status_tag": "idle" } if source or dest: status_text, status_tag = self.validate_pair(source, dest) pair["status"] = status_text pair["status_tag"] = status_tag if source or dest: status_text, status_tag = self.validate_pair(source, dest) pair["status"] = status_text pair["status_tag"] = status_tag self.copy_pairs.append(pair) self.render_pairs() self.pairs_tree.selection_set(pair_id) self.on_pair_select() self.request_autosave() def remove_selected_pair(self): pair_id = self.selected_pair_id if not pair_id: return self.copy_pairs = [p for p in self.copy_pairs if p["id"] != pair_id] with contextlib.suppress(Exception): self.pairs_tree.delete(pair_id) self.selected_pair_id = None self.request_autosave() def toggle_compact_view(self): compact = self.compact_view_var.get() style = ttk.Style() style.configure("Treeview", rowheight=18 if compact else 24) self.render_pairs() def remove_path_pair(self, pair_id): """Совместимость: удалить пару по id.""" if pair_id: self.selected_pair_id = pair_id self.remove_selected_pair() def remove_all_pairs(self, silent=False): """Удаляет все пары""" if not silent: if not messagebox.askyesno("Подтверждение", "Удалить все пути?"): return self.copy_pairs.clear() for item in self.pairs_tree.get_children(): self.pairs_tree.delete(item) self.selected_pair_id = None self.request_autosave() def get_pair_by_id(self, pair_id: str): for pair in self.copy_pairs: if pair["id"] == pair_id: return pair return None def find_pair_by_paths(self, source: str, dest: str, full_copy: bool): for pair in self.copy_pairs: if pair.get("source") == source and pair.get("dest") == dest and bool(pair.get("full_copy", False)) == bool(full_copy): return pair return None def update_pair_row(self, pair): # Treeview is not thread-safe; schedule redraw in UI thread. self.run_on_ui(self.render_pairs) def shorten_path(self, path: str, max_len: int = 40) -> str: if len(path) <= max_len: return path return path[:18] + "..." + path[-(max_len - 21):] def format_status(self, status_text: str) -> str: if status_text == "OK": return "✅ OK" if status_text in ("Нет источника", "Нет назначения", "Нет доступа"): return f"❌ {status_text}" if status_text == "—": return "—" return status_text def render_pairs(self): search = (self.search_var.get() or "").lower() status_filter = self.filter_status_var.get() for item in self.pairs_tree.get_children(): self.pairs_tree.delete(item) for pair in self.copy_pairs: status_text = pair.get("status", "—") if status_filter != "Все" and status_text != status_filter: continue if search: if search not in pair.get("source", "").lower() and search not in pair.get("dest", "").lower(): continue mode_text = "Вся папка" if pair.get("full_copy") else "Последний файл" last_file = pair.get("last_file", "") source_disp = self.shorten_path(pair.get("source", "")) dest_disp = self.shorten_path(pair.get("dest", "")) self.pairs_tree.insert( "", "end", iid=pair["id"], values=(source_disp, dest_disp, mode_text, last_file, self.format_status(status_text)), tags=(pair.get("status_tag", "idle"),) ) def on_pair_select(self): selection = self.pairs_tree.selection() if not selection: return pair_id = selection[0] self.selected_pair_id = pair_id def update_selected_pair_from_edit(self): pair_id = self.selected_pair_id if not pair_id: return pair = self.get_pair_by_id(pair_id) if not pair: return pair["source"] = pair.get("source", "").strip() pair["dest"] = pair.get("dest", "").strip() status_text, status_tag = self.validate_pair(pair["source"], pair["dest"]) pair["status"] = status_text pair["status_tag"] = status_tag self.update_pair_row(pair) def on_tree_double_click(self, event): item = self.pairs_tree.identify_row(event.y) column = self.pairs_tree.identify_column(event.x) if not item or not column: return self.selected_pair_id = item if column == "#3": self.toggle_selected_mode() return if column not in ("#1", "#2"): return if column == "#1": self.pick_folder_for_item(item, "source", "Выберите папку источника") return if column == "#2": self.pick_folder_for_item(item, "dest", "Выберите папку назначения") return def on_tree_f2_edit(self, event): selection = self.pairs_tree.selection() if not selection: return item = selection[0] column = self.pairs_tree.identify_column(event.x) if not column or column not in ("#1", "#2"): column = "#1" self.inline_edit_cell(item, column) def on_tree_right_click(self, event): item = self.pairs_tree.identify_row(event.y) if item: self.pairs_tree.selection_set(item) self.selected_pair_id = item menu = tk.Menu(self.root, tearoff=0) menu.add_command(label="Выбрать источник…", command=lambda: self.pick_folder_for_item(self.selected_pair_id, "source", "Выберите папку источника")) menu.add_command(label="Выбрать назначение…", command=lambda: self.pick_folder_for_item(self.selected_pair_id, "dest", "Выберите папку назначения")) menu.add_command(label="Переключить режим", command=self.toggle_selected_mode) menu.add_command(label="Открыть источник", command=self.open_selected_source) menu.add_command(label="Открыть назначение", command=self.open_selected_destination) menu.add_command(label="Показать лог пары", command=self.show_pair_log) menu.add_command(label="Маски пары…", command=self.edit_pair_masks) menu.add_separator() menu.add_command(label="Удалить", command=self.remove_selected_pair) menu.tk_popup(event.x_root, event.y_root) menu.grab_release() def on_tree_motion(self, event): item = self.pairs_tree.identify_row(event.y) column = self.pairs_tree.identify_column(event.x) if not item: return pair = self.get_pair_by_id(item) if not pair: return if column == "#1": self.set_status_text(pair.get("source", "") or "Готов к работе") elif column == "#2": self.set_status_text(pair.get("dest", "") or "Готов к работе") elif column == "#4": self.set_status_text(pair.get("last_file", "") or "Готов к работе") def pick_folder_for_item(self, item_id, field: str, title: str): if not item_id: return folder = filedialog.askdirectory(title=title) if folder: pair = self.get_pair_by_id(item_id) if pair: pair[field] = folder status_text, status_tag = self.validate_pair(pair["source"], pair["dest"]) pair["status"] = status_text pair["status_tag"] = status_tag self.update_pair_row(pair) self.request_autosave() def inline_edit_cell(self, item, column): bbox = self.pairs_tree.bbox(item, column) if not bbox: return x, y, w, h = bbox value = self.pairs_tree.set(item, column) if self.inline_editor is not None: self.inline_editor.destroy() self.inline_editor = None entry = ttk.Entry(self.pairs_tree) entry.insert(0, value) entry.select_range(0, tk.END) entry.place(x=x, y=y, width=w, height=h) entry.focus_set() self.inline_editor = entry def commit(_evt=None): new_val = entry.get().strip() entry.destroy() self.inline_editor = None pair = self.get_pair_by_id(item) if not pair: return if column == "#1": pair["source"] = new_val elif column == "#2": pair["dest"] = new_val status_text, status_tag = self.validate_pair(pair["source"], pair["dest"]) pair["status"] = status_text pair["status_tag"] = status_tag self.update_pair_row(pair) self.request_autosave() def cancel(_evt=None): entry.destroy() self.inline_editor = None entry.bind("", commit) entry.bind("", commit) entry.bind("", cancel) def validate_pair(self, source: str, dest: str): if not source or not dest: return "—", "idle" if not os.path.exists(source): return "Нет источника", "error" if not os.access(source, os.R_OK): return "Нет доступа", "error" if not os.path.exists(dest): return "Нет назначения", "error" if not os.access(dest, os.W_OK): return "Нет доступа", "error" return "OK", "ok" def open_selected_destination(self): pair_id = self.selected_pair_id if not pair_id: return pair = self.get_pair_by_id(pair_id) if not pair: return self.open_destination(pair["dest"]) def open_selected_source(self): pair_id = self.selected_pair_id if not pair_id: return pair = self.get_pair_by_id(pair_id) if not pair: return self.open_destination(pair["source"]) def show_pair_log(self): pair_id = self.selected_pair_id if not pair_id: return pair = self.get_pair_by_id(pair_id) if not pair: return src = pair.get("source", "") dst = pair.get("dest", "") content = self.log_text.get("1.0", tk.END).splitlines() filtered = [line for line in content if (src and src in line) or (dst and dst in line)] win = tk.Toplevel(self.root) win.title("Лог пары") win.geometry("700x400") text = scrolledtext.ScrolledText(win, wrap=tk.WORD) text.pack(fill=tk.BOTH, expand=True) text.insert(tk.END, "\n".join(filtered) if filtered else "Нет записей для этой пары.") def edit_pair_masks(self): pair_id = self.selected_pair_id if not pair_id: return pair = self.get_pair_by_id(pair_id) if not pair: return win = tk.Toplevel(self.root) win.title("Маски пары") win.geometry("420x180") win.transient(self.root) win.grab_set() include_var = tk.StringVar(value=pair.get("include_masks", "")) exclude_var = tk.StringVar(value=pair.get("exclude_masks", "")) ttk.Label(win, text="Включать маски (через ;):").pack(anchor=tk.W, padx=10, pady=4) include_entry = ttk.Entry(win, textvariable=include_var) include_entry.pack(fill=tk.X, padx=10, pady=2) ttk.Label(win, text="Исключать маски (через ;):").pack(anchor=tk.W, padx=10, pady=4) exclude_entry = ttk.Entry(win, textvariable=exclude_var) exclude_entry.pack(fill=tk.X, padx=10, pady=2) def apply(): pair["include_masks"] = include_var.get().strip() pair["exclude_masks"] = exclude_var.get().strip() self.request_autosave() win.destroy() btns = ttk.Frame(win) btns.pack(pady=8) ttk.Button(btns, text="Сохранить", command=apply).pack(side=tk.LEFT, padx=6) ttk.Button(btns, text="Отмена", command=win.destroy).pack(side=tk.LEFT, padx=6) def toggle_selected_mode(self): pair_id = self.selected_pair_id if not pair_id: return pair = self.get_pair_by_id(pair_id) if not pair: return pair["full_copy"] = not pair["full_copy"] self.update_pair_row(pair) self.request_autosave() def browse_folder(self, entry): """Открывает диалог выбора папки""" folder = filedialog.askdirectory(title="Выберите папку") if folder: entry.delete(0, tk.END) entry.insert(0, folder) self.update_selected_pair_from_edit() def open_destination(self, path: str): if not path or path.startswith("Например: "): return if os.path.exists(path): with contextlib.suppress(Exception): os.startfile(path) def open_log_file(self): log_path = os.path.join(os.path.dirname(self.get_settings_path()), "backup_copier.log") if os.path.exists(log_path): with contextlib.suppress(Exception): os.startfile(log_path) def open_settings_folder(self): folder = os.path.dirname(self.get_settings_path()) if os.path.exists(folder): with contextlib.suppress(Exception): os.startfile(folder) def open_schedule_dialog(self): dlg = tk.Toplevel(self.root) dlg.title("Расписание") dlg.geometry("520x360") dlg.transient(self.root) dlg.grab_set() time_frame = ttk.Frame(dlg) time_frame.pack(fill=tk.X, pady=5, padx=10) ttk.Label(time_frame, text="Время:").pack(side=tk.LEFT) ttk.Combobox(time_frame, textvariable=self.hour_var, values=[f"{h:02d}" for h in range(24)], width=5, state="readonly").pack(side=tk.LEFT, padx=2) ttk.Label(time_frame, text=":").pack(side=tk.LEFT) ttk.Combobox(time_frame, textvariable=self.minute_var, values=[f"{m:02d}" for m in range(60)], width=5, state="readonly").pack(side=tk.LEFT, padx=2) type_frame = ttk.Frame(dlg) type_frame.pack(fill=tk.X, pady=5, padx=10) ttk.Label(type_frame, text="Тип:").pack(side=tk.LEFT) ttk.Radiobutton(type_frame, text="Ежедневно", variable=self.schedule_type_var, value="daily").pack(side=tk.LEFT, padx=4) ttk.Radiobutton(type_frame, text="Еженедельно", variable=self.schedule_type_var, value="weekly").pack(side=tk.LEFT, padx=4) ttk.Radiobutton(type_frame, text="Ежемесячно", variable=self.schedule_type_var, value="monthly").pack(side=tk.LEFT, padx=4) days_frame = ttk.Frame(dlg) days_frame.pack(fill=tk.X, pady=5, padx=10) ttk.Label(days_frame, text="Дни недели:").pack(side=tk.LEFT) day_labels = [("Mon", "Пн"), ("Tue", "Вт"), ("Wed", "Ср"), ("Thu", "Чт"), ("Fri", "Пт"), ("Sat", "Сб"), ("Sun", "Вс")] day_checks = [] for key, label in day_labels: cb = ttk.Checkbutton(days_frame, text=label, variable=self.schedule_days_vars[key]) cb.pack(side=tk.LEFT, padx=2) day_checks.append(cb) month_frame = ttk.Frame(dlg) month_frame.pack(fill=tk.X, pady=5, padx=10) ttk.Label(month_frame, text="День месяца:").pack(side=tk.LEFT) month_spin = ttk.Spinbox(month_frame, from_=1, to=31, width=4, textvariable=self.schedule_monthday_var) month_spin.pack(side=tk.LEFT, padx=4) ttk.Label(month_frame, text="(Если день > кол-ва дней, запуск в последний день месяца)").pack(side=tk.LEFT, padx=6) def update_schedule_controls(): mode = self.schedule_type_var.get() if mode == "daily": for cb in day_checks: cb.config(state=tk.DISABLED) month_spin.config(state=tk.DISABLED) elif mode == "weekly": for cb in day_checks: cb.config(state=tk.NORMAL) month_spin.config(state=tk.DISABLED) else: for cb in day_checks: cb.config(state=tk.DISABLED) month_spin.config(state=tk.NORMAL) self.schedule_type_var.trace_add("write", lambda *_: update_schedule_controls()) update_schedule_controls() list_frame = ttk.Frame(dlg) list_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5) tree = ttk.Treeview(list_frame, columns=("time", "days"), show="headings", height=8) tree.heading("time", text="Время") tree.heading("days", text="Дни") tree.column("time", width=80, anchor=tk.CENTER) tree.column("days", width=220) scroll = ttk.Scrollbar(list_frame, orient="vertical", command=tree.yview) tree.configure(yscrollcommand=scroll.set) tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scroll.pack(side=tk.RIGHT, fill=tk.Y) def refresh_tree(): for item in tree.get_children(): tree.delete(item) for idx, entry in enumerate(self.schedules): sched_type = entry.get("type", "daily") if sched_type == "monthly": days_text = f"ежемесячно, день {entry.get('day', 1)}" elif sched_type == "weekly": days = entry.get("days", []) days_text = ",".join(days) if days else "еженедельно" else: days_text = "ежедневно" tree.insert("", "end", iid=f"s{idx}", values=(entry.get("time"), days_text)) def add_entry(): time_str = f"{self.hour_var.get()}:{self.minute_var.get()}" days = [k for k, v in self.schedule_days_vars.items() if v.get()] sched_type = self.schedule_type_var.get() entry = {"time": time_str, "type": sched_type} if sched_type == "weekly": entry["days"] = days elif sched_type == "monthly": entry["day"] = int(self.schedule_monthday_var.get() or 1) self.schedules.append(entry) refresh_tree() self.update_schedule_summary_label() def remove_entry(): sel = tree.selection() if not sel: return idx = int(sel[0].lstrip("s")) if 0 <= idx < len(self.schedules): self.schedules.pop(idx) refresh_tree() self.update_schedule_summary_label() btns = ttk.Frame(dlg) btns.pack(fill=tk.X, padx=10, pady=8) ttk.Button(btns, text="➕ Добавить", command=add_entry).pack(side=tk.LEFT, padx=4) ttk.Button(btns, text="❌ Удалить", command=remove_entry).pack(side=tk.LEFT, padx=4) ttk.Button(btns, text="Закрыть", command=dlg.destroy).pack(side=tk.RIGHT, padx=4) refresh_tree() def refresh_profile_combo(self): names = sorted(self.profiles.keys()) if self.profiles else ["Default"] if not self.profiles: self.profiles["Default"] = self.default_profile() if self.profile_combo is not None: self.profile_combo["values"] = names if self.active_profile.get() not in names: self.active_profile.set(names[0]) else: if self.active_profile.get() not in names: self.active_profile.set(names[0]) def default_profile(self) -> dict: return { "pairs": [], "schedules": [], "include_masks": DEFAULT_INCLUDE_MASKS, "exclude_masks": DEFAULT_EXCLUDE_MASKS, "min_free_gb": DEFAULT_MIN_FREE_GB, "max_workers": DEFAULT_MAX_WORKERS, "verify_only": False, "cleanup_old": False, "keep_files": DEFAULT_KEEP_FILES, "last_success_time": None, "last_result_summary": None, "last_hashes": {}, } def save_active_profile(self): name = self.active_profile.get() or "Default" profile = self.profiles.get(name, self.default_profile()) profile["pairs"] = [ { "source": p.get("source", ""), "dest": p.get("dest", ""), "full_copy": bool(p.get("full_copy", False)), "include_masks": p.get("include_masks", ""), "exclude_masks": p.get("exclude_masks", ""), } for p in self.copy_pairs ] profile["schedules"] = list(self.schedules) profile["include_masks"] = self.include_masks_var.get() profile["exclude_masks"] = self.exclude_masks_var.get() profile["min_free_gb"] = self.get_float_setting(self.min_free_gb_var, DEFAULT_MIN_FREE_GB) profile["max_workers"] = self.get_int_setting(self.max_workers_var, DEFAULT_MAX_WORKERS, minimum=1) profile["verify_only"] = bool(self.verify_only.get()) profile["cleanup_old"] = bool(self.cleanup_old_var.get()) try: profile["keep_files"] = max(1, int(self.keep_files_var.get() or DEFAULT_KEEP_FILES)) except Exception: profile["keep_files"] = DEFAULT_KEEP_FILES profile["skip_cleanup_confirm"] = bool(self.skip_cleanup_confirm) profile["last_success_time"] = self.last_success_time profile["last_result_summary"] = self.last_result_summary profile["last_hashes"] = self.last_hashes self.profiles[name] = profile def load_profile(self, name: str, save_current: bool = True): self.loading_settings = True self.autosave_enabled = False if save_current: self.save_active_profile() profile = self.profiles.get(name) or self.default_profile() self.active_profile.set(name) self.include_masks_var.set(profile.get("include_masks", DEFAULT_INCLUDE_MASKS)) self.exclude_masks_var.set(profile.get("exclude_masks", DEFAULT_EXCLUDE_MASKS)) self.min_free_gb_var.set(profile.get("min_free_gb", DEFAULT_MIN_FREE_GB)) self.max_workers_var.set(profile.get("max_workers", DEFAULT_MAX_WORKERS)) self.verify_only.set(profile.get("verify_only", False)) self.cleanup_old_var.set(profile.get("cleanup_old", False)) try: self.keep_files_var.set(max(1, int(profile.get("keep_files", DEFAULT_KEEP_FILES) or DEFAULT_KEEP_FILES))) except Exception: self.keep_files_var.set(DEFAULT_KEEP_FILES) self.skip_cleanup_confirm = bool(profile.get("skip_cleanup_confirm", False)) self.last_success_time = profile.get("last_success_time") self.last_result_summary = profile.get("last_result_summary") self.last_hashes = profile.get("last_hashes", {}) self.update_last_success_label() self.update_status_summary() self.remove_all_pairs(silent=True) for item in profile.get("pairs", []): self.add_path_pair(item.get("source", ""), item.get("dest", ""), item.get("full_copy", False)) last = self.copy_pairs[-1] last["include_masks"] = item.get("include_masks", "") last["exclude_masks"] = item.get("exclude_masks", "") self.schedules = profile.get("schedules", []) self.update_schedule_summary_label() self.render_pairs() if self.schedules: first_time = self.schedules[0].get("time", "03:00") if ":" in first_time: hour, minute = first_time.split(":") self.hour_var.set(hour) self.minute_var.set(minute) if self.scheduler_enabled.get(): self.start_scheduler() self.autosave_enabled = True self.loading_settings = False def add_profile(self): name = simpledialog.askstring("Новый профиль", "Введите имя профиля:") if not name: return if name in self.profiles: messagebox.showwarning("Профиль", "Профиль с таким именем уже существует.") return self.save_active_profile() self.profiles[name] = self.default_profile() self.active_profile.set(name) self.refresh_profile_combo() self.load_profile(name) def rename_profile(self): old = self.active_profile.get() if not old: return new = simpledialog.askstring("Переименовать профиль", "Новое имя профиля:", initialvalue=old) if not new or new == old: return if new in self.profiles: messagebox.showwarning("Профиль", "Профиль с таким именем уже существует.") return self.save_active_profile() self.profiles[new] = self.profiles.pop(old) self.active_profile.set(new) self.refresh_profile_combo() def delete_profile(self): name = self.active_profile.get() if not name: return if not messagebox.askyesno("Профили", f"Удалить профиль '{name}'?"): return self.profiles.pop(name, None) self.refresh_profile_combo() self.load_profile(self.active_profile.get()) def export_profiles(self): self.save_active_profile() path = filedialog.asksaveasfilename(defaultextension=".json", filetypes=[("JSON", "*.json")]) if not path: return data = { "active_profile": self.active_profile.get(), "profiles": self.profiles } with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) messagebox.showinfo("Экспорт", "Профили экспортированы.") def import_profiles(self): path = filedialog.askopenfilename(filetypes=[("JSON", "*.json")]) if not path: return with open(path, "r", encoding="utf-8") as f: data = json.load(f) profiles = data.get("profiles") if isinstance(profiles, dict): self.profiles.update(profiles) self.refresh_profile_combo() self.load_profile(data.get("active_profile") or self.active_profile.get()) def refresh_schedules_tree(self): self.update_schedule_summary_label() def add_schedule_from_inputs(self): time_str = f"{self.hour_var.get()}:{self.minute_var.get()}" days = [k for k, v in self.schedule_days_vars.items() if v.get()] sched_type = self.schedule_type_var.get() entry = {"time": time_str, "type": sched_type} if sched_type == "weekly": entry["days"] = days elif sched_type == "monthly": entry["day"] = int(self.schedule_monthday_var.get() or 1) self.schedules.append(entry) self.update_schedule_summary_label() self.request_autosave() if self.scheduler_enabled.get(): self.start_scheduler() def remove_selected_schedule(self): if not self.schedules: return self.schedules.pop() self.update_schedule_summary_label() self.request_autosave() if self.scheduler_enabled.get(): self.start_scheduler() def show_hint(self, title: str, message: str): messagebox.showinfo(title, message) def request_autosave(self): if not self.autosave_enabled or self.loading_settings: return if self.autosave_job is not None: self.root.after_cancel(self.autosave_job) self.autosave_job = self.root.after(1000, lambda: self.save_settings(show_message=False)) def show_mask_templates(self): choices = [ ("SQL/BAK", "*.bak;*.sql;*.backup"), ("Все файлы", "*.*"), ("Только BAK", "*.bak"), ("Архивы", "*.zip;*.7z;*.rar"), ] win = tk.Toplevel(self.root) win.title("Шаблоны масок") win.geometry("320x220") win.transient(self.root) win.grab_set() for label, value in choices: def apply(val=value): self.include_masks_var.set(val) self.request_autosave() win.destroy() ttk.Button(win, text=f"{label}: {value}", command=apply).pack(fill=tk.X, padx=10, pady=4) def on_cleanup_toggle(self): if not self.cleanup_old_var.get(): self.request_autosave() return if self.skip_cleanup_confirm: self.request_autosave() return dlg = tk.Toplevel(self.root) dlg.title("Подтверждение") dlg.geometry("360x170") dlg.transient(self.root) dlg.grab_set() ttk.Label(dlg, text="Удалять старые файлы в назначении?\nЭто необратимое действие.").pack(pady=10) skip_var = tk.BooleanVar(value=False) ttk.Checkbutton(dlg, text="Больше не спрашивать", variable=skip_var).pack() def confirm(): self.skip_cleanup_confirm = skip_var.get() dlg.destroy() self.request_autosave() def cancel(): self.cleanup_old_var.set(False) dlg.destroy() btns = ttk.Frame(dlg) btns.pack(pady=10) ttk.Button(btns, text="Да", command=confirm).pack(side=tk.LEFT, padx=8) ttk.Button(btns, text="Нет", command=cancel).pack(side=tk.LEFT, padx=8) def stop_copying(self): if self.is_copying: if not messagebox.askyesno("Подтверждение", "Остановить текущее копирование?"): return self.cancel_event.set() self.log_message("⏹ Остановка копирования по запросу пользователя", "warning") def open_wizard(self): if getattr(self, "wizard_window", None) is not None: try: self.wizard_window.lift() return except Exception: pass wiz = tk.Toplevel(self.root) wiz.title("Мастер настройки") wiz.geometry("520x320") wiz.transient(self.root) wiz.grab_set() self.wizard_window = wiz step_var = tk.IntVar(value=0) source_var = tk.StringVar() dest_var = tk.StringVar() full_copy_var = tk.BooleanVar(value=False) schedule_enabled_var = tk.BooleanVar(value=self.scheduler_enabled.get()) hour_var = tk.StringVar(value=self.hour_var.get()) minute_var = tk.StringVar(value=self.minute_var.get()) autostart_var = tk.BooleanVar(value=self.autostart_enabled.get()) minimize_var = tk.BooleanVar(value=self.minimize_to_tray_enabled.get()) verify_var = tk.BooleanVar(value=self.verify_only.get()) def show_step(index: int): step_var.set(index) for i, frame in enumerate(steps): frame.pack_forget() if i == index: frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) back_btn.config(state=tk.NORMAL if index > 0 else tk.DISABLED) next_btn.config(state=tk.NORMAL if index < len(steps) - 1 else tk.DISABLED) finish_btn.config(state=tk.NORMAL if index == len(steps) - 1 else tk.DISABLED) steps = [] step1 = ttk.Frame(wiz) ttk.Label(step1, text="Шаг 1/3: Пути", font=("Arial", 11, "bold")).pack(anchor=tk.W, pady=5) ttk.Label(step1, text="Источник:").pack(anchor=tk.W) src_entry = ttk.Entry(step1, textvariable=source_var) src_entry.pack(fill=tk.X, pady=2) ttk.Button(step1, text="Выбрать...", command=lambda: self.browse_folder(src_entry)).pack(anchor=tk.W, pady=2) ttk.Label(step1, text="Назначение:").pack(anchor=tk.W, pady=5) dst_entry = ttk.Entry(step1, textvariable=dest_var) dst_entry.pack(fill=tk.X, pady=2) ttk.Button(step1, text="Выбрать...", command=lambda: self.browse_folder(dst_entry)).pack(anchor=tk.W, pady=2) steps.append(step1) step2 = ttk.Frame(wiz) ttk.Label(step2, text="Шаг 2/3: Режим копирования", font=("Arial", 11, "bold")).pack(anchor=tk.W, pady=5) ttk.Checkbutton(step2, text="Копировать всю папку с подпапками", variable=full_copy_var).pack(anchor=tk.W) steps.append(step2) step3 = ttk.Frame(wiz) ttk.Label(step3, text="Шаг 3/3: Расписание и поведение", font=("Arial", 11, "bold")).pack(anchor=tk.W, pady=5) time_frame = ttk.Frame(step3) time_frame.pack(fill=tk.X, pady=4) ttk.Label(time_frame, text="Время:").pack(side=tk.LEFT) ttk.Combobox(time_frame, textvariable=hour_var, values=[f"{h:02d}" for h in range(24)], width=5, state="readonly").pack(side=tk.LEFT, padx=2) ttk.Label(time_frame, text=":").pack(side=tk.LEFT) ttk.Combobox(time_frame, textvariable=minute_var, values=[f"{m:02d}" for m in range(60)], width=5, state="readonly").pack(side=tk.LEFT, padx=2) ttk.Checkbutton(step3, text="Включить расписание", variable=schedule_enabled_var).pack(anchor=tk.W, pady=4) ttk.Checkbutton(step3, text="Автозапуск Windows", variable=autostart_var).pack(anchor=tk.W) ttk.Checkbutton(step3, text="Сворачивать в трей при закрытии", variable=minimize_var).pack(anchor=tk.W) ttk.Checkbutton(step3, text="Только проверка (без копирования)", variable=verify_var).pack(anchor=tk.W) steps.append(step3) controls = ttk.Frame(wiz) controls.pack(fill=tk.X, pady=5) back_btn = ttk.Button(controls, text="Назад", command=lambda: show_step(step_var.get() - 1)) next_btn = ttk.Button(controls, text="Далее", command=lambda: show_step(step_var.get() + 1)) def finish(): if source_var.get().strip() and dest_var.get().strip(): self.add_path_pair(source_var.get().strip(), dest_var.get().strip(), full_copy_var.get()) self.hour_var.set(hour_var.get()) self.minute_var.set(minute_var.get()) self.scheduler_enabled.set(schedule_enabled_var.get()) self.autostart_enabled.set(autostart_var.get()) self.minimize_to_tray_enabled.set(minimize_var.get()) self.verify_only.set(verify_var.get()) if self.scheduler_enabled.get(): self.start_scheduler() else: self.stop_scheduler() if self.autostart_enabled.get() != self.is_autostart_enabled(): self.set_autostart_enabled(self.autostart_enabled.get()) wiz.destroy() self.wizard_window = None finish_btn = ttk.Button(controls, text="Готово", command=finish) back_btn.pack(side=tk.LEFT, padx=5) next_btn.pack(side=tk.LEFT, padx=5) finish_btn.pack(side=tk.RIGHT, padx=5) def on_close(): self.wizard_window = None wiz.destroy() wiz.protocol("WM_DELETE_WINDOW", on_close) show_step(0) def get_valid_pairs(self) -> List[tuple]: """Возвращает список валидных пар папок""" valid_pairs = [] for pair in self.copy_pairs: source = pair.get("source", "").strip() dest = pair.get("dest", "").strip() full_copy = bool(pair.get("full_copy", False)) if source and dest: valid_pairs.append((source, dest, full_copy)) return valid_pairs def check_paths(self): """Проверяет доступность всех путей""" self.log_message("🔍 Проверка путей:", "info") valid_pairs = self.get_valid_pairs() if not valid_pairs: self.log_message("❌ Нет заполненных пар путей", "error") return for i, (source, dest, full_copy) in enumerate(valid_pairs, 1): self.log_message(f"\nПара {i}:", "info") mode_text = "вся папка" if full_copy else "последний файл" self.log_message(f" Режим: {mode_text}", "info") status_text, status_tag = self.validate_pair(source, dest) if status_tag == "ok": self.log_message(f" ✅ Проверка: OK", "success") else: self.log_message(f" ❌ Проверка: {status_text}", "error") if os.path.exists(source): bak_files = [p for p in Path(source).iterdir() if p.is_file() and p.suffix.lower() == '.bak'] self.log_message(f" Найдено .bak файлов: {len(bak_files)}", "info") # Обновляем статусы в таблице for pair in self.copy_pairs: status_text, status_tag = self.validate_pair(pair.get("source", ""), pair.get("dest", "")) pair["status"] = status_text pair["status_tag"] = status_tag self.update_pair_row(pair) def find_latest_file(self, folder_path: str, include_masks: Optional[List[str]] = None, exclude_masks: Optional[List[str]] = None) -> Optional[Path]: """Находит самый последний файл в папке""" try: include_masks = include_masks or parse_masks(self.include_masks_var.get()) exclude_masks = exclude_masks or parse_masks(self.exclude_masks_var.get()) latest_file = find_latest_file_in_folder(folder_path, include_masks=include_masks, exclude_masks=exclude_masks) if not latest_file: if not Path(folder_path).exists(): self.log_message(f"❌ Папка не существует: {folder_path}", "error") else: self.log_message(f"⚠️ Не найдено .bak файлов в {folder_path}", "warning") return None file_time = datetime.fromtimestamp(latest_file.stat().st_mtime) self.log_message( f"📄 Последний файл: {latest_file.name} ({file_time.strftime('%Y-%m-%d %H:%M:%S')})", "info" ) return latest_file except Exception as e: self.log_message(f"❌ Ошибка при поиске файлов в {folder_path}: {e}", "error") return None def run_on_ui(self, func): """Планирует выполнение функции в UI-потоке.""" self.queue.put({'type': 'ui', 'func': func}) def get_float_setting(self, var, default: float) -> float: try: raw = var.get() except Exception: return float(default) if raw in ("", None): return float(default) try: return float(raw) except (TypeError, ValueError): return float(default) def get_int_setting(self, var, default: int, minimum: int = 1) -> int: try: raw = var.get() except Exception: return max(minimum, int(default)) if raw in ("", None): return max(minimum, int(default)) try: return max(minimum, int(raw)) except (TypeError, ValueError): return max(minimum, int(default)) def _format_active_file_label(self, file_path: str) -> str: p = Path(file_path) parts = p.parts if len(parts) >= 2: return str(Path(parts[-2]) / parts[-1]) return p.name or file_path def set_status_text(self, text: str): with self.active_files_lock: has_active = bool(self.active_files) if has_active: self.update_status_with_active_files() return self.root.after(0, lambda: self.status_bar.config(text=text)) def update_status_with_active_files(self): with self.active_files_lock: file_paths = sorted(self.active_files.keys()) if not file_paths: text = "Идет копирование..." else: labels = [self._format_active_file_label(p) for p in file_paths] if len(labels) <= 3: text = "Идет копирование: " + ", ".join(labels) else: text = f"Идет копирование: {', '.join(labels[:3])} ... (+{len(labels) - 3})" self.root.after(0, lambda t=text: self.status_bar.config(text=t)) def begin_active_file(self, file_path: str): with self.active_files_lock: self.active_files[file_path] = self.active_files.get(file_path, 0) + 1 self.update_status_with_active_files() def end_active_file(self, file_path: str): with self.active_files_lock: count = self.active_files.get(file_path, 0) if count <= 1: self.active_files.pop(file_path, None) else: self.active_files[file_path] = count - 1 self.update_status_with_active_files() def set_progress_total(self, total: int): def _set(): self.progress.config(maximum=max(1, total)) self.progress_var.set(0) self.root.after(0, _set) def step_progress(self, step: int = 1): def _step(): self.progress_var.set(self.progress_var.get() + step) self.root.after(0, _step) def check_previous_hash(self, target_file: Path): key = str(target_file) if key in self.last_hashes: try: current_hash = compute_file_checksum(target_file) if current_hash != self.last_hashes[key]: self.log_message(f"⚠️ Несовпадение хеша прошлой копии: {target_file.name}", "warning") except Exception as e: self.log_message(f"⚠️ Не удалось проверить прошлую копию: {e}", "warning") def copy_file_with_retries(self, source: Path, target: Path) -> bool: for attempt in range(COPY_RETRIES + 1): try: shutil.copy2(source, target) return True except Exception as e: if attempt >= COPY_RETRIES: self.log_message(f"❌ Ошибка при копировании {source.name}: {e}", "error") return False time_module.sleep(COPY_RETRY_DELAY) return False def verify_only_mode(self, source: Path, target: Path) -> bool: if not target.exists(): self.log_message(f"❌ Нет целевого файла для проверки: {target.name}", "error") return False if not should_copy_file(source, target): ok, src_hash, dst_hash = compare_file_checksums(source, target) self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={target.name}", "info") if not ok: self.log_message(f"⚠️ SHA256 отличается у пропущенного файла: {target.name}", "warning") self.log_message(f"⏭️ Проверка пропущена (не изменен): {target.name}", "warning") return True ok, src_hash, dst_hash = compare_file_checksums(source, target) self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={target.name}", "info") if ok: self.log_message(f"✅ Проверка OK: {target.name}", "success") return True self.log_message(f"❌ Проверка не пройдена: {target.name}", "error") return False def has_enough_space(self, dest_path: Path, size_bytes: int) -> bool: min_gb = self.get_float_setting(self.min_free_gb_var, DEFAULT_MIN_FREE_GB) free_gb = get_free_space_gb(dest_path) if free_gb < min_gb: self.log_message(f"❌ Недостаточно места на диске: свободно {free_gb:.2f} ГБ, минимум {min_gb} ГБ", "error") return False if size_bytes > 0 and (free_gb * 1024 ** 3) < size_bytes: self.log_message(f"❌ Недостаточно места для файла: {size_bytes} байт", "error") return False return True def cleanup_destination_files(self, dest_path: Path, include_masks: List[str], exclude_masks: List[str]) -> None: try: keep_count = max(1, int(self.keep_files_var.get() or DEFAULT_KEEP_FILES)) except Exception: keep_count = DEFAULT_KEEP_FILES candidates: List[Path] = [] for item in dest_path.iterdir(): if not item.is_file(): continue if include_masks or exclude_masks: if not matches_masks(item.name, include_masks, exclude_masks): continue candidates.append(item) if len(candidates) <= keep_count: return candidates.sort(key=lambda p: (p.stat().st_mtime, p.name), reverse=True) for item in candidates[keep_count:]: try: item.unlink() self.log_message(f"🧹 Удален старый файл: {item.name}", "info") except Exception as e: msg = f"❌ Не удалось удалить файл {item.name}: {e}" self.log_message(msg, "warning") self.record_error_detail(msg) def copy_full_folder(self, source: Path, dest: Path) -> tuple: copied = 0 skipped = 0 errors = 0 for root, _dirs, files in os.walk(source): if self.cancel_event.is_set(): break rel = os.path.relpath(root, source) dest_root = dest / rel if rel != "." else dest dest_root.mkdir(parents=True, exist_ok=True) for fname in files: if self.cancel_event.is_set(): break src_file = Path(root) / fname dst_file = dest_root / fname self.begin_active_file(str(src_file)) try: self.set_status_text(f"Копируется: {src_file.name}") self.check_previous_hash(dst_file) if dst_file.exists() else None if self.verify_only.get(): if self.verify_only_mode(src_file, dst_file): skipped += 1 else: errors += 1 self.step_progress() continue if should_copy_file(src_file, dst_file): if not self.has_enough_space(dest_root, src_file.stat().st_size): errors += 1 self.step_progress() continue if self.copy_file_with_retries(src_file, dst_file): ok, src_hash, dst_hash = compare_file_checksums(src_file, dst_file) self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={dst_file.name}", "info") if ok: copied += 1 self.last_hashes[str(dst_file)] = dst_hash else: errors += 1 else: errors += 1 self.step_progress() else: skipped += 1 if dst_file.exists(): ok, src_hash, dst_hash = compare_file_checksums(src_file, dst_file) self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={dst_file.name}", "info") if not ok: self.log_message(f"⚠️ SHA256 отличается у пропущенного файла: {dst_file.name}", "warning") self.step_progress() except Exception as e: errors += 1 self.step_progress() msg = f"❌ Ошибка при копировании {src_file.name}: {e}" self.log_message(msg, "error") self.record_error_detail(msg) finally: self.end_active_file(str(src_file)) return copied, skipped, errors def process_pair(self, source: str, dest: str, full_copy: bool, background: bool) -> tuple: copied_files = 0 skipped_files = 0 error_files = 0 pair_ref = self.find_pair_by_paths(source, dest, full_copy) if self.cancel_event.is_set(): return copied_files, skipped_files, error_files self.log_message(f"\n📁 Обработка папки: {source}", "info") src_path = Path(source) dest_path = Path(dest) if full_copy: if not src_path.exists(): msg = f"❌ Папка не существует: {source}" self.log_message(msg, "error") self.record_error_detail(msg) return copied_files, skipped_files, error_files + 1 try: dest_path.mkdir(parents=True, exist_ok=True) except Exception as e: msg = f"❌ Не могу создать папку {dest}: {e}" self.log_message(msg, "error") self.record_error_detail(msg) return copied_files, skipped_files, error_files + 1 if not self.verify_only.get(): required_size = get_total_copy_size(src_path, dest_path) if required_size > 0 and not self.has_enough_space(dest_path, required_size): return copied_files, skipped_files, error_files + 1 if pair_ref is not None: pair_ref["last_file"] = "—" self.update_pair_row(pair_ref) c, s, e = self.copy_full_folder(src_path, dest_path) if self.tray_icon is not None: self.tray_notify(f"{source}", f"Готово: ✅ {c} / ⏭️ {s} / ❌ {e}") return copied_files + c, skipped_files + s, error_files + e include_masks = parse_masks(pair_ref.get("include_masks", "")) if pair_ref else [] exclude_masks = parse_masks(pair_ref.get("exclude_masks", "")) if pair_ref else [] if not include_masks and not exclude_masks: include_masks = parse_masks(self.include_masks_var.get()) exclude_masks = parse_masks(self.exclude_masks_var.get()) latest_file = self.find_latest_file(source, include_masks=include_masks, exclude_masks=exclude_masks) if latest_file: if pair_ref is not None: pair_ref["last_file"] = latest_file.name self.update_pair_row(pair_ref) try: dest_path.mkdir(parents=True, exist_ok=True) except Exception as e: msg = f"❌ Не могу создать папку {dest}: {e}" self.log_message(msg, "error") self.record_error_detail(msg) return copied_files, skipped_files, error_files + 1 target_file = dest_path / latest_file.name self.begin_active_file(str(latest_file)) try: self.set_status_text(f"Копируется: {latest_file.name}") if target_file.exists(): self.check_previous_hash(target_file) if self.verify_only.get(): if self.verify_only_mode(latest_file, target_file): skipped_files += 1 else: error_files += 1 self.step_progress() return copied_files, skipped_files, error_files target_existed = target_file.exists() if should_copy_file(latest_file, target_file): if not self.has_enough_space(dest_path, latest_file.stat().st_size): self.step_progress() return copied_files, skipped_files, error_files + 1 if self.copy_file_with_retries(latest_file, target_file): copied_files += 1 src_hash = compute_file_checksum(latest_file) dst_hash = compute_file_checksum(target_file) self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={target_file.name}", "info") self.last_hashes[str(target_file)] = dst_hash if src_hash != dst_hash: error_files += 1 copied_files -= 1 msg = f"вќЊ Контрольная СЃСѓРјРјР° РЅРµ совпала: {latest_file.name}" self.log_message(msg, "error") self.record_error_detail(msg) if src_hash == dst_hash and target_existed: self.log_message(f"✅ Обновлен: {latest_file.name} (новее)", "success") elif src_hash == dst_hash: self.log_message(f"✅ Скопирован: {latest_file.name}", "success") if src_hash == dst_hash and self.cleanup_old_var.get(): self.cleanup_destination_files(dest_path, include_masks, exclude_masks) else: error_files += 1 msg = f"❌ Контрольная сумма не совпала: {latest_file.name}" self.log_message(msg, "error") self.record_error_detail(msg) self.step_progress() else: skipped_files += 1 if target_file.exists(): ok, src_hash, dst_hash = compare_file_checksums(latest_file, target_file) self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={target_file.name}", "info") if not ok: self.log_message(f"⚠️ SHA256 отличается у пропущенного файла: {target_file.name}", "warning") self.log_message(f"⏭️ Пропущен: {latest_file.name} (уже актуален)", "warning") self.step_progress() except Exception as e: error_files += 1 self.step_progress() msg = f"❌ Ошибка при копировании {latest_file.name}: {e}" self.log_message(msg, "error") self.record_error_detail(msg) finally: self.end_active_file(str(latest_file)) else: error_files += 1 self.step_progress() if self.tray_icon is not None and background: self.tray_notify(f"{source}", f"Готово: ✅ {copied_files} / ⏭️ {skipped_files} / ❌ {error_files}") return copied_files, skipped_files, error_files def copy_files_thread(self, pairs: List[tuple], background: bool = False): """Поток для копирования последних файлов""" if not self.copy_lock.acquire(blocking=False): self.log_message("⚠️ Копирование уже выполняется", "warning") return self.is_copying = True self.cancel_event.clear() self.root.after(0, lambda: self.status_bar.config(text="Идет копирование...")) total_units = 0 for source, dest, full_copy in pairs: if full_copy: if os.path.exists(source): for root, _dirs, files in os.walk(source): for fname in files: total_units += 1 else: include_masks = parse_masks(self.include_masks_var.get()) exclude_masks = parse_masks(self.exclude_masks_var.get()) latest = find_latest_file_in_folder(source, include_masks=include_masks, exclude_masks=exclude_masks) if latest: total_units += 1 self.set_progress_total(total_units or 1) try: copied_files = 0 skipped_files = 0 error_files = 0 if not background: self.log_message("\n" + "=" * 50, "info") self.log_message("🚀 Начало копирования последних файлов", "info") max_workers = self.get_int_setting(self.max_workers_var, DEFAULT_MAX_WORKERS, minimum=1) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(self.process_pair, source, dest, full_copy, background) for source, dest, full_copy in pairs] for future in as_completed(futures): if self.cancel_event.is_set(): break try: c, s, e = future.result() copied_files += c skipped_files += s error_files += e except Exception as e: error_files += 1 msg = f"❌ Ошибка задачи копирования: {e}" self.log_message(msg, "error") self.record_error_detail(msg) # Итог self.log_message("\n" + "=" * 50, "info") self.log_message("📊 ИТОГ:", "info") self.log_message(f" ✅ Скопировано/обновлено: {copied_files}", "success") self.log_message(f" ⏭️ Пропущено: {skipped_files}", "warning") self.log_message(f" ❌ Ошибок: {error_files}", "error") if background: self.log_message("⏰ Копирование по расписанию завершено", "info") else: self.run_on_ui(lambda: messagebox.showinfo( "Готово", f"Копирование завершено!\n\n✅ Скопировано: {copied_files}\n⏭️ Пропущено: {skipped_files}\n❌ Ошибок: {error_files}" )) self.last_result_summary = f"✅ {copied_files} / ⏭️ {skipped_files} / ❌ {error_files}" self.run_on_ui(self.update_status_summary) if error_files == 0: self.last_success_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') self.run_on_ui(self.update_last_success_label) self.tray_notify("Копирование завершено", f"Успешно: {copied_files}, Пропущено: {skipped_files}") else: detail = self.last_error_detail or "Неизвестная ошибка" self.tray_notify("Копирование с ошибками", f"Ошибок: {error_files}. {detail}") except Exception as e: self.log_message(f"❌ Критическая ошибка: {e}\n{traceback.format_exc()}", "error") self.run_on_ui(lambda: messagebox.showerror("Ошибка", f"Произошла ошибка:\n{e}")) finally: self.is_copying = False with self.active_files_lock: self.active_files.clear() self.copy_lock.release() self.root.after(0, lambda: self.status_bar.config(text="Готов к работе")) self.root.after(0, lambda: self.progress_var.set(0)) def start_manual_copy(self): """Запускает ручное копирование""" valid_pairs = self.get_valid_pairs() if not valid_pairs: messagebox.showwarning("Предупреждение", "Заполните хотя бы одну пару папок!") return # Запускаем копирование в отдельном потоке copy_thread = threading.Thread( target=self.copy_files_thread, args=(valid_pairs, False), daemon=True ) copy_thread.start() def toggle_scheduler(self): """Включает/выключает планировщик""" if self.scheduler_enabled.get(): self.start_scheduler() else: self.stop_scheduler() def start_scheduler(self): """Запускает планировщик""" valid_pairs = self.get_valid_pairs() if not valid_pairs: messagebox.showwarning("Предупреждение", "Нет настроенных путей! Планировщик не запущен.") self.scheduler_enabled.set(False) return if not self.schedules: # создаем расписание по умолчанию из полей времени self.schedules = [{"time": f"{self.hour_var.get()}:{self.minute_var.get()}", "days": []}] self.refresh_schedules_tree() self.scheduler.schedule_copy_job("backup_job", self.schedules, valid_pairs) self.scheduler.start() self.scheduler_status.config(text=f"(активен, заданий: {len(self.schedules)})", foreground="green") self.update_next_run_label() self.update_last_success_label() self.log_message("🕒 Планировщик запущен.", "info") def stop_scheduler(self): """Останавливает планировщик""" self.scheduler.stop() self.scheduler_status.config(text="(остановлен)", foreground="red") self.next_run_label.config(text="(следующий запуск: —)") self.log_message("🕒 Планировщик остановлен", "info") def update_next_run_label(self): jobs = schedule.get_jobs("backup_job") if jobs: next_run = jobs[0].next_run if next_run: self.next_run_label.config(text=f"(следующий запуск: {next_run.strftime('%Y-%m-%d %H:%M')})") return self.next_run_label.config(text="(следующий запуск: —)") def update_last_success_label(self): if self.last_success_time: self.last_success_label.config(text=f"(последний успех: {self.last_success_time})") else: self.last_success_label.config(text="(последний успех: —)") def update_status_summary(self): summary = self.last_result_summary or "—" self.status_summary_label.config(text=summary) self.last_result_label.config(text=f"(последний результат: {summary})") def update_schedule_summary_label(self): if not self.schedules: self.schedule_summary_label.config(text="Расписание: —") return parts = [] for entry in self.schedules: time_str = entry.get("time", "—") sched_type = entry.get("type", "daily") if sched_type == "monthly": parts.append(f"{time_str} (ежемес., {entry.get('day', 1)})") elif sched_type == "weekly": days = entry.get("days", []) days_text = ",".join(days) if days else "еженед." parts.append(f"{time_str} ({days_text})") else: parts.append(f"{time_str} (ежедневно)") self.schedule_summary_label.config(text="Расписание: " + "; ".join(parts)) def debug_settings(self): """Отладочный метод для проверки загрузки настроек""" self.log_message("\n" + "=" * 50, "info") self.log_message("🔧 ОТЛАДКА", "info") self.log_message("=" * 50, "info") settings_path = self.get_settings_path() self.log_message(f"📁 Путь к настройкам: {settings_path}", "info") self.log_message(f"📁 Файл существует: {os.path.exists(settings_path)}", "info") if os.path.exists(settings_path): try: with open(settings_path, 'r', encoding='utf-8') as f: content = f.read() self.log_message(f"📄 Содержимое файла:", "info") for line in content.split('\n'): self.log_message(f" {line}", "info") except Exception as e: self.log_message(f"❌ Не удалось прочитать файл: {e}", "error") self.log_message(f"\n🖥️ Текущее состояние:", "info") self.log_message(f" Пар в интерфейсе: {len(self.copy_pairs)}", "info") for i, pair in enumerate(self.copy_pairs, 1): source = pair.get("source", "") dest = pair.get("dest", "") mode = "вся папка" if pair.get("full_copy", False) else "последний файл" self.log_message(f" Пара {i}:", "info") self.log_message(f" Откуда: '{source}'", "info") self.log_message(f" Куда: '{dest}'", "info") self.log_message(f" Режим: {mode}", "info") self.log_message("=" * 50, "info") def log_message(self, message, tag=None): """Добавляет сообщение в лог (вызывается из любого потока)""" # Используем queue для потокобезопасности self.queue.put({'type': 'log', 'message': message, 'tag': tag}) file_logger = getattr(self, "file_logger", None) if file_logger: try: level = logging.INFO if tag == "error": level = logging.ERROR elif tag == "warning": level = logging.WARNING file_logger.log(level, message) except Exception: pass def record_error_detail(self, message: str): self.last_error_detail = message def clear_log(self): """Очищает лог""" self.log_text.delete(1.0, tk.END) def process_queue(self): """Обрабатывает очередь сообщений из потоков""" try: while True: msg = self.queue.get_nowait() if msg['type'] == 'log': timestamp = datetime.now().strftime("%H:%M:%S") log_entry = f"[{timestamp}] {msg['message']}\n" self.log_text.insert(tk.END, log_entry, msg.get('tag')) self.log_text.see(tk.END) elif msg['type'] == 'ui': try: msg['func']() except Exception as e: self.log_message(f"❌ Ошибка UI: {e}", "error") except queue.Empty: pass finally: self.root.after(100, self.process_queue) def check_and_save(self): self.check_paths() self.save_settings() def create_tray_image(self): if Image is None: return None icon_path = get_resource_path(ICON_PATH) if os.path.exists(icon_path): with contextlib.suppress(Exception): return Image.open(icon_path) image = Image.new("RGB", (64, 64), color=(40, 40, 40)) draw = ImageDraw.Draw(image) draw.rectangle((8, 8, 56, 56), fill=(30, 144, 255)) draw.text((18, 20), "B", fill=(255, 255, 255)) return image def setup_tray_icon(self): if pystray is None: self.log_message("⚠️ Модуль pystray не установлен, трей недоступен", "warning") return if self.tray_icon is not None: return image = self.create_tray_image() if image is None: self.log_message("⚠️ Не удалось создать иконку для трея", "warning") return def on_show(_icon, _item): self.root.after(0, self.show_window) def on_copy(_icon, _item): self.root.after(0, self.start_manual_copy) def on_exit(_icon, _item): self.root.after(0, self.exit_app) menu = pystray.Menu( pystray.MenuItem("Открыть", on_show, default=True), pystray.MenuItem("Запустить копирование", on_copy), pystray.MenuItem("Выход", on_exit), ) self.tray_icon = pystray.Icon(APP_NAME, image, APP_NAME, menu) self.tray_thread = threading.Thread(target=self.tray_icon.run, daemon=True) self.tray_thread.start() def tray_notify(self, title: str, message: str): if self.tray_icon is None: return with contextlib.suppress(Exception): self.tray_icon.notify(message, title) def show_window(self): self.root.deiconify() self.root.lift() self.root.focus_force() def hide_to_tray(self): if not self.minimize_to_tray_enabled.get(): self.exit_app() return if pystray is None: self.root.iconify() self.log_message("⚠️ pystray не установлен, окно свернуто в панель задач", "warning") return self.setup_tray_icon() self.root.withdraw() self.log_message("🧰 Приложение свернуто в трей", "info") def exit_app(self): if self.scheduler_enabled.get(): self.stop_scheduler() if self.tray_icon is not None: with contextlib.suppress(Exception): self.tray_icon.stop() self.tray_icon = None self.root.destroy() def main(): mutex = ensure_single_instance() if mutex is None: ctypes.windll.user32.MessageBoxW(None, "Приложение уже запущено.", APP_NAME, 0x00000010) return root = tk.Tk() app = BackgroundFileCopyApp(root) # Обработка закрытия окна def on_closing(): app.hide_to_tray() root.protocol("WM_DELETE_WINDOW", on_closing) root.mainloop() if __name__ == "__main__": main()