commit 36708967f741a4ea3b69d4a61a6996198c405f45 Author: benya Date: Thu Feb 19 19:06:21 2026 +0300 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fbc37c1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/BackupCopier.spec +/dist/ +/build/ +/.venv/ diff --git a/build.py b/build.py new file mode 100644 index 0000000..e215b20 --- /dev/null +++ b/build.py @@ -0,0 +1,77 @@ +import os +import shutil +import subprocess +import sys +import zipfile + +APP_NAME = "BackupCopier" +VERSION = "1.0.0" +MAIN_SCRIPT = "main.py" +ICON_PATH = "icon.ico" +EXE_PATH = os.path.join("dist", f"{APP_NAME}.exe") +ZIP_PATH = os.path.join("dist", f"{APP_NAME}-{VERSION}.zip") + + +def ensure_build_deps() -> None: + try: + __import__("PyInstaller") + except Exception: + print("[ERROR] Missing dependency: PyInstaller") + print(f"[ERROR] Install in this interpreter: {sys.executable} -m pip install pyinstaller") + sys.exit(1) + + +def run_build() -> None: + icon_abs = os.path.abspath(ICON_PATH) + has_icon = os.path.exists(icon_abs) + + cmd = [ + sys.executable, + "-m", + "PyInstaller", + "--noconfirm", + "--clean", + "--onefile", + "--windowed", + f"--name={APP_NAME}", + "--hidden-import=schedule", + "--collect-submodules=schedule", + "--hidden-import=pystray", + "--hidden-import=PIL", + "--collect-submodules=pystray", + "--collect-submodules=PIL", + f"--icon={icon_abs}" if has_icon else "", + f"--add-data={icon_abs}{os.pathsep}." if has_icon else "", + MAIN_SCRIPT, + ] + cmd = [x for x in cmd if x] + subprocess.check_call(cmd) + + +def write_version_file() -> str: + path = os.path.join("dist", "version.txt") + with open(path, "w", encoding="utf-8") as f: + f.write(VERSION + "\n") + return path + + +def create_zip(version_file: str) -> None: + with zipfile.ZipFile(ZIP_PATH, "w", zipfile.ZIP_DEFLATED) as z: + z.write(EXE_PATH, arcname=os.path.basename(EXE_PATH)) + z.write(version_file, arcname="version.txt") + if os.path.exists(ICON_PATH): + z.write(ICON_PATH, arcname="icon.ico") + + +if __name__ == "__main__": + ensure_build_deps() + + for folder in ("build", "dist"): + if os.path.exists(folder): + shutil.rmtree(folder) + + run_build() + vfile = write_version_file() + create_zip(vfile) + print(f"[OK] Built: {EXE_PATH}") + print(f"[OK] Archive: {ZIP_PATH}") diff --git a/icon.ico b/icon.ico new file mode 100644 index 0000000..57081bf Binary files /dev/null and b/icon.ico differ diff --git a/main.py b/main.py new file mode 100644 index 0000000..8346dc2 --- /dev/null +++ b/main.py @@ -0,0 +1,1185 @@ +import tkinter as tk +from tkinter import ttk, filedialog, messagebox, scrolledtext +import shutil +import os +from pathlib import Path +import threading +import queue +from datetime import datetime, time +import time as time_module +import schedule +import sys +import json +import tempfile +from typing import List, Dict, Optional +import traceback +import hashlib +import winreg +import contextlib +import logging +import logging.handlers +try: + import pystray + from PIL import Image, ImageDraw +except Exception: + pystray = None + Image = None + ImageDraw = None + + +DEFAULT_EXTENSIONS = ('*.bak', '*.BAK', '*.backup', '*.sql') +AUTOSTART_REG_PATH = r"Software\Microsoft\Windows\CurrentVersion\Run" +AUTOSTART_REG_NAME = "BackupCopier" +ICON_PATH = "icon.ico" +APP_NAME = "BackupCopier" +COPY_RETRIES = 2 +COPY_RETRY_DELAY = 2 + + +def find_latest_file_in_folder(folder_path: str, extensions=DEFAULT_EXTENSIONS) -> Optional[Path]: + """Возвращает самый новый файл из папки по времени модификации.""" + folder = Path(folder_path) + if not folder.exists(): + return None + + files: List[Path] = [] + for ext in extensions: + files.extend(folder.glob(ext)) + + if not files: + return None + + return max(files, key=lambda f: f.stat().st_mtime) + + +def should_copy_file(source: Path, target: Path) -> bool: + """Определяет, нужно ли копировать файл.""" + if not target.exists(): + return True + + return source.stat().st_mtime > target.stat().st_mtime + + +def compute_file_checksum(path: Path, chunk_size: int = 1024 * 1024) -> str: + """Считает SHA-256 контрольную сумму файла.""" + hasher = hashlib.sha256() + with path.open("rb") as f: + while True: + chunk = f.read(chunk_size) + if not chunk: + break + hasher.update(chunk) + return hasher.hexdigest() + + +def verify_copy(source: Path, target: Path) -> bool: + """Проверяет, что файл скопирован корректно, по контрольной сумме.""" + return compute_file_checksum(source) == compute_file_checksum(target) + + +class FileCopyScheduler: + """Класс для управления расписанием копирования""" + + def __init__(self, app): + self.app = app + self.scheduled_jobs = [] + self.running = False + self.thread = None + + def start(self): + """Запускает планировщик в отдельном потоке""" + if not self.running: + self.running = True + self.thread = threading.Thread(target=self._run_scheduler, daemon=True) + self.thread.start() + self.app.log_message("🕒 Планировщик запущен", "info") + + def stop(self): + """Останавливает планировщик""" + self.running = False + self.app.log_message("🕒 Планировщик остановлен", "info") + + def _run_scheduler(self): + """Основной цикл планировщика""" + while self.running: + schedule.run_pending() + time_module.sleep(1) + + def schedule_copy_job(self, job_id: str, time_str: str, pairs: List[tuple]): + """Добавляет задание в планировщик""" + # Очищаем предыдущие задания с таким же ID + schedule.clear(job_id) + + # Парсим время + try: + hour, minute = map(int, time_str.split(':')) + except: + hour, minute = 3, 0 # По умолчанию 3:00 + + # Добавляем ежедневное задание + schedule.every().day.at(f"{hour:02d}:{minute:02d}").do( + self._execute_copy_job, pairs=pairs + ).tag(job_id) + + self.app.log_message(f"📅 Запланировано копирование на {hour:02d}:{minute:02d} ежедневно", "info") + + def _execute_copy_job(self, pairs: List[tuple]): + """Выполняет задание копирования""" + self.app.log_message("⏰ Запуск запланированного копирования...", "info") + + # Запускаем копирование в отдельном потоке + copy_thread = threading.Thread( + target=self.app.copy_files_thread, + args=(pairs, True), # True означает фоновый режим + daemon=True + ) + copy_thread.start() + + +class BackgroundFileCopyApp: + def __init__(self, root): + self.root = root + self.root.title("Планировщик копирования бекапов") + self.root.geometry("900x700") + self.setup_window_icon() + + # Для работы с очередью сообщений из потоков + self.queue = queue.Queue() + + # Список пар для копирования + self.copy_pairs = [] + + # Планировщик + self.scheduler = FileCopyScheduler(self) + + # Флаг для отслеживания состояния + self.is_copying = False + self.copy_lock = threading.Lock() + + # Автозапуск + self.autostart_enabled = tk.BooleanVar(value=False) + + # Сворачивание в трей + self.minimize_to_tray_enabled = tk.BooleanVar(value=True) + self.tray_icon = None + self.tray_thread = None + + # Режим проверки без копирования + self.verify_only = tk.BooleanVar(value=False) + + # Последний успешный запуск + self.last_success_time: Optional[str] = None + + # Хеши последних копий + self.last_hashes: Dict[str, str] = {} + + # Логгер в файл + self.file_logger = self.setup_file_logger() + + # Создаем интерфейс + self.setup_ui() + + # Загружаем сохраненные настройки + self.load_settings() + + # Проверяем очередь каждые 100ms + self.process_queue() + + # Запускаем планировщик при старте, если есть сохраненные настройки + if self.scheduler_enabled.get(): + self.start_scheduler() + + def setup_ui(self): + # Основной фрейм + main_frame = ttk.Frame(self.root, padding="10") + main_frame.pack(fill=tk.BOTH, expand=True) + + # Заголовок + title_label = ttk.Label(main_frame, text="Планировщик копирования бекапов", + font=("Arial", 14, "bold")) + title_label.pack(pady=10) + + # Фрейм настроек расписания + schedule_frame = ttk.LabelFrame(main_frame, text="Настройки расписания", padding="10") + schedule_frame.pack(fill=tk.X, pady=10) + + # Время копирования + time_frame = ttk.Frame(schedule_frame) + time_frame.pack(fill=tk.X, pady=5) + + ttk.Label(time_frame, text="Время копирования:", width=15).pack(side=tk.LEFT) + + self.hour_var = tk.StringVar(value="03") + self.minute_var = tk.StringVar(value="00") + + hours = [f"{h:02d}" for h in range(24)] + minutes = [f"{m:02d}" for m in range(60)] + + ttk.Combobox(time_frame, textvariable=self.hour_var, values=hours, + width=5, state="readonly").pack(side=tk.LEFT, padx=2) + ttk.Label(time_frame, text=":").pack(side=tk.LEFT) + ttk.Combobox(time_frame, textvariable=self.minute_var, values=minutes, + width=5, state="readonly").pack(side=tk.LEFT, padx=2) + + ttk.Label(time_frame, text="(ежедневно)", font=("Arial", 9, "italic")).pack(side=tk.LEFT, padx=10) + + # Включение/выключение планировщика + scheduler_ctrl_frame = ttk.Frame(schedule_frame) + scheduler_ctrl_frame.pack(fill=tk.X, pady=5) + + self.scheduler_enabled = tk.BooleanVar(value=False) + self.scheduler_check = ttk.Checkbutton(scheduler_ctrl_frame, + text="Включить автоматическое копирование по расписанию", + variable=self.scheduler_enabled, + command=self.toggle_scheduler) + self.scheduler_check.pack(side=tk.LEFT) + + self.scheduler_status = ttk.Label(scheduler_ctrl_frame, text="(остановлен)", + font=("Arial", 9, "italic")) + self.scheduler_status.pack(side=tk.LEFT, padx=10) + + self.next_run_label = ttk.Label(scheduler_ctrl_frame, text="(следующий запуск: —)", + font=("Arial", 9, "italic")) + self.next_run_label.pack(side=tk.LEFT, padx=10) + + self.last_success_label = ttk.Label(scheduler_ctrl_frame, text="(последний успех: —)", + font=("Arial", 9, "italic")) + self.last_success_label.pack(side=tk.LEFT, padx=10) + + # Автозапуск + autostart_frame = ttk.Frame(schedule_frame) + autostart_frame.pack(fill=tk.X, pady=5) + + self.autostart_check = ttk.Checkbutton( + autostart_frame, + text="Добавить программу в автозапуск Windows", + variable=self.autostart_enabled, + command=self.toggle_autostart + ) + self.autostart_check.pack(side=tk.LEFT) + + minimize_frame = ttk.Frame(schedule_frame) + minimize_frame.pack(fill=tk.X, pady=5) + + self.minimize_check = ttk.Checkbutton( + minimize_frame, + text="Сворачивать в трей при закрытии", + variable=self.minimize_to_tray_enabled + ) + self.minimize_check.pack(side=tk.LEFT) + + verify_frame = ttk.Frame(schedule_frame) + verify_frame.pack(fill=tk.X, pady=5) + + self.verify_check = ttk.Checkbutton( + verify_frame, + text="Только проверка (без копирования)", + variable=self.verify_only + ) + self.verify_check.pack(side=tk.LEFT) + + # Кнопки управления + buttons_frame = ttk.Frame(schedule_frame) + buttons_frame.pack(fill=tk.X, pady=5) + + ttk.Button(buttons_frame, text="✅ Проверить и сохранить", + command=self.check_and_save).pack(side=tk.LEFT, padx=5) + ttk.Button(buttons_frame, text="▶ Запустить копирование сейчас", + command=self.start_manual_copy).pack(side=tk.LEFT, padx=5) + + # Фрейм для списка путей + paths_frame = ttk.LabelFrame(main_frame, text="Пути для копирования", padding="10") + paths_frame.pack(fill=tk.BOTH, expand=True, pady=10) + + # Заголовки колонок + headers_frame = ttk.Frame(paths_frame) + headers_frame.pack(fill=tk.X, pady=5) + + ttk.Label(headers_frame, text="Откуда копировать (папка с бекапами)", + font=("Arial", 10, "bold")).pack(side=tk.LEFT, padx=(0, 20)) + ttk.Label(headers_frame, text="Куда копировать (сетевая папка)", + font=("Arial", 10, "bold")).pack(side=tk.LEFT, padx=(20, 0)) + + # Canvas для прокрутки списка + canvas = tk.Canvas(paths_frame, borderwidth=0, highlightthickness=0) + scrollbar = ttk.Scrollbar(paths_frame, orient="vertical", command=canvas.yview) + self.scrollable_frame = ttk.Frame(canvas) + + self.scrollable_frame.bind( + "", + lambda e: canvas.configure(scrollregion=canvas.bbox("all")) + ) + + canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw") + canvas.configure(yscrollcommand=scrollbar.set) + + canvas.pack(side="left", fill="both", expand=True) + scrollbar.pack(side="right", fill="y") + + # Кнопки управления списком + list_buttons_frame = ttk.Frame(paths_frame) + list_buttons_frame.pack(fill=tk.X, pady=10) + + ttk.Button(list_buttons_frame, text="➕ Добавить пару папок", + command=self.add_path_pair).pack(side=tk.LEFT, padx=5) + ttk.Button(list_buttons_frame, text="❌ Удалить все", + command=self.remove_all_pairs).pack(side=tk.LEFT, padx=5) + ttk.Button(list_buttons_frame, text="🔍 Проверить пути", + command=self.check_paths).pack(side=tk.LEFT, padx=5) + + # Лог операций + log_frame = ttk.LabelFrame(main_frame, text="Лог операций", padding="5") + log_frame.pack(fill=tk.BOTH, expand=True, pady=5) + + # Кнопки управления логом + log_buttons = ttk.Frame(log_frame) + log_buttons.pack(fill=tk.X, pady=2) + + ttk.Button(log_buttons, text="📋 Очистить лог", + command=self.clear_log).pack(side=tk.RIGHT, padx=2) + ttk.Button(log_buttons, text="🔍 Отладка", + command=self.debug_settings).pack(side=tk.RIGHT, padx=2) + + # Текст лога + self.log_text = scrolledtext.ScrolledText(log_frame, height=8, wrap=tk.WORD) + self.log_text.pack(fill=tk.BOTH, expand=True) + + # Настройка тегов для цветного лога + self.log_text.tag_configure("success", foreground="green") + self.log_text.tag_configure("error", foreground="red") + self.log_text.tag_configure("warning", foreground="orange") + self.log_text.tag_configure("info", foreground="blue") + + # Статус бар + self.status_bar = ttk.Label(self.root, text="Готов к работе", + relief=tk.SUNKEN, anchor=tk.W) + self.status_bar.pack(side=tk.BOTTOM, fill=tk.X) + + self.progress = ttk.Progressbar(self.root, mode="indeterminate") + self.progress.pack(side=tk.BOTTOM, fill=tk.X) + + def setup_window_icon(self): + icon_path = os.path.abspath(ICON_PATH) + if os.path.exists(icon_path): + with contextlib.suppress(Exception): + self.root.iconbitmap(icon_path) + + def setup_file_logger(self): + settings_path = self.get_settings_path() + log_dir = os.path.dirname(settings_path) + os.makedirs(log_dir, exist_ok=True) + log_path = os.path.join(log_dir, "backup_copier.log") + + logger = logging.getLogger("backup_copier") + logger.setLevel(logging.INFO) + if not logger.handlers: + handler = logging.handlers.RotatingFileHandler( + log_path, maxBytes=2 * 1024 * 1024, backupCount=3, encoding="utf-8" + ) + formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + def get_settings_path(self): + """Возвращает путь для сохранения настроек""" + if getattr(sys, 'frozen', False): + # Для EXE используем папку с EXE файлом + base_path = os.path.dirname(sys.executable) + else: + # Для скрипта используем папку со скриптом + base_path = os.path.dirname(os.path.abspath(__file__)) + + # Пробуем создать папку, если есть права + settings_dir = base_path + try: + # Проверяем, можем ли мы писать в эту папку + test_file = os.path.join(settings_dir, 'test_write.tmp') + with open(test_file, 'w') as f: + f.write('test') + os.remove(test_file) + self.log_message(f"📁 Используется папка: {settings_dir}", "info") + except: + # Если не можем писать, используем AppData + settings_dir = os.path.join(os.environ.get('APPDATA', os.path.expanduser('~')), 'BackupCopier') + os.makedirs(settings_dir, exist_ok=True) + self.log_message(f"📁 Используется папка AppData: {settings_dir}", "info") + + return os.path.join(settings_dir, 'backup_copier_settings.json') + + def create_placeholder(self, entry, text): + """Добавляет подсказку в поле ввода.""" + placeholder_color = "gray" + normal_color = "black" + + def on_focus_in(_): + if entry.get() == text and entry.cget("foreground") == placeholder_color: + entry.delete(0, tk.END) + entry.config(foreground=normal_color) + + def on_focus_out(_): + if not entry.get(): + entry.insert(0, text) + entry.config(foreground=placeholder_color) + + entry.bind("", on_focus_in) + entry.bind("", on_focus_out) + on_focus_out(None) + + def get_autostart_command(self) -> str: + """Возвращает команду для автозапуска.""" + if getattr(sys, 'frozen', False): + return f"\"{sys.executable}\"" + + script_path = os.path.abspath(__file__) + return f"\"{sys.executable}\" \"{script_path}\"" + + def is_autostart_enabled(self) -> bool: + """Проверяет, включен ли автозапуск в реестре.""" + try: + with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_REG_PATH, 0, winreg.KEY_READ) as key: + value, _ = winreg.QueryValueEx(key, AUTOSTART_REG_NAME) + return bool(value) + except FileNotFoundError: + return False + except OSError: + return False + + def set_autostart_enabled(self, enabled: bool) -> None: + """Включает/выключает автозапуск в реестре.""" + try: + with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_REG_PATH, 0, winreg.KEY_SET_VALUE) as key: + if enabled: + winreg.SetValueEx(key, AUTOSTART_REG_NAME, 0, winreg.REG_SZ, self.get_autostart_command()) + self.log_message("✅ Автозапуск включен", "success") + else: + try: + winreg.DeleteValue(key, AUTOSTART_REG_NAME) + except FileNotFoundError: + pass + self.log_message("⏭️ Автозапуск выключен", "info") + except Exception as e: + self.log_message(f"❌ Ошибка автозапуска: {e}", "error") + + def toggle_autostart(self): + """Обработчик галочки автозапуска.""" + self.set_autostart_enabled(self.autostart_enabled.get()) + + def load_settings(self): + """Загружает настройки из файла и отображает их""" + try: + settings_path = self.get_settings_path() + self.log_message(f"🔍 Загрузка настроек из: {settings_path}", "info") + + if os.path.exists(settings_path): + with open(settings_path, 'r', encoding='utf-8') as f: + settings = json.load(f) + + # Загружаем время + self.hour_var.set(settings.get('hour', '03')) + self.minute_var.set(settings.get('minute', '00')) + self.scheduler_enabled.set(settings.get('enabled', False)) + desired_autostart = settings.get('autostart_enabled') + self.minimize_to_tray_enabled.set(settings.get('minimize_to_tray', True)) + + # Загружаем пары путей + pairs_data = settings.get('pairs', []) + self.log_message(f"📦 Найдено {len(pairs_data)} пар путей в файле", "info") + + # Очищаем текущие пары + self.remove_all_pairs(silent=True) + + # Добавляем загруженные пары + if pairs_data: + for item in pairs_data: + if isinstance(item, dict): + source = item.get('source', '') + dest = item.get('dest', '') + full_copy = bool(item.get('full_copy', False)) + else: + source, dest = item + full_copy = False + self.add_path_pair(source, dest) + self.copy_pairs[-1]['full_copy'].set(full_copy) + self.log_message(f"✅ Загружено {len(pairs_data)} пар путей", "success") + else: + # Если нет сохраненных пар, добавляем одну пустую + self.add_path_pair() + self.log_message("➕ Добавлена пустая пара для ввода", "info") + + # Настройки автозапуска + actual_autostart = self.is_autostart_enabled() + if desired_autostart is None: + self.autostart_enabled.set(actual_autostart) + else: + desired_autostart = bool(desired_autostart) + self.autostart_enabled.set(desired_autostart) + if desired_autostart != actual_autostart: + self.set_autostart_enabled(desired_autostart) + + self.verify_only.set(settings.get('verify_only', False)) + self.last_success_time = settings.get('last_success_time') + self.last_hashes = settings.get('last_hashes', {}) + self.update_last_success_label() + + self.log_message(f"📂 Настройки загружены из {settings_path}", "info") + else: + # Если файла нет, добавляем пустую пару + self.log_message("🆕 Файл настроек не найден, создаем новую конфигурацию", "info") + self.add_path_pair() + self.autostart_enabled.set(self.is_autostart_enabled()) + self.verify_only.set(False) + self.last_success_time = None + self.last_hashes = {} + self.update_last_success_label() + + except Exception as e: + self.log_message(f"❌ Ошибка при загрузке настроек: {e}", "error") + import traceback + traceback.print_exc() + # В случае ошибки добавляем пустую пару + self.add_path_pair() + self.autostart_enabled.set(self.is_autostart_enabled()) + self.verify_only.set(False) + self.last_success_time = None + self.last_hashes = {} + self.update_last_success_label() + + def save_settings(self): + """Сохраняет настройки в файл""" + try: + # Собираем данные из полей ввода + pairs_data = [] + for pair in self.copy_pairs: + source = pair['source'].get().strip() + dest = pair['dest'].get().strip() + if source.startswith("Например: "): + source = "" + if dest.startswith("Например: "): + dest = "" + if source or dest: # Сохраняем даже если одно поле пустое + pairs_data.append({ + "source": source, + "dest": dest, + "full_copy": pair['full_copy'].get() + }) + + settings = { + 'hour': self.hour_var.get(), + 'minute': self.minute_var.get(), + 'enabled': self.scheduler_enabled.get(), + 'autostart_enabled': self.autostart_enabled.get(), + 'minimize_to_tray': self.minimize_to_tray_enabled.get(), + 'verify_only': self.verify_only.get(), + 'last_success_time': self.last_success_time, + 'last_hashes': self.last_hashes, + 'pairs': pairs_data + } + + settings_path = self.get_settings_path() + + # Сохраняем с отступами для читаемости + with open(settings_path, 'w', encoding='utf-8') as f: + json.dump(settings, f, ensure_ascii=False, indent=2) + + self.log_message(f"💾 Настройки сохранены ({len(pairs_data)} пар путей)", "success") + self.log_message(f"📁 Файл: {settings_path}", "info") + + messagebox.showinfo("Успех", f"Настройки сохранены!\n\nФайл: {settings_path}") + if self.scheduler_enabled.get(): + # Перепланируем, чтобы учесть новые пути/время + self.start_scheduler() + + except Exception as e: + self.log_message(f"❌ Ошибка при сохранении настроек: {e}", "error") + messagebox.showerror("Ошибка", f"Не удалось сохранить настройки:\n{e}") + + def add_path_pair(self, source="", dest=""): + """Добавляет новую пару полей для ввода путей""" + pair_frame = ttk.Frame(self.scrollable_frame) + pair_frame.pack(fill=tk.X, pady=5) + + # Поле "Откуда" + source_frame = ttk.Frame(pair_frame) + source_frame.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5)) + + source_entry = ttk.Entry(source_frame) + source_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) + if source: + source_entry.insert(0, source) + else: + self.create_placeholder(source_entry, "Например: C:\\Backups") + + ttk.Button(source_frame, text="📁", width=3, + command=lambda: self.browse_folder(source_entry)).pack(side=tk.RIGHT, padx=2) + + # Поле "Куда" + dest_frame = ttk.Frame(pair_frame) + dest_frame.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(5, 0)) + + dest_entry = ttk.Entry(dest_frame) + dest_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) + if dest: + dest_entry.insert(0, dest) + else: + self.create_placeholder(dest_entry, "Например: D:\\BackupArchive") + + ttk.Button(dest_frame, text="📁", width=3, + command=lambda: self.browse_folder(dest_entry)).pack(side=tk.RIGHT, padx=2) + + # Опции пары + options_frame = ttk.Frame(pair_frame) + options_frame.pack(side=tk.LEFT, padx=5) + + full_copy_var = tk.BooleanVar(value=False) + full_copy_check = ttk.Checkbutton(options_frame, text="Вся папка", variable=full_copy_var) + full_copy_check.pack(side=tk.LEFT) + + ttk.Button(options_frame, text="📂", width=3, + command=lambda: self.open_destination(dest_entry.get())).pack(side=tk.LEFT, padx=2) + + # Кнопка удаления + ttk.Button(pair_frame, text="✖", width=3, + command=lambda: self.remove_path_pair(pair_frame)).pack(side=tk.RIGHT, padx=5) + + # Сохраняем ссылки на entry + self.copy_pairs.append({ + 'frame': pair_frame, + 'source': source_entry, + 'dest': dest_entry, + 'full_copy': full_copy_var + }) + + # Прокручиваем к новому элементу + self.scrollable_frame.update_idletasks() + canvas = self.scrollable_frame.master + canvas.yview_moveto(1.0) + + def remove_path_pair(self, frame): + """Удаляет пару полей""" + for pair in self.copy_pairs: + if pair['frame'] == frame: + self.copy_pairs.remove(pair) + break + frame.destroy() + + # Если не осталось пар, добавляем пустую + if not self.copy_pairs: + self.add_path_pair() + + def remove_all_pairs(self, silent=False): + """Удаляет все пары""" + if not silent: + if not messagebox.askyesno("Подтверждение", "Удалить все пути?"): + return + + for pair in self.copy_pairs[:]: + pair['frame'].destroy() + self.copy_pairs.clear() + + def browse_folder(self, entry): + """Открывает диалог выбора папки""" + folder = filedialog.askdirectory(title="Выберите папку") + if folder: + entry.delete(0, tk.END) + entry.insert(0, folder) + + def open_destination(self, path: str): + if not path or path.startswith("Например: "): + return + if os.path.exists(path): + with contextlib.suppress(Exception): + os.startfile(path) + + def get_valid_pairs(self) -> List[tuple]: + """Возвращает список валидных пар папок""" + valid_pairs = [] + for pair in self.copy_pairs: + source = pair['source'].get().strip() + dest = pair['dest'].get().strip() + if source.startswith("Например: "): + source = "" + if dest.startswith("Например: "): + dest = "" + if source and dest: + valid_pairs.append((source, dest, pair['full_copy'].get())) + return valid_pairs + + def check_paths(self): + """Проверяет доступность всех путей""" + self.log_message("🔍 Проверка путей:", "info") + valid_pairs = self.get_valid_pairs() + + if not valid_pairs: + self.log_message("❌ Нет заполненных пар путей", "error") + return + + for i, (source, dest, full_copy) in enumerate(valid_pairs, 1): + self.log_message(f"\nПара {i}:", "info") + mode_text = "вся папка" if full_copy else "последний файл" + self.log_message(f" Режим: {mode_text}", "info") + + # Проверяем исходную папку + if os.path.exists(source): + self.log_message(f" ✅ Исходная папка: {source}", "success") + # Считаем файлы .bak (без дублей из-за регистра) + bak_files = [p for p in Path(source).iterdir() if p.is_file() and p.suffix.lower() == '.bak'] + self.log_message(f" Найдено .bak файлов: {len(bak_files)}", "info") + else: + self.log_message(f" ❌ Исходная папка НЕ существует: {source}", "error") + + # Проверяем целевую папку + if os.path.exists(dest): + self.log_message(f" ✅ Целевая папка: {dest}", "success") + # Проверяем права на запись + test_file = os.path.join(dest, 'test_write.tmp') + try: + with open(test_file, 'w') as f: + f.write('test') + os.remove(test_file) + self.log_message(f" Права на запись: есть", "success") + except: + self.log_message(f" ❌ Права на запись: нет", "error") + else: + self.log_message(f" ❌ Целевая папка НЕ существует: {dest}", "error") + + def find_latest_file(self, folder_path: str) -> Optional[Path]: + """Находит самый последний файл в папке""" + try: + latest_file = find_latest_file_in_folder(folder_path) + if not latest_file: + if not Path(folder_path).exists(): + self.log_message(f"❌ Папка не существует: {folder_path}", "error") + else: + self.log_message(f"⚠️ Не найдено .bak файлов в {folder_path}", "warning") + return None + + file_time = datetime.fromtimestamp(latest_file.stat().st_mtime) + self.log_message( + f"📄 Последний файл: {latest_file.name} ({file_time.strftime('%Y-%m-%d %H:%M:%S')})", + "info" + ) + return latest_file + + except Exception as e: + self.log_message(f"❌ Ошибка при поиске файлов в {folder_path}: {e}", "error") + return None + + def run_on_ui(self, func): + """Планирует выполнение функции в UI-потоке.""" + self.queue.put({'type': 'ui', 'func': func}) + + def check_previous_hash(self, target_file: Path): + key = str(target_file) + if key in self.last_hashes: + try: + current_hash = compute_file_checksum(target_file) + if current_hash != self.last_hashes[key]: + self.log_message(f"⚠️ Несовпадение хеша прошлой копии: {target_file.name}", "warning") + except Exception as e: + self.log_message(f"⚠️ Не удалось проверить прошлую копию: {e}", "warning") + + def copy_file_with_retries(self, source: Path, target: Path) -> bool: + for attempt in range(COPY_RETRIES + 1): + try: + shutil.copy2(source, target) + return True + except Exception as e: + if attempt >= COPY_RETRIES: + self.log_message(f"❌ Ошибка при копировании {source.name}: {e}", "error") + return False + time_module.sleep(COPY_RETRY_DELAY) + return False + + def verify_only_mode(self, source: Path, target: Path) -> bool: + if not target.exists(): + self.log_message(f"❌ Нет целевого файла для проверки: {target.name}", "error") + return False + if verify_copy(source, target): + self.log_message(f"✅ Проверка OK: {target.name}", "success") + return True + self.log_message(f"❌ Проверка не пройдена: {target.name}", "error") + return False + + def copy_full_folder(self, source: Path, dest: Path) -> tuple: + copied = 0 + skipped = 0 + errors = 0 + for root, _dirs, files in os.walk(source): + rel = os.path.relpath(root, source) + dest_root = dest / rel if rel != "." else dest + dest_root.mkdir(parents=True, exist_ok=True) + for fname in files: + src_file = Path(root) / fname + dst_file = dest_root / fname + try: + self.check_previous_hash(dst_file) if dst_file.exists() else None + if self.verify_only.get(): + if self.verify_only_mode(src_file, dst_file): + skipped += 1 + else: + errors += 1 + continue + if should_copy_file(src_file, dst_file): + if self.copy_file_with_retries(src_file, dst_file) and verify_copy(src_file, dst_file): + copied += 1 + self.last_hashes[str(dst_file)] = compute_file_checksum(dst_file) + else: + errors += 1 + else: + skipped += 1 + except Exception as e: + errors += 1 + self.log_message(f"❌ Ошибка при копировании {src_file.name}: {e}", "error") + return copied, skipped, errors + + def copy_files_thread(self, pairs: List[tuple], background: bool = False): + """Поток для копирования последних файлов""" + if not self.copy_lock.acquire(blocking=False): + self.log_message("⚠️ Копирование уже выполняется", "warning") + return + + self.is_copying = True + self.root.after(0, lambda: self.status_bar.config(text="Идет копирование...")) + self.root.after(0, lambda: self.progress.start(10)) + + try: + copied_files = 0 + skipped_files = 0 + error_files = 0 + + if not background: + self.log_message("\n" + "=" * 50, "info") + self.log_message("🚀 Начало копирования последних файлов", "info") + + for source, dest, full_copy in pairs: + self.log_message(f"\n📁 Обработка папки: {source}", "info") + + src_path = Path(source) + dest_path = Path(dest) + if full_copy: + if not src_path.exists(): + self.log_message(f"❌ Папка не существует: {source}", "error") + error_files += 1 + continue + try: + dest_path.mkdir(parents=True, exist_ok=True) + except Exception as e: + self.log_message(f"❌ Не могу создать папку {dest}: {e}", "error") + error_files += 1 + continue + + c, s, e = self.copy_full_folder(src_path, dest_path) + copied_files += c + skipped_files += s + error_files += e + continue + + # Находим последний файл + latest_file = self.find_latest_file(source) + + if latest_file: + # Создаем целевую папку, если её нет + try: + dest_path.mkdir(parents=True, exist_ok=True) + except Exception as e: + self.log_message(f"❌ Не могу создать папку {dest}: {e}", "error") + error_files += 1 + continue + + target_file = dest_path / latest_file.name + + try: + if target_file.exists(): + self.check_previous_hash(target_file) + if self.verify_only.get(): + if self.verify_only_mode(latest_file, target_file): + skipped_files += 1 + else: + error_files += 1 + continue + target_existed = target_file.exists() + if should_copy_file(latest_file, target_file): + if self.copy_file_with_retries(latest_file, target_file) and verify_copy(latest_file, target_file): + copied_files += 1 + self.last_hashes[str(target_file)] = compute_file_checksum(target_file) + if target_existed: + self.log_message(f"✅ Обновлен: {latest_file.name} (новее)", "success") + else: + self.log_message(f"✅ Скопирован: {latest_file.name}", "success") + else: + error_files += 1 + self.log_message(f"❌ Контрольная сумма не совпала: {latest_file.name}", "error") + else: + skipped_files += 1 + self.log_message(f"⏭️ Пропущен: {latest_file.name} (уже актуален)", "warning") + + except Exception as e: + error_files += 1 + self.log_message(f"❌ Ошибка при копировании {latest_file.name}: {e}", "error") + else: + error_files += 1 + + # Итог + self.log_message("\n" + "=" * 50, "info") + self.log_message("📊 ИТОГ:", "info") + self.log_message(f" ✅ Скопировано/обновлено: {copied_files}", "success") + self.log_message(f" ⏭️ Пропущено: {skipped_files}", "warning") + self.log_message(f" ❌ Ошибок: {error_files}", "error") + + if background: + self.log_message("⏰ Копирование по расписанию завершено", "info") + else: + self.run_on_ui(lambda: messagebox.showinfo( + "Готово", + f"Копирование завершено!\n\n✅ Скопировано: {copied_files}\n⏭️ Пропущено: {skipped_files}\n❌ Ошибок: {error_files}" + )) + + if error_files == 0: + self.last_success_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + self.run_on_ui(self.update_last_success_label) + self.tray_notify("Копирование завершено", f"Успешно: {copied_files}, Пропущено: {skipped_files}") + else: + self.tray_notify("Копирование с ошибками", f"Ошибок: {error_files}") + + except Exception as e: + self.log_message(f"❌ Критическая ошибка: {e}\n{traceback.format_exc()}", "error") + self.run_on_ui(lambda: messagebox.showerror("Ошибка", f"Произошла ошибка:\n{e}")) + finally: + self.is_copying = False + self.copy_lock.release() + self.root.after(0, lambda: self.status_bar.config(text="Готов к работе")) + self.root.after(0, lambda: self.progress.stop()) + + def start_manual_copy(self): + """Запускает ручное копирование""" + valid_pairs = self.get_valid_pairs() + + if not valid_pairs: + messagebox.showwarning("Предупреждение", "Заполните хотя бы одну пару папок!") + return + + # Запускаем копирование в отдельном потоке + copy_thread = threading.Thread( + target=self.copy_files_thread, + args=(valid_pairs, False), + daemon=True + ) + copy_thread.start() + + def toggle_scheduler(self): + """Включает/выключает планировщик""" + if self.scheduler_enabled.get(): + self.start_scheduler() + else: + self.stop_scheduler() + + def start_scheduler(self): + """Запускает планировщик""" + valid_pairs = self.get_valid_pairs() + + if not valid_pairs: + messagebox.showwarning("Предупреждение", + "Нет настроенных путей! Планировщик не запущен.") + self.scheduler_enabled.set(False) + return + + time_str = f"{self.hour_var.get()}:{self.minute_var.get()}" + self.scheduler.schedule_copy_job("backup_job", time_str, valid_pairs) + self.scheduler.start() + + self.scheduler_status.config(text=f"(активен, копирование в {time_str})", foreground="green") + self.update_next_run_label() + self.update_last_success_label() + self.log_message(f"🕒 Планировщик запущен. Копирование ежедневно в {time_str}", "info") + + def stop_scheduler(self): + """Останавливает планировщик""" + self.scheduler.stop() + self.scheduler_status.config(text="(остановлен)", foreground="red") + self.next_run_label.config(text="(следующий запуск: —)") + self.log_message("🕒 Планировщик остановлен", "info") + + def update_next_run_label(self): + jobs = schedule.get_jobs("backup_job") + if jobs: + next_run = jobs[0].next_run + if next_run: + self.next_run_label.config(text=f"(следующий запуск: {next_run.strftime('%Y-%m-%d %H:%M')})") + return + self.next_run_label.config(text="(следующий запуск: —)") + + def update_last_success_label(self): + if self.last_success_time: + self.last_success_label.config(text=f"(последний успех: {self.last_success_time})") + else: + self.last_success_label.config(text="(последний успех: —)") + + def debug_settings(self): + """Отладочный метод для проверки загрузки настроек""" + self.log_message("\n" + "=" * 50, "info") + self.log_message("🔧 ОТЛАДКА", "info") + self.log_message("=" * 50, "info") + + settings_path = self.get_settings_path() + self.log_message(f"📁 Путь к настройкам: {settings_path}", "info") + self.log_message(f"📁 Файл существует: {os.path.exists(settings_path)}", "info") + + if os.path.exists(settings_path): + try: + with open(settings_path, 'r', encoding='utf-8') as f: + content = f.read() + self.log_message(f"📄 Содержимое файла:", "info") + for line in content.split('\n'): + self.log_message(f" {line}", "info") + except Exception as e: + self.log_message(f"❌ Не удалось прочитать файл: {e}", "error") + + self.log_message(f"\n🖥️ Текущее состояние:", "info") + self.log_message(f" Пар в интерфейсе: {len(self.copy_pairs)}", "info") + + for i, pair in enumerate(self.copy_pairs, 1): + source = pair['source'].get() + dest = pair['dest'].get() + self.log_message(f" Пара {i}:", "info") + self.log_message(f" Откуда: '{source}'", "info") + self.log_message(f" Куда: '{dest}'", "info") + + self.log_message("=" * 50, "info") + + def log_message(self, message, tag=None): + """Добавляет сообщение в лог (вызывается из любого потока)""" + # Используем queue для потокобезопасности + self.queue.put({'type': 'log', 'message': message, 'tag': tag}) + file_logger = getattr(self, "file_logger", None) + if file_logger: + try: + level = logging.INFO + if tag == "error": + level = logging.ERROR + elif tag == "warning": + level = logging.WARNING + file_logger.log(level, message) + except Exception: + pass + + def clear_log(self): + """Очищает лог""" + self.log_text.delete(1.0, tk.END) + + def process_queue(self): + """Обрабатывает очередь сообщений из потоков""" + try: + while True: + msg = self.queue.get_nowait() + if msg['type'] == 'log': + timestamp = datetime.now().strftime("%H:%M:%S") + log_entry = f"[{timestamp}] {msg['message']}\n" + self.log_text.insert(tk.END, log_entry, msg.get('tag')) + self.log_text.see(tk.END) + elif msg['type'] == 'ui': + try: + msg['func']() + except Exception as e: + self.log_message(f"❌ Ошибка UI: {e}", "error") + except queue.Empty: + pass + finally: + self.root.after(100, self.process_queue) + + def check_and_save(self): + self.check_paths() + self.save_settings() + + def create_tray_image(self): + if Image is None: + return None + icon_path = os.path.abspath(ICON_PATH) + if os.path.exists(icon_path): + with contextlib.suppress(Exception): + return Image.open(icon_path) + + image = Image.new("RGB", (64, 64), color=(40, 40, 40)) + draw = ImageDraw.Draw(image) + draw.rectangle((8, 8, 56, 56), fill=(30, 144, 255)) + draw.text((18, 20), "B", fill=(255, 255, 255)) + return image + + def setup_tray_icon(self): + if pystray is None: + self.log_message("⚠️ Модуль pystray не установлен, трей недоступен", "warning") + return + if self.tray_icon is not None: + return + + image = self.create_tray_image() + if image is None: + self.log_message("⚠️ Не удалось создать иконку для трея", "warning") + return + + def on_show(_icon, _item): + self.root.after(0, self.show_window) + + def on_copy(_icon, _item): + self.root.after(0, self.start_manual_copy) + + def on_exit(_icon, _item): + self.root.after(0, self.exit_app) + + menu = pystray.Menu( + pystray.MenuItem("Открыть", on_show, default=True), + pystray.MenuItem("Запустить копирование", on_copy), + pystray.MenuItem("Выход", on_exit), + ) + + self.tray_icon = pystray.Icon(APP_NAME, image, APP_NAME, menu) + self.tray_thread = threading.Thread(target=self.tray_icon.run, daemon=True) + self.tray_thread.start() + + def tray_notify(self, title: str, message: str): + if self.tray_icon is None: + return + with contextlib.suppress(Exception): + self.tray_icon.notify(message, title) + + def show_window(self): + self.root.deiconify() + self.root.lift() + self.root.focus_force() + + def hide_to_tray(self): + if not self.minimize_to_tray_enabled.get(): + self.exit_app() + return + if pystray is None: + self.root.iconify() + self.log_message("⚠️ pystray не установлен, окно свернуто в панель задач", "warning") + return + self.setup_tray_icon() + self.root.withdraw() + self.log_message("🧰 Приложение свернуто в трей", "info") + + def exit_app(self): + if self.scheduler_enabled.get(): + self.stop_scheduler() + if self.tray_icon is not None: + with contextlib.suppress(Exception): + self.tray_icon.stop() + self.tray_icon = None + self.root.destroy() + + +def main(): + root = tk.Tk() + app = BackgroundFileCopyApp(root) + + # Обработка закрытия окна + def on_closing(): + app.hide_to_tray() + + root.protocol("WM_DELETE_WINDOW", on_closing) + root.mainloop() + + +if __name__ == "__main__": + main() diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..039d26e --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1 @@ +pytest>=8.0 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5101389 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +schedule>=1.2 +pystray>=0.19 +pillow>=10.0 diff --git a/tests/test_copy_logic.py b/tests/test_copy_logic.py new file mode 100644 index 0000000..c3f1118 --- /dev/null +++ b/tests/test_copy_logic.py @@ -0,0 +1,95 @@ +import os +import time +from pathlib import Path + +import pytest + +from main import find_latest_file_in_folder, should_copy_file, compute_file_checksum, verify_copy + + + +def _touch(path: Path, mtime: float) -> None: + path.write_text('data', encoding='utf-8') + os.utime(path, (mtime, mtime)) + + + +def test_find_latest_file_returns_none_for_missing_folder(tmp_path): + missing = tmp_path / 'missing' + assert find_latest_file_in_folder(str(missing)) is None + + + +def test_find_latest_file_picks_newest(tmp_path): + folder = tmp_path / 'src' + folder.mkdir() + + now = time.time() + older = folder / 'older.bak' + newer = folder / 'newer.bak' + other = folder / 'other.sql' + + _touch(older, now - 10) + _touch(newer, now - 5) + _touch(other, now - 1) + + latest = find_latest_file_in_folder(str(folder)) + assert latest is not None + assert latest.name == 'other.sql' + + + +def test_should_copy_file_when_target_missing(tmp_path): + src = tmp_path / 'src.bak' + src.write_text('x', encoding='utf-8') + dst = tmp_path / 'dst.bak' + + assert should_copy_file(src, dst) is True + + + +def test_should_copy_file_when_source_newer(tmp_path): + now = time.time() + src = tmp_path / 'src.bak' + dst = tmp_path / 'dst.bak' + + _touch(dst, now - 10) + _touch(src, now) + + assert should_copy_file(src, dst) is True + + + +def test_should_copy_file_when_target_newer_or_equal(tmp_path): + now = time.time() + src = tmp_path / 'src.bak' + dst = tmp_path / 'dst.bak' + + _touch(src, now) + _touch(dst, now) + + assert should_copy_file(src, dst) is False + + +def test_compute_file_checksum_stable(tmp_path): + src = tmp_path / 'a.bak' + src.write_text('hello', encoding='utf-8') + c1 = compute_file_checksum(src) + c2 = compute_file_checksum(src) + assert c1 == c2 + + +def test_verify_copy_true_for_equal_files(tmp_path): + src = tmp_path / 'src.bak' + dst = tmp_path / 'dst.bak' + src.write_text('data', encoding='utf-8') + dst.write_text('data', encoding='utf-8') + assert verify_copy(src, dst) is True + + +def test_verify_copy_false_for_different_files(tmp_path): + src = tmp_path / 'src.bak' + dst = tmp_path / 'dst.bak' + src.write_text('data1', encoding='utf-8') + dst.write_text('data2', encoding='utf-8') + assert verify_copy(src, dst) is False