1636 lines
62 KiB
Python
1636 lines
62 KiB
Python
"""
|
|
Image Scanner Module for Bagheera Image Viewer.
|
|
|
|
This module handles asynchronous image discovery, thumbnail generation, and
|
|
persistent caching. It ensures the main application UI remains responsive by
|
|
offloading file I/O and image processing to background threads.
|
|
|
|
Key Features:
|
|
- Asynchronous Scanning: Directory traversal and image loading in separate threads.
|
|
- Two-Tier Caching:
|
|
- In-Memory: LRU-based cache with safe concurrent access.
|
|
- Disk: Persistent storage using LMDB (Lightning Memory-Mapped Database).
|
|
- Incremental Loading: Batched image processing for memory efficiency.
|
|
- Cache Maintenance: Automatic cleanup of invalid entries and size enforcement.
|
|
- Improved Synchronization: Context managers, proper mutex handling, no deadlocks.
|
|
- Better Path Handling: Uses pathlib for robust path operations.
|
|
- Enhanced Error Handling: Detailed logging and recovery mechanisms.
|
|
"""
|
|
import os
|
|
import logging
|
|
import shlex
|
|
import shutil
|
|
import struct
|
|
import subprocess
|
|
import time
|
|
import argparse
|
|
import collections
|
|
from pathlib import Path
|
|
from contextlib import contextmanager
|
|
import lmdb
|
|
from PySide6.QtCore import (QObject, QThread, Signal, QMutex, QReadWriteLock, QSize,
|
|
QWaitCondition, QByteArray, QBuffer, QIODevice, Qt, QTimer,
|
|
QRunnable, QThreadPool)
|
|
from PySide6.QtGui import QImage, QImageReader, QImageIOHandler
|
|
|
|
from constants import (
|
|
APP_CONFIG, CACHE_PATH, CACHE_MAX_SIZE, CONFIG_DIR, DISK_CACHE_MAX_BYTES,
|
|
IMAGE_EXTENSIONS, SEARCH_CMD, THUMBNAIL_SIZES, RATING_XATTR_NAME, XATTR_NAME,
|
|
UITexts, SCANNER_SETTINGS_DEFAULTS, HAVE_BAGHEERASEARCH_LIB
|
|
)
|
|
|
|
from imageviewer import ImageViewer
|
|
from metadatamanager import XattrManager
|
|
|
|
if HAVE_BAGHEERASEARCH_LIB:
|
|
from bagheera_search_lib import BagheeraSearcher
|
|
|
|
# Set up logging for better debugging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def generate_thumbnail(path, size):
|
|
"""Generates a QImage thumbnail for a given path and size."""
|
|
try:
|
|
reader = QImageReader(path)
|
|
|
|
# Optimization: Instruct the image decoder to scale while reading.
|
|
# This drastically reduces memory usage and CPU time for large images
|
|
# (e.g. JPEG).
|
|
if reader.supportsOption(QImageIOHandler.ImageOption.ScaledSize):
|
|
orig_size = reader.size()
|
|
if orig_size.isValid() \
|
|
and (orig_size.width() > size or orig_size.height() > size):
|
|
target_size = QSize(orig_size)
|
|
target_size.scale(size, size, Qt.KeepAspectRatio)
|
|
reader.setScaledSize(target_size)
|
|
|
|
reader.setAutoTransform(True)
|
|
img = reader.read()
|
|
|
|
# Fallback: If optimization failed (and it wasn't just a missing file),
|
|
# try standard read
|
|
if img.isNull():
|
|
error = reader.error()
|
|
# Optimize: Don't retry for AppleDouble files (._*) if UnknownError
|
|
if os.path.basename(path).startswith("._") and \
|
|
error == QImageReader.ImageReaderError.UnknownError:
|
|
return None
|
|
|
|
if error != QImageReader.ImageReaderError.FileNotFoundError:
|
|
reader = QImageReader(path)
|
|
reader.setAutoTransform(True)
|
|
img = reader.read()
|
|
|
|
if img.isNull():
|
|
return None
|
|
# Always scale to the target size. Qt.KeepAspectRatio will handle
|
|
# both upscaling and downscaling correctly. SmoothTransformation gives
|
|
# better quality for upscaling.
|
|
return img.scaled(size, size, Qt.KeepAspectRatio, Qt.SmoothTransformation)
|
|
except Exception as e:
|
|
logger.error(f"Error generating thumbnail for {path}: {e}")
|
|
return None
|
|
|
|
|
|
class CacheWriter(QThread):
|
|
"""
|
|
Dedicated background thread for writing thumbnails to LMDB asynchronously.
|
|
Batches writes to improve disk throughput and avoid blocking the scanner.
|
|
"""
|
|
def __init__(self, cache):
|
|
super().__init__()
|
|
self.cache = cache
|
|
# Use deque for flexible buffer management and deduplication
|
|
self._queue = collections.deque()
|
|
self._mutex = QMutex()
|
|
self._condition_new_data = QWaitCondition()
|
|
self._condition_space_available = QWaitCondition()
|
|
# Soft limit for blocking producers (background threads)
|
|
self._max_size = 50
|
|
self._running = True
|
|
|
|
def enqueue(self, item, block=False):
|
|
"""Queue an item for writing. Item: (device_id, inode_key, img, mtime, size,
|
|
path)"""
|
|
if not self._running:
|
|
return
|
|
|
|
self._mutex.lock()
|
|
try:
|
|
if block:
|
|
# Backpressure for background threads (Scanner/Generator)
|
|
while len(self._queue) >= self._max_size and self._running:
|
|
self._condition_space_available.wait(self._mutex)
|
|
|
|
if not self._running:
|
|
return
|
|
|
|
# Ensure we don't accept new items if stopping, especially when block=False
|
|
if not self._running:
|
|
return
|
|
|
|
# --- Soft Cleaning: Deduplication ---
|
|
# Remove redundant pending updates for the same image/size (e.g.
|
|
# rapid rotations)
|
|
if len(item) >= 5:
|
|
new_dev, new_inode, _, _, new_size = item[:5]
|
|
if self._queue:
|
|
# Rebuild deque excluding any item that matches the new one's key
|
|
self._queue = collections.deque(
|
|
q for q in self._queue
|
|
if not (len(q) >= 5 and
|
|
q[0] == new_dev and
|
|
q[1] == new_inode and
|
|
q[4] == new_size)
|
|
)
|
|
|
|
# Always append the new item (it becomes the authoritative version)
|
|
self._queue.append(item)
|
|
self._condition_new_data.wakeOne()
|
|
finally:
|
|
self._mutex.unlock()
|
|
|
|
def stop(self):
|
|
self._mutex.lock()
|
|
self._running = False
|
|
# Do not clear the queue here; let the run loop drain it to prevent data loss.
|
|
self._condition_new_data.wakeAll()
|
|
self._condition_space_available.wakeAll()
|
|
self._mutex.unlock()
|
|
self.wait()
|
|
|
|
def run(self):
|
|
self.setPriority(QThread.IdlePriority)
|
|
while self._running or self._queue:
|
|
self._mutex.lock()
|
|
if not self._queue:
|
|
if not self._running:
|
|
self._mutex.unlock()
|
|
break
|
|
# Wait for new items
|
|
self._condition_new_data.wait(self._mutex)
|
|
|
|
# Auto-flush: if queue has data but not enough for a full batch,
|
|
# wait up to 200ms for more data. If timeout, flush what we have.
|
|
# Only wait if running (flush immediately on stop)
|
|
if self._running and self._queue and len(self._queue) < 50:
|
|
signaled = self._condition_new_data.wait(self._mutex, 200)
|
|
if signaled and self._running and len(self._queue) < 50:
|
|
self._mutex.unlock()
|
|
continue
|
|
|
|
if not self._queue:
|
|
self._mutex.unlock()
|
|
continue
|
|
|
|
# Gather a batch of items
|
|
# Adaptive batch size: if queue is backing up, increase transaction size
|
|
# to improve throughput.
|
|
if not self._running:
|
|
# Flush everything if stopping
|
|
batch_limit = len(self._queue)
|
|
else:
|
|
batch_limit = self._max_size
|
|
|
|
batch = []
|
|
while self._queue and len(batch) < batch_limit:
|
|
batch.append(self._queue.popleft())
|
|
|
|
# Notify producers that space might be available
|
|
self._condition_space_available.wakeAll()
|
|
self._mutex.unlock()
|
|
|
|
if batch:
|
|
try:
|
|
self.cache._batch_write_to_lmdb(batch)
|
|
except Exception as e:
|
|
logger.error(f"CacheWriter batch write error: {e}")
|
|
|
|
|
|
class CacheLoader(QThread):
|
|
"""
|
|
Background thread to load and decode thumbnails from LMDB without blocking UI.
|
|
Uses LIFO queue (deque) to prioritize most recently requested images.
|
|
Implements a "drop-oldest" policy when full to ensure new requests (visible images)
|
|
are prioritized over old pending ones (scrolled away).
|
|
"""
|
|
def __init__(self, cache):
|
|
super().__init__()
|
|
self.cache = cache
|
|
# Use deque for LIFO behavior with drop-oldest capability
|
|
self._queue = collections.deque()
|
|
self._max_size = 50
|
|
self._mutex = QMutex()
|
|
self._condition = QWaitCondition()
|
|
self._running = True
|
|
|
|
def enqueue(self, item):
|
|
if not self._running:
|
|
return False, None
|
|
|
|
dropped_item = None
|
|
self._mutex.lock()
|
|
try:
|
|
# If queue is full, drop the OLDEST item (left) to make room for NEWEST
|
|
# (right).
|
|
# This ensures that during fast scrolling, we process what is currently
|
|
# on screen (just added) rather than what was on screen moments ago.
|
|
while len(self._queue) >= self._max_size:
|
|
# Drop oldest and return it so caller can cleanup state
|
|
dropped = self._queue.popleft()
|
|
dropped_item = (dropped[0], dropped[1]) # path, size
|
|
|
|
self._queue.append(item)
|
|
self._condition.wakeOne()
|
|
return True, dropped_item
|
|
finally:
|
|
self._mutex.unlock()
|
|
|
|
def promote(self, path, size):
|
|
"""Moves an item to the front of the line (end of deque) if exists."""
|
|
if not self._running:
|
|
return
|
|
|
|
self._mutex.lock()
|
|
try:
|
|
# Find item by path and size
|
|
for item in self._queue:
|
|
if item[0] == path and item[1] == size:
|
|
# Move to right end (LIFO pop side - highest priority)
|
|
self._queue.remove(item)
|
|
self._queue.append(item)
|
|
break
|
|
finally:
|
|
self._mutex.unlock()
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
self._mutex.lock()
|
|
self._condition.wakeAll()
|
|
self._mutex.unlock()
|
|
self.wait()
|
|
|
|
def run(self):
|
|
self.setPriority(QThread.IdlePriority)
|
|
while self._running:
|
|
self._mutex.lock()
|
|
if not self._queue:
|
|
# Wait up to 100ms for new items
|
|
self._condition.wait(self._mutex, 100)
|
|
|
|
if not self._queue:
|
|
self._mutex.unlock()
|
|
continue
|
|
|
|
# LIFO: Pop from right (newest)
|
|
item = self._queue.pop()
|
|
self._mutex.unlock()
|
|
|
|
path, size, mtime, inode, dev = item
|
|
|
|
# Call synchronous get_thumbnail to fetch and decode
|
|
# This puts the result into the RAM cache
|
|
img, _ = self.cache.get_thumbnail(
|
|
path, size, curr_mtime=mtime, inode=inode,
|
|
device_id=dev, async_load=False
|
|
)
|
|
|
|
if img:
|
|
self.cache.thumbnail_loaded.emit(path, size)
|
|
|
|
self.cache._mark_load_complete(path, size)
|
|
|
|
|
|
class GenerationFuture:
|
|
"""Helper class to synchronize threads waiting for a thumbnail."""
|
|
def __init__(self):
|
|
self._mutex = QMutex()
|
|
self._condition = QWaitCondition()
|
|
self._finished = False
|
|
|
|
def wait(self):
|
|
self._mutex.lock()
|
|
while not self._finished:
|
|
self._condition.wait(self._mutex)
|
|
self._mutex.unlock()
|
|
|
|
def complete(self):
|
|
self._mutex.lock()
|
|
self._finished = True
|
|
self._condition.wakeAll()
|
|
self._mutex.unlock()
|
|
|
|
|
|
class ThumbnailCache(QObject):
|
|
"""
|
|
Thread-safe in-memory thumbnail cache with LMDB disk persistence.
|
|
|
|
Optimization: Uses (device, inode) as LMDB key instead of file paths.
|
|
This gives:
|
|
- Smaller keys (16 bytes vs variable path length)
|
|
- Faster lookups and LMDB operations
|
|
- Automatic handling of file moves/renames
|
|
- Better cache efficiency
|
|
"""
|
|
|
|
thumbnail_loaded = Signal(str, int) # path, size
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
# In-memory cache: {path: {size: (QImage, mtime)}}
|
|
self._thumbnail_cache = {}
|
|
# Path -> (device, inode) mapping for fast lookup
|
|
self._path_to_inode = {}
|
|
self._loading_set = set() # Track pending async loads (path, size)
|
|
self._futures = {} # Map (dev, inode, size) -> GenerationFuture
|
|
self._futures_lock = QMutex()
|
|
self._cache_lock = QReadWriteLock()
|
|
self._db_lock = QMutex() # Lock specifically for _db_handles access
|
|
self._db_handles = {} # Cache for LMDB database handles (dbi)
|
|
self._cancel_loading = False
|
|
self._cache_bytes_size = 0
|
|
self._cache_writer = None
|
|
self._cache_loader = None
|
|
|
|
self.lmdb_open()
|
|
|
|
def lmdb_open(self):
|
|
# Initialize LMDB environment
|
|
cache_dir = Path(CONFIG_DIR)
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
# map_size: 1024MB max database size
|
|
# max_dbs: 512 named databases (one per device + main DB)
|
|
self._lmdb_env = lmdb.open(
|
|
CACHE_PATH,
|
|
map_size=DISK_CACHE_MAX_BYTES,
|
|
max_dbs=512,
|
|
readonly=False,
|
|
create=True
|
|
)
|
|
logger.info(f"LMDB cache opened: {CACHE_PATH}")
|
|
|
|
# Start the async writer thread
|
|
self._cache_writer = CacheWriter(self)
|
|
self._cache_writer.start()
|
|
|
|
# Start the async loader thread
|
|
self._cache_loader = CacheLoader(self)
|
|
self._cache_loader.start()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to open LMDB cache: {e}")
|
|
self._lmdb_env = None
|
|
|
|
def lmdb_close(self):
|
|
if hasattr(self, '_cache_writer') and self._cache_writer:
|
|
self._cache_writer.stop()
|
|
self._cache_writer = None
|
|
|
|
if hasattr(self, '_cache_loader') and self._cache_loader:
|
|
self._cache_loader.stop()
|
|
self._cache_loader = None
|
|
self._loading_set.clear()
|
|
self._futures.clear()
|
|
|
|
self._db_handles.clear()
|
|
if hasattr(self, '_lmdb_env') and self._lmdb_env:
|
|
self._lmdb_env.close()
|
|
self._lmdb_env = None
|
|
|
|
def __del__(self):
|
|
self.lmdb_close()
|
|
|
|
@staticmethod
|
|
def _get_inode_key(path):
|
|
"""Get inode (8 bytes) for a file path."""
|
|
try:
|
|
stat_info = os.stat(path)
|
|
# Pack inode as uint64 (8 bytes)
|
|
return struct.pack('Q', stat_info.st_ino)
|
|
except (OSError, AttributeError):
|
|
return None
|
|
|
|
@staticmethod
|
|
def _get_device_id(path):
|
|
"""Get device ID for a file path."""
|
|
try:
|
|
stat_info = os.stat(path)
|
|
return stat_info.st_dev
|
|
except OSError:
|
|
return 0
|
|
|
|
def _get_device_db(self, device_id, size, write=False, txn=None):
|
|
"""Get or create a named database for the given device."""
|
|
env = self._lmdb_env
|
|
if not env:
|
|
return None
|
|
|
|
db_name = f"dev_{device_id}_{size}".encode('utf-8')
|
|
|
|
# Protect access to _db_handles which is not thread-safe by default
|
|
self._db_lock.lock()
|
|
try:
|
|
# Return cached handle if available
|
|
if db_name in self._db_handles:
|
|
return self._db_handles[db_name]
|
|
|
|
# Not in cache, try to open/create
|
|
db = env.open_db(db_name, create=write, integerkey=False, txn=txn)
|
|
self._db_handles[db_name] = db
|
|
return db
|
|
|
|
except lmdb.NotFoundError:
|
|
# Expected when reading from non-existent DB (cache miss)
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error opening device DB for dev {device_id} "
|
|
f"size {size}: {e}")
|
|
return None
|
|
finally:
|
|
self._db_lock.unlock()
|
|
|
|
@contextmanager
|
|
def _write_lock(self):
|
|
"""Context manager for write-safe access to cache."""
|
|
self._cache_lock.lockForWrite()
|
|
try:
|
|
yield
|
|
finally:
|
|
self._cache_lock.unlock()
|
|
|
|
@contextmanager
|
|
def _read_lock(self):
|
|
"""Context manager for read-safe access to cache."""
|
|
self._cache_lock.lockForRead()
|
|
try:
|
|
yield
|
|
finally:
|
|
self._cache_lock.unlock()
|
|
|
|
def _ensure_cache_limit(self):
|
|
"""Enforces cache size limit by evicting oldest entries.
|
|
Must be called with a write lock held."""
|
|
# Safety limit: 512MB for thumbnails in RAM to prevent system freeze
|
|
MAX_RAM_BYTES = 512 * 1024 * 1024
|
|
|
|
while len(self._thumbnail_cache) > 0 and (
|
|
len(self._thumbnail_cache) >= CACHE_MAX_SIZE or
|
|
self._cache_bytes_size > MAX_RAM_BYTES):
|
|
oldest_path = next(iter(self._thumbnail_cache))
|
|
cached_sizes = self._thumbnail_cache.pop(oldest_path)
|
|
for img, _ in cached_sizes.values():
|
|
if img:
|
|
self._cache_bytes_size -= img.sizeInBytes()
|
|
self._path_to_inode.pop(oldest_path, None)
|
|
|
|
def _get_tier_for_size(self, requested_size):
|
|
"""Determines the ideal thumbnail tier based on the requested size."""
|
|
if requested_size < 192:
|
|
return 128
|
|
if requested_size < 320:
|
|
return 256
|
|
return 512
|
|
|
|
def _resolve_file_identity(self, path, curr_mtime, inode, device_id):
|
|
"""Helper to resolve file mtime, device, and inode."""
|
|
mtime = curr_mtime
|
|
dev_id = device_id if device_id is not None else 0
|
|
inode_key = struct.pack('Q', inode) if inode is not None else None
|
|
|
|
if mtime is None or dev_id == 0 or not inode_key:
|
|
try:
|
|
stat_res = os.stat(path)
|
|
mtime = stat_res.st_mtime
|
|
dev_id = stat_res.st_dev
|
|
inode_key = struct.pack('Q', stat_res.st_ino)
|
|
except OSError:
|
|
return None, 0, None
|
|
|
|
return mtime, dev_id, inode_key
|
|
|
|
def _queue_async_load(self, path, size, mtime, dev_id, inode_key):
|
|
"""Helper to queue async load."""
|
|
with self._write_lock():
|
|
if (path, size) in self._loading_set:
|
|
# If already queued, promote to process sooner (LIFO)
|
|
if self._cache_loader:
|
|
self._cache_loader.promote(path, size)
|
|
return
|
|
|
|
if not self._cache_loader:
|
|
return
|
|
|
|
inode_int = struct.unpack('Q', inode_key)[0] if inode_key else 0
|
|
|
|
# Optimistically add to set
|
|
self._loading_set.add((path, size))
|
|
success, dropped = self._cache_loader.enqueue(
|
|
(path, size, mtime, inode_int, dev_id))
|
|
|
|
if dropped:
|
|
self._loading_set.discard(dropped)
|
|
elif not success:
|
|
self._loading_set.discard((path, size))
|
|
|
|
def _check_disk_cache(self, path, search_order, mtime, dev_id, inode_key):
|
|
"""Helper to check LMDB synchronously."""
|
|
if not self._lmdb_env or not inode_key or dev_id == 0:
|
|
return None, 0
|
|
|
|
txn = None
|
|
try:
|
|
txn = self._lmdb_env.begin(write=False)
|
|
for size in search_order:
|
|
db = self._get_device_db(dev_id, size, write=False, txn=txn)
|
|
if not db:
|
|
continue
|
|
|
|
value_bytes = txn.get(inode_key, db=db)
|
|
if value_bytes and len(value_bytes) > 8:
|
|
stored_mtime = struct.unpack('d', value_bytes[:8])[0]
|
|
if stored_mtime != mtime:
|
|
continue
|
|
|
|
payload = value_bytes[8:]
|
|
if len(payload) > 4 and payload.startswith(b'PTH\0'):
|
|
path_len = struct.unpack('I', payload[4:8])[0]
|
|
img_data = payload[8 + path_len:]
|
|
else:
|
|
img_data = payload
|
|
|
|
img = QImage()
|
|
if img.loadFromData(img_data, "PNG"):
|
|
with self._write_lock():
|
|
if path not in self._thumbnail_cache:
|
|
self._ensure_cache_limit()
|
|
self._thumbnail_cache[path] = {}
|
|
self._thumbnail_cache[path][size] = (img, mtime)
|
|
self._cache_bytes_size += img.sizeInBytes()
|
|
self._path_to_inode[path] = (dev_id, inode_key)
|
|
return img, mtime
|
|
except Exception as e:
|
|
logger.debug(f"Cache lookup error for {path}: {e}")
|
|
finally:
|
|
if txn:
|
|
txn.abort()
|
|
|
|
return None, 0
|
|
|
|
def get_thumbnail(self, path, requested_size, curr_mtime=None,
|
|
inode=None, device_id=None, async_load=False):
|
|
"""
|
|
Safely retrieve a thumbnail from cache, finding the best available size.
|
|
Returns: tuple (QImage or None, mtime) or (None, 0) if not found.
|
|
"""
|
|
# 1. Determine the ideal tier and create a prioritized search order.
|
|
target_tier = self._get_tier_for_size(requested_size)
|
|
search_order = [target_tier] + \
|
|
sorted([s for s in THUMBNAIL_SIZES if s > target_tier]) + \
|
|
sorted([s for s in THUMBNAIL_SIZES if s < target_tier], reverse=True)
|
|
|
|
# 2. Resolve file identity (mtime, dev, inode)
|
|
mtime, dev_id, inode_key = self._resolve_file_identity(
|
|
path, curr_mtime, inode, device_id)
|
|
|
|
if mtime is None:
|
|
return None, 0
|
|
|
|
# 3. Check memory cache (fastest)
|
|
with self._read_lock():
|
|
cached_sizes = self._thumbnail_cache.get(path)
|
|
if cached_sizes:
|
|
for size in search_order:
|
|
if size in cached_sizes:
|
|
img, cached_mtime = cached_sizes[size]
|
|
if img and not img.isNull() and cached_mtime == mtime:
|
|
return img, mtime
|
|
|
|
# 4. Handle Async Request
|
|
if async_load:
|
|
self._queue_async_load(path, target_tier, mtime, dev_id, inode_key)
|
|
return None, 0
|
|
|
|
# 5. Check Disk Cache (Sync fallback)
|
|
return self._check_disk_cache(path, search_order, mtime, dev_id, inode_key)
|
|
|
|
def _mark_load_complete(self, path, size):
|
|
"""Remove item from pending loading set."""
|
|
with self._write_lock():
|
|
# Uncomment to trace individual completions:
|
|
# logger.debug(f"Load complete: {path}")
|
|
self._loading_set.discard((path, size))
|
|
|
|
def set_thumbnail(self, path, img, mtime, size, inode=None, device_id=None,
|
|
block=False):
|
|
"""Safely store a thumbnail of a specific size in cache."""
|
|
if not img or img.isNull() or size not in THUMBNAIL_SIZES:
|
|
return False
|
|
|
|
img_size = img.sizeInBytes()
|
|
|
|
if inode is not None and device_id is not None:
|
|
dev_id = device_id
|
|
inode_key = struct.pack('Q', inode)
|
|
else:
|
|
dev_id = self._get_device_id(path)
|
|
inode_key = self._get_inode_key(path)
|
|
|
|
if not inode_key or dev_id == 0:
|
|
return False
|
|
|
|
with self._write_lock():
|
|
if path not in self._thumbnail_cache:
|
|
self._ensure_cache_limit()
|
|
self._thumbnail_cache[path] = {}
|
|
else:
|
|
# Move to end to mark as recently used (LRU behavior)
|
|
# We pop and re-insert the existing entry
|
|
entry = self._thumbnail_cache.pop(path)
|
|
self._thumbnail_cache[path] = entry
|
|
|
|
# If replacing, subtract old size
|
|
if size in self._thumbnail_cache.get(path, {}):
|
|
old_img, _ = self._thumbnail_cache[path][size]
|
|
if old_img:
|
|
self._cache_bytes_size -= old_img.sizeInBytes()
|
|
|
|
self._thumbnail_cache[path][size] = (img, mtime)
|
|
self._path_to_inode[path] = (dev_id, inode_key)
|
|
self._cache_bytes_size += img_size
|
|
|
|
# Enqueue asynchronous write to LMDB
|
|
if self._cache_writer:
|
|
self._cache_writer.enqueue(
|
|
(dev_id, inode_key, img, mtime, size, path), block=block)
|
|
return True
|
|
|
|
def invalidate_path(self, path):
|
|
"""Removes all cached data for a specific path from memory and disk."""
|
|
inode_info = None
|
|
with self._write_lock():
|
|
if path in self._thumbnail_cache:
|
|
cached_sizes = self._thumbnail_cache.pop(path)
|
|
for img, _ in cached_sizes.values():
|
|
if img:
|
|
self._cache_bytes_size -= img.sizeInBytes()
|
|
inode_info = self._path_to_inode.pop(path, None)
|
|
|
|
device_id, inode_key = inode_info if inode_info else (None, None)
|
|
self._delete_from_lmdb(path, device_id=device_id, inode_key=inode_key)
|
|
|
|
def remove_if_missing(self, paths_to_check):
|
|
"""Remove cache entries for files that no longer exist on disk."""
|
|
to_remove = []
|
|
|
|
with self._read_lock():
|
|
cached_paths = list(self._thumbnail_cache.keys())
|
|
|
|
for path in cached_paths:
|
|
if not os.path.exists(path):
|
|
to_remove.append(path)
|
|
|
|
if to_remove:
|
|
entries_to_delete = []
|
|
with self._write_lock():
|
|
for path in to_remove:
|
|
if path in self._thumbnail_cache:
|
|
cached_sizes = self._thumbnail_cache.pop(path)
|
|
for img, _ in cached_sizes.values():
|
|
if img:
|
|
self._cache_bytes_size -= img.sizeInBytes()
|
|
inode_info = self._path_to_inode.pop(path, None)
|
|
entries_to_delete.append((path, inode_info))
|
|
# Delete from LMDB outside the lock
|
|
for path, inode_info in entries_to_delete:
|
|
device_id, inode_key = inode_info if inode_info else (None, None)
|
|
self._delete_from_lmdb(path, device_id=device_id, inode_key=inode_key)
|
|
logger.info(f"Removed {len(to_remove)} missing entries from cache")
|
|
|
|
return len(to_remove)
|
|
|
|
def clean_orphans(self, stop_check=None):
|
|
"""
|
|
Scans the LMDB database for entries where the original file no longer exists.
|
|
This is a heavy operation and should be run in a background thread.
|
|
"""
|
|
if not self._lmdb_env:
|
|
return 0
|
|
|
|
orphans_removed = 0
|
|
|
|
# 1. Get all named databases (one per device/size)
|
|
db_names = []
|
|
try:
|
|
with self._lmdb_env.begin(write=False) as txn:
|
|
cursor = txn.cursor()
|
|
for key, _ in cursor:
|
|
if key.startswith(b'dev_'):
|
|
db_names.append(key)
|
|
except Exception as e:
|
|
logger.error(f"Error listing DBs: {e}")
|
|
return 0
|
|
|
|
# 2. Iterate each DB and check paths
|
|
for db_name in db_names:
|
|
if stop_check and stop_check():
|
|
return orphans_removed
|
|
|
|
try:
|
|
# Parse device ID from name "dev_{id}_{size}"
|
|
parts = db_name.decode('utf-8').split('_')
|
|
if len(parts) >= 3:
|
|
db_dev = int(parts[1])
|
|
else:
|
|
db_dev = 0
|
|
|
|
# Parse device ID from name "dev_{id}_{size}"
|
|
# We open the DB to scan its keys
|
|
with self._lmdb_env.begin(write=True) as txn:
|
|
db = self._lmdb_env.open_db(db_name, txn=txn, create=False)
|
|
cursor = txn.cursor(db=db)
|
|
for key, value in cursor:
|
|
if stop_check and stop_check():
|
|
return orphans_removed
|
|
|
|
if len(value) > 12 and value[8:12] == b'PTH\0':
|
|
path_len = struct.unpack('I', value[12:16])[0]
|
|
path_bytes = value[16:16+path_len]
|
|
path = path_bytes.decode('utf-8', errors='ignore')
|
|
|
|
should_delete = False
|
|
try:
|
|
st = os.stat(path)
|
|
key_inode = struct.unpack('Q', key)[0]
|
|
if st.st_dev != db_dev or st.st_ino != key_inode:
|
|
should_delete = True
|
|
except OSError:
|
|
should_delete = True
|
|
|
|
if should_delete:
|
|
cursor.delete()
|
|
orphans_removed += 1
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning DB {db_name}: {e}")
|
|
|
|
return orphans_removed
|
|
|
|
def clear_cache(self):
|
|
"""Clear both in-memory and LMDB cache."""
|
|
with self._write_lock():
|
|
self._thumbnail_cache.clear()
|
|
self._path_to_inode.clear()
|
|
self._cache_bytes_size = 0
|
|
|
|
self.lmdb_close()
|
|
try:
|
|
if os.path.exists(CACHE_PATH):
|
|
shutil.rmtree(CACHE_PATH)
|
|
self.lmdb_open()
|
|
logger.info("LMDB cache cleared by removing directory.")
|
|
except Exception as e:
|
|
logger.error(f"Error clearing LMDB by removing directory: {e}")
|
|
|
|
def _batch_write_to_lmdb(self, batch):
|
|
"""Write a batch of thumbnails to LMDB in a single transaction."""
|
|
env = self._lmdb_env
|
|
if not env or not batch:
|
|
return
|
|
|
|
data_to_write = []
|
|
|
|
# 1. Prepare data (image encoding) outside the transaction lock
|
|
# This is CPU intensive, better done without holding the DB lock if possible,
|
|
# though LMDB write lock mostly blocks other writers.
|
|
for item in batch:
|
|
# Small sleep to yield GIL/CPU to UI thread during heavy batch encoding
|
|
QThread.msleep(1)
|
|
|
|
if len(item) == 6:
|
|
device_id, inode_key, img, mtime, size, path = item
|
|
else:
|
|
device_id, inode_key, img, mtime, size = item
|
|
path = None
|
|
|
|
if not img or img.isNull():
|
|
continue
|
|
|
|
try:
|
|
img_data = self._image_to_bytes(img)
|
|
if not img_data:
|
|
continue
|
|
|
|
# Pack mtime as a double (8 bytes) and prepend to image data
|
|
mtime_bytes = struct.pack('d', mtime)
|
|
|
|
# New format: mtime(8) + 'PTH\0'(4) + len(4) + path + img
|
|
if path:
|
|
path_encoded = path.encode('utf-8')
|
|
header = mtime_bytes + b'PTH\0' + \
|
|
struct.pack('I', len(path_encoded)) + path_encoded
|
|
value_bytes = header + img_data
|
|
else:
|
|
value_bytes = mtime_bytes + img_data
|
|
|
|
data_to_write.append((device_id, size, inode_key, value_bytes))
|
|
except Exception as e:
|
|
logger.error(f"Error converting image for LMDB: {e}")
|
|
|
|
if not data_to_write:
|
|
return
|
|
|
|
# 2. Commit to DB in one transaction
|
|
try:
|
|
with env.begin(write=True) as txn:
|
|
for device_id, size, inode_key, value_bytes in data_to_write:
|
|
# Ensure DB exists (creates if needed) using the current transaction
|
|
db = self._get_device_db(device_id, size, write=True, txn=txn)
|
|
if db:
|
|
try:
|
|
txn.put(inode_key, value_bytes, db=db)
|
|
except lmdb.Error as e:
|
|
# Handle potential stale DB handles (EINVAL)
|
|
# This happens if a previous transaction created the handle
|
|
# but aborted.
|
|
if "Invalid argument" in str(e):
|
|
db_name = f"dev_{device_id}_{size}".encode('utf-8')
|
|
self._db_lock.lock()
|
|
if db_name in self._db_handles:
|
|
del self._db_handles[db_name]
|
|
self._db_lock.unlock()
|
|
# Retry open and put with fresh handle
|
|
db = self._get_device_db(
|
|
device_id, size, write=True, txn=txn)
|
|
if db:
|
|
txn.put(inode_key, value_bytes, db=db)
|
|
else:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error committing batch to LMDB: {e}")
|
|
# If transaction failed, handles created within it are now invalid.
|
|
# Clear cache to be safe.
|
|
self._db_lock.lock()
|
|
self._db_handles.clear()
|
|
self._db_lock.unlock()
|
|
|
|
def _delete_from_lmdb(self, path, device_id=None, inode_key=None):
|
|
"""Delete all thumbnail sizes for a path from LMDB."""
|
|
env = self._lmdb_env
|
|
if not env:
|
|
return
|
|
|
|
if device_id is None or inode_key is None:
|
|
device_id = self._get_device_id(path)
|
|
inode_key = self._get_inode_key(path)
|
|
|
|
if not inode_key or device_id == 0:
|
|
return
|
|
|
|
for size in THUMBNAIL_SIZES:
|
|
try:
|
|
db = self._get_device_db(device_id, size, write=True)
|
|
if not db:
|
|
continue
|
|
|
|
with env.begin(write=True) as txn:
|
|
txn.delete(inode_key, db=db)
|
|
except Exception as e:
|
|
logger.error(f"Error deleting from LMDB for size {size}: {e}")
|
|
|
|
@staticmethod
|
|
def _image_to_bytes(img):
|
|
"""Convert QImage to PNG bytes."""
|
|
buf = None
|
|
try:
|
|
ba = QByteArray()
|
|
buf = QBuffer(ba)
|
|
if not buf.open(QIODevice.WriteOnly):
|
|
logger.error("Failed to open buffer for image conversion")
|
|
return None
|
|
|
|
if not img.save(buf, "PNG"):
|
|
logger.error("Failed to save image to buffer")
|
|
return None
|
|
return ba.data()
|
|
except Exception as e:
|
|
logger.error(f"Error converting image to bytes: {e}")
|
|
return None
|
|
finally:
|
|
if buf:
|
|
buf.close()
|
|
|
|
def get_cache_stats(self):
|
|
"""Get current cache statistics."""
|
|
with self._read_lock():
|
|
# Count all thumbnails, including different sizes of the same image.
|
|
count = sum(len(sizes) for sizes in self._thumbnail_cache.values())
|
|
size = self._cache_bytes_size
|
|
return count, size
|
|
|
|
def rename_entry(self, old_path, new_path):
|
|
"""Move a cache entry from one path to another."""
|
|
# Get new identity to check for cross-filesystem moves
|
|
try:
|
|
stat_info = os.stat(new_path)
|
|
new_dev = stat_info.st_dev
|
|
new_inode_key = struct.pack('Q', stat_info.st_ino)
|
|
except OSError:
|
|
return False
|
|
|
|
entries_to_rewrite = []
|
|
old_inode_info = None
|
|
|
|
with self._write_lock():
|
|
if old_path not in self._thumbnail_cache:
|
|
return False
|
|
|
|
self._thumbnail_cache[new_path] = self._thumbnail_cache.pop(old_path)
|
|
old_inode_info = self._path_to_inode.pop(old_path, None)
|
|
|
|
if old_inode_info:
|
|
self._path_to_inode[new_path] = (new_dev, new_inode_key)
|
|
# Always prepare data to persist to update the embedded path in LMDB
|
|
for size, (img, mtime) in self._thumbnail_cache[new_path].items():
|
|
entries_to_rewrite.append((size, img, mtime))
|
|
|
|
# Perform LMDB operations outside the lock
|
|
if not old_inode_info:
|
|
return False
|
|
|
|
old_dev, old_inode = old_inode_info
|
|
|
|
# Only delete old key if physical identity changed (cross-filesystem moves)
|
|
if old_inode_info != (new_dev, new_inode_key):
|
|
self._delete_from_lmdb(old_path, device_id=old_dev, inode_key=old_inode)
|
|
|
|
if self._cache_writer:
|
|
for size, img, mtime in entries_to_rewrite:
|
|
self._cache_writer.enqueue(
|
|
(new_dev, new_inode_key, img, mtime, size, new_path),
|
|
block=False)
|
|
|
|
return True
|
|
|
|
@contextmanager
|
|
def generation_lock(self, path, size, curr_mtime=None, inode=None,
|
|
device_id=None):
|
|
"""
|
|
Context manager to coordinate thumbnail generation between threads.
|
|
Prevents double work: if one thread is generating, others wait.
|
|
|
|
Yields:
|
|
bool: True if the caller should generate the thumbnail.
|
|
False if the caller waited and should check cache again.
|
|
"""
|
|
# Resolve identity for locking key
|
|
mtime, dev_id, inode_key = self._resolve_file_identity(
|
|
path, curr_mtime, inode, device_id)
|
|
|
|
if not inode_key or dev_id == 0:
|
|
# Cannot lock reliably without stable ID, allow generation
|
|
yield True
|
|
return
|
|
|
|
key = (dev_id, inode_key, size)
|
|
future = None
|
|
owner = False
|
|
|
|
self._futures_lock.lock()
|
|
if key in self._futures:
|
|
future = self._futures[key]
|
|
else:
|
|
future = GenerationFuture()
|
|
self._futures[key] = future
|
|
owner = True
|
|
self._futures_lock.unlock()
|
|
|
|
if owner:
|
|
try:
|
|
yield True
|
|
finally:
|
|
future.complete()
|
|
self._futures_lock.lock()
|
|
if key in self._futures and self._futures[key] is future:
|
|
del self._futures[key]
|
|
self._futures_lock.unlock()
|
|
else:
|
|
# Another thread is generating, wait for it
|
|
future.wait()
|
|
yield False
|
|
|
|
|
|
class CacheCleaner(QThread):
|
|
"""Background thread to remove cache entries for deleted files."""
|
|
finished_clean = Signal(int)
|
|
|
|
def __init__(self, cache):
|
|
super().__init__()
|
|
self.cache = cache
|
|
self._is_running = True
|
|
|
|
def stop(self):
|
|
"""Signals the thread to stop."""
|
|
self._is_running = False
|
|
|
|
def run(self):
|
|
self.setPriority(QThread.IdlePriority)
|
|
if not self._is_running:
|
|
return
|
|
# Perform deep cleaning of LMDB
|
|
removed_count = self.cache.remove_if_missing([])
|
|
if self._is_running:
|
|
removed_count += self.cache.clean_orphans(
|
|
stop_check=lambda: not self._is_running)
|
|
self.finished_clean.emit(removed_count)
|
|
|
|
|
|
class ThumbnailRunnable(QRunnable):
|
|
"""Runnable task to generate a single thumbnail."""
|
|
def __init__(self, cache, path, size, signal_emitter):
|
|
super().__init__()
|
|
self.cache = cache
|
|
self.path = path
|
|
self.size = size
|
|
self.emitter = signal_emitter
|
|
|
|
def run(self):
|
|
try:
|
|
# Optimization: Single stat call per file
|
|
stat_res = os.stat(self.path)
|
|
curr_mtime = stat_res.st_mtime
|
|
inode = stat_res.st_ino
|
|
dev = stat_res.st_dev
|
|
|
|
# Check cache first to avoid expensive generation
|
|
thumb, mtime = self.cache.get_thumbnail(
|
|
self.path, self.size, curr_mtime=curr_mtime,
|
|
inode=inode, device_id=dev, async_load=False)
|
|
|
|
if not thumb or mtime != curr_mtime:
|
|
# Use the generation lock to coordinate
|
|
with self.cache.generation_lock(
|
|
self.path, self.size, curr_mtime, inode, dev) as should_gen:
|
|
if should_gen:
|
|
# I am the owner, I generate the thumbnail
|
|
new_thumb = generate_thumbnail(self.path, self.size)
|
|
if new_thumb and not new_thumb.isNull():
|
|
self.cache.set_thumbnail(
|
|
self.path, new_thumb, curr_mtime, self.size,
|
|
inode=inode, device_id=dev, block=True)
|
|
except Exception as e:
|
|
logger.error(f"Error generating thumbnail for {self.path}: {e}")
|
|
finally:
|
|
self.emitter.emit_progress()
|
|
|
|
|
|
class ThumbnailGenerator(QThread):
|
|
"""
|
|
Background thread to generate thumbnails for a specific size for a list of
|
|
already discovered files.
|
|
"""
|
|
generation_complete = Signal()
|
|
progress = Signal(int, int) # processed, total
|
|
|
|
class SignalEmitter(QObject):
|
|
"""Helper to emit signals from runnables to the main thread."""
|
|
progress_tick = Signal()
|
|
|
|
def emit_progress(self):
|
|
self.progress_tick.emit()
|
|
|
|
def __init__(self, cache, paths, size):
|
|
super().__init__()
|
|
self.cache = cache
|
|
self.paths = paths
|
|
self.size = size
|
|
self._abort = False
|
|
|
|
def stop(self):
|
|
"""Stops the worker thread gracefully."""
|
|
self._abort = True
|
|
self.wait()
|
|
|
|
def run(self):
|
|
"""
|
|
Main execution loop. Uses a thread pool to process paths in parallel.
|
|
"""
|
|
pool = QThreadPool()
|
|
max_threads = APP_CONFIG.get(
|
|
"generation_threads",
|
|
SCANNER_SETTINGS_DEFAULTS.get("generation_threads", 4))
|
|
pool.setMaxThreadCount(max_threads)
|
|
|
|
emitter = self.SignalEmitter()
|
|
processed_count = 0
|
|
total = len(self.paths)
|
|
|
|
def on_tick():
|
|
nonlocal processed_count
|
|
processed_count += 1
|
|
if processed_count % 5 == 0 or processed_count == total:
|
|
self.progress.emit(processed_count, total)
|
|
|
|
# Use a direct connection or queued connection depending on context,
|
|
# but since we are in QThread.run, we can connect local slot.
|
|
# However, runnables run in pool threads. We need thread-safe update.
|
|
# The signal/slot mechanism handles thread safety automatically.
|
|
emitter.progress_tick.connect(on_tick, Qt.QueuedConnection)
|
|
|
|
for path in self.paths:
|
|
if self._abort:
|
|
break
|
|
runnable = ThumbnailRunnable(self.cache, path, self.size, emitter)
|
|
pool.start(runnable)
|
|
|
|
pool.waitForDone()
|
|
self.generation_complete.emit()
|
|
|
|
|
|
class ImageScanner(QThread):
|
|
"""
|
|
Background thread for scanning directories and loading images.
|
|
"""
|
|
# List of tuples (path, QImage_thumb, mtime, tags, rating)
|
|
# Updated tuple: (path, QImage_thumb, mtime, tags, rating, inode, device_id)
|
|
images_found = Signal(list)
|
|
progress_msg = Signal(str)
|
|
progress_percent = Signal(int)
|
|
finished_scan = Signal(int) # Total images found
|
|
more_files_available = Signal(int, int) # Last loaded index, remainder
|
|
|
|
def __init__(self, cache, paths, is_file_list=False, viewers=None):
|
|
# is_file_list is not used
|
|
if not paths or not isinstance(paths, (list, tuple)):
|
|
logger.warning("ImageScanner initialized with empty or invalid paths")
|
|
paths = []
|
|
super().__init__()
|
|
self.cache = cache
|
|
self.all_files = []
|
|
self._viewers = viewers
|
|
self._seen_files = set()
|
|
self._is_file_list = is_file_list
|
|
if self._is_file_list:
|
|
self.paths = []
|
|
for p in paths:
|
|
if os.path.isfile(p):
|
|
p_str = str(p)
|
|
if p_str not in self._seen_files:
|
|
self.all_files.append(p_str)
|
|
self._seen_files.add(p_str)
|
|
path = os.path.dirname(p)
|
|
if path not in self.paths:
|
|
self.paths.append(path)
|
|
else:
|
|
self.paths.append(p)
|
|
else:
|
|
self.paths = paths
|
|
self._is_running = True
|
|
self._auto_load_enabled = False
|
|
self.count = 0
|
|
self.index = 0
|
|
self._paused = False
|
|
self.mutex = QMutex()
|
|
self.condition = QWaitCondition()
|
|
self.pending_tasks = []
|
|
self._priority_queue = collections.deque()
|
|
self._processed_paths = set()
|
|
|
|
# Initial load
|
|
self.pending_tasks.append((0, APP_CONFIG.get(
|
|
"scan_batch_size", SCANNER_SETTINGS_DEFAULTS["scan_batch_size"])))
|
|
self._last_update_time = 0
|
|
|
|
logger.info(f"ImageScanner initialized with {len(paths)} paths")
|
|
|
|
def set_auto_load(self, enabled):
|
|
"""Enable or disable automatic loading of all subsequent images."""
|
|
self._auto_load_enabled = enabled
|
|
|
|
def set_paused(self, paused):
|
|
"""Pauses or resumes the scanning process."""
|
|
self._paused = paused
|
|
|
|
def prioritize(self, paths):
|
|
"""Adds paths to the priority queue for immediate processing."""
|
|
self.mutex.lock()
|
|
for p in paths:
|
|
self._priority_queue.append(p)
|
|
self.mutex.unlock()
|
|
|
|
def run(self):
|
|
# LowPriority ensures UI thread gets CPU time to process events (mouse, draw)
|
|
self.setPriority(QThread.IdlePriority)
|
|
self.progress_msg.emit(UITexts.SCANNING_DIRS)
|
|
self.scan_files()
|
|
|
|
while self._is_running:
|
|
self.mutex.lock()
|
|
while not self.pending_tasks and self._is_running:
|
|
self.condition.wait(self.mutex)
|
|
|
|
if not self._is_running:
|
|
self.mutex.unlock()
|
|
break
|
|
|
|
i, to_load = self.pending_tasks.pop(0)
|
|
self.mutex.unlock()
|
|
|
|
self._process_images(i, to_load)
|
|
|
|
def _update_viewers(self, force=False):
|
|
if not self._viewers:
|
|
return
|
|
|
|
current_time = time.time()
|
|
# Throttle updates to avoid saturating the event loop (max 5 times/sec)
|
|
if not force and (current_time - self._last_update_time < 0.2):
|
|
return
|
|
self._last_update_time = current_time
|
|
|
|
# Image viewers standalone unique sort, this must be improved.
|
|
all_files_sorted_by_name_ascending = sorted(self.all_files)
|
|
# Iterate over a copy to avoid runtime errors if list changes in main thread
|
|
for w in list(self._viewers):
|
|
try:
|
|
if isinstance(w, ImageViewer):
|
|
QTimer.singleShot(
|
|
0, w, lambda viewer=w, files=all_files_sorted_by_name_ascending:
|
|
viewer.update_image_list(files))
|
|
except (RuntimeError, Exception):
|
|
# Handle cases where viewer might be closing/deleted (RuntimeError) or
|
|
# other issues
|
|
pass
|
|
|
|
def scan_files(self):
|
|
for path in self.paths:
|
|
if not self._is_running:
|
|
return
|
|
try:
|
|
if path.startswith("file://"):
|
|
path = path[7:]
|
|
elif path.startswith("file:/"):
|
|
path = path[6:]
|
|
|
|
# Check if path exists before resolving absolute path to avoid
|
|
# creating fake paths for search queries.
|
|
expanded_path = os.path.expanduser(path)
|
|
if os.path.exists(expanded_path):
|
|
p = Path(os.path.abspath(expanded_path))
|
|
if p.is_file():
|
|
p_str = str(p)
|
|
if p_str not in self._seen_files:
|
|
self.all_files.append(p_str)
|
|
self._seen_files.add(p_str)
|
|
elif p.is_dir():
|
|
self._scan_directory(p, 0)
|
|
self._update_viewers()
|
|
else:
|
|
self._search(path)
|
|
self._update_viewers()
|
|
# logger.warning(f"Path not found: {p}")
|
|
except Exception as e:
|
|
logger.error(f"Error scanning path {path}: {e}")
|
|
|
|
# Ensure final update reaches viewers
|
|
self._update_viewers(force=True)
|
|
|
|
def _scan_directory(self, dir_path, current_depth):
|
|
if not self._is_running or current_depth > APP_CONFIG.get(
|
|
"scan_max_level", SCANNER_SETTINGS_DEFAULTS["scan_max_level"]):
|
|
return
|
|
try:
|
|
for item in dir_path.iterdir():
|
|
if not self._is_running:
|
|
return
|
|
if item.is_file() and item.suffix.lower() in IMAGE_EXTENSIONS:
|
|
p = os.path.abspath(str(item))
|
|
if p not in self._seen_files:
|
|
self.all_files.append(p)
|
|
self._seen_files.add(p)
|
|
self._update_viewers()
|
|
elif item.is_dir():
|
|
self._scan_directory(item, current_depth + 1)
|
|
except (PermissionError, OSError):
|
|
pass
|
|
|
|
def _parse_query(self, query):
|
|
parser = argparse.ArgumentParser(prog="bagheera-search", add_help=False)
|
|
|
|
# Main arguments
|
|
parser.add_argument("query", nargs="?", default="")
|
|
parser.add_argument("-d", "--directory")
|
|
parser.add_argument("-e", "--exclude", nargs="?", const="",
|
|
default=None)
|
|
parser.add_argument("-l", "--limit", type=int)
|
|
parser.add_argument("-o", "--offset", type=int)
|
|
parser.add_argument("-r", "--recursive", nargs="?", const="", default=None)
|
|
parser.add_argument("-x", "--recursive-exclude", nargs="?", const="",
|
|
default=None)
|
|
parser.add_argument("-s", "--sort")
|
|
parser.add_argument("-t", "--type")
|
|
|
|
# Date filters
|
|
parser.add_argument("--day", type=int)
|
|
parser.add_argument("--month", type=int)
|
|
parser.add_argument("--year", type=int)
|
|
|
|
try:
|
|
args_list = shlex.split(str(query))
|
|
args, unknown_args = parser.parse_known_args(args_list)
|
|
|
|
if args.day is not None and args.month is None:
|
|
raise ValueError("Missing --month (required when --day is used)")
|
|
|
|
if args.month is not None and args.year is None:
|
|
raise ValueError("Missing --year (required when --month is used)")
|
|
|
|
query_parts = [args.query] if args.query else []
|
|
if unknown_args:
|
|
query_parts.extend(unknown_args)
|
|
|
|
query_text = " ".join(query_parts)
|
|
|
|
# Build options dictionary
|
|
main_options = {}
|
|
if args.recursive is not None:
|
|
main_options["type"] = "folder"
|
|
else:
|
|
if args.limit is not None:
|
|
main_options["limit"] = args.limit
|
|
if args.offset is not None:
|
|
main_options["offset"] = args.offset
|
|
if args.type:
|
|
main_options["type"] = args.type
|
|
|
|
if args.directory:
|
|
main_options["directory"] = args.directory
|
|
if args.year is not None:
|
|
main_options["year"] = args.year
|
|
if args.month is not None:
|
|
main_options["month"] = args.month
|
|
if args.day is not None:
|
|
main_options["day"] = args.day
|
|
if args.sort:
|
|
main_options["sort"] = args.sort
|
|
if args.exclude and args.exclude == '':
|
|
args.recursive_exclude = None
|
|
if args.recursive_exclude and args.recursive_exclude == '':
|
|
args.recursive_exclude = None
|
|
other_options = {
|
|
"exclude": args.exclude,
|
|
"id": False,
|
|
"konsole": False,
|
|
"limit": args.limit if args.limit and args.recursive is not None
|
|
else 99999999999,
|
|
"offset": args.offset if args.offset and args.recursive is not None
|
|
else 0,
|
|
"recursive": args.recursive,
|
|
"recursive_indent": "",
|
|
"recursive_exclude": args.recursive_exclude,
|
|
"sort": args.sort,
|
|
"type": args.type if args.recursive is not None else None,
|
|
"verbose": False,
|
|
}
|
|
|
|
return query_text, main_options, other_options
|
|
|
|
except ValueError as e:
|
|
print(f"Arguments error: {e}")
|
|
return None, []
|
|
except Exception as e:
|
|
print(f"Unexpected error parsing query: {e}")
|
|
return None, []
|
|
|
|
def _search(self, query):
|
|
engine = APP_CONFIG.get("search_engine", "Bagheera")
|
|
if HAVE_BAGHEERASEARCH_LIB and (engine == "Bagheera" or not SEARCH_CMD):
|
|
query_text, main_options, other_options = self._parse_query(query)
|
|
try:
|
|
searcher = BagheeraSearcher()
|
|
for item in searcher.search(query_text, main_options, other_options):
|
|
if not self._is_running:
|
|
break
|
|
p = item["path"].strip()
|
|
if p and os.path.exists(os.path.expanduser(p)):
|
|
if p not in self._seen_files:
|
|
self.all_files.append(p)
|
|
self._seen_files.add(p)
|
|
self._update_viewers()
|
|
except Exception as e:
|
|
print(f"Error during bagheerasearch library call: {e}")
|
|
|
|
elif SEARCH_CMD:
|
|
try:
|
|
cmd = SEARCH_CMD + shlex.split(str(query))
|
|
out = subprocess.check_output(cmd, text=True).splitlines()
|
|
for p in out:
|
|
if not self._is_running:
|
|
break
|
|
p = p.strip()
|
|
if p.startswith("file://"):
|
|
p = p[7:]
|
|
if p and os.path.exists(os.path.expanduser(p)):
|
|
if p not in self._seen_files:
|
|
self.all_files.append(p)
|
|
self._seen_files.add(p)
|
|
self._update_viewers()
|
|
except Exception as e:
|
|
print(f"Error during {SEARCH_CMD} subprocess call: {e}")
|
|
|
|
def load_images(self, i, to_load):
|
|
if i < 0:
|
|
i = 0
|
|
if i >= len(self.all_files):
|
|
return
|
|
self.mutex.lock()
|
|
self.pending_tasks.append((i, to_load))
|
|
self.condition.wakeAll()
|
|
self.mutex.unlock()
|
|
|
|
def _process_images(self, i, to_load):
|
|
|
|
if i >= len(self.all_files):
|
|
self.finished_scan.emit(self.count)
|
|
return
|
|
|
|
images_loaded = 0
|
|
batch = []
|
|
while i < len(self.all_files):
|
|
|
|
if not self._is_running:
|
|
return
|
|
self.msleep(1) # Force yield to UI thread per item
|
|
|
|
while self._paused and self._is_running:
|
|
self.msleep(100)
|
|
|
|
# 1. Check priority queue first
|
|
priority_path = None
|
|
self.mutex.lock()
|
|
while self._priority_queue:
|
|
p = self._priority_queue.popleft()
|
|
if p not in self._processed_paths and p in self._seen_files:
|
|
priority_path = p
|
|
break
|
|
self.mutex.unlock()
|
|
|
|
# 2. Determine file to process
|
|
if priority_path:
|
|
f_path = priority_path
|
|
# Don't increment 'i' yet, we are processing out of order
|
|
else:
|
|
f_path = self.all_files[i]
|
|
i += 1 # Only advance sequential index if processing sequentially
|
|
|
|
if f_path not in self._processed_paths \
|
|
and Path(f_path).suffix.lower() in IMAGE_EXTENSIONS:
|
|
# Pass the batch list to store result instead of emitting immediately
|
|
was_loaded = self._process_single_image(f_path, batch)
|
|
|
|
# Emit batch if size is enough (responsiveness optimization)
|
|
# Dynamic batching: Start small for instant feedback.
|
|
# Keep batches small enough to prevent UI starvation during rapid cache
|
|
# reads.
|
|
if self.count <= 100:
|
|
target_batch_size = 20
|
|
else:
|
|
target_batch_size = 200
|
|
|
|
if len(batch) >= target_batch_size:
|
|
|
|
self.images_found.emit(batch)
|
|
batch = []
|
|
# Yield briefly to let the main thread process the emitted batch
|
|
# (update UI), preventing UI freeze during fast cache reading.
|
|
self.msleep(10)
|
|
|
|
if was_loaded:
|
|
self._processed_paths.add(f_path)
|
|
images_loaded += 1
|
|
if images_loaded >= to_load and to_load > 0:
|
|
if batch: # Emit remaining items
|
|
self.images_found.emit(batch)
|
|
|
|
next_index = i + 1
|
|
total_files = len(self.all_files)
|
|
self.index = next_index
|
|
self.progress_msg.emit(UITexts.LOADED_PARTIAL.format(
|
|
self.count, total_files - next_index))
|
|
|
|
if total_files > 0:
|
|
percent = int((self.count / total_files) * 100)
|
|
self.progress_percent.emit(percent)
|
|
|
|
self.more_files_available.emit(next_index, total_files)
|
|
# This loads all images continuously without pausing only if
|
|
# explicitly requested
|
|
if self._auto_load_enabled:
|
|
self.load_images(
|
|
next_index,
|
|
APP_CONFIG.get("scan_batch_size",
|
|
SCANNER_SETTINGS_DEFAULTS[
|
|
"scan_batch_size"]))
|
|
return
|
|
|
|
if self.count % 10 == 0: # Update progress less frequently
|
|
self.progress_msg.emit(
|
|
UITexts.LOADING_SCAN.format(self.count, len(self.all_files)))
|
|
if len(self.all_files) > 0:
|
|
percent = int((self.count / len(self.all_files)) * 100)
|
|
self.progress_percent.emit(percent)
|
|
|
|
self.index = len(self.all_files)
|
|
if batch:
|
|
self.images_found.emit(batch)
|
|
self.progress_percent.emit(100)
|
|
self.finished_scan.emit(self.count)
|
|
|
|
def _load_metadata(self, path_or_fd):
|
|
"""Loads tag and rating data for a path or file descriptor."""
|
|
tags = []
|
|
|
|
raw_tags = XattrManager.get_attribute(path_or_fd, XATTR_NAME)
|
|
if raw_tags:
|
|
tags = sorted(list(set(t.strip()
|
|
for t in raw_tags.split(',') if t.strip())))
|
|
|
|
raw_rating = XattrManager.get_attribute(path_or_fd, RATING_XATTR_NAME, "0")
|
|
try:
|
|
rating = int(raw_rating)
|
|
except ValueError:
|
|
rating = 0
|
|
return tags, rating
|
|
|
|
def _process_single_image(self, f_path, batch_list):
|
|
from constants import SCANNER_GENERATE_SIZES
|
|
|
|
fd = None
|
|
try:
|
|
# Optimize: Open file once to reuse FD for stat and xattrs
|
|
fd = os.open(f_path, os.O_RDONLY)
|
|
stat_res = os.fstat(fd)
|
|
curr_mtime = stat_res.st_mtime
|
|
curr_inode = stat_res.st_ino
|
|
curr_dev = stat_res.st_dev
|
|
|
|
smallest_thumb_for_signal = None
|
|
|
|
# Ensure required thumbnails exist
|
|
for size in SCANNER_GENERATE_SIZES:
|
|
# Check if a valid thumbnail for this size exists
|
|
thumb, mtime = self.cache.get_thumbnail(f_path, size,
|
|
curr_mtime=curr_mtime,
|
|
inode=curr_inode,
|
|
device_id=curr_dev)
|
|
if not thumb or mtime != curr_mtime:
|
|
# Use generation lock to prevent multiple threads generating the
|
|
# same thumb
|
|
with self.cache.generation_lock(
|
|
f_path, size, curr_mtime,
|
|
curr_inode, curr_dev) as should_gen:
|
|
if should_gen:
|
|
# I am the owner, I generate the thumbnail
|
|
new_thumb = generate_thumbnail(f_path, size)
|
|
if new_thumb and not new_thumb.isNull():
|
|
self.cache.set_thumbnail(
|
|
f_path, new_thumb, curr_mtime, size,
|
|
inode=curr_inode, device_id=curr_dev, block=True)
|
|
if size == min(SCANNER_GENERATE_SIZES):
|
|
smallest_thumb_for_signal = new_thumb
|
|
else:
|
|
# Another thread generated it, re-fetch to use it for the
|
|
# signal
|
|
if size == min(SCANNER_GENERATE_SIZES):
|
|
re_thumb, _ = self.cache.get_thumbnail(
|
|
f_path, size, curr_mtime=curr_mtime,
|
|
inode=curr_inode, device_id=curr_dev,
|
|
async_load=False)
|
|
smallest_thumb_for_signal = re_thumb
|
|
elif size == min(SCANNER_GENERATE_SIZES):
|
|
# valid thumb exists, use it for signal
|
|
smallest_thumb_for_signal = thumb
|
|
|
|
tags, rating = self._load_metadata(fd)
|
|
batch_list.append((f_path, smallest_thumb_for_signal,
|
|
curr_mtime, tags, rating, curr_inode, curr_dev))
|
|
self.count += 1
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error processing image {f_path}: {e}")
|
|
return False
|
|
finally:
|
|
if fd is not None:
|
|
try:
|
|
os.close(fd)
|
|
except OSError:
|
|
pass
|
|
|
|
def stop(self):
|
|
self._is_running = False
|
|
self.mutex.lock()
|
|
self.condition.wakeAll()
|
|
self.mutex.unlock()
|
|
self.wait()
|