|
https://wwvu.lanzouq.com/icus82tvlt9c
通过网盘分享的文件:电脑自动备份插入U盘数据.exe
链接: https://pan.baidu.com/s/1fI_QTf3F6DKw7cFzMJr2Hw?pwd=52pj 提取码: 52pj
- import os
- import shutil
- import time
- import string
- import win32file # 需要安装 pywin32
- import logging
- from datetime import datetime
- import threading
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import tkinter as tk
- from tkinter import scrolledtext
- import queue
-
- # --- 配置 ---
- # 备份文件存储的基础路径 (请确保这个文件夹存在,或者脚本有权限创建它)
- BACKUP_DESTINATION_BASE = r"D:\USB_Backups"
- # 检测时间间隔(秒)
- CHECK_INTERVAL = 5
- # 日志文件路径
- LOG_FILE = os.path.join(BACKUP_DESTINATION_BASE, "usb_backup_log.txt")
- # --- 配置结束 ---
-
- # --- GUI 相关 ---
- class TextHandler(logging.Handler):
- """自定义日志处理器,将日志记录发送到 Text 控件"""
- def __init__(self, text_widget):
- logging.Handler.__init__(self)
- self.text_widget = text_widget
- self.queue = queue.Queue()
- # 启动**个线程来处理队列中的日志消息,避免阻塞主线程
- self.thread = threading.Thread(target=self.process_queue, daemon=True)
- self.thread.start()
-
- def emit(self, record):
- msg = self.format(record)
- self.queue.put(msg)
-
- def process_queue(self):
- while True:
- try:
- msg = self.queue.get()
- if msg is None: # Sentinel value to stop the thread
- break
- # Schedule GUI update on the main thread
- def update_widget():
- try:
- self.text_widget.configure(state='normal')
- self.text_widget.insert(tk.END, msg + '\n')
- self.text_widget.configure(state='disabled')
- self.text_widget.yview(tk.END)
- except tk.TclError: # Handle cases where the widget might be destroyed
- pass
- self.text_widget.after(0, update_widget)
- self.queue.task_done()
- except Exception:
- # 处理可能的窗口已销毁等异常
- import traceback
- traceback.print_exc()
- break
-
- def close(self):
- self.stop_processing() # Signal the thread to stop
- # Don't join here to avoid blocking the main thread
- logging.Handler.close(self)
-
- def stop_processing(self):
- """Signals the processing thread to stop without waiting for it."""
- self.queue.put(None) # Send sentinel to stop the processing thread
-
- class App(tk.Tk):
- def __init__(self):
- super().__init__()
- self.title("USB 自动备份")
- self.geometry("600x400")
-
- self.log_text = scrolledtext.ScrolledText(self, state='disabled', wrap=tk.WORD)
- self.log_text.pack(expand=True, fill='both', padx=10, pady=5)
-
- self.status_label = tk.Label(self, text="状态: 初始化中...", anchor='w')
- self.status_label.pack(fill='x', padx=10, pady=2)
-
- self.exit_button = tk.Button(self, text="退出", command=self.quit_app)
- self.exit_button.pack(pady=5)
-
- self.backup_thread = None
- self.running = True
- self.protocol("WM_DELETE_WINDOW", self.quit_app)
-
- self.configure_logging()
-
- def configure_logging(self):
- # 日志配置前先确保备份目录存在
- if not os.path.exists(BACKUP_DESTINATION_BASE):
- try:
- os.makedirs(BACKUP_DESTINATION_BASE)
- except Exception as e:
- # 如果无法创建目录,在GUI中显示错误
- self.update_status(f"错误: 无法创建备份目录 {BACKUP_DESTINATION_BASE}: {e}")
- self.log_text.configure(state='normal')
- self.log_text.insert(tk.END, f"错误: 无法创建备份目录 {BACKUP_DESTINATION_BASE}: {e}\n")
- self.log_text.configure(state='disabled')
- return
-
- log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
-
- # 文件处理器
- file_handler = logging.FileHandler(LOG_FILE)
- file_handler.setFormatter(log_formatter)
-
- # GUI 文本框处理器
- self.text_handler = TextHandler(self.log_text)
- self.text_handler.setFormatter(log_formatter)
-
- # 获取根 logger 并添加处理器
- root_logger = logging.getLogger()
- root_logger.setLevel(logging.INFO)
- # 清除可能存在的默认处理器(例如 basicConfig 创建的 StreamHandler)
- if root_logger.hasHandlers():
- root_logger.handlers.clear()
- root_logger.addHandler(file_handler)
- root_logger.addHandler(self.text_handler)
- # 添加**个 StreamHandler 以便在控制台也看到日志(调试用)
- # stream_handler = logging.StreamHandler()
- # stream_handler.setFormatter(log_formatter)
- # root_logger.addHandler(stream_handler)
-
- def update_status(self, message):
- # 使用 self.after 将 GUI 更新调度回主线程
- self.after(0, lambda: self.status_label.config(text=f"状态: {message}"))
-
- def start_backup_monitor(self):
- self.backup_thread = threading.Thread(target=run_backup_monitor, args=(self,), daemon=True)
- self.backup_thread.start()
-
- def quit_app(self):
- logging.info("收到退出信号,程序即将关闭。")
- self.running = False # Signal the backup thread to stop
-
- # Signal the logger thread to stop processing new messages
- if hasattr(self, 'text_handler'):
- self.text_handler.stop_processing()
-
- # Give the backup thread a short time to finish
- if self.backup_thread and self.backup_thread.is_alive():
- try:
- self.backup_thread.join(timeout=1.0) # Wait max 1 second
- if self.backup_thread.is_alive():
- logging.warning("备份线程未能在1秒内停止,将强制关闭窗口。")
- except Exception as e:
- logging.error(f"等待备份线程时出错: {e}")
-
- # Close the main window. Daemon threads will be terminated.
- self.destroy()
- # os._exit(0) # Avoid force exit, let the application close naturally
-
- # --- 核心备份逻辑 (从旧 main 函数提取) ---
- def get_available_drives():
- """获取当前所有可用的驱动器盘符"""
- drives = []
- bitmask = win32file.GetLogicalDrives()
- for letter in string.ascii_uppercase:
- if bitmask & 1:
- drives.append(letter)
- bitmask >>= 1
- return set(drives)
-
- def is_removable_drive(drive_letter):
- """判断指定盘符是否是可移动驱动器 (U盘通常是这个类型)"""
- drive_path = f"{drive_letter}:"
- try:
- # DRIVE_REMOVABLE 的类型代码是 2
- return win32file.GetDriveTypeW(drive_path) == win32file.DRIVE_REMOVABLE
- except Exception as e:
- # logging.error(f"检查驱动器 {drive_path} 类型时出错: {e}") # 可能在驱动器刚插入时发生
- return False
-
- def should_skip_file(src, dst):
- """判断是否需要跳过备份(增量备份逻辑)"""
- if not os.path.exists(dst):
- return False
- try:
- src_stat = os.stat(src)
- dst_stat = os.stat(dst)
- # 文件大小和修改时间都相同则跳过
- return src_stat.st_size == dst_stat.st_size and int(src_stat.st_mtime) == int(dst_stat.st_mtime)
- except Exception:
- return False
-
- def copy_file_with_log(src, dst):
- try:
- file_size = os.path.getsize(src)
- # 超过128MB的大文件采用分块复制
- if file_size > 128 * 1024 * 1024:
- chunk_size = 16 * 1024 * 1024 # 16MB
- with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst:
- while True:
- chunk = fsrc.read(chunk_size)
- if not chunk:
- break
- fdst.write(chunk)
- # 尝试复制亓数据,如果失败则记录但不中断
- try:
- shutil.copystat(src, dst)
- except Exception as e_stat:
- logging.warning(f"无法复制亓数据 {src} -> {dst}: {e_stat}")
- logging.info(f"分块复制大文件: {src} -> {dst}")
- else:
- shutil.copy2(src, dst)
- logging.info(f"已复制: {src} -> {dst}")
- except Exception as e:
- logging.error(f"复制文件 {src} 时出错: {e}")
-
- def threaded_copytree(src, dst, skip_exts=None, skip_dirs=None, max_workers=8):
- """线程池递归复制目录,支持增量备份和跳过指定类型,限制**大线程数"""
- if skip_exts is None:
- skip_exts = ['.tmp', '.log', '.sys']
- if skip_dirs is None:
- skip_dirs = ['$RECYCLE.BIN', 'System Volume Information']
- if not os.path.exists(dst):
- try:
- os.makedirs(dst)
- except Exception as e_mkdir:
- logging.error(f"创建目录 {dst} 失败: {e_mkdir}")
- return #无法创建目标目录,则无法继续复制
- tasks = []
- small_files = []
- try:
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- for item in os.listdir(src):
- s = os.path.join(src, item)
- d = os.path.join(dst, item)
- try:
- if os.path.isdir(s):
- if item in skip_dirs:
- logging.info(f"跳过目录: {s}")
- continue
- # 递归调用也放入线程池
- tasks.append(executor.submit(threaded_copytree, s, d, skip_exts, skip_dirs, max_workers))
- else:
- ext = os.path.splitext(item)[1].lower()
- if ext in skip_exts:
- logging.info(f"跳过文件: {s}")
- continue
- if should_skip_file(s, d):
- # logging.debug(f"跳过未变更文件: {s}") # 改为 debug 级别
- continue
- # 小于16MB的小文件批量处理
- if os.path.getsize(s) < 16 * 1024 * 1024:
- small_files.append((s, d))
- else:
- tasks.append(executor.submit(copy_file_with_log, s, d))
- except PermissionError:
- logging.warning(f"无权限访问: {s},跳过")
- except FileNotFoundError:
- logging.warning(f"文件或目录不存在(可能在扫描时被移除): {s},跳过")
- except Exception as e_item:
- logging.error(f"处理 {s} 时出错: {e_item}")
-
- # 批量提交小文件任务,减少线程调度开销
- batch_size = 16
- for i in range(0, len(small_files), batch_size):
- batch = small_files[i:i+batch_size]
- tasks.append(executor.submit(batch_copy_files, batch))
-
- # 等待所有任务完成
- for future in as_completed(tasks):
- try:
- future.result() # 获取结果以暴露异常
- except Exception as e_future:
- logging.error(f"线程池任务出错: {e_future}")
- except PermissionError:
- logging.error(f"无权限访问源目录: {src}")
- except FileNotFoundError:
- logging.error(f"源目录不存在: {src}")
- except Exception as e_pool:
- logging.error(f"处理目录 {src} 时线程池出错: {e_pool}")
-
- def batch_copy_files(file_pairs):
- for src, dst in file_pairs:
- copy_file_with_log(src, dst)
-
- def backup_usb_drive(drive_letter, app_instance):
- """执行U盘备份(多线程+增量),并更新GUI状态"""
- source_drive = f"{drive_letter}:"
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- destination_folder = os.path.join(BACKUP_DESTINATION_BASE, f"Backup_{drive_letter}_{timestamp}")
-
- logging.info(f"检测到U盘: {source_drive}")
- app_instance.update_status(f"检测到U盘: {drive_letter}:\\,准备备份...")
- logging.info(f"开始备份到: {destination_folder}")
- app_instance.update_status(f"开始备份 {drive_letter}:\\ 到 {destination_folder}")
-
- start_time = time.time()
- try:
- threaded_copytree(source_drive, destination_folder, max_workers=16)
- end_time = time.time()
- duration = end_time - start_time
- logging.info(f"成功完成备份: {source_drive} -> {destination_folder} (耗时: {duration:.2f} 秒)")
- app_instance.update_status(f"备份完成: {drive_letter}:\\ (耗时: {duration:.2f} 秒)")
- except FileNotFoundError:
- logging.error(f"错误:源驱动器 {source_drive} 不存在或无法访问。")
- app_instance.update_status(f"错误: 无法访问 {drive_letter}:")
- except PermissionError:
- logging.error(f"错误:没有权限读取 {source_drive} 或写入 {destination_folder}。请检查权限设置。")
- app_instance.update_status(f"错误: 权限不足 {drive_letter}:\\ 或目标文件夹")
- except Exception as e:
- logging.error(f"备份U盘 {source_drive} 时发生未知错误: {e}")
- app_instance.update_status(f"错误: 备份 {drive_letter}:\\ 时发生未知错误")
- finally:
- # 短暂显示完成/错误状态后,恢复到空闲状态
- app_instance.after(5000, lambda: app_instance.update_status("空闲,等待U盘插入..."))
-
- def run_backup_monitor(app_instance):
- """后台监控线程的主函数"""
- logging.info("U盘自动备份程序启动...")
- logging.info(f"备份将存储在: {BACKUP_DESTINATION_BASE}")
- app_instance.update_status("启动成功,等待U盘插入...")
-
- # 检查备份目录是否已成功创建(在 App 初始化时完成)
- if not os.path.exists(BACKUP_DESTINATION_BASE):
- logging.error(f"无法启动监控:备份目录 {BACKUP_DESTINATION_BASE} 不存在且无法创建。")
- app_instance.update_status(f"错误: 备份目录不存在且无法创建")
- return
-
- try:
- known_drives = get_available_drives()
- logging.info(f"当前已知驱动器: {sorted(list(known_drives))}")
- except Exception as e_init_drives:
- logging.error(f"初始化获取驱动器列表失败: {e_init_drives}")
- app_instance.update_status(f"错误: 获取驱动器列表失败")
- known_drives = set()
-
- while app_instance.running:
- try:
- app_instance.update_status("正在检测驱动器...")
- current_drives = get_available_drives()
- new_drives = current_drives - known_drives
- removed_drives = known_drives - current_drives
-
- if new_drives:
- logging.info(f"检测到新驱动器: {sorted(list(new_drives))}")
- for drive in new_drives:
- if not app_instance.running: break # Check flag before potentially long operation
- # 稍作等待,确保驱动器已准备好
- logging.info(f"等待驱动器 {drive}: 准备就绪...")
- app_instance.update_status(f"检测到新驱动器 {drive}:,等待准备就绪...")
- time.sleep(3) # 增加等待时间
- if not app_instance.running: break
- try:
- if is_removable_drive(drive):
- backup_usb_drive(drive, app_instance)
- else:
- logging.info(f"驱动器 {drive}: 不是可移动驱动器,跳过备份。")
- app_instance.update_status(f"驱动器 {drive}: 非U盘,跳过")
- # 短暂显示后恢复空闲
- app_instance.after(3000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
- except Exception as e_check:
- logging.error(f"检查或备份驱动器 {drive}: 时出错: {e_check}")
- app_instance.update_status(f"错误: 处理驱动器 {drive}: 时出错")
- app_instance.after(5000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
-
- if removed_drives:
- logging.info(f"检测到驱动器移除: {sorted(list(removed_drives))}")
- # Optionally update status for removed drives
- # app_instance.update_status(f"驱动器 {','.join(sorted(list(removed_drives)))} 已移除")
- # app_instance.after(3000, lambda: app_instance.update_status("空闲,等待U盘插入...") if app_instance.running else None)
-
- # 更新已知驱动器列表
- known_drives = current_drives
-
- # 在循环末尾更新状态为空闲(如果没有正在进行的草作)
- if not new_drives and app_instance.status_label.cget("text").startswith("状态: 正在检测驱动器"):
- app_instance.update_status("空闲,等待U盘插入...")
-
- # 等待指定间隔,并允许提前退出
- interval_counter = 0
- while app_instance.running and interval_counter < CHECK_INTERVAL:
- time.sleep(1)
- interval_counter += 1
- if not app_instance.running:
- break
-
- except Exception as e:
- logging.error(f"主循环发生错误: {e}")
- app_instance.update_status(f"错误: {e}")
- # 防止因临时错误导致程序崩溃,稍等后继续,并允许提前退出
- error_wait_counter = 0
- while app_instance.running and error_wait_counter < CHECK_INTERVAL * 2:
- time.sleep(1)
- error_wait_counter += 1
- if not app_instance.running:
- break
-
- logging.info("后台监控线程已停止。")
- app_instance.update_status("程序已停止")
-
- if __name__ == "__main__":
- app = App()
- app.start_backup_monitor()
- app.mainloop()
复制代码
|
|