Files
Copyrka/main.py

2465 lines
109 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, scrolledtext, simpledialog
import shutil
import os
from pathlib import Path
import threading
import queue
from datetime import datetime, time
import time as time_module
import schedule
import sys
import json
import tempfile
from typing import List, Dict, Optional
import traceback
import hashlib
import winreg
import contextlib
import logging
import logging.handlers
import fnmatch
import ctypes
import calendar
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
import pystray
from PIL import Image, ImageDraw
except Exception:
pystray = None
Image = None
ImageDraw = None
DEFAULT_EXTENSIONS = ('*.bak', '*.BAK', '*.backup', '*.sql')
AUTOSTART_REG_PATH = r"Software\Microsoft\Windows\CurrentVersion\Run"
AUTOSTART_REG_NAME = "Copyrka"
ICON_PATH = "icon.ico"
APP_NAME = "Copyrka"
COPY_RETRIES = 2
COPY_RETRY_DELAY = 2
DEFAULT_MIN_FREE_GB = 1
DEFAULT_MAX_WORKERS = 3
DEFAULT_KEEP_FILES = 1
DEFAULT_INCLUDE_MASKS = "*.bak;*.sql;*.backup"
DEFAULT_EXCLUDE_MASKS = "*.tmp;*.temp"
MUTEX_NAME = "Global\\BackupCopierMutex"
def find_latest_file_in_folder(folder_path: str, extensions=DEFAULT_EXTENSIONS,
include_masks: Optional[List[str]] = None,
exclude_masks: Optional[List[str]] = None) -> Optional[Path]:
"""Возвращает самый новый файл из папки по времени модификации."""
folder = Path(folder_path)
if not folder.exists():
return None
files: List[Path] = []
include_masks = include_masks or []
exclude_masks = exclude_masks or []
if include_masks:
for mask in include_masks:
files.extend(folder.glob(mask))
else:
for ext in extensions:
files.extend(folder.glob(ext))
filtered = []
seen = set()
for f in files:
if not f.is_file():
continue
if f.name in seen:
continue
if matches_masks(f.name, include_masks, exclude_masks):
filtered.append(f)
seen.add(f.name)
if not filtered:
return None
return max(filtered, key=lambda f: f.stat().st_mtime)
def should_copy_file(source: Path, target: Path) -> bool:
"""Определяет, нужно ли копировать файл."""
if not target.exists():
return True
return source.stat().st_mtime > target.stat().st_mtime
def compute_file_checksum(path: Path, chunk_size: int = 1024 * 1024) -> str:
"""Считает SHA-256 контрольную сумму файла."""
hasher = hashlib.sha256()
with path.open("rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
hasher.update(chunk)
return hasher.hexdigest()
def verify_copy(source: Path, target: Path) -> bool:
"""Проверяет, что файл скопирован корректно, по контрольной сумме."""
return compute_file_checksum(source) == compute_file_checksum(target)
def compare_file_checksums(source: Path, target: Path) -> tuple[bool, str, str]:
src_hash = compute_file_checksum(source)
dst_hash = compute_file_checksum(target)
return src_hash == dst_hash, src_hash, dst_hash
def get_resource_path(relative_path: str) -> str:
"""Возвращает абсолютный путь к ресурсу (поддерживает PyInstaller)."""
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
return os.path.join(sys._MEIPASS, relative_path)
return os.path.abspath(relative_path)
def parse_masks(mask_text: str) -> List[str]:
parts = [p.strip() for p in (mask_text or "").split(";")]
return [p for p in parts if p]
def matches_masks(filename: str, include_masks: List[str], exclude_masks: List[str]) -> bool:
if include_masks:
if not any(fnmatch.fnmatch(filename, m) for m in include_masks):
return False
if exclude_masks:
if any(fnmatch.fnmatch(filename, m) for m in exclude_masks):
return False
return True
def get_free_space_gb(path: Path) -> float:
try:
usage = shutil.disk_usage(str(path))
return usage.free / (1024 ** 3)
except Exception:
return 0.0
def get_total_copy_size(source: Path, dest: Path) -> int:
total = 0
for root, _dirs, files in os.walk(source):
rel = os.path.relpath(root, source)
dest_root = dest / rel if rel != "." else dest
for fname in files:
src_file = Path(root) / fname
dst_file = dest_root / fname
try:
if should_copy_file(src_file, dst_file):
total += src_file.stat().st_size
except Exception:
pass
return total
def ensure_single_instance() -> Optional[int]:
"""Создает системный mutex. Возвращает handle или None если уже запущено."""
mutex = ctypes.windll.kernel32.CreateMutexW(None, True, MUTEX_NAME)
already_exists = ctypes.windll.kernel32.GetLastError() == 183
if already_exists:
return None
return mutex
class FileCopyScheduler:
"""Класс для управления расписанием копирования"""
def __init__(self, app):
self.app = app
self.scheduled_jobs = []
self.running = False
self.thread = None
def start(self):
"""Запускает планировщик в отдельном потоке"""
if not self.running:
self.running = True
self.thread = threading.Thread(target=self._run_scheduler, daemon=True)
self.thread.start()
self.app.log_message("🕒 Планировщик запущен", "info")
def stop(self):
"""Останавливает планировщик"""
self.running = False
self.app.log_message("🕒 Планировщик остановлен", "info")
def _run_scheduler(self):
"""Основной цикл планировщика"""
while self.running:
schedule.run_pending()
time_module.sleep(1)
def schedule_copy_job(self, job_id: str, schedules: List[dict], pairs: List[tuple]):
"""Добавляет задания в планировщик"""
# Очищаем предыдущие задания с таким же ID
schedule.clear(job_id)
if not schedules:
return
day_map = {
"Mon": schedule.every().monday,
"Tue": schedule.every().tuesday,
"Wed": schedule.every().wednesday,
"Thu": schedule.every().thursday,
"Fri": schedule.every().friday,
"Sat": schedule.every().saturday,
"Sun": schedule.every().sunday,
}
for entry in schedules:
time_str = entry.get("time", "03:00")
sched_type = entry.get("type", "daily")
days = entry.get("days", [])
if sched_type == "monthly":
day_num = int(entry.get("day", 1))
schedule.every().day.at(time_str).do(self._execute_copy_job_if_monthday, pairs=pairs, day=day_num).tag(job_id)
self.app.log_message(f"📅 Запланировано копирование на {time_str} (ежемесячно, день {day_num})", "info")
elif sched_type == "weekly" and days:
for day in days:
if day in day_map:
day_map[day].at(time_str).do(self._execute_copy_job, pairs=pairs).tag(job_id)
self.app.log_message(f"📅 Запланировано копирование на {time_str} ({', '.join(days)})", "info")
else:
schedule.every().day.at(time_str).do(self._execute_copy_job, pairs=pairs).tag(job_id)
self.app.log_message(f"📅 Запланировано копирование на {time_str} ежедневно", "info")
def _execute_copy_job(self, pairs: List[tuple]):
"""Выполняет задание копирования"""
self.app.log_message("⏰ Запуск запланированного копирования...", "info")
# Запускаем копирование в отдельном потоке
copy_thread = threading.Thread(
target=self.app.copy_files_thread,
args=(pairs, True), # True означает фоновый режим
daemon=True
)
copy_thread.start()
def _execute_copy_job_if_monthday(self, pairs: List[tuple], day: int):
today = datetime.now()
last_day = calendar.monthrange(today.year, today.month)[1]
run_day = min(int(day), last_day)
if today.day != run_day:
return
self._execute_copy_job(pairs)
class BackgroundFileCopyApp:
def __init__(self, root):
self.root = root
self.root.title("Копирка")
self.root.geometry("900x800")
self.root.minsize(900, 700)
self.setup_window_icon()
# Для работы с очередью сообщений из потоков
self.queue = queue.Queue()
# Список пар для копирования
self.copy_pairs = []
self.pair_counter = 0
self.selected_pair_id = None
self.inline_editor = None
self.profile_combo = None
self.cancel_event = threading.Event()
self.autosave_job = None
self.autosave_enabled = True
self.skip_cleanup_confirm = False
self.loading_settings = False
# Планировщик
self.scheduler = FileCopyScheduler(self)
# Флаг для отслеживания состояния
self.is_copying = False
self.copy_lock = threading.Lock()
self.active_files_lock = threading.Lock()
self.active_files: Dict[str, int] = {}
# Автозапуск
self.autostart_enabled = tk.BooleanVar(value=False)
# Сворачивание в трей
self.minimize_to_tray_enabled = tk.BooleanVar(value=True)
self.tray_icon = None
self.tray_thread = None
# Режим проверки без копирования
self.verify_only = tk.BooleanVar(value=False)
# Последний успешный запуск
self.last_success_time: Optional[str] = None
self.last_result_summary: Optional[str] = None
self.last_error_detail: Optional[str] = None
# Хеши последних копий
self.last_hashes: Dict[str, str] = {}
# Профили
self.profiles: Dict[str, dict] = {}
self.active_profile = tk.StringVar(value="Default")
self.include_masks_var = tk.StringVar(value=DEFAULT_INCLUDE_MASKS)
self.exclude_masks_var = tk.StringVar(value=DEFAULT_EXCLUDE_MASKS)
self.min_free_gb_var = tk.DoubleVar(value=DEFAULT_MIN_FREE_GB)
self.max_workers_var = tk.IntVar(value=DEFAULT_MAX_WORKERS)
self.cleanup_old_var = tk.BooleanVar(value=False)
self.keep_files_var = tk.IntVar(value=DEFAULT_KEEP_FILES)
self.schedules: List[dict] = []
self.schedule_days_vars = {
"Mon": tk.BooleanVar(value=False),
"Tue": tk.BooleanVar(value=False),
"Wed": tk.BooleanVar(value=False),
"Thu": tk.BooleanVar(value=False),
"Fri": tk.BooleanVar(value=False),
"Sat": tk.BooleanVar(value=False),
"Sun": tk.BooleanVar(value=False),
}
self.schedule_type_var = tk.StringVar(value="daily")
self.schedule_monthday_var = tk.IntVar(value=1)
# Логгер в файл
self.file_logger = self.setup_file_logger()
# Создаем интерфейс
self.setup_ui()
# Загружаем сохраненные настройки
self.load_settings()
# Проверяем очередь каждые 100ms
self.process_queue()
# Запускаем планировщик при старте, если есть сохраненные настройки
if self.scheduler_enabled.get():
self.start_scheduler()
def setup_ui(self):
main_frame = ttk.Frame(self.root, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
notebook = ttk.Notebook(main_frame)
notebook.pack(fill=tk.BOTH, expand=True)
settings_tab = ttk.Frame(notebook)
log_tab = ttk.Frame(notebook)
help_tab = ttk.Frame(notebook)
notebook.add(settings_tab, text="Настройки")
notebook.add(log_tab, text="Лог")
notebook.add(help_tab, text="Справка")
title_label = ttk.Label(settings_tab, text="Копирка",
font=("Arial", 14, "bold"))
title_label.pack(pady=8)
toolbar = ttk.Frame(settings_tab)
toolbar.pack(fill=tk.X, pady=2)
toolbar_row1 = ttk.Frame(toolbar)
toolbar_row1.pack(fill=tk.X, pady=1)
toolbar_row2 = ttk.Frame(toolbar)
toolbar_row2.pack(fill=tk.X, pady=1)
ttk.Button(toolbar_row1, text=" Добавить", command=self.add_path_pair).pack(side=tk.LEFT, padx=4)
ttk.Button(toolbar_row1, text="❌ Удалить", command=self.remove_selected_pair).pack(side=tk.LEFT, padx=4)
ttk.Button(toolbar_row1, text="📂 Источник", command=lambda: self.pick_folder_for_item(self.selected_pair_id, "source", "Выберите папку источника")).pack(side=tk.LEFT, padx=4)
ttk.Button(toolbar_row1, text="📂 Назначение", command=lambda: self.pick_folder_for_item(self.selected_pair_id, "dest", "Выберите папку назначения")).pack(side=tk.LEFT, padx=4)
self.search_var = tk.StringVar(value="")
self.filter_status_var = tk.StringVar(value="Все")
self.compact_view_var = tk.BooleanVar(value=False)
ttk.Label(toolbar_row2, text="Поиск:").pack(side=tk.LEFT, padx=6)
search_entry = ttk.Entry(toolbar_row2, textvariable=self.search_var, width=20)
search_entry.pack(side=tk.LEFT, padx=4)
search_entry.bind("<KeyRelease>", lambda _e: self.render_pairs())
ttk.Label(toolbar_row2, text="Статус:").pack(side=tk.LEFT, padx=6)
status_combo = ttk.Combobox(toolbar_row2, textvariable=self.filter_status_var, state="readonly",
values=["Все", "OK", "Нет источника", "Нет назначения", "Нет доступа", ""], width=14)
status_combo.pack(side=tk.LEFT, padx=4)
status_combo.bind("<<ComboboxSelected>>", lambda _e: self.render_pairs())
ttk.Checkbutton(toolbar_row2, text="Компактно", variable=self.compact_view_var, command=self.toggle_compact_view).pack(side=tk.LEFT, padx=6)
# Профили скрыты по запросу пользователя
status_frame = ttk.LabelFrame(settings_tab, text="Состояние", padding="10")
status_frame.pack(fill=tk.X, pady=5)
self.status_summary_label = ttk.Label(status_frame, text="")
self.status_summary_label.pack(side=tk.LEFT)
self.next_run_label = ttk.Label(status_frame, text="(следующий запуск: —)",
font=("Arial", 9, "italic"))
self.next_run_label.pack(side=tk.LEFT, padx=10)
self.last_success_label = ttk.Label(status_frame, text="(последний успех: —)",
font=("Arial", 9, "italic"))
self.last_success_label.pack(side=tk.LEFT, padx=10)
self.last_result_label = ttk.Label(status_frame, text="(последний результат: —)",
font=("Arial", 9, "italic"))
self.last_result_label.pack(side=tk.LEFT, padx=10)
schedule_frame = ttk.LabelFrame(settings_tab, text="Расписание и поведение", padding="10")
schedule_frame.pack(fill=tk.X, pady=5)
scheduler_box = ttk.LabelFrame(schedule_frame, text="Планировщик", padding="8")
scheduler_box.pack(fill=tk.X, pady=(0, 6))
behavior_box = ttk.LabelFrame(schedule_frame, text="Параметры копирования", padding="8")
behavior_box.pack(fill=tk.X)
time_frame = ttk.Frame(scheduler_box)
time_frame.pack(fill=tk.X, pady=5)
self.hour_var = tk.StringVar(value="03")
self.minute_var = tk.StringVar(value="00")
self.scheduler_enabled = tk.BooleanVar(value=False)
self.scheduler_check = ttk.Checkbutton(time_frame,
text="Включить расписание",
variable=self.scheduler_enabled,
command=self.toggle_scheduler)
self.scheduler_check.pack(side=tk.LEFT, padx=10)
self.scheduler_status = ttk.Label(time_frame, text="(остановлен)",
font=("Arial", 9, "italic"))
self.scheduler_status.pack(side=tk.LEFT, padx=10)
schedule_summary_frame = ttk.Frame(scheduler_box)
schedule_summary_frame.pack(fill=tk.X, pady=4)
self.schedule_summary_label = ttk.Label(schedule_summary_frame, text="Расписание: —")
self.schedule_summary_label.pack(side=tk.LEFT)
ttk.Button(schedule_summary_frame, text="Расписание...", command=self.open_schedule_dialog).pack(side=tk.LEFT, padx=8)
options_frame = ttk.Frame(behavior_box)
options_frame.pack(fill=tk.X, pady=5)
options_row1 = ttk.Frame(options_frame)
options_row1.pack(fill=tk.X, pady=3)
options_row2 = ttk.Frame(options_frame)
options_row2.pack(fill=tk.X, pady=3)
self.autostart_check = ttk.Checkbutton(
options_row1,
text="Автозапуск Windows",
variable=self.autostart_enabled,
command=self.toggle_autostart
)
autostart_help = ttk.Button(
options_row1,
text="?",
width=2,
command=lambda: self.show_hint("Автозапуск", "Добавляет программу в автозапуск Windows.")
)
self.minimize_check = ttk.Checkbutton(
options_row1,
text="Сворачивать в трей при закрытии",
variable=self.minimize_to_tray_enabled
)
minimize_help = ttk.Button(
options_row1,
text="?",
width=2,
command=lambda: self.show_hint("Сворачивание", "По крестику окно скрывается в трей.")
)
self.autostart_check.grid(row=0, column=0, sticky="w", padx=(0, 4))
autostart_help.grid(row=0, column=1, sticky="w", padx=(0, 12))
self.minimize_check.grid(row=0, column=2, sticky="w", padx=(0, 4))
minimize_help.grid(row=0, column=3, sticky="w")
self.verify_check = ttk.Checkbutton(
options_row2,
text="Только проверка (без копирования)",
variable=self.verify_only
)
verify_help = ttk.Button(
options_row2,
text="?",
width=2,
command=lambda: self.show_hint("Проверка", "Проверяет совпадение контрольных сумм без копирования.")
)
self.cleanup_check = ttk.Checkbutton(
options_row2,
text="Удалять старые файлы в назначении",
variable=self.cleanup_old_var,
command=self.on_cleanup_toggle
)
cleanup_help = ttk.Button(
options_row2,
text="?",
width=2,
command=lambda: self.show_hint("Очистка", "В режиме 'последний файл' удаляет старые файлы в папке назначения.")
)
self.verify_check.grid(row=0, column=0, sticky="w", padx=(0, 4))
verify_help.grid(row=0, column=1, sticky="w", padx=(0, 12))
self.cleanup_check.grid(row=0, column=2, sticky="w", padx=(0, 4))
cleanup_help.grid(row=0, column=3, sticky="w")
perf_frame = ttk.Frame(behavior_box)
perf_frame.pack(fill=tk.X, pady=4)
ttk.Label(perf_frame, text="Мин. свободно, ГБ:", width=20, anchor=tk.W).pack(side=tk.LEFT)
self.min_free_entry = ttk.Entry(perf_frame, textvariable=self.min_free_gb_var, width=6)
self.min_free_entry.pack(side=tk.LEFT, padx=(2, 6))
self.min_free_entry.bind("<KeyRelease>", lambda _e: self.request_autosave())
ttk.Label(perf_frame, text="Параллельных потоков:", width=23, anchor=tk.W).pack(side=tk.LEFT, padx=(4, 0))
self.max_workers_entry = ttk.Entry(perf_frame, textvariable=self.max_workers_var, width=4)
self.max_workers_entry.pack(side=tk.LEFT, padx=(2, 6))
self.max_workers_entry.bind("<KeyRelease>", lambda _e: self.request_autosave())
ttk.Label(perf_frame, text="Хранить последних:", width=19, anchor=tk.W).pack(side=tk.LEFT, padx=(4, 0))
self.keep_files_spin = ttk.Spinbox(perf_frame, from_=1, to=999, width=4, textvariable=self.keep_files_var)
self.keep_files_spin.pack(side=tk.LEFT, padx=(2, 0))
self.keep_files_spin.bind("<KeyRelease>", lambda _e: self.request_autosave())
self.keep_files_spin.bind("<FocusOut>", lambda _e: self.request_autosave())
masks_frame = ttk.Frame(behavior_box)
masks_frame.pack(fill=tk.X, pady=4)
ttk.Label(masks_frame, text="Включать маски:").pack(side=tk.LEFT)
self.include_entry = ttk.Entry(masks_frame, textvariable=self.include_masks_var)
self.include_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=4)
self.include_entry.bind("<KeyRelease>", lambda _e: self.request_autosave())
ttk.Label(masks_frame, text="Исключать маски:").pack(side=tk.LEFT, padx=6)
self.exclude_entry = ttk.Entry(masks_frame, textvariable=self.exclude_masks_var)
self.exclude_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=4)
self.exclude_entry.bind("<KeyRelease>", lambda _e: self.request_autosave())
ttk.Button(masks_frame, text="Шаблоны", command=self.show_mask_templates).pack(side=tk.LEFT, padx=6)
buttons_frame = ttk.Frame(behavior_box)
buttons_frame.pack(fill=tk.X, pady=5)
ttk.Button(buttons_frame, text="🧭 Мастер настройки",
command=self.open_wizard).pack(side=tk.LEFT, padx=5)
ttk.Button(buttons_frame, text="▶ Запустить сейчас",
command=self.start_manual_copy).pack(side=tk.LEFT, padx=5)
ttk.Button(buttons_frame, text="⏹ Остановить",
command=self.stop_copying).pack(side=tk.LEFT, padx=5)
ttk.Button(buttons_frame, text="💾 Сохранить",
command=self.check_and_save).pack(side=tk.LEFT, padx=5)
paths_frame = ttk.LabelFrame(settings_tab, text="Пары копирования", padding="10")
paths_frame.pack(fill=tk.BOTH, expand=True, pady=5)
self.pairs_tree = ttk.Treeview(
paths_frame,
columns=("source", "dest", "mode", "last", "status"),
show="headings",
selectmode="browse"
)
self.pairs_tree.heading("source", text="Источник")
self.pairs_tree.heading("dest", text="Назначение")
self.pairs_tree.heading("mode", text="Режим")
self.pairs_tree.heading("last", text="Последний файл")
self.pairs_tree.heading("status", text="Статус")
self.pairs_tree.column("source", width=240)
self.pairs_tree.column("dest", width=240)
self.pairs_tree.column("mode", width=110, anchor=tk.CENTER)
self.pairs_tree.column("last", width=160)
self.pairs_tree.column("status", width=120, anchor=tk.CENTER)
tree_scroll = ttk.Scrollbar(paths_frame, orient="vertical", command=self.pairs_tree.yview)
self.pairs_tree.configure(yscrollcommand=tree_scroll.set)
self.pairs_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
tree_scroll.pack(side=tk.RIGHT, fill=tk.Y)
self.pairs_tree.tag_configure("ok", foreground="green")
self.pairs_tree.tag_configure("warn", foreground="orange")
self.pairs_tree.tag_configure("error", foreground="red")
self.pairs_tree.tag_configure("idle", foreground="gray")
self.pairs_tree.bind("<Double-1>", self.on_tree_double_click)
self.pairs_tree.bind("<F2>", self.on_tree_f2_edit)
self.pairs_tree.bind("<Button-3>", self.on_tree_right_click)
self.pairs_tree.bind("<Motion>", self.on_tree_motion)
self.pairs_tree.bind("<Delete>", lambda _e: self.remove_selected_pair())
self.pairs_tree.bind("<Return>", lambda _e: self.open_selected_destination())
self.pairs_tree.bind("<<TreeviewSelect>>", lambda _e: self.on_pair_select())
log_frame = ttk.LabelFrame(log_tab, text="Лог операций", padding="5")
log_frame.pack(fill=tk.BOTH, expand=True, pady=5)
log_buttons = ttk.Frame(log_frame)
log_buttons.pack(fill=tk.X, pady=2)
ttk.Button(log_buttons, text="📋 Очистить лог",
command=self.clear_log).pack(side=tk.RIGHT, padx=2)
ttk.Button(log_buttons, text="🔍 Отладка",
command=self.debug_settings).pack(side=tk.RIGHT, padx=2)
ttk.Button(log_buttons, text="📂 Открыть лог",
command=self.open_log_file).pack(side=tk.LEFT, padx=2)
ttk.Button(log_buttons, text="📁 Папка настроек",
command=self.open_settings_folder).pack(side=tk.LEFT, padx=2)
self.log_text = scrolledtext.ScrolledText(log_frame, height=8, wrap=tk.WORD)
self.log_text.pack(fill=tk.BOTH, expand=True)
self.log_text.tag_configure("success", foreground="green")
self.log_text.tag_configure("error", foreground="red")
self.log_text.tag_configure("warning", foreground="orange")
self.log_text.tag_configure("info", foreground="blue")
help_text = (
"Быстрый старт:\n"
"1) Нажмите 'Добавить' и заполните источник/назначение.\n"
"2) Выберите режим: последний файл или вся папка.\n"
"3) Настройте расписание и нажмите 'Сохранить'.\n\n"
"Подсказки доступны по кнопке '?'."
)
ttk.Label(help_tab, text=help_text, justify=tk.LEFT).pack(anchor=tk.W, padx=10, pady=10)
footer = ttk.Frame(self.root)
footer.pack(side=tk.BOTTOM, fill=tk.X)
self.status_bar = ttk.Label(footer, text="F2 — редактировать, двойной клик — выбрать папку, ПКМ — меню",
relief=tk.SUNKEN, anchor=tk.W)
self.status_bar.pack(side=tk.TOP, fill=tk.X)
self.progress_var = tk.IntVar(value=0)
self.progress = ttk.Progressbar(footer, mode="determinate", variable=self.progress_var)
self.progress.pack(side=tk.TOP, fill=tk.X)
def setup_window_icon(self):
icon_path = get_resource_path(ICON_PATH)
if os.path.exists(icon_path):
with contextlib.suppress(Exception):
self.root.iconbitmap(icon_path)
def setup_file_logger(self):
settings_path = self.get_settings_path()
log_dir = os.path.dirname(settings_path)
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "backup_copier.log")
logger = logging.getLogger("backup_copier")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.handlers.RotatingFileHandler(
log_path, maxBytes=2 * 1024 * 1024, backupCount=3, encoding="utf-8"
)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def get_settings_path(self):
"""Возвращает путь для сохранения настроек"""
if getattr(sys, 'frozen', False):
# Для EXE используем папку с EXE файлом
base_path = os.path.dirname(sys.executable)
else:
# Для скрипта используем папку со скриптом
base_path = os.path.dirname(os.path.abspath(__file__))
# Пробуем создать папку, если есть права
settings_dir = base_path
try:
# Проверяем, можем ли мы писать в эту папку
test_file = os.path.join(settings_dir, 'test_write.tmp')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
self.log_message(f"📁 Используется папка: {settings_dir}", "info")
except:
# Если не можем писать, используем AppData
settings_dir = os.path.join(os.environ.get('APPDATA', os.path.expanduser('~')), 'BackupCopier')
os.makedirs(settings_dir, exist_ok=True)
self.log_message(f"📁 Используется папка AppData: {settings_dir}", "info")
return os.path.join(settings_dir, 'backup_copier_settings.json')
def create_placeholder(self, entry, text):
"""Добавляет подсказку в поле ввода."""
placeholder_color = "gray"
normal_color = "black"
def on_focus_in(_):
if entry.get() == text and entry.cget("foreground") == placeholder_color:
entry.delete(0, tk.END)
entry.config(foreground=normal_color)
def on_focus_out(_):
if not entry.get():
entry.insert(0, text)
entry.config(foreground=placeholder_color)
entry.bind("<FocusIn>", on_focus_in)
entry.bind("<FocusOut>", on_focus_out)
on_focus_out(None)
def get_autostart_command(self) -> str:
"""Возвращает команду для автозапуска."""
if getattr(sys, 'frozen', False):
return f"\"{sys.executable}\""
script_path = os.path.abspath(__file__)
return f"\"{sys.executable}\" \"{script_path}\""
def is_autostart_enabled(self) -> bool:
"""Проверяет, включен ли автозапуск в реестре."""
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_REG_PATH, 0, winreg.KEY_READ) as key:
value, _ = winreg.QueryValueEx(key, AUTOSTART_REG_NAME)
return bool(value)
except FileNotFoundError:
return False
except OSError:
return False
def set_autostart_enabled(self, enabled: bool) -> None:
"""Включает/выключает автозапуск в реестре."""
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_REG_PATH, 0, winreg.KEY_SET_VALUE) as key:
if enabled:
winreg.SetValueEx(key, AUTOSTART_REG_NAME, 0, winreg.REG_SZ, self.get_autostart_command())
self.log_message("✅ Автозапуск включен", "success")
else:
try:
winreg.DeleteValue(key, AUTOSTART_REG_NAME)
except FileNotFoundError:
pass
self.log_message("⏭️ Автозапуск выключен", "info")
except Exception as e:
self.log_message(f"❌ Ошибка автозапуска: {e}", "error")
def toggle_autostart(self):
"""Обработчик галочки автозапуска."""
self.set_autostart_enabled(self.autostart_enabled.get())
def load_settings(self):
"""Загружает настройки из файла и отображает их"""
try:
self.loading_settings = True
self.autosave_enabled = False
settings_path = self.get_settings_path()
self.log_message(f"🔍 Загрузка настроек из: {settings_path}", "info")
if os.path.exists(settings_path):
with open(settings_path, 'r', encoding='utf-8') as f:
settings = json.load(f)
self.scheduler_enabled.set(settings.get('enabled', False))
desired_autostart = settings.get('autostart_enabled')
self.minimize_to_tray_enabled.set(settings.get('minimize_to_tray', True))
# Профили
profiles = settings.get("profiles")
if not isinstance(profiles, dict):
# Миграция со старого формата
profiles = {"Default": self.default_profile()}
profiles["Default"]["pairs"] = settings.get('pairs', [])
profiles["Default"]["include_masks"] = settings.get('include_masks', DEFAULT_INCLUDE_MASKS)
profiles["Default"]["exclude_masks"] = settings.get('exclude_masks', DEFAULT_EXCLUDE_MASKS)
profiles["Default"]["min_free_gb"] = settings.get('min_free_gb', DEFAULT_MIN_FREE_GB)
profiles["Default"]["max_workers"] = settings.get('max_workers', DEFAULT_MAX_WORKERS)
profiles["Default"]["verify_only"] = settings.get('verify_only', False)
profiles["Default"]["keep_files"] = settings.get('keep_files', DEFAULT_KEEP_FILES)
profiles["Default"]["last_success_time"] = settings.get('last_success_time')
profiles["Default"]["last_result_summary"] = settings.get('last_result_summary')
profiles["Default"]["last_hashes"] = settings.get('last_hashes', {})
profiles["Default"]["schedules"] = settings.get('schedules', [])
self.profiles = profiles
self.refresh_profile_combo()
active = settings.get("active_profile") or self.active_profile.get()
self.load_profile(active, save_current=False)
# Настройки автозапуска
actual_autostart = self.is_autostart_enabled()
if desired_autostart is None:
self.autostart_enabled.set(actual_autostart)
else:
desired_autostart = bool(desired_autostart)
self.autostart_enabled.set(desired_autostart)
if desired_autostart != actual_autostart:
self.set_autostart_enabled(desired_autostart)
self.log_message(f"📂 Настройки загружены из {settings_path}", "info")
else:
# Если файла нет, добавляем пустую пару
self.log_message("🆕 Файл настроек не найден, создаем новую конфигурацию", "info")
self.profiles = {"Default": self.default_profile()}
self.refresh_profile_combo()
self.load_profile("Default", save_current=False)
self.autostart_enabled.set(self.is_autostart_enabled())
except Exception as e:
self.log_message(f"❌ Ошибка при загрузке настроек: {e}", "error")
import traceback
traceback.print_exc()
self.profiles = {"Default": self.default_profile()}
self.refresh_profile_combo()
self.load_profile("Default", save_current=False)
self.autostart_enabled.set(self.is_autostart_enabled())
finally:
self.autosave_enabled = True
self.loading_settings = False
def save_settings(self, show_message: bool = True):
"""Сохраняет настройки в файл"""
try:
self.save_active_profile()
settings = {
'enabled': self.scheduler_enabled.get(),
'autostart_enabled': self.autostart_enabled.get(),
'minimize_to_tray': self.minimize_to_tray_enabled.get(),
'active_profile': self.active_profile.get(),
'profiles': self.profiles
}
settings_path = self.get_settings_path()
# Сохраняем с отступами для читаемости
with open(settings_path, 'w', encoding='utf-8') as f:
json.dump(settings, f, ensure_ascii=False, indent=2)
if show_message:
self.log_message(f"💾 Настройки сохранены (профилей: {len(self.profiles)})", "success")
self.log_message(f"📁 Файл: {settings_path}", "info")
messagebox.showinfo("Успех", f"Настройки сохранены!\n\nФайл: {settings_path}")
if self.scheduler_enabled.get():
# Перепланируем, чтобы учесть новые пути/время
self.start_scheduler()
except Exception as e:
self.log_message(f"❌ Ошибка при сохранении настроек: {e}", "error")
messagebox.showerror("Ошибка", f"Не удалось сохранить настройки:\n{e}")
def add_path_pair(self, source="", dest="", full_copy: bool = False):
"""Добавляет новую пару в таблицу."""
pair_id = f"pair_{self.pair_counter}"
self.pair_counter += 1
pair = {
"id": pair_id,
"source": source,
"dest": dest,
"full_copy": bool(full_copy),
"last_file": "",
"include_masks": "",
"exclude_masks": "",
"status": "",
"status_tag": "idle"
}
if source or dest:
status_text, status_tag = self.validate_pair(source, dest)
pair["status"] = status_text
pair["status_tag"] = status_tag
if source or dest:
status_text, status_tag = self.validate_pair(source, dest)
pair["status"] = status_text
pair["status_tag"] = status_tag
self.copy_pairs.append(pair)
self.render_pairs()
self.pairs_tree.selection_set(pair_id)
self.on_pair_select()
self.request_autosave()
def remove_selected_pair(self):
pair_id = self.selected_pair_id
if not pair_id:
return
self.copy_pairs = [p for p in self.copy_pairs if p["id"] != pair_id]
with contextlib.suppress(Exception):
self.pairs_tree.delete(pair_id)
self.selected_pair_id = None
self.request_autosave()
def toggle_compact_view(self):
compact = self.compact_view_var.get()
style = ttk.Style()
style.configure("Treeview", rowheight=18 if compact else 24)
self.render_pairs()
def remove_path_pair(self, pair_id):
"""Совместимость: удалить пару по id."""
if pair_id:
self.selected_pair_id = pair_id
self.remove_selected_pair()
def remove_all_pairs(self, silent=False):
"""Удаляет все пары"""
if not silent:
if not messagebox.askyesno("Подтверждение", "Удалить все пути?"):
return
self.copy_pairs.clear()
for item in self.pairs_tree.get_children():
self.pairs_tree.delete(item)
self.selected_pair_id = None
self.request_autosave()
def get_pair_by_id(self, pair_id: str):
for pair in self.copy_pairs:
if pair["id"] == pair_id:
return pair
return None
def find_pair_by_paths(self, source: str, dest: str, full_copy: bool):
for pair in self.copy_pairs:
if pair.get("source") == source and pair.get("dest") == dest and bool(pair.get("full_copy", False)) == bool(full_copy):
return pair
return None
def update_pair_row(self, pair):
# Treeview is not thread-safe; schedule redraw in UI thread.
self.run_on_ui(self.render_pairs)
def shorten_path(self, path: str, max_len: int = 40) -> str:
if len(path) <= max_len:
return path
return path[:18] + "..." + path[-(max_len - 21):]
def format_status(self, status_text: str) -> str:
if status_text == "OK":
return "✅ OK"
if status_text in ("Нет источника", "Нет назначения", "Нет доступа"):
return f"{status_text}"
if status_text == "":
return ""
return status_text
def render_pairs(self):
search = (self.search_var.get() or "").lower()
status_filter = self.filter_status_var.get()
for item in self.pairs_tree.get_children():
self.pairs_tree.delete(item)
for pair in self.copy_pairs:
status_text = pair.get("status", "")
if status_filter != "Все" and status_text != status_filter:
continue
if search:
if search not in pair.get("source", "").lower() and search not in pair.get("dest", "").lower():
continue
mode_text = "Вся папка" if pair.get("full_copy") else "Последний файл"
last_file = pair.get("last_file", "")
source_disp = self.shorten_path(pair.get("source", ""))
dest_disp = self.shorten_path(pair.get("dest", ""))
self.pairs_tree.insert(
"",
"end",
iid=pair["id"],
values=(source_disp, dest_disp, mode_text, last_file, self.format_status(status_text)),
tags=(pair.get("status_tag", "idle"),)
)
def on_pair_select(self):
selection = self.pairs_tree.selection()
if not selection:
return
pair_id = selection[0]
self.selected_pair_id = pair_id
def update_selected_pair_from_edit(self):
pair_id = self.selected_pair_id
if not pair_id:
return
pair = self.get_pair_by_id(pair_id)
if not pair:
return
pair["source"] = pair.get("source", "").strip()
pair["dest"] = pair.get("dest", "").strip()
status_text, status_tag = self.validate_pair(pair["source"], pair["dest"])
pair["status"] = status_text
pair["status_tag"] = status_tag
self.update_pair_row(pair)
def on_tree_double_click(self, event):
item = self.pairs_tree.identify_row(event.y)
column = self.pairs_tree.identify_column(event.x)
if not item or not column:
return
self.selected_pair_id = item
if column == "#3":
self.toggle_selected_mode()
return
if column not in ("#1", "#2"):
return
if column == "#1":
self.pick_folder_for_item(item, "source", "Выберите папку источника")
return
if column == "#2":
self.pick_folder_for_item(item, "dest", "Выберите папку назначения")
return
def on_tree_f2_edit(self, event):
selection = self.pairs_tree.selection()
if not selection:
return
item = selection[0]
column = self.pairs_tree.identify_column(event.x)
if not column or column not in ("#1", "#2"):
column = "#1"
self.inline_edit_cell(item, column)
def on_tree_right_click(self, event):
item = self.pairs_tree.identify_row(event.y)
if item:
self.pairs_tree.selection_set(item)
self.selected_pair_id = item
menu = tk.Menu(self.root, tearoff=0)
menu.add_command(label="Выбрать источник…", command=lambda: self.pick_folder_for_item(self.selected_pair_id, "source", "Выберите папку источника"))
menu.add_command(label="Выбрать назначение…", command=lambda: self.pick_folder_for_item(self.selected_pair_id, "dest", "Выберите папку назначения"))
menu.add_command(label="Переключить режим", command=self.toggle_selected_mode)
menu.add_command(label="Открыть источник", command=self.open_selected_source)
menu.add_command(label="Открыть назначение", command=self.open_selected_destination)
menu.add_command(label="Показать лог пары", command=self.show_pair_log)
menu.add_command(label="Маски пары…", command=self.edit_pair_masks)
menu.add_separator()
menu.add_command(label="Удалить", command=self.remove_selected_pair)
menu.tk_popup(event.x_root, event.y_root)
menu.grab_release()
def on_tree_motion(self, event):
item = self.pairs_tree.identify_row(event.y)
column = self.pairs_tree.identify_column(event.x)
if not item:
return
pair = self.get_pair_by_id(item)
if not pair:
return
if column == "#1":
self.set_status_text(pair.get("source", "") or "Готов к работе")
elif column == "#2":
self.set_status_text(pair.get("dest", "") or "Готов к работе")
elif column == "#4":
self.set_status_text(pair.get("last_file", "") or "Готов к работе")
def pick_folder_for_item(self, item_id, field: str, title: str):
if not item_id:
return
folder = filedialog.askdirectory(title=title)
if folder:
pair = self.get_pair_by_id(item_id)
if pair:
pair[field] = folder
status_text, status_tag = self.validate_pair(pair["source"], pair["dest"])
pair["status"] = status_text
pair["status_tag"] = status_tag
self.update_pair_row(pair)
self.request_autosave()
def inline_edit_cell(self, item, column):
bbox = self.pairs_tree.bbox(item, column)
if not bbox:
return
x, y, w, h = bbox
value = self.pairs_tree.set(item, column)
if self.inline_editor is not None:
self.inline_editor.destroy()
self.inline_editor = None
entry = ttk.Entry(self.pairs_tree)
entry.insert(0, value)
entry.select_range(0, tk.END)
entry.place(x=x, y=y, width=w, height=h)
entry.focus_set()
self.inline_editor = entry
def commit(_evt=None):
new_val = entry.get().strip()
entry.destroy()
self.inline_editor = None
pair = self.get_pair_by_id(item)
if not pair:
return
if column == "#1":
pair["source"] = new_val
elif column == "#2":
pair["dest"] = new_val
status_text, status_tag = self.validate_pair(pair["source"], pair["dest"])
pair["status"] = status_text
pair["status_tag"] = status_tag
self.update_pair_row(pair)
self.request_autosave()
def cancel(_evt=None):
entry.destroy()
self.inline_editor = None
entry.bind("<Return>", commit)
entry.bind("<FocusOut>", commit)
entry.bind("<Escape>", cancel)
def validate_pair(self, source: str, dest: str):
if not source or not dest:
return "", "idle"
if not os.path.exists(source):
return "Нет источника", "error"
if not os.access(source, os.R_OK):
return "Нет доступа", "error"
if not os.path.exists(dest):
return "Нет назначения", "error"
if not os.access(dest, os.W_OK):
return "Нет доступа", "error"
return "OK", "ok"
def open_selected_destination(self):
pair_id = self.selected_pair_id
if not pair_id:
return
pair = self.get_pair_by_id(pair_id)
if not pair:
return
self.open_destination(pair["dest"])
def open_selected_source(self):
pair_id = self.selected_pair_id
if not pair_id:
return
pair = self.get_pair_by_id(pair_id)
if not pair:
return
self.open_destination(pair["source"])
def show_pair_log(self):
pair_id = self.selected_pair_id
if not pair_id:
return
pair = self.get_pair_by_id(pair_id)
if not pair:
return
src = pair.get("source", "")
dst = pair.get("dest", "")
content = self.log_text.get("1.0", tk.END).splitlines()
filtered = [line for line in content if (src and src in line) or (dst and dst in line)]
win = tk.Toplevel(self.root)
win.title("Лог пары")
win.geometry("700x400")
text = scrolledtext.ScrolledText(win, wrap=tk.WORD)
text.pack(fill=tk.BOTH, expand=True)
text.insert(tk.END, "\n".join(filtered) if filtered else "Нет записей для этой пары.")
def edit_pair_masks(self):
pair_id = self.selected_pair_id
if not pair_id:
return
pair = self.get_pair_by_id(pair_id)
if not pair:
return
win = tk.Toplevel(self.root)
win.title("Маски пары")
win.geometry("420x180")
win.transient(self.root)
win.grab_set()
include_var = tk.StringVar(value=pair.get("include_masks", ""))
exclude_var = tk.StringVar(value=pair.get("exclude_masks", ""))
ttk.Label(win, text="Включать маски (через ;):").pack(anchor=tk.W, padx=10, pady=4)
include_entry = ttk.Entry(win, textvariable=include_var)
include_entry.pack(fill=tk.X, padx=10, pady=2)
ttk.Label(win, text="Исключать маски (через ;):").pack(anchor=tk.W, padx=10, pady=4)
exclude_entry = ttk.Entry(win, textvariable=exclude_var)
exclude_entry.pack(fill=tk.X, padx=10, pady=2)
def apply():
pair["include_masks"] = include_var.get().strip()
pair["exclude_masks"] = exclude_var.get().strip()
self.request_autosave()
win.destroy()
btns = ttk.Frame(win)
btns.pack(pady=8)
ttk.Button(btns, text="Сохранить", command=apply).pack(side=tk.LEFT, padx=6)
ttk.Button(btns, text="Отмена", command=win.destroy).pack(side=tk.LEFT, padx=6)
def toggle_selected_mode(self):
pair_id = self.selected_pair_id
if not pair_id:
return
pair = self.get_pair_by_id(pair_id)
if not pair:
return
pair["full_copy"] = not pair["full_copy"]
self.update_pair_row(pair)
self.request_autosave()
def browse_folder(self, entry):
"""Открывает диалог выбора папки"""
folder = filedialog.askdirectory(title="Выберите папку")
if folder:
entry.delete(0, tk.END)
entry.insert(0, folder)
self.update_selected_pair_from_edit()
def open_destination(self, path: str):
if not path or path.startswith("Например: "):
return
if os.path.exists(path):
with contextlib.suppress(Exception):
os.startfile(path)
def open_log_file(self):
log_path = os.path.join(os.path.dirname(self.get_settings_path()), "backup_copier.log")
if os.path.exists(log_path):
with contextlib.suppress(Exception):
os.startfile(log_path)
def open_settings_folder(self):
folder = os.path.dirname(self.get_settings_path())
if os.path.exists(folder):
with contextlib.suppress(Exception):
os.startfile(folder)
def open_schedule_dialog(self):
dlg = tk.Toplevel(self.root)
dlg.title("Расписание")
dlg.geometry("520x360")
dlg.transient(self.root)
dlg.grab_set()
time_frame = ttk.Frame(dlg)
time_frame.pack(fill=tk.X, pady=5, padx=10)
ttk.Label(time_frame, text="Время:").pack(side=tk.LEFT)
ttk.Combobox(time_frame, textvariable=self.hour_var, values=[f"{h:02d}" for h in range(24)], width=5, state="readonly").pack(side=tk.LEFT, padx=2)
ttk.Label(time_frame, text=":").pack(side=tk.LEFT)
ttk.Combobox(time_frame, textvariable=self.minute_var, values=[f"{m:02d}" for m in range(60)], width=5, state="readonly").pack(side=tk.LEFT, padx=2)
type_frame = ttk.Frame(dlg)
type_frame.pack(fill=tk.X, pady=5, padx=10)
ttk.Label(type_frame, text="Тип:").pack(side=tk.LEFT)
ttk.Radiobutton(type_frame, text="Ежедневно", variable=self.schedule_type_var, value="daily").pack(side=tk.LEFT, padx=4)
ttk.Radiobutton(type_frame, text="Еженедельно", variable=self.schedule_type_var, value="weekly").pack(side=tk.LEFT, padx=4)
ttk.Radiobutton(type_frame, text="Ежемесячно", variable=self.schedule_type_var, value="monthly").pack(side=tk.LEFT, padx=4)
days_frame = ttk.Frame(dlg)
days_frame.pack(fill=tk.X, pady=5, padx=10)
ttk.Label(days_frame, text="Дни недели:").pack(side=tk.LEFT)
day_labels = [("Mon", "Пн"), ("Tue", "Вт"), ("Wed", "Ср"), ("Thu", "Чт"), ("Fri", "Пт"), ("Sat", "Сб"), ("Sun", "Вс")]
day_checks = []
for key, label in day_labels:
cb = ttk.Checkbutton(days_frame, text=label, variable=self.schedule_days_vars[key])
cb.pack(side=tk.LEFT, padx=2)
day_checks.append(cb)
month_frame = ttk.Frame(dlg)
month_frame.pack(fill=tk.X, pady=5, padx=10)
ttk.Label(month_frame, text="День месяца:").pack(side=tk.LEFT)
month_spin = ttk.Spinbox(month_frame, from_=1, to=31, width=4, textvariable=self.schedule_monthday_var)
month_spin.pack(side=tk.LEFT, padx=4)
ttk.Label(month_frame, text="(Если день > кол-ва дней, запуск в последний день месяца)").pack(side=tk.LEFT, padx=6)
def update_schedule_controls():
mode = self.schedule_type_var.get()
if mode == "daily":
for cb in day_checks:
cb.config(state=tk.DISABLED)
month_spin.config(state=tk.DISABLED)
elif mode == "weekly":
for cb in day_checks:
cb.config(state=tk.NORMAL)
month_spin.config(state=tk.DISABLED)
else:
for cb in day_checks:
cb.config(state=tk.DISABLED)
month_spin.config(state=tk.NORMAL)
self.schedule_type_var.trace_add("write", lambda *_: update_schedule_controls())
update_schedule_controls()
list_frame = ttk.Frame(dlg)
list_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
tree = ttk.Treeview(list_frame, columns=("time", "days"), show="headings", height=8)
tree.heading("time", text="Время")
tree.heading("days", text="Дни")
tree.column("time", width=80, anchor=tk.CENTER)
tree.column("days", width=220)
scroll = ttk.Scrollbar(list_frame, orient="vertical", command=tree.yview)
tree.configure(yscrollcommand=scroll.set)
tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scroll.pack(side=tk.RIGHT, fill=tk.Y)
def refresh_tree():
for item in tree.get_children():
tree.delete(item)
for idx, entry in enumerate(self.schedules):
sched_type = entry.get("type", "daily")
if sched_type == "monthly":
days_text = f"ежемесячно, день {entry.get('day', 1)}"
elif sched_type == "weekly":
days = entry.get("days", [])
days_text = ",".join(days) if days else "еженедельно"
else:
days_text = "ежедневно"
tree.insert("", "end", iid=f"s{idx}", values=(entry.get("time"), days_text))
def add_entry():
time_str = f"{self.hour_var.get()}:{self.minute_var.get()}"
days = [k for k, v in self.schedule_days_vars.items() if v.get()]
sched_type = self.schedule_type_var.get()
entry = {"time": time_str, "type": sched_type}
if sched_type == "weekly":
entry["days"] = days
elif sched_type == "monthly":
entry["day"] = int(self.schedule_monthday_var.get() or 1)
self.schedules.append(entry)
refresh_tree()
self.update_schedule_summary_label()
def remove_entry():
sel = tree.selection()
if not sel:
return
idx = int(sel[0].lstrip("s"))
if 0 <= idx < len(self.schedules):
self.schedules.pop(idx)
refresh_tree()
self.update_schedule_summary_label()
btns = ttk.Frame(dlg)
btns.pack(fill=tk.X, padx=10, pady=8)
ttk.Button(btns, text=" Добавить", command=add_entry).pack(side=tk.LEFT, padx=4)
ttk.Button(btns, text="❌ Удалить", command=remove_entry).pack(side=tk.LEFT, padx=4)
ttk.Button(btns, text="Закрыть", command=dlg.destroy).pack(side=tk.RIGHT, padx=4)
refresh_tree()
def refresh_profile_combo(self):
names = sorted(self.profiles.keys()) if self.profiles else ["Default"]
if not self.profiles:
self.profiles["Default"] = self.default_profile()
if self.profile_combo is not None:
self.profile_combo["values"] = names
if self.active_profile.get() not in names:
self.active_profile.set(names[0])
else:
if self.active_profile.get() not in names:
self.active_profile.set(names[0])
def default_profile(self) -> dict:
return {
"pairs": [],
"schedules": [],
"include_masks": DEFAULT_INCLUDE_MASKS,
"exclude_masks": DEFAULT_EXCLUDE_MASKS,
"min_free_gb": DEFAULT_MIN_FREE_GB,
"max_workers": DEFAULT_MAX_WORKERS,
"verify_only": False,
"cleanup_old": False,
"keep_files": DEFAULT_KEEP_FILES,
"last_success_time": None,
"last_result_summary": None,
"last_hashes": {},
}
def save_active_profile(self):
name = self.active_profile.get() or "Default"
profile = self.profiles.get(name, self.default_profile())
profile["pairs"] = [
{
"source": p.get("source", ""),
"dest": p.get("dest", ""),
"full_copy": bool(p.get("full_copy", False)),
"include_masks": p.get("include_masks", ""),
"exclude_masks": p.get("exclude_masks", ""),
}
for p in self.copy_pairs
]
profile["schedules"] = list(self.schedules)
profile["include_masks"] = self.include_masks_var.get()
profile["exclude_masks"] = self.exclude_masks_var.get()
profile["min_free_gb"] = self.get_float_setting(self.min_free_gb_var, DEFAULT_MIN_FREE_GB)
profile["max_workers"] = self.get_int_setting(self.max_workers_var, DEFAULT_MAX_WORKERS, minimum=1)
profile["verify_only"] = bool(self.verify_only.get())
profile["cleanup_old"] = bool(self.cleanup_old_var.get())
try:
profile["keep_files"] = max(1, int(self.keep_files_var.get() or DEFAULT_KEEP_FILES))
except Exception:
profile["keep_files"] = DEFAULT_KEEP_FILES
profile["skip_cleanup_confirm"] = bool(self.skip_cleanup_confirm)
profile["last_success_time"] = self.last_success_time
profile["last_result_summary"] = self.last_result_summary
profile["last_hashes"] = self.last_hashes
self.profiles[name] = profile
def load_profile(self, name: str, save_current: bool = True):
self.loading_settings = True
self.autosave_enabled = False
if save_current:
self.save_active_profile()
profile = self.profiles.get(name) or self.default_profile()
self.active_profile.set(name)
self.include_masks_var.set(profile.get("include_masks", DEFAULT_INCLUDE_MASKS))
self.exclude_masks_var.set(profile.get("exclude_masks", DEFAULT_EXCLUDE_MASKS))
self.min_free_gb_var.set(profile.get("min_free_gb", DEFAULT_MIN_FREE_GB))
self.max_workers_var.set(profile.get("max_workers", DEFAULT_MAX_WORKERS))
self.verify_only.set(profile.get("verify_only", False))
self.cleanup_old_var.set(profile.get("cleanup_old", False))
try:
self.keep_files_var.set(max(1, int(profile.get("keep_files", DEFAULT_KEEP_FILES) or DEFAULT_KEEP_FILES)))
except Exception:
self.keep_files_var.set(DEFAULT_KEEP_FILES)
self.skip_cleanup_confirm = bool(profile.get("skip_cleanup_confirm", False))
self.last_success_time = profile.get("last_success_time")
self.last_result_summary = profile.get("last_result_summary")
self.last_hashes = profile.get("last_hashes", {})
self.update_last_success_label()
self.update_status_summary()
self.remove_all_pairs(silent=True)
for item in profile.get("pairs", []):
self.add_path_pair(item.get("source", ""), item.get("dest", ""), item.get("full_copy", False))
last = self.copy_pairs[-1]
last["include_masks"] = item.get("include_masks", "")
last["exclude_masks"] = item.get("exclude_masks", "")
self.schedules = profile.get("schedules", [])
self.update_schedule_summary_label()
self.render_pairs()
if self.schedules:
first_time = self.schedules[0].get("time", "03:00")
if ":" in first_time:
hour, minute = first_time.split(":")
self.hour_var.set(hour)
self.minute_var.set(minute)
if self.scheduler_enabled.get():
self.start_scheduler()
self.autosave_enabled = True
self.loading_settings = False
def add_profile(self):
name = simpledialog.askstring("Новый профиль", "Введите имя профиля:")
if not name:
return
if name in self.profiles:
messagebox.showwarning("Профиль", "Профиль с таким именем уже существует.")
return
self.save_active_profile()
self.profiles[name] = self.default_profile()
self.active_profile.set(name)
self.refresh_profile_combo()
self.load_profile(name)
def rename_profile(self):
old = self.active_profile.get()
if not old:
return
new = simpledialog.askstring("Переименовать профиль", "Новое имя профиля:", initialvalue=old)
if not new or new == old:
return
if new in self.profiles:
messagebox.showwarning("Профиль", "Профиль с таким именем уже существует.")
return
self.save_active_profile()
self.profiles[new] = self.profiles.pop(old)
self.active_profile.set(new)
self.refresh_profile_combo()
def delete_profile(self):
name = self.active_profile.get()
if not name:
return
if not messagebox.askyesno("Профили", f"Удалить профиль '{name}'?"):
return
self.profiles.pop(name, None)
self.refresh_profile_combo()
self.load_profile(self.active_profile.get())
def export_profiles(self):
self.save_active_profile()
path = filedialog.asksaveasfilename(defaultextension=".json", filetypes=[("JSON", "*.json")])
if not path:
return
data = {
"active_profile": self.active_profile.get(),
"profiles": self.profiles
}
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
messagebox.showinfo("Экспорт", "Профили экспортированы.")
def import_profiles(self):
path = filedialog.askopenfilename(filetypes=[("JSON", "*.json")])
if not path:
return
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
profiles = data.get("profiles")
if isinstance(profiles, dict):
self.profiles.update(profiles)
self.refresh_profile_combo()
self.load_profile(data.get("active_profile") or self.active_profile.get())
def refresh_schedules_tree(self):
self.update_schedule_summary_label()
def add_schedule_from_inputs(self):
time_str = f"{self.hour_var.get()}:{self.minute_var.get()}"
days = [k for k, v in self.schedule_days_vars.items() if v.get()]
sched_type = self.schedule_type_var.get()
entry = {"time": time_str, "type": sched_type}
if sched_type == "weekly":
entry["days"] = days
elif sched_type == "monthly":
entry["day"] = int(self.schedule_monthday_var.get() or 1)
self.schedules.append(entry)
self.update_schedule_summary_label()
self.request_autosave()
if self.scheduler_enabled.get():
self.start_scheduler()
def remove_selected_schedule(self):
if not self.schedules:
return
self.schedules.pop()
self.update_schedule_summary_label()
self.request_autosave()
if self.scheduler_enabled.get():
self.start_scheduler()
def show_hint(self, title: str, message: str):
messagebox.showinfo(title, message)
def request_autosave(self):
if not self.autosave_enabled or self.loading_settings:
return
if self.autosave_job is not None:
self.root.after_cancel(self.autosave_job)
self.autosave_job = self.root.after(1000, lambda: self.save_settings(show_message=False))
def show_mask_templates(self):
choices = [
("SQL/BAK", "*.bak;*.sql;*.backup"),
("Все файлы", "*.*"),
("Только BAK", "*.bak"),
("Архивы", "*.zip;*.7z;*.rar"),
]
win = tk.Toplevel(self.root)
win.title("Шаблоны масок")
win.geometry("320x220")
win.transient(self.root)
win.grab_set()
for label, value in choices:
def apply(val=value):
self.include_masks_var.set(val)
self.request_autosave()
win.destroy()
ttk.Button(win, text=f"{label}: {value}", command=apply).pack(fill=tk.X, padx=10, pady=4)
def on_cleanup_toggle(self):
if not self.cleanup_old_var.get():
self.request_autosave()
return
if self.skip_cleanup_confirm:
self.request_autosave()
return
dlg = tk.Toplevel(self.root)
dlg.title("Подтверждение")
dlg.geometry("360x170")
dlg.transient(self.root)
dlg.grab_set()
ttk.Label(dlg, text="Удалять старые файлы в назначении?\nЭто необратимое действие.").pack(pady=10)
skip_var = tk.BooleanVar(value=False)
ttk.Checkbutton(dlg, text="Больше не спрашивать", variable=skip_var).pack()
def confirm():
self.skip_cleanup_confirm = skip_var.get()
dlg.destroy()
self.request_autosave()
def cancel():
self.cleanup_old_var.set(False)
dlg.destroy()
btns = ttk.Frame(dlg)
btns.pack(pady=10)
ttk.Button(btns, text="Да", command=confirm).pack(side=tk.LEFT, padx=8)
ttk.Button(btns, text="Нет", command=cancel).pack(side=tk.LEFT, padx=8)
def stop_copying(self):
if self.is_copying:
if not messagebox.askyesno("Подтверждение", "Остановить текущее копирование?"):
return
self.cancel_event.set()
self.log_message("⏹ Остановка копирования по запросу пользователя", "warning")
def open_wizard(self):
if getattr(self, "wizard_window", None) is not None:
try:
self.wizard_window.lift()
return
except Exception:
pass
wiz = tk.Toplevel(self.root)
wiz.title("Мастер настройки")
wiz.geometry("520x320")
wiz.transient(self.root)
wiz.grab_set()
self.wizard_window = wiz
step_var = tk.IntVar(value=0)
source_var = tk.StringVar()
dest_var = tk.StringVar()
full_copy_var = tk.BooleanVar(value=False)
schedule_enabled_var = tk.BooleanVar(value=self.scheduler_enabled.get())
hour_var = tk.StringVar(value=self.hour_var.get())
minute_var = tk.StringVar(value=self.minute_var.get())
autostart_var = tk.BooleanVar(value=self.autostart_enabled.get())
minimize_var = tk.BooleanVar(value=self.minimize_to_tray_enabled.get())
verify_var = tk.BooleanVar(value=self.verify_only.get())
def show_step(index: int):
step_var.set(index)
for i, frame in enumerate(steps):
frame.pack_forget()
if i == index:
frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
back_btn.config(state=tk.NORMAL if index > 0 else tk.DISABLED)
next_btn.config(state=tk.NORMAL if index < len(steps) - 1 else tk.DISABLED)
finish_btn.config(state=tk.NORMAL if index == len(steps) - 1 else tk.DISABLED)
steps = []
step1 = ttk.Frame(wiz)
ttk.Label(step1, text="Шаг 1/3: Пути", font=("Arial", 11, "bold")).pack(anchor=tk.W, pady=5)
ttk.Label(step1, text="Источник:").pack(anchor=tk.W)
src_entry = ttk.Entry(step1, textvariable=source_var)
src_entry.pack(fill=tk.X, pady=2)
ttk.Button(step1, text="Выбрать...", command=lambda: self.browse_folder(src_entry)).pack(anchor=tk.W, pady=2)
ttk.Label(step1, text="Назначение:").pack(anchor=tk.W, pady=5)
dst_entry = ttk.Entry(step1, textvariable=dest_var)
dst_entry.pack(fill=tk.X, pady=2)
ttk.Button(step1, text="Выбрать...", command=lambda: self.browse_folder(dst_entry)).pack(anchor=tk.W, pady=2)
steps.append(step1)
step2 = ttk.Frame(wiz)
ttk.Label(step2, text="Шаг 2/3: Режим копирования", font=("Arial", 11, "bold")).pack(anchor=tk.W, pady=5)
ttk.Checkbutton(step2, text="Копировать всю папку с подпапками", variable=full_copy_var).pack(anchor=tk.W)
steps.append(step2)
step3 = ttk.Frame(wiz)
ttk.Label(step3, text="Шаг 3/3: Расписание и поведение", font=("Arial", 11, "bold")).pack(anchor=tk.W, pady=5)
time_frame = ttk.Frame(step3)
time_frame.pack(fill=tk.X, pady=4)
ttk.Label(time_frame, text="Время:").pack(side=tk.LEFT)
ttk.Combobox(time_frame, textvariable=hour_var, values=[f"{h:02d}" for h in range(24)], width=5, state="readonly").pack(side=tk.LEFT, padx=2)
ttk.Label(time_frame, text=":").pack(side=tk.LEFT)
ttk.Combobox(time_frame, textvariable=minute_var, values=[f"{m:02d}" for m in range(60)], width=5, state="readonly").pack(side=tk.LEFT, padx=2)
ttk.Checkbutton(step3, text="Включить расписание", variable=schedule_enabled_var).pack(anchor=tk.W, pady=4)
ttk.Checkbutton(step3, text="Автозапуск Windows", variable=autostart_var).pack(anchor=tk.W)
ttk.Checkbutton(step3, text="Сворачивать в трей при закрытии", variable=minimize_var).pack(anchor=tk.W)
ttk.Checkbutton(step3, text="Только проверка (без копирования)", variable=verify_var).pack(anchor=tk.W)
steps.append(step3)
controls = ttk.Frame(wiz)
controls.pack(fill=tk.X, pady=5)
back_btn = ttk.Button(controls, text="Назад", command=lambda: show_step(step_var.get() - 1))
next_btn = ttk.Button(controls, text="Далее", command=lambda: show_step(step_var.get() + 1))
def finish():
if source_var.get().strip() and dest_var.get().strip():
self.add_path_pair(source_var.get().strip(), dest_var.get().strip(), full_copy_var.get())
self.hour_var.set(hour_var.get())
self.minute_var.set(minute_var.get())
self.scheduler_enabled.set(schedule_enabled_var.get())
self.autostart_enabled.set(autostart_var.get())
self.minimize_to_tray_enabled.set(minimize_var.get())
self.verify_only.set(verify_var.get())
if self.scheduler_enabled.get():
self.start_scheduler()
else:
self.stop_scheduler()
if self.autostart_enabled.get() != self.is_autostart_enabled():
self.set_autostart_enabled(self.autostart_enabled.get())
wiz.destroy()
self.wizard_window = None
finish_btn = ttk.Button(controls, text="Готово", command=finish)
back_btn.pack(side=tk.LEFT, padx=5)
next_btn.pack(side=tk.LEFT, padx=5)
finish_btn.pack(side=tk.RIGHT, padx=5)
def on_close():
self.wizard_window = None
wiz.destroy()
wiz.protocol("WM_DELETE_WINDOW", on_close)
show_step(0)
def get_valid_pairs(self) -> List[tuple]:
"""Возвращает список валидных пар папок"""
valid_pairs = []
for pair in self.copy_pairs:
source = pair.get("source", "").strip()
dest = pair.get("dest", "").strip()
full_copy = bool(pair.get("full_copy", False))
if source and dest:
valid_pairs.append((source, dest, full_copy))
return valid_pairs
def check_paths(self):
"""Проверяет доступность всех путей"""
self.log_message("🔍 Проверка путей:", "info")
valid_pairs = self.get_valid_pairs()
if not valid_pairs:
self.log_message("❌ Нет заполненных пар путей", "error")
return
for i, (source, dest, full_copy) in enumerate(valid_pairs, 1):
self.log_message(f"\nПара {i}:", "info")
mode_text = "вся папка" if full_copy else "последний файл"
self.log_message(f" Режим: {mode_text}", "info")
status_text, status_tag = self.validate_pair(source, dest)
if status_tag == "ok":
self.log_message(f" ✅ Проверка: OK", "success")
else:
self.log_message(f" ❌ Проверка: {status_text}", "error")
if os.path.exists(source):
bak_files = [p for p in Path(source).iterdir() if p.is_file() and p.suffix.lower() == '.bak']
self.log_message(f" Найдено .bak файлов: {len(bak_files)}", "info")
# Обновляем статусы в таблице
for pair in self.copy_pairs:
status_text, status_tag = self.validate_pair(pair.get("source", ""), pair.get("dest", ""))
pair["status"] = status_text
pair["status_tag"] = status_tag
self.update_pair_row(pair)
def find_latest_file(self, folder_path: str, include_masks: Optional[List[str]] = None, exclude_masks: Optional[List[str]] = None) -> Optional[Path]:
"""Находит самый последний файл в папке"""
try:
include_masks = include_masks or parse_masks(self.include_masks_var.get())
exclude_masks = exclude_masks or parse_masks(self.exclude_masks_var.get())
latest_file = find_latest_file_in_folder(folder_path, include_masks=include_masks, exclude_masks=exclude_masks)
if not latest_file:
if not Path(folder_path).exists():
self.log_message(f"❌ Папка не существует: {folder_path}", "error")
else:
self.log_message(f"⚠️ Не найдено .bak файлов в {folder_path}", "warning")
return None
file_time = datetime.fromtimestamp(latest_file.stat().st_mtime)
self.log_message(
f"📄 Последний файл: {latest_file.name} ({file_time.strftime('%Y-%m-%d %H:%M:%S')})",
"info"
)
return latest_file
except Exception as e:
self.log_message(f"❌ Ошибка при поиске файлов в {folder_path}: {e}", "error")
return None
def run_on_ui(self, func):
"""Планирует выполнение функции в UI-потоке."""
self.queue.put({'type': 'ui', 'func': func})
def get_float_setting(self, var, default: float) -> float:
try:
raw = var.get()
except Exception:
return float(default)
if raw in ("", None):
return float(default)
try:
return float(raw)
except (TypeError, ValueError):
return float(default)
def get_int_setting(self, var, default: int, minimum: int = 1) -> int:
try:
raw = var.get()
except Exception:
return max(minimum, int(default))
if raw in ("", None):
return max(minimum, int(default))
try:
return max(minimum, int(raw))
except (TypeError, ValueError):
return max(minimum, int(default))
def _format_active_file_label(self, file_path: str) -> str:
p = Path(file_path)
parts = p.parts
if len(parts) >= 2:
return str(Path(parts[-2]) / parts[-1])
return p.name or file_path
def set_status_text(self, text: str):
with self.active_files_lock:
has_active = bool(self.active_files)
if has_active:
self.update_status_with_active_files()
return
self.root.after(0, lambda: self.status_bar.config(text=text))
def update_status_with_active_files(self):
with self.active_files_lock:
file_paths = sorted(self.active_files.keys())
if not file_paths:
text = "Идет копирование..."
else:
labels = [self._format_active_file_label(p) for p in file_paths]
if len(labels) <= 3:
text = "Идет копирование: " + ", ".join(labels)
else:
text = f"Идет копирование: {', '.join(labels[:3])} ... (+{len(labels) - 3})"
self.root.after(0, lambda t=text: self.status_bar.config(text=t))
def begin_active_file(self, file_path: str):
with self.active_files_lock:
self.active_files[file_path] = self.active_files.get(file_path, 0) + 1
self.update_status_with_active_files()
def end_active_file(self, file_path: str):
with self.active_files_lock:
count = self.active_files.get(file_path, 0)
if count <= 1:
self.active_files.pop(file_path, None)
else:
self.active_files[file_path] = count - 1
self.update_status_with_active_files()
def set_progress_total(self, total: int):
def _set():
self.progress.config(maximum=max(1, total))
self.progress_var.set(0)
self.root.after(0, _set)
def step_progress(self, step: int = 1):
def _step():
self.progress_var.set(self.progress_var.get() + step)
self.root.after(0, _step)
def check_previous_hash(self, target_file: Path):
key = str(target_file)
if key in self.last_hashes:
try:
current_hash = compute_file_checksum(target_file)
if current_hash != self.last_hashes[key]:
self.log_message(f"⚠️ Несовпадение хеша прошлой копии: {target_file.name}", "warning")
except Exception as e:
self.log_message(f"⚠️ Не удалось проверить прошлую копию: {e}", "warning")
def copy_file_with_retries(self, source: Path, target: Path) -> bool:
for attempt in range(COPY_RETRIES + 1):
try:
shutil.copy2(source, target)
return True
except Exception as e:
if attempt >= COPY_RETRIES:
self.log_message(f"❌ Ошибка при копировании {source.name}: {e}", "error")
return False
time_module.sleep(COPY_RETRY_DELAY)
return False
def verify_only_mode(self, source: Path, target: Path) -> bool:
if not target.exists():
self.log_message(f"❌ Нет целевого файла для проверки: {target.name}", "error")
return False
if not should_copy_file(source, target):
ok, src_hash, dst_hash = compare_file_checksums(source, target)
self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={target.name}", "info")
if not ok:
self.log_message(f"⚠️ SHA256 отличается у пропущенного файла: {target.name}", "warning")
self.log_message(f"⏭️ Проверка пропущена (не изменен): {target.name}", "warning")
return True
ok, src_hash, dst_hash = compare_file_checksums(source, target)
self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={target.name}", "info")
if ok:
self.log_message(f"✅ Проверка OK: {target.name}", "success")
return True
self.log_message(f"❌ Проверка не пройдена: {target.name}", "error")
return False
def has_enough_space(self, dest_path: Path, size_bytes: int) -> bool:
min_gb = self.get_float_setting(self.min_free_gb_var, DEFAULT_MIN_FREE_GB)
free_gb = get_free_space_gb(dest_path)
if free_gb < min_gb:
self.log_message(f"❌ Недостаточно места на диске: свободно {free_gb:.2f} ГБ, минимум {min_gb} ГБ", "error")
return False
if size_bytes > 0 and (free_gb * 1024 ** 3) < size_bytes:
self.log_message(f"❌ Недостаточно места для файла: {size_bytes} байт", "error")
return False
return True
def cleanup_destination_files(self, dest_path: Path, include_masks: List[str], exclude_masks: List[str]) -> None:
try:
keep_count = max(1, int(self.keep_files_var.get() or DEFAULT_KEEP_FILES))
except Exception:
keep_count = DEFAULT_KEEP_FILES
candidates: List[Path] = []
for item in dest_path.iterdir():
if not item.is_file():
continue
if include_masks or exclude_masks:
if not matches_masks(item.name, include_masks, exclude_masks):
continue
candidates.append(item)
if len(candidates) <= keep_count:
return
candidates.sort(key=lambda p: (p.stat().st_mtime, p.name), reverse=True)
for item in candidates[keep_count:]:
try:
item.unlink()
self.log_message(f"🧹 Удален старый файл: {item.name}", "info")
except Exception as e:
msg = f"Не удалось удалить файл {item.name}: {e}"
self.log_message(msg, "warning")
self.record_error_detail(msg)
def copy_full_folder(self, source: Path, dest: Path) -> tuple:
copied = 0
skipped = 0
errors = 0
for root, _dirs, files in os.walk(source):
if self.cancel_event.is_set():
break
rel = os.path.relpath(root, source)
dest_root = dest / rel if rel != "." else dest
dest_root.mkdir(parents=True, exist_ok=True)
for fname in files:
if self.cancel_event.is_set():
break
src_file = Path(root) / fname
dst_file = dest_root / fname
self.begin_active_file(str(src_file))
try:
self.set_status_text(f"Копируется: {src_file.name}")
self.check_previous_hash(dst_file) if dst_file.exists() else None
if self.verify_only.get():
if self.verify_only_mode(src_file, dst_file):
skipped += 1
else:
errors += 1
self.step_progress()
continue
if should_copy_file(src_file, dst_file):
if not self.has_enough_space(dest_root, src_file.stat().st_size):
errors += 1
self.step_progress()
continue
if self.copy_file_with_retries(src_file, dst_file):
ok, src_hash, dst_hash = compare_file_checksums(src_file, dst_file)
self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={dst_file.name}", "info")
if ok:
copied += 1
self.last_hashes[str(dst_file)] = dst_hash
else:
errors += 1
else:
errors += 1
self.step_progress()
else:
skipped += 1
if dst_file.exists():
ok, src_hash, dst_hash = compare_file_checksums(src_file, dst_file)
self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={dst_file.name}", "info")
if not ok:
self.log_message(f"⚠️ SHA256 отличается у пропущенного файла: {dst_file.name}", "warning")
self.step_progress()
except Exception as e:
errors += 1
self.step_progress()
msg = f"❌ Ошибка при копировании {src_file.name}: {e}"
self.log_message(msg, "error")
self.record_error_detail(msg)
finally:
self.end_active_file(str(src_file))
return copied, skipped, errors
def process_pair(self, source: str, dest: str, full_copy: bool, background: bool) -> tuple:
copied_files = 0
skipped_files = 0
error_files = 0
pair_ref = self.find_pair_by_paths(source, dest, full_copy)
if self.cancel_event.is_set():
return copied_files, skipped_files, error_files
self.log_message(f"\n📁 Обработка папки: {source}", "info")
src_path = Path(source)
dest_path = Path(dest)
if full_copy:
if not src_path.exists():
msg = f"❌ Папка не существует: {source}"
self.log_message(msg, "error")
self.record_error_detail(msg)
return copied_files, skipped_files, error_files + 1
try:
dest_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
msg = f"Не могу создать папку {dest}: {e}"
self.log_message(msg, "error")
self.record_error_detail(msg)
return copied_files, skipped_files, error_files + 1
if not self.verify_only.get():
required_size = get_total_copy_size(src_path, dest_path)
if required_size > 0 and not self.has_enough_space(dest_path, required_size):
return copied_files, skipped_files, error_files + 1
if pair_ref is not None:
pair_ref["last_file"] = ""
self.update_pair_row(pair_ref)
c, s, e = self.copy_full_folder(src_path, dest_path)
if self.tray_icon is not None:
self.tray_notify(f"{source}", f"Готово: ✅ {c} / ⏭️ {s} / ❌ {e}")
return copied_files + c, skipped_files + s, error_files + e
include_masks = parse_masks(pair_ref.get("include_masks", "")) if pair_ref else []
exclude_masks = parse_masks(pair_ref.get("exclude_masks", "")) if pair_ref else []
if not include_masks and not exclude_masks:
include_masks = parse_masks(self.include_masks_var.get())
exclude_masks = parse_masks(self.exclude_masks_var.get())
latest_file = self.find_latest_file(source, include_masks=include_masks, exclude_masks=exclude_masks)
if latest_file:
if pair_ref is not None:
pair_ref["last_file"] = latest_file.name
self.update_pair_row(pair_ref)
try:
dest_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
msg = f"Не могу создать папку {dest}: {e}"
self.log_message(msg, "error")
self.record_error_detail(msg)
return copied_files, skipped_files, error_files + 1
target_file = dest_path / latest_file.name
self.begin_active_file(str(latest_file))
try:
self.set_status_text(f"Копируется: {latest_file.name}")
if target_file.exists():
self.check_previous_hash(target_file)
if self.verify_only.get():
if self.verify_only_mode(latest_file, target_file):
skipped_files += 1
else:
error_files += 1
self.step_progress()
return copied_files, skipped_files, error_files
target_existed = target_file.exists()
if should_copy_file(latest_file, target_file):
if not self.has_enough_space(dest_path, latest_file.stat().st_size):
self.step_progress()
return copied_files, skipped_files, error_files + 1
if self.copy_file_with_retries(latest_file, target_file):
copied_files += 1
src_hash = compute_file_checksum(latest_file)
dst_hash = compute_file_checksum(target_file)
self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={target_file.name}", "info")
self.last_hashes[str(target_file)] = dst_hash
if src_hash != dst_hash:
error_files += 1
copied_files -= 1
msg = f"❌ Контрольная сумма не совпала: {latest_file.name}"
self.log_message(msg, "error")
self.record_error_detail(msg)
if src_hash == dst_hash and target_existed:
self.log_message(f"✅ Обновлен: {latest_file.name} (новее)", "success")
elif src_hash == dst_hash:
self.log_message(f"✅ Скопирован: {latest_file.name}", "success")
if src_hash == dst_hash and self.cleanup_old_var.get():
self.cleanup_destination_files(dest_path, include_masks, exclude_masks)
else:
error_files += 1
msg = f"❌ Контрольная сумма не совпала: {latest_file.name}"
self.log_message(msg, "error")
self.record_error_detail(msg)
self.step_progress()
else:
skipped_files += 1
if target_file.exists():
ok, src_hash, dst_hash = compare_file_checksums(latest_file, target_file)
self.log_message(f"🔐 SHA256 src={src_hash} dest={dst_hash} file={target_file.name}", "info")
if not ok:
self.log_message(f"⚠️ SHA256 отличается у пропущенного файла: {target_file.name}", "warning")
self.log_message(f"⏭️ Пропущен: {latest_file.name} (уже актуален)", "warning")
self.step_progress()
except Exception as e:
error_files += 1
self.step_progress()
msg = f"❌ Ошибка при копировании {latest_file.name}: {e}"
self.log_message(msg, "error")
self.record_error_detail(msg)
finally:
self.end_active_file(str(latest_file))
else:
error_files += 1
self.step_progress()
if self.tray_icon is not None and background:
self.tray_notify(f"{source}", f"Готово: ✅ {copied_files} / ⏭️ {skipped_files} / ❌ {error_files}")
return copied_files, skipped_files, error_files
def copy_files_thread(self, pairs: List[tuple], background: bool = False):
"""Поток для копирования последних файлов"""
if not self.copy_lock.acquire(blocking=False):
self.log_message("⚠️ Копирование уже выполняется", "warning")
return
self.is_copying = True
self.cancel_event.clear()
self.root.after(0, lambda: self.status_bar.config(text="Идет копирование..."))
total_units = 0
for source, dest, full_copy in pairs:
if full_copy:
if os.path.exists(source):
for root, _dirs, files in os.walk(source):
for fname in files:
total_units += 1
else:
include_masks = parse_masks(self.include_masks_var.get())
exclude_masks = parse_masks(self.exclude_masks_var.get())
latest = find_latest_file_in_folder(source, include_masks=include_masks, exclude_masks=exclude_masks)
if latest:
total_units += 1
self.set_progress_total(total_units or 1)
try:
copied_files = 0
skipped_files = 0
error_files = 0
if not background:
self.log_message("\n" + "=" * 50, "info")
self.log_message("🚀 Начало копирования последних файлов", "info")
max_workers = self.get_int_setting(self.max_workers_var, DEFAULT_MAX_WORKERS, minimum=1)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self.process_pair, source, dest, full_copy, background) for source, dest, full_copy in pairs]
for future in as_completed(futures):
if self.cancel_event.is_set():
break
try:
c, s, e = future.result()
copied_files += c
skipped_files += s
error_files += e
except Exception as e:
error_files += 1
msg = f"❌ Ошибка задачи копирования: {e}"
self.log_message(msg, "error")
self.record_error_detail(msg)
# Итог
self.log_message("\n" + "=" * 50, "info")
self.log_message("📊 ИТОГ:", "info")
self.log_message(f" ✅ Скопировано/обновлено: {copied_files}", "success")
self.log_message(f" ⏭️ Пропущено: {skipped_files}", "warning")
self.log_message(f" ❌ Ошибок: {error_files}", "error")
if background:
self.log_message("⏰ Копирование по расписанию завершено", "info")
else:
self.run_on_ui(lambda: messagebox.showinfo(
"Готово",
f"Копирование завершено!\n\n✅ Скопировано: {copied_files}\n⏭️ Пропущено: {skipped_files}\n❌ Ошибок: {error_files}"
))
self.last_result_summary = f"{copied_files} / ⏭️ {skipped_files} / ❌ {error_files}"
self.run_on_ui(self.update_status_summary)
if error_files == 0:
self.last_success_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.run_on_ui(self.update_last_success_label)
self.tray_notify("Копирование завершено", f"Успешно: {copied_files}, Пропущено: {skipped_files}")
else:
detail = self.last_error_detail or "Неизвестная ошибка"
self.tray_notify("Копирование с ошибками", f"Ошибок: {error_files}. {detail}")
except Exception as e:
self.log_message(f"❌ Критическая ошибка: {e}\n{traceback.format_exc()}", "error")
self.run_on_ui(lambda: messagebox.showerror("Ошибка", f"Произошла ошибка:\n{e}"))
finally:
self.is_copying = False
with self.active_files_lock:
self.active_files.clear()
self.copy_lock.release()
self.root.after(0, lambda: self.status_bar.config(text="Готов к работе"))
self.root.after(0, lambda: self.progress_var.set(0))
def start_manual_copy(self):
"""Запускает ручное копирование"""
valid_pairs = self.get_valid_pairs()
if not valid_pairs:
messagebox.showwarning("Предупреждение", "Заполните хотя бы одну пару папок!")
return
# Запускаем копирование в отдельном потоке
copy_thread = threading.Thread(
target=self.copy_files_thread,
args=(valid_pairs, False),
daemon=True
)
copy_thread.start()
def toggle_scheduler(self):
"""Включает/выключает планировщик"""
if self.scheduler_enabled.get():
self.start_scheduler()
else:
self.stop_scheduler()
def start_scheduler(self):
"""Запускает планировщик"""
valid_pairs = self.get_valid_pairs()
if not valid_pairs:
messagebox.showwarning("Предупреждение",
"Нет настроенных путей! Планировщик не запущен.")
self.scheduler_enabled.set(False)
return
if not self.schedules:
# создаем расписание по умолчанию из полей времени
self.schedules = [{"time": f"{self.hour_var.get()}:{self.minute_var.get()}", "days": []}]
self.refresh_schedules_tree()
self.scheduler.schedule_copy_job("backup_job", self.schedules, valid_pairs)
self.scheduler.start()
self.scheduler_status.config(text=f"(активен, заданий: {len(self.schedules)})", foreground="green")
self.update_next_run_label()
self.update_last_success_label()
self.log_message("🕒 Планировщик запущен.", "info")
def stop_scheduler(self):
"""Останавливает планировщик"""
self.scheduler.stop()
self.scheduler_status.config(text="(остановлен)", foreground="red")
self.next_run_label.config(text="(следующий запуск: —)")
self.log_message("🕒 Планировщик остановлен", "info")
def update_next_run_label(self):
jobs = schedule.get_jobs("backup_job")
if jobs:
next_run = jobs[0].next_run
if next_run:
self.next_run_label.config(text=f"(следующий запуск: {next_run.strftime('%Y-%m-%d %H:%M')})")
return
self.next_run_label.config(text="(следующий запуск: —)")
def update_last_success_label(self):
if self.last_success_time:
self.last_success_label.config(text=f"(последний успех: {self.last_success_time})")
else:
self.last_success_label.config(text="(последний успех: —)")
def update_status_summary(self):
summary = self.last_result_summary or ""
self.status_summary_label.config(text=summary)
self.last_result_label.config(text=f"(последний результат: {summary})")
def update_schedule_summary_label(self):
if not self.schedules:
self.schedule_summary_label.config(text="Расписание: —")
return
parts = []
for entry in self.schedules:
time_str = entry.get("time", "")
sched_type = entry.get("type", "daily")
if sched_type == "monthly":
parts.append(f"{time_str} (ежемес., {entry.get('day', 1)})")
elif sched_type == "weekly":
days = entry.get("days", [])
days_text = ",".join(days) if days else "еженед."
parts.append(f"{time_str} ({days_text})")
else:
parts.append(f"{time_str} (ежедневно)")
self.schedule_summary_label.config(text="Расписание: " + "; ".join(parts))
def debug_settings(self):
"""Отладочный метод для проверки загрузки настроек"""
self.log_message("\n" + "=" * 50, "info")
self.log_message("🔧 ОТЛАДКА", "info")
self.log_message("=" * 50, "info")
settings_path = self.get_settings_path()
self.log_message(f"📁 Путь к настройкам: {settings_path}", "info")
self.log_message(f"📁 Файл существует: {os.path.exists(settings_path)}", "info")
if os.path.exists(settings_path):
try:
with open(settings_path, 'r', encoding='utf-8') as f:
content = f.read()
self.log_message(f"📄 Содержимое файла:", "info")
for line in content.split('\n'):
self.log_message(f" {line}", "info")
except Exception as e:
self.log_message(f"Не удалось прочитать файл: {e}", "error")
self.log_message(f"\n🖥️ Текущее состояние:", "info")
self.log_message(f" Пар в интерфейсе: {len(self.copy_pairs)}", "info")
for i, pair in enumerate(self.copy_pairs, 1):
source = pair.get("source", "")
dest = pair.get("dest", "")
mode = "вся папка" if pair.get("full_copy", False) else "последний файл"
self.log_message(f" Пара {i}:", "info")
self.log_message(f" Откуда: '{source}'", "info")
self.log_message(f" Куда: '{dest}'", "info")
self.log_message(f" Режим: {mode}", "info")
self.log_message("=" * 50, "info")
def log_message(self, message, tag=None):
"""Добавляет сообщение в лог (вызывается из любого потока)"""
# Используем queue для потокобезопасности
self.queue.put({'type': 'log', 'message': message, 'tag': tag})
file_logger = getattr(self, "file_logger", None)
if file_logger:
try:
level = logging.INFO
if tag == "error":
level = logging.ERROR
elif tag == "warning":
level = logging.WARNING
file_logger.log(level, message)
except Exception:
pass
def record_error_detail(self, message: str):
self.last_error_detail = message
def clear_log(self):
"""Очищает лог"""
self.log_text.delete(1.0, tk.END)
def process_queue(self):
"""Обрабатывает очередь сообщений из потоков"""
try:
while True:
msg = self.queue.get_nowait()
if msg['type'] == 'log':
timestamp = datetime.now().strftime("%H:%M:%S")
log_entry = f"[{timestamp}] {msg['message']}\n"
self.log_text.insert(tk.END, log_entry, msg.get('tag'))
self.log_text.see(tk.END)
elif msg['type'] == 'ui':
try:
msg['func']()
except Exception as e:
self.log_message(f"❌ Ошибка UI: {e}", "error")
except queue.Empty:
pass
finally:
self.root.after(100, self.process_queue)
def check_and_save(self):
self.check_paths()
self.save_settings()
def create_tray_image(self):
if Image is None:
return None
icon_path = get_resource_path(ICON_PATH)
if os.path.exists(icon_path):
with contextlib.suppress(Exception):
return Image.open(icon_path)
image = Image.new("RGB", (64, 64), color=(40, 40, 40))
draw = ImageDraw.Draw(image)
draw.rectangle((8, 8, 56, 56), fill=(30, 144, 255))
draw.text((18, 20), "B", fill=(255, 255, 255))
return image
def setup_tray_icon(self):
if pystray is None:
self.log_message("⚠️ Модуль pystray не установлен, трей недоступен", "warning")
return
if self.tray_icon is not None:
return
image = self.create_tray_image()
if image is None:
self.log_message("⚠️ Не удалось создать иконку для трея", "warning")
return
def on_show(_icon, _item):
self.root.after(0, self.show_window)
def on_copy(_icon, _item):
self.root.after(0, self.start_manual_copy)
def on_exit(_icon, _item):
self.root.after(0, self.exit_app)
menu = pystray.Menu(
pystray.MenuItem("Открыть", on_show, default=True),
pystray.MenuItem("Запустить копирование", on_copy),
pystray.MenuItem("Выход", on_exit),
)
self.tray_icon = pystray.Icon(APP_NAME, image, APP_NAME, menu)
self.tray_thread = threading.Thread(target=self.tray_icon.run, daemon=True)
self.tray_thread.start()
def tray_notify(self, title: str, message: str):
if self.tray_icon is None:
return
with contextlib.suppress(Exception):
self.tray_icon.notify(message, title)
def show_window(self):
self.root.deiconify()
self.root.lift()
self.root.focus_force()
def hide_to_tray(self):
if not self.minimize_to_tray_enabled.get():
self.exit_app()
return
if pystray is None:
self.root.iconify()
self.log_message("⚠️ pystray не установлен, окно свернуто в панель задач", "warning")
return
self.setup_tray_icon()
self.root.withdraw()
self.log_message("🧰 Приложение свернуто в трей", "info")
def exit_app(self):
if self.scheduler_enabled.get():
self.stop_scheduler()
if self.tray_icon is not None:
with contextlib.suppress(Exception):
self.tray_icon.stop()
self.tray_icon = None
self.root.destroy()
def main():
mutex = ensure_single_instance()
if mutex is None:
ctypes.windll.user32.MessageBoxW(None, "Приложение уже запущено.", APP_NAME, 0x00000010)
return
root = tk.Tk()
app = BackgroundFileCopyApp(root)
# Обработка закрытия окна
def on_closing():
app.hide_to_tray()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
if __name__ == "__main__":
main()