First commit

This commit is contained in:
Ignacio Serantes
2026-03-22 18:13:22 +01:00
commit 3fb55ee4f3
19 changed files with 1928 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
from .bagheera_search import BagheeraSearcher
def search(query):
"""Interfaz simplificada para la librería."""
bs = BagheeraSearcher()
return bs.search(query)

View File

@@ -0,0 +1,313 @@
"""
Bagheera Search Library
A Python interface for the Baloo search wrapper.
"""
import ctypes
import json
import re
import sys
from pathlib import Path
from typing import Dict, Any, Iterator, Optional, Union
from baloo_tools import get_resolution
from bagheera_query_parser_lib import parse_date
class BagheeraSearcher:
"""Class to handle Baloo searches and interact with the C wrapper."""
def __init__(self, lib_path: Optional[Union[str, Path]] = None) -> None:
self.ids_processed: set[int] = set()
self.baloo_lib = self._load_baloo_wrapper(lib_path)
def _load_baloo_wrapper(self, custom_path: Optional[Union[str, Path]]) \
-> ctypes.CDLL:
"""Loads and configures the Baloo C wrapper library."""
if custom_path:
lib_path = Path(custom_path)
else:
lib_name = "libbaloo_wrapper.so"
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
base_dir = Path(getattr(sys, '_MEIPASS')) / 'lib'
else:
base_dir = Path(__file__).parent.absolute()
search_paths = [base_dir]
if sys.prefix != sys.base_prefix:
venv_base = Path(sys.prefix)
search_paths.append(venv_base / "lib64")
search_paths.append(venv_base / "lib")
search_paths.extend([
Path("/lib64"),
Path("/lib"),
Path("/usr/lib64"),
Path("/usr/lib"),
Path("/usr/local/lib64"),
Path("/usr/local/lib")
])
lib_path = None
for path in search_paths:
potential_path = path / lib_name
if potential_path.exists():
lib_path = potential_path
break
if lib_path is None or not lib_path.exists():
raise FileNotFoundError(
f"ERROR: Baloo wrapper '{lib_name}' not found at {search_paths}"
)
lib = ctypes.CDLL(str(lib_path))
lib.execute_baloo_query.argtypes = [ctypes.c_char_p]
lib.execute_baloo_query.restype = ctypes.c_char_p
lib.get_file_properties.argtypes = [ctypes.c_char_p]
lib.get_file_properties.restype = ctypes.c_char_p
return lib
def check_keywords(
self, text: str, query: str, file_path: str = "", file_id: int = 0
) -> bool:
"""
Evaluates if a text meets a logical query.
Supports: AND, OR, ( ), dimensions (width=height, etc.), and shapes.
"""
if file_path:
try:
w, h = get_resolution(file_id)
except Exception:
w, h = -1, -1
def replace_dim(match: re.Match) -> str:
if w <= 0 or h <= 0:
return "__false__"
s = match.group(0).upper()
if "PORTRAIT" in s:
return "__true__" if w < h else "__false__"
if "LANDSCAPE" in s:
return "__true__" if w > h else "__false__"
if "SQUARE" in s:
return "__true__" if w == h else "__false__"
op = match.group(1)
ops_map = {
"=": w == h,
">": w > h,
"<": w < h,
">=": w >= h,
"<=": w <= h,
"!=": w != h,
}
return "__true__" if ops_map.get(op, False) else "__false__"
query = re.sub(
r"\b(PORTRAIT|LANDSCAPE|SQUARE)\b",
replace_dim,
query,
flags=re.IGNORECASE,
)
query = re.sub(
r"\bwidth\s*(<=|>=|!=|<|>|=)\s*height\b",
replace_dim,
query,
flags=re.IGNORECASE,
)
text = text.lower()
query = re.sub(r"(?<=\w)\s+(?=\w)", " AND ", query)
tokens = re.findall(r"\(|\)|OR|AND|[^\s()]+", query)
regex_parts = []
for t in tokens:
if t in ("(", ")"):
regex_parts.append(t)
elif t == "OR":
regex_parts.append("|")
elif t == "AND":
continue
elif t == "__true__":
regex_parts.append("(?=.*)")
elif t == "__false__":
regex_parts.append("(?!)")
else:
regex_parts.append(rf"(?=.*{re.escape(t)})")
final_regex = "".join(regex_parts).lower()
try:
return bool(re.search(f"^{final_regex}.*", text, re.DOTALL))
except re.error:
return False
def get_baloo_info(self, file_path: str) -> Dict[str, str]:
"""Retrieves properties for a specific file from Baloo."""
result = self.baloo_lib.get_file_properties(file_path.encode("utf-8"))
if not result:
return {}
data_raw = result.decode("utf-8")
properties = {}
for entry in data_raw.split("|"):
if ":" in entry:
k, v = entry.split(":", 1)
properties[k] = v
return properties
def _execute_query(self, options: Dict[str, Any]) -> list:
"""Helper method to execute the query against the C wrapper."""
query_json = json.dumps(options).encode("utf-8")
result_ptr = self.baloo_lib.execute_baloo_query(query_json)
if not result_ptr:
return []
try:
raw_results = result_ptr.decode("utf-8")
return json.loads(raw_results)
except json.JSONDecodeError as e:
print(f"JSON decode error from Baloo wrapper: {e}")
return []
def search_recursive(
self,
query_text: str,
options: Dict[str, Any],
search_opts: Dict[str, Any],
files_count: int,
) -> Iterator[Dict[str, Any]]:
"""Executes a recursive search yielded item by item."""
options["query"] = query_text
files = self._execute_query(options)
for item in files:
if search_opts.get("limit", 0) <= 0:
break
file_id = int(item["id"], 16)
if file_id in self.ids_processed:
continue
self.ids_processed.add(file_id)
rec_exclude = search_opts.get("recursive_exclude")
if not rec_exclude or not self.check_keywords(
item["path"], rec_exclude, item["path"], file_id
):
if files_count >= search_opts.get("offset", 0):
search_opts["limit"] -= 1
yield item
files_count += 1
def search(
self,
query_text: str,
main_options: Dict[str, Any],
search_opts: Dict[str, Any],
) -> Iterator[Dict[str, Any]]:
"""
Main search generator. Yields file dictionaries.
"""
main_options["query"] = parse_date(query_text)
files = self._execute_query(main_options)
if not files:
return
is_recursive = search_opts.get("recursive") is not None
if is_recursive:
if search_opts.get("type"):
main_options["type"] = search_opts["type"]
elif "type" in main_options:
main_options.pop("type")
rec_query = search_opts.get("recursive")
query_text = parse_date(rec_query) if rec_query else ""
files_count = 0
for item in files:
if search_opts.get("limit", 0) <= 0:
break
file_id = int(item["id"], 16)
if file_id in self.ids_processed:
continue
self.ids_processed.add(file_id)
exclude_pattern = search_opts.get("exclude")
if not exclude_pattern or not self.check_keywords(
item["path"], exclude_pattern, item["path"], file_id
):
if is_recursive:
main_options["directory"] = item["path"]
yield from self.search_recursive(
query_text, main_options, search_opts, files_count
)
else:
yield item
files_count += 1
def reset_state(self) -> None:
"""Clears the processed IDs to allow for fresh consecutive searches."""
self.ids_processed.clear()
if __name__ == "__main__":
# Test de integración rápido
print(f"Testing {__file__} integration:")
try:
searcher = BagheeraSearcher()
print("✔ Library and wrapper loaded successfully.")
# Intento de búsqueda de prueba (limitado a 1 resultado)
test_main_opts = {"limit": 1}
test_search_opts = {"limit": 1}
print("Searching for recent files...")
results = list(searcher.search("MODIFIED TODAY", test_main_opts,
test_search_opts))
if results:
print(f"✔ Found: {results[0].get('path')}")
else:
print("? No files found for today, but search executed correctly.")
except FileNotFoundError as e:
print(f"✘ Setup error: {e}")
except Exception as e:
print(f"✘ Unexpected error: {e}")
if __name__ == "__main__":
# Integration test block
print(f"Testing {__file__} integration:")
try:
searcher = BagheeraSearcher()
print("✔ Library and wrapper loaded successfully.")
# Test search (limited to 1 result for today)
test_main_opts = {"limit": 1}
test_search_opts = {"limit": 1}
print("Searching for recent files...")
results = list(searcher.search(
"MODIFIED TODAY", test_main_opts, test_search_opts
))
if results:
print(f"✔ Found: {results[0].get('path')}")
else:
print("? No files found for today, but search executed correctly.")
except FileNotFoundError as e:
print(f"✘ Setup error: {e}")
except Exception as e:
print(f"✘ Unexpected error: {e}")

