import tkinter as tk from tkinter import ttk, filedialog, messagebox, scrolledtext import shutil import os from pathlib import Path import threading import queue from datetime import datetime, time import time as time_module import schedule import sys import json import tempfile from typing import List, Dict, Optional import traceback import hashlib import winreg import contextlib import logging import logging.handlers try: import pystray from PIL import Image, ImageDraw except Exception: pystray = None Image = None ImageDraw = None DEFAULT_EXTENSIONS = ('*.bak', '*.BAK', '*.backup', '*.sql') AUTOSTART_REG_PATH = r"Software\Microsoft\Windows\CurrentVersion\Run" AUTOSTART_REG_NAME = "BackupCopier" ICON_PATH = "icon.ico" APP_NAME = "BackupCopier" COPY_RETRIES = 2 COPY_RETRY_DELAY = 2 def find_latest_file_in_folder(folder_path: str, extensions=DEFAULT_EXTENSIONS) -> Optional[Path]: """Возвращает самый новый файл из папки по времени модификации.""" folder = Path(folder_path) if not folder.exists(): return None files: List[Path] = [] for ext in extensions: files.extend(folder.glob(ext)) if not files: return None return max(files, key=lambda f: f.stat().st_mtime) def should_copy_file(source: Path, target: Path) -> bool: """Определяет, нужно ли копировать файл.""" if not target.exists(): return True return source.stat().st_mtime > target.stat().st_mtime def compute_file_checksum(path: Path, chunk_size: int = 1024 * 1024) -> str: """Считает SHA-256 контрольную сумму файла.""" hasher = hashlib.sha256() with path.open("rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break hasher.update(chunk) return hasher.hexdigest() def verify_copy(source: Path, target: Path) -> bool: """Проверяет, что файл скопирован корректно, по контрольной сумме.""" return compute_file_checksum(source) == compute_file_checksum(target) class FileCopyScheduler: """Класс для управления расписанием копирования""" def __init__(self, app): self.app = app self.scheduled_jobs = [] self.running = False self.thread = None def start(self): """Запускает планировщик в отдельном потоке""" if not self.running: self.running = True self.thread = threading.Thread(target=self._run_scheduler, daemon=True) self.thread.start() self.app.log_message("🕒 Планировщик запущен", "info") def stop(self): """Останавливает планировщик""" self.running = False self.app.log_message("🕒 Планировщик остановлен", "info") def _run_scheduler(self): """Основной цикл планировщика""" while self.running: schedule.run_pending() time_module.sleep(1) def schedule_copy_job(self, job_id: str, time_str: str, pairs: List[tuple]): """Добавляет задание в планировщик""" # Очищаем предыдущие задания с таким же ID schedule.clear(job_id) # Парсим время try: hour, minute = map(int, time_str.split(':')) except: hour, minute = 3, 0 # По умолчанию 3:00 # Добавляем ежедневное задание schedule.every().day.at(f"{hour:02d}:{minute:02d}").do( self._execute_copy_job, pairs=pairs ).tag(job_id) self.app.log_message(f"📅 Запланировано копирование на {hour:02d}:{minute:02d} ежедневно", "info") def _execute_copy_job(self, pairs: List[tuple]): """Выполняет задание копирования""" self.app.log_message("⏰ Запуск запланированного копирования...", "info") # Запускаем копирование в отдельном потоке copy_thread = threading.Thread( target=self.app.copy_files_thread, args=(pairs, True), # True означает фоновый режим daemon=True ) copy_thread.start() class BackgroundFileCopyApp: def __init__(self, root): self.root = root self.root.title("Планировщик копирования бекапов") self.root.geometry("900x700") self.setup_window_icon() # Для работы с очередью сообщений из потоков self.queue = queue.Queue() # Список пар для копирования self.copy_pairs = [] # Планировщик self.scheduler = FileCopyScheduler(self) # Флаг для отслеживания состояния self.is_copying = False self.copy_lock = threading.Lock() # Автозапуск self.autostart_enabled = tk.BooleanVar(value=False) # Сворачивание в трей self.minimize_to_tray_enabled = tk.BooleanVar(value=True) self.tray_icon = None self.tray_thread = None # Режим проверки без копирования self.verify_only = tk.BooleanVar(value=False) # Последний успешный запуск self.last_success_time: Optional[str] = None # Хеши последних копий self.last_hashes: Dict[str, str] = {} # Логгер в файл self.file_logger = self.setup_file_logger() # Создаем интерфейс self.setup_ui() # Загружаем сохраненные настройки self.load_settings() # Проверяем очередь каждые 100ms self.process_queue() # Запускаем планировщик при старте, если есть сохраненные настройки if self.scheduler_enabled.get(): self.start_scheduler() def setup_ui(self): # Основной фрейм main_frame = ttk.Frame(self.root, padding="10") main_frame.pack(fill=tk.BOTH, expand=True) # Заголовок title_label = ttk.Label(main_frame, text="Планировщик копирования бекапов", font=("Arial", 14, "bold")) title_label.pack(pady=10) # Фрейм настроек расписания schedule_frame = ttk.LabelFrame(main_frame, text="Настройки расписания", padding="10") schedule_frame.pack(fill=tk.X, pady=10) # Время копирования time_frame = ttk.Frame(schedule_frame) time_frame.pack(fill=tk.X, pady=5) ttk.Label(time_frame, text="Время копирования:", width=15).pack(side=tk.LEFT) self.hour_var = tk.StringVar(value="03") self.minute_var = tk.StringVar(value="00") hours = [f"{h:02d}" for h in range(24)] minutes = [f"{m:02d}" for m in range(60)] ttk.Combobox(time_frame, textvariable=self.hour_var, values=hours, width=5, state="readonly").pack(side=tk.LEFT, padx=2) ttk.Label(time_frame, text=":").pack(side=tk.LEFT) ttk.Combobox(time_frame, textvariable=self.minute_var, values=minutes, width=5, state="readonly").pack(side=tk.LEFT, padx=2) ttk.Label(time_frame, text="(ежедневно)", font=("Arial", 9, "italic")).pack(side=tk.LEFT, padx=10) # Включение/выключение планировщика scheduler_ctrl_frame = ttk.Frame(schedule_frame) scheduler_ctrl_frame.pack(fill=tk.X, pady=5) self.scheduler_enabled = tk.BooleanVar(value=False) self.scheduler_check = ttk.Checkbutton(scheduler_ctrl_frame, text="Включить автоматическое копирование по расписанию", variable=self.scheduler_enabled, command=self.toggle_scheduler) self.scheduler_check.pack(side=tk.LEFT) self.scheduler_status = ttk.Label(scheduler_ctrl_frame, text="(остановлен)", font=("Arial", 9, "italic")) self.scheduler_status.pack(side=tk.LEFT, padx=10) self.next_run_label = ttk.Label(scheduler_ctrl_frame, text="(следующий запуск: —)", font=("Arial", 9, "italic")) self.next_run_label.pack(side=tk.LEFT, padx=10) self.last_success_label = ttk.Label(scheduler_ctrl_frame, text="(последний успех: —)", font=("Arial", 9, "italic")) self.last_success_label.pack(side=tk.LEFT, padx=10) # Автозапуск autostart_frame = ttk.Frame(schedule_frame) autostart_frame.pack(fill=tk.X, pady=5) self.autostart_check = ttk.Checkbutton( autostart_frame, text="Добавить программу в автозапуск Windows", variable=self.autostart_enabled, command=self.toggle_autostart ) self.autostart_check.pack(side=tk.LEFT) minimize_frame = ttk.Frame(schedule_frame) minimize_frame.pack(fill=tk.X, pady=5) self.minimize_check = ttk.Checkbutton( minimize_frame, text="Сворачивать в трей при закрытии", variable=self.minimize_to_tray_enabled ) self.minimize_check.pack(side=tk.LEFT) verify_frame = ttk.Frame(schedule_frame) verify_frame.pack(fill=tk.X, pady=5) self.verify_check = ttk.Checkbutton( verify_frame, text="Только проверка (без копирования)", variable=self.verify_only ) self.verify_check.pack(side=tk.LEFT) # Кнопки управления buttons_frame = ttk.Frame(schedule_frame) buttons_frame.pack(fill=tk.X, pady=5) ttk.Button(buttons_frame, text="✅ Проверить и сохранить", command=self.check_and_save).pack(side=tk.LEFT, padx=5) ttk.Button(buttons_frame, text="▶ Запустить копирование сейчас", command=self.start_manual_copy).pack(side=tk.LEFT, padx=5) # Фрейм для списка путей paths_frame = ttk.LabelFrame(main_frame, text="Пути для копирования", padding="10") paths_frame.pack(fill=tk.BOTH, expand=True, pady=10) # Заголовки колонок headers_frame = ttk.Frame(paths_frame) headers_frame.pack(fill=tk.X, pady=5) ttk.Label(headers_frame, text="Откуда копировать (папка с бекапами)", font=("Arial", 10, "bold")).pack(side=tk.LEFT, padx=(0, 20)) ttk.Label(headers_frame, text="Куда копировать (сетевая папка)", font=("Arial", 10, "bold")).pack(side=tk.LEFT, padx=(20, 0)) # Canvas для прокрутки списка canvas = tk.Canvas(paths_frame, borderwidth=0, highlightthickness=0) scrollbar = ttk.Scrollbar(paths_frame, orient="vertical", command=canvas.yview) self.scrollable_frame = ttk.Frame(canvas) self.scrollable_frame.bind( "", lambda e: canvas.configure(scrollregion=canvas.bbox("all")) ) canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) canvas.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Кнопки управления списком list_buttons_frame = ttk.Frame(paths_frame) list_buttons_frame.pack(fill=tk.X, pady=10) ttk.Button(list_buttons_frame, text="➕ Добавить пару папок", command=self.add_path_pair).pack(side=tk.LEFT, padx=5) ttk.Button(list_buttons_frame, text="❌ Удалить все", command=self.remove_all_pairs).pack(side=tk.LEFT, padx=5) ttk.Button(list_buttons_frame, text="🔍 Проверить пути", command=self.check_paths).pack(side=tk.LEFT, padx=5) # Лог операций log_frame = ttk.LabelFrame(main_frame, text="Лог операций", padding="5") log_frame.pack(fill=tk.BOTH, expand=True, pady=5) # Кнопки управления логом log_buttons = ttk.Frame(log_frame) log_buttons.pack(fill=tk.X, pady=2) ttk.Button(log_buttons, text="📋 Очистить лог", command=self.clear_log).pack(side=tk.RIGHT, padx=2) ttk.Button(log_buttons, text="🔍 Отладка", command=self.debug_settings).pack(side=tk.RIGHT, padx=2) # Текст лога self.log_text = scrolledtext.ScrolledText(log_frame, height=8, wrap=tk.WORD) self.log_text.pack(fill=tk.BOTH, expand=True) # Настройка тегов для цветного лога self.log_text.tag_configure("success", foreground="green") self.log_text.tag_configure("error", foreground="red") self.log_text.tag_configure("warning", foreground="orange") self.log_text.tag_configure("info", foreground="blue") # Статус бар self.status_bar = ttk.Label(self.root, text="Готов к работе", relief=tk.SUNKEN, anchor=tk.W) self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) self.progress = ttk.Progressbar(self.root, mode="indeterminate") self.progress.pack(side=tk.BOTTOM, fill=tk.X) def setup_window_icon(self): icon_path = os.path.abspath(ICON_PATH) if os.path.exists(icon_path): with contextlib.suppress(Exception): self.root.iconbitmap(icon_path) def setup_file_logger(self): settings_path = self.get_settings_path() log_dir = os.path.dirname(settings_path) os.makedirs(log_dir, exist_ok=True) log_path = os.path.join(log_dir, "backup_copier.log") logger = logging.getLogger("backup_copier") logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.handlers.RotatingFileHandler( log_path, maxBytes=2 * 1024 * 1024, backupCount=3, encoding="utf-8" ) formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) return logger def get_settings_path(self): """Возвращает путь для сохранения настроек""" if getattr(sys, 'frozen', False): # Для EXE используем папку с EXE файлом base_path = os.path.dirname(sys.executable) else: # Для скрипта используем папку со скриптом base_path = os.path.dirname(os.path.abspath(__file__)) # Пробуем создать папку, если есть права settings_dir = base_path try: # Проверяем, можем ли мы писать в эту папку test_file = os.path.join(settings_dir, 'test_write.tmp') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) self.log_message(f"📁 Используется папка: {settings_dir}", "info") except: # Если не можем писать, используем AppData settings_dir = os.path.join(os.environ.get('APPDATA', os.path.expanduser('~')), 'BackupCopier') os.makedirs(settings_dir, exist_ok=True) self.log_message(f"📁 Используется папка AppData: {settings_dir}", "info") return os.path.join(settings_dir, 'backup_copier_settings.json') def create_placeholder(self, entry, text): """Добавляет подсказку в поле ввода.""" placeholder_color = "gray" normal_color = "black" def on_focus_in(_): if entry.get() == text and entry.cget("foreground") == placeholder_color: entry.delete(0, tk.END) entry.config(foreground=normal_color) def on_focus_out(_): if not entry.get(): entry.insert(0, text) entry.config(foreground=placeholder_color) entry.bind("", on_focus_in) entry.bind("", on_focus_out) on_focus_out(None) def get_autostart_command(self) -> str: """Возвращает команду для автозапуска.""" if getattr(sys, 'frozen', False): return f"\"{sys.executable}\"" script_path = os.path.abspath(__file__) return f"\"{sys.executable}\" \"{script_path}\"" def is_autostart_enabled(self) -> bool: """Проверяет, включен ли автозапуск в реестре.""" try: with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_REG_PATH, 0, winreg.KEY_READ) as key: value, _ = winreg.QueryValueEx(key, AUTOSTART_REG_NAME) return bool(value) except FileNotFoundError: return False except OSError: return False def set_autostart_enabled(self, enabled: bool) -> None: """Включает/выключает автозапуск в реестре.""" try: with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_REG_PATH, 0, winreg.KEY_SET_VALUE) as key: if enabled: winreg.SetValueEx(key, AUTOSTART_REG_NAME, 0, winreg.REG_SZ, self.get_autostart_command()) self.log_message("✅ Автозапуск включен", "success") else: try: winreg.DeleteValue(key, AUTOSTART_REG_NAME) except FileNotFoundError: pass self.log_message("⏭️ Автозапуск выключен", "info") except Exception as e: self.log_message(f"❌ Ошибка автозапуска: {e}", "error") def toggle_autostart(self): """Обработчик галочки автозапуска.""" self.set_autostart_enabled(self.autostart_enabled.get()) def load_settings(self): """Загружает настройки из файла и отображает их""" try: settings_path = self.get_settings_path() self.log_message(f"🔍 Загрузка настроек из: {settings_path}", "info") if os.path.exists(settings_path): with open(settings_path, 'r', encoding='utf-8') as f: settings = json.load(f) # Загружаем время self.hour_var.set(settings.get('hour', '03')) self.minute_var.set(settings.get('minute', '00')) self.scheduler_enabled.set(settings.get('enabled', False)) desired_autostart = settings.get('autostart_enabled') self.minimize_to_tray_enabled.set(settings.get('minimize_to_tray', True)) # Загружаем пары путей pairs_data = settings.get('pairs', []) self.log_message(f"📦 Найдено {len(pairs_data)} пар путей в файле", "info") # Очищаем текущие пары self.remove_all_pairs(silent=True) # Добавляем загруженные пары if pairs_data: for item in pairs_data: if isinstance(item, dict): source = item.get('source', '') dest = item.get('dest', '') full_copy = bool(item.get('full_copy', False)) else: source, dest = item full_copy = False self.add_path_pair(source, dest) self.copy_pairs[-1]['full_copy'].set(full_copy) self.log_message(f"✅ Загружено {len(pairs_data)} пар путей", "success") else: # Если нет сохраненных пар, добавляем одну пустую self.add_path_pair() self.log_message("➕ Добавлена пустая пара для ввода", "info") # Настройки автозапуска actual_autostart = self.is_autostart_enabled() if desired_autostart is None: self.autostart_enabled.set(actual_autostart) else: desired_autostart = bool(desired_autostart) self.autostart_enabled.set(desired_autostart) if desired_autostart != actual_autostart: self.set_autostart_enabled(desired_autostart) self.verify_only.set(settings.get('verify_only', False)) self.last_success_time = settings.get('last_success_time') self.last_hashes = settings.get('last_hashes', {}) self.update_last_success_label() self.log_message(f"📂 Настройки загружены из {settings_path}", "info") else: # Если файла нет, добавляем пустую пару self.log_message("🆕 Файл настроек не найден, создаем новую конфигурацию", "info") self.add_path_pair() self.autostart_enabled.set(self.is_autostart_enabled()) self.verify_only.set(False) self.last_success_time = None self.last_hashes = {} self.update_last_success_label() except Exception as e: self.log_message(f"❌ Ошибка при загрузке настроек: {e}", "error") import traceback traceback.print_exc() # В случае ошибки добавляем пустую пару self.add_path_pair() self.autostart_enabled.set(self.is_autostart_enabled()) self.verify_only.set(False) self.last_success_time = None self.last_hashes = {} self.update_last_success_label() def save_settings(self): """Сохраняет настройки в файл""" try: # Собираем данные из полей ввода pairs_data = [] for pair in self.copy_pairs: source = pair['source'].get().strip() dest = pair['dest'].get().strip() if source.startswith("Например: "): source = "" if dest.startswith("Например: "): dest = "" if source or dest: # Сохраняем даже если одно поле пустое pairs_data.append({ "source": source, "dest": dest, "full_copy": pair['full_copy'].get() }) settings = { 'hour': self.hour_var.get(), 'minute': self.minute_var.get(), 'enabled': self.scheduler_enabled.get(), 'autostart_enabled': self.autostart_enabled.get(), 'minimize_to_tray': self.minimize_to_tray_enabled.get(), 'verify_only': self.verify_only.get(), 'last_success_time': self.last_success_time, 'last_hashes': self.last_hashes, 'pairs': pairs_data } settings_path = self.get_settings_path() # Сохраняем с отступами для читаемости with open(settings_path, 'w', encoding='utf-8') as f: json.dump(settings, f, ensure_ascii=False, indent=2) self.log_message(f"💾 Настройки сохранены ({len(pairs_data)} пар путей)", "success") self.log_message(f"📁 Файл: {settings_path}", "info") messagebox.showinfo("Успех", f"Настройки сохранены!\n\nФайл: {settings_path}") if self.scheduler_enabled.get(): # Перепланируем, чтобы учесть новые пути/время self.start_scheduler() except Exception as e: self.log_message(f"❌ Ошибка при сохранении настроек: {e}", "error") messagebox.showerror("Ошибка", f"Не удалось сохранить настройки:\n{e}") def add_path_pair(self, source="", dest=""): """Добавляет новую пару полей для ввода путей""" pair_frame = ttk.Frame(self.scrollable_frame) pair_frame.pack(fill=tk.X, pady=5) # Поле "Откуда" source_frame = ttk.Frame(pair_frame) source_frame.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5)) source_entry = ttk.Entry(source_frame) source_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) if source: source_entry.insert(0, source) else: self.create_placeholder(source_entry, "Например: C:\\Backups") ttk.Button(source_frame, text="📁", width=3, command=lambda: self.browse_folder(source_entry)).pack(side=tk.RIGHT, padx=2) # Поле "Куда" dest_frame = ttk.Frame(pair_frame) dest_frame.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(5, 0)) dest_entry = ttk.Entry(dest_frame) dest_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) if dest: dest_entry.insert(0, dest) else: self.create_placeholder(dest_entry, "Например: D:\\BackupArchive") ttk.Button(dest_frame, text="📁", width=3, command=lambda: self.browse_folder(dest_entry)).pack(side=tk.RIGHT, padx=2) # Опции пары options_frame = ttk.Frame(pair_frame) options_frame.pack(side=tk.LEFT, padx=5) full_copy_var = tk.BooleanVar(value=False) full_copy_check = ttk.Checkbutton(options_frame, text="Вся папка", variable=full_copy_var) full_copy_check.pack(side=tk.LEFT) ttk.Button(options_frame, text="📂", width=3, command=lambda: self.open_destination(dest_entry.get())).pack(side=tk.LEFT, padx=2) # Кнопка удаления ttk.Button(pair_frame, text="✖", width=3, command=lambda: self.remove_path_pair(pair_frame)).pack(side=tk.RIGHT, padx=5) # Сохраняем ссылки на entry self.copy_pairs.append({ 'frame': pair_frame, 'source': source_entry, 'dest': dest_entry, 'full_copy': full_copy_var }) # Прокручиваем к новому элементу self.scrollable_frame.update_idletasks() canvas = self.scrollable_frame.master canvas.yview_moveto(1.0) def remove_path_pair(self, frame): """Удаляет пару полей""" for pair in self.copy_pairs: if pair['frame'] == frame: self.copy_pairs.remove(pair) break frame.destroy() # Если не осталось пар, добавляем пустую if not self.copy_pairs: self.add_path_pair() def remove_all_pairs(self, silent=False): """Удаляет все пары""" if not silent: if not messagebox.askyesno("Подтверждение", "Удалить все пути?"): return for pair in self.copy_pairs[:]: pair['frame'].destroy() self.copy_pairs.clear() def browse_folder(self, entry): """Открывает диалог выбора папки""" folder = filedialog.askdirectory(title="Выберите папку") if folder: entry.delete(0, tk.END) entry.insert(0, folder) def open_destination(self, path: str): if not path or path.startswith("Например: "): return if os.path.exists(path): with contextlib.suppress(Exception): os.startfile(path) def get_valid_pairs(self) -> List[tuple]: """Возвращает список валидных пар папок""" valid_pairs = [] for pair in self.copy_pairs: source = pair['source'].get().strip() dest = pair['dest'].get().strip() if source.startswith("Например: "): source = "" if dest.startswith("Например: "): dest = "" if source and dest: valid_pairs.append((source, dest, pair['full_copy'].get())) return valid_pairs def check_paths(self): """Проверяет доступность всех путей""" self.log_message("🔍 Проверка путей:", "info") valid_pairs = self.get_valid_pairs() if not valid_pairs: self.log_message("❌ Нет заполненных пар путей", "error") return for i, (source, dest, full_copy) in enumerate(valid_pairs, 1): self.log_message(f"\nПара {i}:", "info") mode_text = "вся папка" if full_copy else "последний файл" self.log_message(f" Режим: {mode_text}", "info") # Проверяем исходную папку if os.path.exists(source): self.log_message(f" ✅ Исходная папка: {source}", "success") # Считаем файлы .bak (без дублей из-за регистра) bak_files = [p for p in Path(source).iterdir() if p.is_file() and p.suffix.lower() == '.bak'] self.log_message(f" Найдено .bak файлов: {len(bak_files)}", "info") else: self.log_message(f" ❌ Исходная папка НЕ существует: {source}", "error") # Проверяем целевую папку if os.path.exists(dest): self.log_message(f" ✅ Целевая папка: {dest}", "success") # Проверяем права на запись test_file = os.path.join(dest, 'test_write.tmp') try: with open(test_file, 'w') as f: f.write('test') os.remove(test_file) self.log_message(f" Права на запись: есть", "success") except: self.log_message(f" ❌ Права на запись: нет", "error") else: self.log_message(f" ❌ Целевая папка НЕ существует: {dest}", "error") def find_latest_file(self, folder_path: str) -> Optional[Path]: """Находит самый последний файл в папке""" try: latest_file = find_latest_file_in_folder(folder_path) if not latest_file: if not Path(folder_path).exists(): self.log_message(f"❌ Папка не существует: {folder_path}", "error") else: self.log_message(f"⚠️ Не найдено .bak файлов в {folder_path}", "warning") return None file_time = datetime.fromtimestamp(latest_file.stat().st_mtime) self.log_message( f"📄 Последний файл: {latest_file.name} ({file_time.strftime('%Y-%m-%d %H:%M:%S')})", "info" ) return latest_file except Exception as e: self.log_message(f"❌ Ошибка при поиске файлов в {folder_path}: {e}", "error") return None def run_on_ui(self, func): """Планирует выполнение функции в UI-потоке.""" self.queue.put({'type': 'ui', 'func': func}) def check_previous_hash(self, target_file: Path): key = str(target_file) if key in self.last_hashes: try: current_hash = compute_file_checksum(target_file) if current_hash != self.last_hashes[key]: self.log_message(f"⚠️ Несовпадение хеша прошлой копии: {target_file.name}", "warning") except Exception as e: self.log_message(f"⚠️ Не удалось проверить прошлую копию: {e}", "warning") def copy_file_with_retries(self, source: Path, target: Path) -> bool: for attempt in range(COPY_RETRIES + 1): try: shutil.copy2(source, target) return True except Exception as e: if attempt >= COPY_RETRIES: self.log_message(f"❌ Ошибка при копировании {source.name}: {e}", "error") return False time_module.sleep(COPY_RETRY_DELAY) return False def verify_only_mode(self, source: Path, target: Path) -> bool: if not target.exists(): self.log_message(f"❌ Нет целевого файла для проверки: {target.name}", "error") return False if verify_copy(source, target): self.log_message(f"✅ Проверка OK: {target.name}", "success") return True self.log_message(f"❌ Проверка не пройдена: {target.name}", "error") return False def copy_full_folder(self, source: Path, dest: Path) -> tuple: copied = 0 skipped = 0 errors = 0 for root, _dirs, files in os.walk(source): rel = os.path.relpath(root, source) dest_root = dest / rel if rel != "." else dest dest_root.mkdir(parents=True, exist_ok=True) for fname in files: src_file = Path(root) / fname dst_file = dest_root / fname try: self.check_previous_hash(dst_file) if dst_file.exists() else None if self.verify_only.get(): if self.verify_only_mode(src_file, dst_file): skipped += 1 else: errors += 1 continue if should_copy_file(src_file, dst_file): if self.copy_file_with_retries(src_file, dst_file) and verify_copy(src_file, dst_file): copied += 1 self.last_hashes[str(dst_file)] = compute_file_checksum(dst_file) else: errors += 1 else: skipped += 1 except Exception as e: errors += 1 self.log_message(f"❌ Ошибка при копировании {src_file.name}: {e}", "error") return copied, skipped, errors def copy_files_thread(self, pairs: List[tuple], background: bool = False): """Поток для копирования последних файлов""" if not self.copy_lock.acquire(blocking=False): self.log_message("⚠️ Копирование уже выполняется", "warning") return self.is_copying = True self.root.after(0, lambda: self.status_bar.config(text="Идет копирование...")) self.root.after(0, lambda: self.progress.start(10)) try: copied_files = 0 skipped_files = 0 error_files = 0 if not background: self.log_message("\n" + "=" * 50, "info") self.log_message("🚀 Начало копирования последних файлов", "info") for source, dest, full_copy in pairs: self.log_message(f"\n📁 Обработка папки: {source}", "info") src_path = Path(source) dest_path = Path(dest) if full_copy: if not src_path.exists(): self.log_message(f"❌ Папка не существует: {source}", "error") error_files += 1 continue try: dest_path.mkdir(parents=True, exist_ok=True) except Exception as e: self.log_message(f"❌ Не могу создать папку {dest}: {e}", "error") error_files += 1 continue c, s, e = self.copy_full_folder(src_path, dest_path) copied_files += c skipped_files += s error_files += e continue # Находим последний файл latest_file = self.find_latest_file(source) if latest_file: # Создаем целевую папку, если её нет try: dest_path.mkdir(parents=True, exist_ok=True) except Exception as e: self.log_message(f"❌ Не могу создать папку {dest}: {e}", "error") error_files += 1 continue target_file = dest_path / latest_file.name try: if target_file.exists(): self.check_previous_hash(target_file) if self.verify_only.get(): if self.verify_only_mode(latest_file, target_file): skipped_files += 1 else: error_files += 1 continue target_existed = target_file.exists() if should_copy_file(latest_file, target_file): if self.copy_file_with_retries(latest_file, target_file) and verify_copy(latest_file, target_file): copied_files += 1 self.last_hashes[str(target_file)] = compute_file_checksum(target_file) if target_existed: self.log_message(f"✅ Обновлен: {latest_file.name} (новее)", "success") else: self.log_message(f"✅ Скопирован: {latest_file.name}", "success") else: error_files += 1 self.log_message(f"❌ Контрольная сумма не совпала: {latest_file.name}", "error") else: skipped_files += 1 self.log_message(f"⏭️ Пропущен: {latest_file.name} (уже актуален)", "warning") except Exception as e: error_files += 1 self.log_message(f"❌ Ошибка при копировании {latest_file.name}: {e}", "error") else: error_files += 1 # Итог self.log_message("\n" + "=" * 50, "info") self.log_message("📊 ИТОГ:", "info") self.log_message(f" ✅ Скопировано/обновлено: {copied_files}", "success") self.log_message(f" ⏭️ Пропущено: {skipped_files}", "warning") self.log_message(f" ❌ Ошибок: {error_files}", "error") if background: self.log_message("⏰ Копирование по расписанию завершено", "info") else: self.run_on_ui(lambda: messagebox.showinfo( "Готово", f"Копирование завершено!\n\n✅ Скопировано: {copied_files}\n⏭️ Пропущено: {skipped_files}\n❌ Ошибок: {error_files}" )) if error_files == 0: self.last_success_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') self.run_on_ui(self.update_last_success_label) self.tray_notify("Копирование завершено", f"Успешно: {copied_files}, Пропущено: {skipped_files}") else: self.tray_notify("Копирование с ошибками", f"Ошибок: {error_files}") except Exception as e: self.log_message(f"❌ Критическая ошибка: {e}\n{traceback.format_exc()}", "error") self.run_on_ui(lambda: messagebox.showerror("Ошибка", f"Произошла ошибка:\n{e}")) finally: self.is_copying = False self.copy_lock.release() self.root.after(0, lambda: self.status_bar.config(text="Готов к работе")) self.root.after(0, lambda: self.progress.stop()) def start_manual_copy(self): """Запускает ручное копирование""" valid_pairs = self.get_valid_pairs() if not valid_pairs: messagebox.showwarning("Предупреждение", "Заполните хотя бы одну пару папок!") return # Запускаем копирование в отдельном потоке copy_thread = threading.Thread( target=self.copy_files_thread, args=(valid_pairs, False), daemon=True ) copy_thread.start() def toggle_scheduler(self): """Включает/выключает планировщик""" if self.scheduler_enabled.get(): self.start_scheduler() else: self.stop_scheduler() def start_scheduler(self): """Запускает планировщик""" valid_pairs = self.get_valid_pairs() if not valid_pairs: messagebox.showwarning("Предупреждение", "Нет настроенных путей! Планировщик не запущен.") self.scheduler_enabled.set(False) return time_str = f"{self.hour_var.get()}:{self.minute_var.get()}" self.scheduler.schedule_copy_job("backup_job", time_str, valid_pairs) self.scheduler.start() self.scheduler_status.config(text=f"(активен, копирование в {time_str})", foreground="green") self.update_next_run_label() self.update_last_success_label() self.log_message(f"🕒 Планировщик запущен. Копирование ежедневно в {time_str}", "info") def stop_scheduler(self): """Останавливает планировщик""" self.scheduler.stop() self.scheduler_status.config(text="(остановлен)", foreground="red") self.next_run_label.config(text="(следующий запуск: —)") self.log_message("🕒 Планировщик остановлен", "info") def update_next_run_label(self): jobs = schedule.get_jobs("backup_job") if jobs: next_run = jobs[0].next_run if next_run: self.next_run_label.config(text=f"(следующий запуск: {next_run.strftime('%Y-%m-%d %H:%M')})") return self.next_run_label.config(text="(следующий запуск: —)") def update_last_success_label(self): if self.last_success_time: self.last_success_label.config(text=f"(последний успех: {self.last_success_time})") else: self.last_success_label.config(text="(последний успех: —)") def debug_settings(self): """Отладочный метод для проверки загрузки настроек""" self.log_message("\n" + "=" * 50, "info") self.log_message("🔧 ОТЛАДКА", "info") self.log_message("=" * 50, "info") settings_path = self.get_settings_path() self.log_message(f"📁 Путь к настройкам: {settings_path}", "info") self.log_message(f"📁 Файл существует: {os.path.exists(settings_path)}", "info") if os.path.exists(settings_path): try: with open(settings_path, 'r', encoding='utf-8') as f: content = f.read() self.log_message(f"📄 Содержимое файла:", "info") for line in content.split('\n'): self.log_message(f" {line}", "info") except Exception as e: self.log_message(f"❌ Не удалось прочитать файл: {e}", "error") self.log_message(f"\n🖥️ Текущее состояние:", "info") self.log_message(f" Пар в интерфейсе: {len(self.copy_pairs)}", "info") for i, pair in enumerate(self.copy_pairs, 1): source = pair['source'].get() dest = pair['dest'].get() self.log_message(f" Пара {i}:", "info") self.log_message(f" Откуда: '{source}'", "info") self.log_message(f" Куда: '{dest}'", "info") self.log_message("=" * 50, "info") def log_message(self, message, tag=None): """Добавляет сообщение в лог (вызывается из любого потока)""" # Используем queue для потокобезопасности self.queue.put({'type': 'log', 'message': message, 'tag': tag}) file_logger = getattr(self, "file_logger", None) if file_logger: try: level = logging.INFO if tag == "error": level = logging.ERROR elif tag == "warning": level = logging.WARNING file_logger.log(level, message) except Exception: pass def clear_log(self): """Очищает лог""" self.log_text.delete(1.0, tk.END) def process_queue(self): """Обрабатывает очередь сообщений из потоков""" try: while True: msg = self.queue.get_nowait() if msg['type'] == 'log': timestamp = datetime.now().strftime("%H:%M:%S") log_entry = f"[{timestamp}] {msg['message']}\n" self.log_text.insert(tk.END, log_entry, msg.get('tag')) self.log_text.see(tk.END) elif msg['type'] == 'ui': try: msg['func']() except Exception as e: self.log_message(f"❌ Ошибка UI: {e}", "error") except queue.Empty: pass finally: self.root.after(100, self.process_queue) def check_and_save(self): self.check_paths() self.save_settings() def create_tray_image(self): if Image is None: return None icon_path = os.path.abspath(ICON_PATH) if os.path.exists(icon_path): with contextlib.suppress(Exception): return Image.open(icon_path) image = Image.new("RGB", (64, 64), color=(40, 40, 40)) draw = ImageDraw.Draw(image) draw.rectangle((8, 8, 56, 56), fill=(30, 144, 255)) draw.text((18, 20), "B", fill=(255, 255, 255)) return image def setup_tray_icon(self): if pystray is None: self.log_message("⚠️ Модуль pystray не установлен, трей недоступен", "warning") return if self.tray_icon is not None: return image = self.create_tray_image() if image is None: self.log_message("⚠️ Не удалось создать иконку для трея", "warning") return def on_show(_icon, _item): self.root.after(0, self.show_window) def on_copy(_icon, _item): self.root.after(0, self.start_manual_copy) def on_exit(_icon, _item): self.root.after(0, self.exit_app) menu = pystray.Menu( pystray.MenuItem("Открыть", on_show, default=True), pystray.MenuItem("Запустить копирование", on_copy), pystray.MenuItem("Выход", on_exit), ) self.tray_icon = pystray.Icon(APP_NAME, image, APP_NAME, menu) self.tray_thread = threading.Thread(target=self.tray_icon.run, daemon=True) self.tray_thread.start() def tray_notify(self, title: str, message: str): if self.tray_icon is None: return with contextlib.suppress(Exception): self.tray_icon.notify(message, title) def show_window(self): self.root.deiconify() self.root.lift() self.root.focus_force() def hide_to_tray(self): if not self.minimize_to_tray_enabled.get(): self.exit_app() return if pystray is None: self.root.iconify() self.log_message("⚠️ pystray не установлен, окно свернуто в панель задач", "warning") return self.setup_tray_icon() self.root.withdraw() self.log_message("🧰 Приложение свернуто в трей", "info") def exit_app(self): if self.scheduler_enabled.get(): self.stop_scheduler() if self.tray_icon is not None: with contextlib.suppress(Exception): self.tray_icon.stop() self.tray_icon = None self.root.destroy() def main(): root = tk.Tk() app = BackgroundFileCopyApp(root) # Обработка закрытия окна def on_closing(): app.hide_to_tray() root.protocol("WM_DELETE_WINDOW", on_closing) root.mainloop() if __name__ == "__main__": main()