""" Duplicate Cache and Detection Module for Bagheera. This module provides the core logic for detecting duplicate images using perceptual hashing (dHash) and managing a persistent cache of these hashes and their relationships using LMDB. Classes: DuplicateCache: Manages the LMDB database for hashes and exceptions. DuplicateDetector: Background thread that performs the duplicate analysis. """ import os import logging import struct import time import collections import shutil import lmdb from pathlib import Path import PIL.Image from PySide6.QtCore import ( QObject, QThread, Signal, QMutex, QSemaphore, QReadWriteLock, QMutexLocker, QReadLocker, QWriteLocker, QRunnable ) import imagehash # For perceptual hashing from constants import ( DUPLICATE_CACHE_PATH, DUPLICATE_HASH_DB_NAME, DUPLICATE_EXCEPTIONS_DB_NAME, DUPLICATE_PENDING_DB_NAME, MAX_DHASH_DISTANCE, UITexts ) logger = logging.getLogger(__name__) # Result structure for duplicate detection DuplicateResult = collections.namedtuple( 'DuplicateResult', ['path1', 'path2', 'hash_value', 'is_exception', 'similarity', 'timestamp']) class BKTree: """A Burkhard-Keller tree for efficient similarity searching using Hamming distance.""" def __init__(self, distance_func): self.distance_func = distance_func self.tree = None def add(self, item): if self.tree is None: self.tree = (item, {}) return node = self.tree while True: val, children = node dist = self.distance_func(item, val) if dist == 0: return if dist in children: node = children[dist] else: children[dist] = (item, {}) break def query(self, item, max_dist): if self.tree is None: return [] results = [] candidates = [self.tree] while candidates: val, children = candidates.pop() dist = self.distance_func(item, val) if dist <= max_dist: results.append((val, dist)) for d in range(max(0, dist - max_dist), dist + max_dist + 1): if d in children: candidates.append(children[d]) return results class HashWorker(QRunnable): """Worker to calculate image hash in a thread pool.""" def __init__(self, path, detector, result_dict, mutex, semaphore): super().__init__() self.path = path self.detector = detector self.result_dict = result_dict self.mutex = mutex self.semaphore = semaphore def run(self): if self.detector._is_running: try: # imagehash requires a PIL/Pillow image object. with PIL.Image.open(self.path) as pil_img: # Using dHash from imagehash library as default h = str(imagehash.dhash(pil_img)) with QMutexLocker(self.mutex): self.result_dict[self.path] = h except Exception as e: logger.warning(f"HashWorker failed for {self.path}: {e}") self.semaphore.release() class DuplicateCache(QObject): """ Manages a persistent LMDB cache for perceptual hashes and duplicate relationships. Uses (device_id, inode) as primary keys for robustness against file renames/moves. """ def __init__(self): super().__init__() self._lmdb_env = None self._hash_db = None self._exceptions_db = None self._pending_db = None self._db_lock = QMutex() # Protects LMDB transactions # In-memory cache for hashes: (dev, inode) -> (hash_value, path) self._hash_cache = {} self._hash_cache_lock = QReadWriteLock() self.lmdb_open() def lmdb_open(self): cache_dir = Path(DUPLICATE_CACHE_PATH) cache_dir.mkdir(parents=True, exist_ok=True) try: self._lmdb_env = lmdb.open( DUPLICATE_CACHE_PATH, map_size=10 * 1024 * 1024 * 1024, # 10GB default max_dbs=3, # For hashes, exceptions and pending readonly=False, create=True ) self._hash_db = self._lmdb_env.open_db(DUPLICATE_HASH_DB_NAME) self._exceptions_db = self._lmdb_env.open_db(DUPLICATE_EXCEPTIONS_DB_NAME) self._pending_db = self._lmdb_env.open_db(DUPLICATE_PENDING_DB_NAME) logger.info(f"Duplicate LMDB cache opened: {DUPLICATE_CACHE_PATH}") except Exception as e: logger.error(f"Failed to open duplicate LMDB cache: {e}") self._lmdb_env = None def lmdb_close(self): if self._lmdb_env: self._lmdb_env.close() self._lmdb_env = None self._hash_db = None self._exceptions_db = None self._pending_db = None def get_hash_stats(self): """Returns (count, size_bytes) for the hash database.""" count = 0 if not self._lmdb_env: return 0, 0 with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: count = txn.stat(db=self._hash_db)['entries'] size = 0 disk_path = os.path.join(DUPLICATE_CACHE_PATH, "data.mdb") if os.path.exists(disk_path): size = os.path.getsize(disk_path) return count, size def clear_hashes(self): """Clears all hashes from the database by recreating the environment.""" with QWriteLocker(self._hash_cache_lock): self._hash_cache.clear() self.lmdb_close() try: if os.path.exists(DUPLICATE_CACHE_PATH): shutil.rmtree(DUPLICATE_CACHE_PATH) self.lmdb_open() logger.info("Duplicate hash cache cleared.") except Exception as e: logger.error(f"Error clearing duplicate LMDB: {e}") def __del__(self): self.lmdb_close() @staticmethod def _get_inode_info(path): try: stat_info = os.stat(path) return stat_info.st_dev, struct.pack('Q', stat_info.st_ino) except OSError: return 0, None def _get_lmdb_key(self, dev_id, inode_key_bytes): return f"{dev_id}-{inode_key_bytes.hex()}".encode('utf-8') def get_hash_and_path(self, dev_id, inode_key_bytes): """Retrieves hash, mtime and path for a given (dev_id, inode_key_bytes).""" # Check in-memory cache first with QReadLocker(self._hash_cache_lock): cached_data = self._hash_cache.get((dev_id, inode_key_bytes)) if cached_data: return cached_data # (hash_value, mtime, path) # Check LMDB if not self._lmdb_env: return None, 0, None with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: lmdb_key = self._get_lmdb_key(dev_id, inode_key_bytes) value_bytes = txn.get(lmdb_key, db=self._hash_db) if value_bytes: # Handle format "hash_value_str|mtime|path_str" or old "hash|path" parts = value_bytes.decode('utf-8').split('|', 2) if len(parts) == 3: hash_str, mtime_str, path_str = parts mtime = float(mtime_str) elif len(parts) == 2: hash_str, path_str = parts mtime = 0.0 # Force re-hash else: return None, 0, None with QWriteLocker(self._hash_cache_lock): self._hash_cache[(dev_id, inode_key_bytes)] = ( hash_str, mtime, path_str) return hash_str, mtime, path_str return None, 0, None def get_hash_for_path(self, path, current_mtime, dev_id=None, inode_key_bytes=None): if dev_id is None or inode_key_bytes is None: dev_id, inode_key_bytes = self._get_inode_info(path) if not inode_key_bytes: return None hash_value, cached_mtime, _ = self.get_hash_and_path(dev_id, inode_key_bytes) # Return hash only if mtime matches (with small float tolerance) if hash_value and abs(cached_mtime - current_mtime) < 0.001: return hash_value return None def add_hash_for_path(self, path, hash_value, mtime, dev_id=None, inode_key_bytes=None): if dev_id is None or inode_key_bytes is None: dev_id, inode_key_bytes = self._get_inode_info(path) if not inode_key_bytes or not self._lmdb_env: return False value_str = f"{hash_value}|{mtime}|{path}" with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: lmdb_key = self._get_lmdb_key(dev_id, inode_key_bytes) txn.put(lmdb_key, value_str.encode('utf-8'), db=self._hash_db) with QWriteLocker(self._hash_cache_lock): self._hash_cache[(dev_id, inode_key_bytes)] = (hash_value, mtime, path) return True def remove_hash_for_path(self, path, clear_relationships=True): """ Removes the hash entry for a path. Args: path: File path. clear_relationships: If True, also wipes all entries in pending and exceptions DBs involving this file. """ dev_id, inode_key_bytes = self._get_inode_info(path) if not inode_key_bytes or not self._lmdb_env: return False with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: lmdb_key = self._get_lmdb_key(dev_id, inode_key_bytes) txn.delete(lmdb_key, db=self._hash_db) with QWriteLocker(self._hash_cache_lock): self._hash_cache.pop((dev_id, inode_key_bytes), None) # Also remove any exceptions involving this path if clear_relationships: self._remove_pair_entries_for_path( dev_id, inode_key_bytes, self._exceptions_db) self._remove_pair_entries_for_path( dev_id, inode_key_bytes, self._pending_db) return True def _get_pair_lmdb_key_from_ids(self, dev1, inode1, dev2, inode2): # Ensure canonical order for exception keys key_parts = sorted([f"{dev1}-{inode1.hex()}", f"{dev2}-{inode2.hex()}"]) return f"{key_parts[0]}-{key_parts[1]}".encode('utf-8') def _get_pair_lmdb_key(self, path1, path2): dev1, inode1 = self._get_inode_info(path1) dev2, inode2 = self._get_inode_info(path2) if not inode1 or not inode2: return None return self._get_pair_lmdb_key_from_ids(dev1, inode1, dev2, inode2) def mark_as_exception(self, path1, path2, is_exception=True, similarity=None, timestamp=None): if not self._lmdb_env: return False dev1, inode1 = self._get_inode_info(path1) dev2, inode2 = self._get_inode_info(path2) if not inode1 or not inode2: return False exception_key = self._get_pair_lmdb_key_from_ids(dev1, inode1, dev2, inode2) if not exception_key: return False # Store paths in value to make exception recovery independent of hash DB ts = timestamp if timestamp is not None else int(time.time()) val_str = f"{path1}|{path2}|{similarity if similarity is not None else ''}|{ts}" value = val_str.encode('utf-8') with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: if is_exception: txn.put(exception_key, value, db=self._exceptions_db) else: txn.delete(exception_key, db=self._exceptions_db) return True def is_exception(self, path1, path2): if not self._lmdb_env: return False dev1, inode1 = self._get_inode_info(path1) dev2, inode2 = self._get_inode_info(path2) if not inode1 or not inode2: return False exception_key = self._get_pair_lmdb_key_from_ids(dev1, inode1, dev2, inode2) if not exception_key: return False with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: return txn.get(exception_key, db=self._exceptions_db) is not None def _remove_pair_entries_for_path(self, target_dev, target_inode, db_handle, txn=None): """Removes all entries involving a specific (dev, inode) pair from a pair-based DB.""" if not self._lmdb_env: return target_inode_hex = target_inode.hex() def do_remove(t): cursor = t.cursor(db=db_handle) keys_to_delete = [] for key_bytes, _ in cursor: key_str = key_bytes.decode('utf-8') parts = key_str.split('-') if len(parts) < 4: continue dev1, inode1_hex, dev2, inode2_hex = int( parts[0]), parts[1], int(parts[2]), parts[3] if (dev1 == target_dev and inode1_hex == target_inode_hex) or \ (dev2 == target_dev and inode2_hex == target_inode_hex): keys_to_delete.append(key_bytes) for key in keys_to_delete: t.delete(key, db=db_handle) if txn: do_remove(txn) else: with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as t: do_remove(t) def mark_as_pending(self, path1, path2, is_pending=True, similarity=None, timestamp=None): """Marks a pair as pending review.""" if not self._lmdb_env or self._pending_db is None: return False key = self._get_pair_lmdb_key(path1, path2) if not key: return False # Store paths in value to allow reconstruction without scanning ts = timestamp if timestamp is not None else int(time.time()) val_str = f"{path1}|{path2}|{similarity if similarity is not None else ''}|{ts}" value = val_str.encode('utf-8') with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: if is_pending: txn.put(key, value, db=self._pending_db) else: # Check if it exists before deleting to avoid errors if txn.get(key, db=self._pending_db): txn.delete(key, db=self._pending_db) return True def mark_as_pending_batch(self, pairs_data): """ Marks multiple pairs as pending review in a single transaction. pairs_data: list of (path1, path2, similarity, timestamp) """ if not self._lmdb_env or self._pending_db is None or not pairs_data: return False with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: for p1, p2, similarity, timestamp in pairs_data: key = self._get_pair_lmdb_key(p1, p2) if not key: continue ts = timestamp if timestamp is not None else int(time.time()) sim_str = str(similarity) if similarity is not None else "" val_str = f"{p1}|{p2}|{sim_str}|{ts}" value = val_str.encode('utf-8') txn.put(key, value, db=self._pending_db) return True def get_all_exceptions_set(self): """Returns a set of canonical pairs (frozenset) marked as exceptions.""" exceptions = set() if not self._lmdb_env or self._exceptions_db is None: return exceptions with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: cursor = txn.cursor(db=self._exceptions_db) for _, value_bytes in cursor: try: parts = value_bytes.decode('utf-8').split('|') if len(parts) >= 2: exceptions.add(frozenset((parts[0], parts[1]))) except Exception: continue return exceptions def get_all_pending_duplicates(self): """Retrieves all pending duplicate pairs from the database.""" results = [] if not self._lmdb_env or self._pending_db is None: return results keys_to_delete = [] with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: cursor = txn.cursor(db=self._pending_db) for key, value_bytes in cursor: try: parts = value_bytes.decode('utf-8').split('|') p1, p2 = parts[0], parts[1] sim = int(parts[2]) if len(parts) > 2 and parts[2] else None ts = int(parts[3]) if len(parts) > 3 else 0 if os.path.exists(p1) and os.path.exists(p2): results.append( DuplicateResult(p1, p2, None, False, sim, ts)) else: keys_to_delete.append(key) except Exception: keys_to_delete.append(key) continue if keys_to_delete: try: with self._lmdb_env.begin(write=True) as txn: for k in keys_to_delete: txn.delete(k, db=self._pending_db) logger.info(f"Cleaned up {len(keys_to_delete)} invalid " "pending duplicates (files deleted externally)") except Exception as e: logger.error(f"Error cleaning up pending duplicates from DB: {e}") return results def get_all_exceptions(self): """Retrieves all duplicate pairs marked as exceptions from the database.""" results = [] if not self._lmdb_env or self._exceptions_db is None: return results with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: cursor = txn.cursor(db=self._exceptions_db) for key_bytes, value_bytes in cursor: try: p1, p2 = None, None sim = None ts = 0 val_str = value_bytes.decode('utf-8') if '|' in val_str: # New format: paths are stored in the value parts = val_str.split('|') if len(parts) >= 2: p1, p2 = parts[0], parts[1] if len(parts) > 2 and parts[2]: sim = int(parts[2]) if len(parts) > 3: ts = int(parts[3]) else: ts = int(os.path.getmtime(p1)) \ if os.path.exists(p1) else 0 if not p1 or not p2: # Legacy format fallback: lookup paths in hash db key_str = key_bytes.decode('utf-8') kp = key_str.split('-') if len(kp) == 4: k1, k2 = f"{kp[0]}-{kp[1]}".encode(), f"{kp[2]}-{kp[3]}".encode() v1, v2 = txn.get(k1, db=self._hash_db), \ txn.get(k2, db=self._hash_db) if v1 and v2: # Format is hash|mtime|path|dist... path is always # index 2 p1 = v1.decode('utf-8').split('|')[2] p2 = v2.decode('utf-8').split('|')[2] if p1 and p2: if os.path.exists(p1) and os.path.exists(p2): results.append( DuplicateResult(p1, p2, None, True, sim, ts)) except Exception: continue return results def clean_stale_hashes(self): """ Removes hash entries from the database for files that no longer exist on disk. """ if not self._lmdb_env or self._hash_db is None: return 0 keys_to_delete = [] with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: cursor = txn.cursor(db=self._hash_db) for key, value_bytes in cursor: try: # value_bytes is "hash|mtime|path|last_dist" parts = value_bytes.decode('utf-8').split('|') if len(parts) >= 3: path = parts[2] if not os.path.exists(path): keys_to_delete.append(key) except Exception: keys_to_delete.append(key) # Corrupted entry continue if keys_to_delete: with self._lmdb_env.begin(write=True) as txn: for k in keys_to_delete: txn.delete(k, db=self._hash_db) logger.info(f"Cleaned up {len(keys_to_delete)} stale hash " "entries (files deleted externally)") return len(keys_to_delete) def get_all_hashes_with_paths(self): """Retrieves all hashes from the database along with their associated paths and inode info.""" # hash_value -> [(path, dev_id, inode_key_bytes)] all_hashes = collections.defaultdict(list) if not self._lmdb_env: return all_hashes with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=False) as txn: cursor = txn.cursor(db=self._hash_db) for key_bytes, value_bytes in cursor: # key_bytes is like "dev_id-inode_hex" key_str = key_bytes.decode('utf-8') parts = key_str.split('-') dev_id = int(parts[0]) inode_key_bytes = bytes.fromhex(parts[1]) # value_bytes is "hash|mtime|path|last_dist" parts_val = value_bytes.decode('utf-8').split('|') if len(parts_val) >= 3: hash_value = parts_val[0] path = parts_val[2] else: continue all_hashes[hash_value].append((path, dev_id, inode_key_bytes)) return all_hashes def rename_entry(self, old_path, new_path): """ Updates the cache entry for a file that has been renamed or moved. This involves deleting the old (dev, inode) entry and adding a new one with the new (dev, inode) and path, preserving the hash value. """ old_dev, old_inode_key_bytes = self._get_inode_info(old_path) new_dev, new_inode_key_bytes = self._get_inode_info(new_path) if not old_inode_key_bytes or not new_inode_key_bytes or not self._lmdb_env: return False # If the (dev, inode) pair is the same, only the path in the value needs # updating. # This happens if the file is renamed within the same filesystem. if (old_dev, old_inode_key_bytes) == (new_dev, new_inode_key_bytes): hash_value, mtime, _ = self.get_hash_and_path(old_dev, old_inode_key_bytes) if hash_value: self.add_hash_for_path(new_path, hash_value, mtime) self._update_pair_paths(old_path, new_path, self._pending_db) return True return False # If (dev, inode) changed (cross-filesystem move), we need to: # 1. Get the hash from the old entry. # 2. Remove the old entry. # 3. Add a new entry with the new (dev, inode) and path, using the old hash. hash_value, mtime, _ = self.get_hash_and_path(old_dev, old_inode_key_bytes) if hash_value: # This removes the old (dev, inode) entry self.remove_hash_for_path(old_path) # Adds new (dev, inode) entry self.add_hash_for_path(new_path, hash_value, mtime) self._update_pair_paths(old_path, new_path, self._pending_db) return True return False def _update_pair_paths(self, old_path, new_path, db_handle): """Updates stored paths in a pair-based DB value when a file is renamed.""" if not self._lmdb_env or db_handle is None: return with QMutexLocker(self._db_lock): with self._lmdb_env.begin(write=True) as txn: cursor = txn.cursor(db=db_handle) for key, value_bytes in cursor: val_str = value_bytes.decode('utf-8') if old_path in val_str: p1, p2 = val_str.split('|') np1 = new_path if p1 == old_path else p1 np2 = new_path if p2 == old_path else p2 txn.put(key, f"{np1}|{np2}".encode('utf-8'), db=db_handle) class DuplicateDetector(QThread): """ Worker thread for detecting duplicate images using perceptual hashing. """ progress_update = Signal(int, int, str) # current, total, message duplicates_found = Signal(list) # List of DuplicateResult detection_finished = Signal() def __init__(self, paths_to_scan, duplicate_cache, pool_manager, method="histogram_hashing", threshold=90, force_full=False): super().__init__() self.paths_to_scan = paths_to_scan self.duplicate_cache = duplicate_cache self.pool_manager = pool_manager self.method = method self.threshold = threshold # Similarity percentage (50-100) self.force_full = force_full self._is_running = True def stop(self): self._is_running = False self.wait() # Add this line def run(self): total_files = len(self.paths_to_scan) found_duplicates = [] # To store frozenset((path1, path2)) for uniqueness unique_duplicate_pairs = set() last_update_time = 0 pool = self.pool_manager.get_pool() # 1. Load existing pending duplicates from cache to avoid recalculation (unless # force_full) if not self.force_full: pending = self.duplicate_cache.get_all_pending_duplicates() for p in pending: if p.path1 in self.paths_to_scan and p.path2 in self.paths_to_scan: if p.similarity is None or p.similarity >= self.threshold: found_duplicates.append(p) unique_duplicate_pairs.add(frozenset((p.path1, p.path2))) # Convert similarity threshold (percentage) to Hamming distance distance_threshold = int(MAX_DHASH_DISTANCE * (100 - self.threshold) / 100) logger.info( f"Duplicate detection: Method={self.method}, " f"Similarity Threshold={self.threshold}%, Hamming " f"Distance Threshold={distance_threshold}") # 2. Phase 1: Hash Collection (Parallelized) path_to_hash = {} dirty_hashes_objs = set() dirty_paths = set() paths_to_hash_parallel = [] processed_initial = 0 for i, path in enumerate(self.paths_to_scan): if not self._is_running: break try: stat_info = os.stat(path) mtime = stat_info.st_mtime dev, inode = stat_info.st_dev, struct.pack('Q', stat_info.st_ino) # Update UI during initial cache check (Phase 1 part A) processed_initial += 1 cached_h = \ self.duplicate_cache.get_hash_for_path(path, mtime, dev, inode) if cached_h: path_to_hash[path] = (cached_h, dev, inode) else: dirty_paths.add(path) paths_to_hash_parallel.append((path, mtime, dev, inode)) if time.perf_counter() - last_update_time > 0.05: # Scale this part to 0-50% of the total bar progress = int((processed_initial / total_files) * total_files) self.progress_update.emit( progress, total_files * 2, UITexts.DUPLICATE_MSG_HASHING.format(filename="...")) last_update_time = time.perf_counter() except OSError: continue if paths_to_hash_parallel and self._is_running: batch_size = pool.maxThreadCount() * 2 results_mutex = QMutex() new_hashes = {} sem = QSemaphore(0) # Phase 1 part B: Parallel hashing for new/changed files processed_hashing = total_files - len(paths_to_hash_parallel) for i in range(0, len(paths_to_hash_parallel), batch_size): if not self._is_running: break current_batch = paths_to_hash_parallel[i : i + batch_size] for p_data in current_batch: pool.start(HashWorker( p_data[0], self, new_hashes, results_mutex, sem)) for j in range(len(current_batch)): while not sem.tryAcquire(1, 100): if not self._is_running: break if not self._is_running: break processed_hashing += 1 if time.perf_counter() - last_update_time > 0.03: self.progress_update.emit( processed_hashing, total_files * 2, UITexts.DUPLICATE_MSG_HASHING.format(filename="...")) last_update_time = time.perf_counter() for p, mtime, dev, inode in paths_to_hash_parallel: h = new_hashes.get(p) if h: path_to_hash[p] = (h, dev, inode) dirty_hashes_objs.add(imagehash.hex_to_hash(h)) self.duplicate_cache.add_hash_for_path(p, h, mtime, dev, inode) if not self._is_running: self.detection_finished.emit() return # 3. Phase 2: Comparison (Optimized with BK-Tree) hash_map = collections.defaultdict(list) bk_tree = BKTree(lambda a, b: a - b) path_items = list(path_to_hash.items()) total_items = len(path_items) for i, (p, (h_str, dev, inode)) in enumerate(path_items): if not self._is_running: break # Sub-phase: Indexing hashes into the BK-Tree for comparison if time.perf_counter() - last_update_time > 0.05 \ or i == 0 or i == total_items - 1: # Scale Indexing to 50% - 75% range of the total bar indexing_progress = int((i / total_items) * (total_files / 2)) \ if total_items > 0 else 0 self.progress_update.emit( total_files + indexing_progress, total_files * 2, UITexts.DUPLICATE_MSG_ANALYZING.format(filename="...")) last_update_time = time.perf_counter() h_obj = imagehash.hex_to_hash(h_str) if h_obj not in hash_map: bk_tree.add(h_obj) hash_map[h_obj].append((p, dev, inode)) if self.force_full or p in dirty_paths: dirty_hashes_objs.add(h_obj) # Optimization: Only query the tree for hashes associated with new or modified # files. # This finds pairs (Dirty, Clean) and (Dirty, Dirty). (Clean, Clean) were # handled in previous runs. hashes_to_query = list(dirty_hashes_objs) \ if not self.force_full else list(hash_map.keys()) total_queries = len(hashes_to_query) pending_db_updates = [] # Pre-load exceptions into memory to avoid thousands of DB lookups self.progress_update.emit( total_files, total_files * 2, UITexts.DUPLICATE_MSG_ANALYZING.format(filename="...")) exceptions_set = self.duplicate_cache.get_all_exceptions_set() if total_queries == 0: # Nothing new to analyze, jump to end of detection phase self.progress_update.emit( total_files * 2, total_files * 2, UITexts.DUPLICATE_MSG_ANALYZING.format(filename="... (OK)")) for i, h1 in enumerate(hashes_to_query): if not self._is_running: break items1 = hash_map[h1] # Update progress more frequently during analysis phase if time.perf_counter() - last_update_time > 0.05 \ or i == 0 or i == total_queries - 1: # Scale Comparison to 75% - 100% range comparison_progress = int(((i + 1) / total_queries) * (total_files / 2)) \ if total_queries > 0 else (total_files / 2) self.progress_update.emit( int(total_files * 1.5 + comparison_progress), total_files * 2, UITexts.DUPLICATE_MSG_ANALYZING.format(filename="...")) last_update_time = time.perf_counter() # Query tree for similar hashes for h2, distance in bk_tree.query(h1, distance_threshold): items2 = hash_map[h2] for p1, dev1, ino1 in items1: for p2, dev2, ino2 in items2: if not self._is_running: break if (dev1, ino1) == (dev2, ino2): continue # Optimization: Skip pair if BOTH were already verified if not self.force_full \ and p1 not in dirty_paths and p2 not in dirty_paths: continue canonical = frozenset((p1, p2)) if not self._is_running: break if canonical not in unique_duplicate_pairs: if canonical not in exceptions_set: sim = int((1.0 - (distance / MAX_DHASH_DISTANCE)) * 100) ts = int(time.time()) res = DuplicateResult(p1, p2, str(h1), False, sim, ts) found_duplicates.append(res) unique_duplicate_pairs.add(canonical) # Frequent UI heartbeat for large duplicate groups if time.perf_counter() - last_update_time > 0.05: comparison_progress = int(((i + 1) / total_queries) * (total_files / 2)) self.progress_update.emit( int(total_files * 1.5 + comparison_progress), total_files * 2, UITexts.DUPLICATE_MSG_ANALYZING.format( filename="...")) last_update_time = time.perf_counter() # Collect for batch update to improve performance pending_db_updates.append((p1, p2, sim, ts)) # Periodically flush pending updates to DB if len(pending_db_updates) >= 50: self.duplicate_cache.mark_as_pending_batch(pending_db_updates) pending_db_updates = [] # Final flush of remaining updates if pending_db_updates: self.duplicate_cache.mark_as_pending_batch(pending_db_updates) self.duplicates_found.emit(found_duplicates) self.detection_finished.emit()