View File

@@ -0,0 +1,284 @@
"""
Bagheera Search Library
A Python interface for the Baloo search wrapper.
"""
import ctypes
import json
import re
import sys
from pathlib import Path
from typing import Dict, Any, Iterator, Optional, Union
from baloo_tools import get_resolution
from date_query_parser import parse_date
class BagheeraSearcher:
"""Class to handle Baloo searches and interact with the C wrapper."""
def __init__(self, lib_path: Optional[Union[str, Path]] = None) -> None:
self.ids_processed: set[int] = set()
self.baloo_lib = self._load_baloo_wrapper(lib_path)
def _load_baloo_wrapper(self, custom_path: Optional[Union[str, Path]]) \
-> ctypes.CDLL:
"""Loads and configures the Baloo C wrapper library."""
if custom_path:
lib_path = Path(custom_path)
else:
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
current_dir = Path(getattr(sys, '_MEIPASS')) / 'lib'
else:
current_dir = Path(__file__).parent.absolute()
lib_name = "libbaloo_wrapper.so"
lib_path = current_dir / lib_name
if not lib_path.exists():
raise FileNotFoundError(
f"ERROR: Baloo wrapper '{lib_path.name}' not found at {lib_path}"
)
lib = ctypes.CDLL(str(lib_path))
lib.execute_baloo_query.argtypes = [ctypes.c_char_p]
lib.execute_baloo_query.restype = ctypes.c_char_p
lib.get_file_properties.argtypes = [ctypes.c_char_p]
lib.get_file_properties.restype = ctypes.c_char_p
return lib
def check_keywords(
self, text: str, query: str, file_path: str = "", file_id: int = 0
) -> bool:
"""
Evaluates if a text meets a logical query.
Supports: AND, OR, ( ), dimensions (width=height, etc.), and shapes.
"""
if file_path:
try:
w, h = get_resolution(file_id)
except Exception:
w, h = -1, -1
def replace_dim(match: re.Match) -> str:
if w <= 0 or h <= 0:
return "__false__"
s = match.group(0).upper()
if "PORTRAIT" in s:
return "__true__" if w < h else "__false__"
if "LANDSCAPE" in s:
return "__true__" if w > h else "__false__"
if "SQUARE" in s:
return "__true__" if w == h else "__false__"
op = match.group(1)
ops_map = {
"=": w == h,
">": w > h,
"<": w < h,
">=": w >= h,
"<=": w <= h,
"!=": w != h,
}
return "__true__" if ops_map.get(op, False) else "__false__"
query = re.sub(
r"\b(PORTRAIT|LANDSCAPE|SQUARE)\b",
replace_dim,
query,
flags=re.IGNORECASE,
)
query = re.sub(
r"\bwidth\s*(<=|>=|!=|<|>|=)\s*height\b",
replace_dim,
query,
flags=re.IGNORECASE,
)
text = text.lower()
query = re.sub(r"(?<=\w)\s+(?=\w)", " AND ", query)
tokens = re.findall(r"\(|\)|OR|AND|[^\s()]+", query)
regex_parts = []
for t in tokens:
if t in ("(", ")"):
regex_parts.append(t)
elif t == "OR":
regex_parts.append("|")
elif t == "AND":
continue
elif t == "__true__":
regex_parts.append("(?=.*)")
elif t == "__false__":
regex_parts.append("(?!)")
else:
regex_parts.append(rf"(?=.*{re.escape(t)})")
final_regex = "".join(regex_parts)
try:
return bool(re.search(f"^{final_regex}.*", text, re.DOTALL))
except re.error:
return False
def get_baloo_info(self, file_path: str) -> Dict[str, str]:
"""Retrieves properties for a specific file from Baloo."""
result = self.baloo_lib.get_file_properties(file_path.encode("utf-8"))
if not result:
return {}
data_raw = result.decode("utf-8")
properties = {}
for entry in data_raw.split("|"):
if ":" in entry:
k, v = entry.split(":", 1)
properties[k] = v
return properties
def _execute_query(self, options: Dict[str, Any]) -> list:
"""Helper method to execute the query against the C wrapper."""
query_json = json.dumps(options).encode("utf-8")
result_ptr = self.baloo_lib.execute_baloo_query(query_json)
if not result_ptr:
return []
try:
raw_results = result_ptr.decode("utf-8")
return json.loads(raw_results)
except json.JSONDecodeError as e:
print(f"JSON decode error from Baloo wrapper: {e}")
return []
def search_recursive(
self,
query_text: str,
options: Dict[str, Any],
search_opts: Dict[str, Any],
files_count: int,
) -> Iterator[Dict[str, Any]]:
"""Executes a recursive search yielded item by item."""
options["query"] = query_text
files = self._execute_query(options)
for item in files:
if search_opts.get("limit", 0) <= 0:
break
file_id = int(item["id"], 16)
if file_id in self.ids_processed:
continue
self.ids_processed.add(file_id)
rec_exclude = search_opts.get("recursive_exclude")
if not rec_exclude or not self.check_keywords(
item["path"], rec_exclude, item["path"], file_id
):
if files_count >= search_opts.get("offset", 0):
search_opts["limit"] -= 1
yield item
files_count += 1
def search(
self,
query_text: str,
main_options: Dict[str, Any],
search_opts: Dict[str, Any],
) -> Iterator[Dict[str, Any]]:
"""
Main search generator. Yields file dictionaries.
"""
main_options["query"] = parse_date(query_text)
files = self._execute_query(main_options)
if not files:
return
is_recursive = search_opts.get("recursive") is not None
if is_recursive:
if search_opts.get("type"):
main_options["type"] = search_opts["type"]
elif "type" in main_options:
main_options.pop("type")
rec_query = search_opts.get("recursive")
query_text = parse_date(rec_query) if rec_query else ""
files_count = 0
for item in files:
if search_opts.get("limit", 0) <= 0:
break
file_id = int(item["id"], 16)
if file_id in self.ids_processed:
continue
self.ids_processed.add(file_id)
exclude_pattern = search_opts.get("exclude")
if not exclude_pattern or not self.check_keywords(
item["path"], exclude_pattern, item["path"], file_id
):
if is_recursive:
main_options["directory"] = item["path"]
yield from self.search_recursive(
query_text, main_options, search_opts, files_count
)
else:
yield item
files_count += 1
def reset_state(self) -> None:
"""Clears the processed IDs to allow for fresh consecutive searches."""
self.ids_processed.clear()
# from bagheera_search_lib import BagheeraSearcher
#
# def main():
# # ... tu lógica de argparse existente ...
#
# try:
# # Inicializamos la librería
# searcher = BagheeraSearcher()
#
# # Consumimos el generador
# for file_info in searcher.search(query_text, main_options, other_options):
# output = file_info['path']
# if other_options.get('konsole'):
# output = f"file:/'{output}'"
# if other_options.get('id'):
# output += f" [ID: {file_info['id']}]"
#
# print(output)
#
# except FileNotFoundError as e:
# print(e)
# sys.exit(1)
#
# if __name__ == "__main__":
# try:
# # Inicializamos la librería
# searcher = BagheeraSearcher()
# # Consumimos el generador
# for file_info in searcher.search(query_text, main_options, other_options):
# output = file_info['path']
# if other_options.get('konsole'):
# output = f"file:/'{output}'"
# if other_options.get('id'):
# output += f" [ID: {file_info['id']}]"
# print(output)
# except FileNotFoundError as e:
# print(e)
# sys.exit(1)