From c3342100e0ea71dcb70adfe88169ff1f6be4c8f9 Mon Sep 17 00:00:00 2001 From: scawful Date: Tue, 30 Dec 2025 12:33:07 -0500 Subject: [PATCH] plugin: add dataset and resource indexing --- README.md | 20 ++ docs/STATUS.md | 6 +- scripts/build_dataset_registry.py | 15 ++ scripts/rebuild_resource_index.py | 17 ++ src/afs_scawful/__init__.py | 15 +- src/afs_scawful/__main__.py | 7 + src/afs_scawful/cli.py | 99 ++++++++++ src/afs_scawful/paths.py | 53 ++++++ src/afs_scawful/registry.py | 135 ++++++++++++++ src/afs_scawful/resource_index.py | 294 ++++++++++++++++++++++++++++++ 10 files changed, 657 insertions(+), 4 deletions(-) create mode 100644 scripts/build_dataset_registry.py create mode 100644 scripts/rebuild_resource_index.py create mode 100644 src/afs_scawful/__main__.py create mode 100644 src/afs_scawful/cli.py create mode 100644 src/afs_scawful/paths.py create mode 100644 src/afs_scawful/registry.py create mode 100644 src/afs_scawful/resource_index.py diff --git a/README.md b/README.md index b54441b..b950ca2 100644 --- a/README.md +++ b/README.md @@ -10,3 +10,23 @@ Docs: - `docs/STATUS.md` - `docs/ROADMAP.md` - `docs/REPO_FACTS.json` + +Quickstart: +- `python -m afs_scawful datasets index` +- `python -m afs_scawful resources index` + +Mounts (AFS Studio): +- Create `mounts.json` in `~/.config/afs/afs_scawful/` or `~/.config/afs/plugins/afs_scawful/config/` +- Optional override: `AFS_SCAWFUL_MOUNTS=/path/to/mounts.json` +- Mount entries are user-specific; keep this file out of version control. + +Example `mounts.json`: +```json +{ + "mounts": [ + { "name": "Projects", "path": "~/projects" }, + { "name": "Training", "path": "~/Mounts/windows-training" }, + { "name": "Reference", "path": "~/docs/reference" } + ] +} +``` diff --git a/docs/STATUS.md b/docs/STATUS.md index 4453bdf..e97411c 100644 --- a/docs/STATUS.md +++ b/docs/STATUS.md @@ -1,7 +1,7 @@ # STATUS Stage: Prototype -Now: package stub; guardrails; config helpers for training paths/resources. -Not yet: plugin features; generators. -Next: minimal plugin layout; one small utility. +Now: config helpers; dataset registry builder; resource indexer. +Not yet: generators; training runtime. +Next: hook registry/index outputs into AFS Studio. Issues: no runtime yet. diff --git a/scripts/build_dataset_registry.py b/scripts/build_dataset_registry.py new file mode 100644 index 0000000..92d9acd --- /dev/null +++ b/scripts/build_dataset_registry.py @@ -0,0 +1,15 @@ +"""Build the dataset registry for AFS Scawful.""" + +from __future__ import annotations + +from afs_scawful.registry import index_datasets + + +def main() -> int: + path = index_datasets() + print(f"dataset_registry: {path}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/rebuild_resource_index.py b/scripts/rebuild_resource_index.py new file mode 100644 index 0000000..88e1eaa --- /dev/null +++ b/scripts/rebuild_resource_index.py @@ -0,0 +1,17 @@ +"""Rebuild the resource index for AFS Scawful.""" + +from __future__ import annotations + +from afs_scawful.resource_index import ResourceIndexer + + +def main() -> int: + indexer = ResourceIndexer() + result = indexer.build_index() + path = indexer.write_index(result) + print(f"resource_index: {path}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/afs_scawful/__init__.py b/src/afs_scawful/__init__.py index 2ca6331..6f4a5e6 100644 --- a/src/afs_scawful/__init__.py +++ b/src/afs_scawful/__init__.py @@ -3,5 +3,18 @@ __version__ = "0.0.0" from .config import load_training_paths, load_training_resources +from .paths import resolve_datasets_root, resolve_index_root, resolve_training_root +from .registry import build_dataset_registry, index_datasets, write_dataset_registry +from .resource_index import ResourceIndexer -__all__ = ["load_training_paths", "load_training_resources"] +__all__ = [ + "load_training_paths", + "load_training_resources", + "resolve_training_root", + "resolve_datasets_root", + "resolve_index_root", + "build_dataset_registry", + "write_dataset_registry", + "index_datasets", + "ResourceIndexer", +] diff --git a/src/afs_scawful/__main__.py b/src/afs_scawful/__main__.py new file mode 100644 index 0000000..ab1c4fd --- /dev/null +++ b/src/afs_scawful/__main__.py @@ -0,0 +1,7 @@ +"""AFS Scawful module entry point.""" + +from .cli import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/afs_scawful/cli.py b/src/afs_scawful/cli.py new file mode 100644 index 0000000..184ccd3 --- /dev/null +++ b/src/afs_scawful/cli.py @@ -0,0 +1,99 @@ +"""AFS Scawful command-line helpers.""" + +from __future__ import annotations + +import argparse +from pathlib import Path +from typing import Iterable + +from .registry import index_datasets, build_dataset_registry, write_dataset_registry +from .resource_index import ResourceIndexer +from .paths import resolve_datasets_root, resolve_index_root + + +def _datasets_index_command(args: argparse.Namespace) -> int: + datasets_root = ( + Path(args.root).expanduser().resolve() if args.root else resolve_datasets_root() + ) + output_path = ( + Path(args.output).expanduser().resolve() + if args.output + else resolve_index_root() / "dataset_registry.json" + ) + registry = build_dataset_registry(datasets_root) + write_dataset_registry(registry, output_path) + print(f"dataset_registry: {output_path}") + return 0 + + +def _resources_index_command(args: argparse.Namespace) -> int: + indexer = ResourceIndexer( + index_path=Path(args.output).expanduser().resolve() + if args.output + else None, + resource_roots=[Path(path).expanduser().resolve() for path in args.root] + if args.root + else None, + search_patterns=args.pattern if args.pattern else None, + exclude_patterns=args.exclude if args.exclude else None, + ) + result = indexer.build_index() + output_path = indexer.write_index(result) + print(f"resource_index: {output_path}") + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="afs_scawful") + subparsers = parser.add_subparsers(dest="command") + + datasets_parser = subparsers.add_parser("datasets", help="Dataset registry tools.") + datasets_sub = datasets_parser.add_subparsers(dest="datasets_command") + + datasets_index = datasets_sub.add_parser("index", help="Build dataset registry.") + datasets_index.add_argument("--root", help="Datasets root override.") + datasets_index.add_argument("--output", help="Output registry path.") + datasets_index.set_defaults(func=_datasets_index_command) + + resources_parser = subparsers.add_parser("resources", help="Resource index tools.") + resources_sub = resources_parser.add_subparsers(dest="resources_command") + + resources_index = resources_sub.add_parser("index", help="Build resource index.") + resources_index.add_argument( + "--root", + action="append", + help="Resource root override (repeatable).", + ) + resources_index.add_argument( + "--pattern", + action="append", + help="Search pattern override (repeatable).", + ) + resources_index.add_argument( + "--exclude", + action="append", + help="Exclude pattern override (repeatable).", + ) + resources_index.add_argument("--output", help="Output index path.") + resources_index.set_defaults(func=_resources_index_command) + + return parser + + +def main(argv: Iterable[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + if not getattr(args, "command", None): + parser.print_help() + return 1 + if args.command == "datasets" and not getattr(args, "datasets_command", None): + parser.print_help() + return 1 + if args.command == "resources" and not getattr(args, "resources_command", None): + parser.print_help() + return 1 + return args.func(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/afs_scawful/paths.py b/src/afs_scawful/paths.py new file mode 100644 index 0000000..0b0badb --- /dev/null +++ b/src/afs_scawful/paths.py @@ -0,0 +1,53 @@ +"""AFS Scawful training path helpers.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from .config import load_training_paths + + +def default_training_root() -> Path: + candidate = Path.home() / "src" / "training" + if candidate.exists(): + return candidate + return Path.home() / ".context" / "training" + + +def resolve_training_root(config_path: Path | None = None) -> Path: + data = load_training_paths(config_path=config_path) + paths: dict[str, Any] = data.get("paths", {}) if isinstance(data, dict) else {} + training_root = paths.get("training_root") or paths.get("training") + if isinstance(training_root, Path): + return training_root + if isinstance(training_root, str) and training_root: + return Path(training_root).expanduser().resolve() + datasets = paths.get("datasets") + if isinstance(datasets, Path): + return datasets.parent + if isinstance(datasets, str) and datasets: + return Path(datasets).expanduser().resolve().parent + return default_training_root() + + +def resolve_datasets_root(config_path: Path | None = None) -> Path: + data = load_training_paths(config_path=config_path) + paths: dict[str, Any] = data.get("paths", {}) if isinstance(data, dict) else {} + datasets = paths.get("datasets") + if isinstance(datasets, Path): + return datasets + if isinstance(datasets, str) and datasets: + return Path(datasets).expanduser().resolve() + return resolve_training_root(config_path=config_path) / "datasets" + + +def resolve_index_root(config_path: Path | None = None) -> Path: + data = load_training_paths(config_path=config_path) + paths: dict[str, Any] = data.get("paths", {}) if isinstance(data, dict) else {} + index_root = paths.get("index_root") + if isinstance(index_root, Path): + return index_root + if isinstance(index_root, str) and index_root: + return Path(index_root).expanduser().resolve() + return resolve_training_root(config_path=config_path) / "index" diff --git a/src/afs_scawful/registry.py b/src/afs_scawful/registry.py new file mode 100644 index 0000000..13f6afe --- /dev/null +++ b/src/afs_scawful/registry.py @@ -0,0 +1,135 @@ +"""Dataset registry utilities for AFS Scawful.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any + +from .paths import resolve_datasets_root, resolve_index_root + + +@dataclass +class DatasetEntry: + name: str + path: Path + size_bytes: int + updated_at: str + files: list[str] + stats: dict[str, Any] | None = None + metadata: dict[str, Any] | None = None + + def to_dict(self) -> dict[str, Any]: + payload: dict[str, Any] = { + "name": self.name, + "path": str(self.path), + "size_bytes": self.size_bytes, + "updated_at": self.updated_at, + "files": list(self.files), + } + if self.stats: + payload["stats"] = self.stats + if self.metadata: + payload["metadata"] = self.metadata + return payload + + +def build_dataset_registry(datasets_root: Path) -> dict[str, Any]: + entries: list[DatasetEntry] = [] + if not datasets_root.exists(): + return { + "generated_at": datetime.now().isoformat(), + "datasets": [], + } + + for entry in sorted(datasets_root.iterdir()): + if entry.is_dir(): + dataset_entry = _build_dataset_entry(entry) + if dataset_entry: + entries.append(dataset_entry) + elif entry.is_file() and entry.suffix.lower() in {".jsonl", ".json"}: + dataset_entry = _build_file_dataset_entry(entry) + if dataset_entry: + entries.append(dataset_entry) + + return { + "generated_at": datetime.now().isoformat(), + "datasets": [entry.to_dict() for entry in entries], + } + + +def write_dataset_registry(registry: dict[str, Any], output_path: Path) -> Path: + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(json.dumps(registry, indent=2) + "\n", encoding="utf-8") + return output_path + + +def index_datasets( + datasets_root: Path | None = None, + output_path: Path | None = None, +) -> Path: + datasets_root = datasets_root or resolve_datasets_root() + output_path = output_path or resolve_index_root() / "dataset_registry.json" + registry = build_dataset_registry(datasets_root) + return write_dataset_registry(registry, output_path) + + +def _build_dataset_entry(dataset_dir: Path) -> DatasetEntry | None: + files = [file for file in dataset_dir.iterdir() if file.is_file()] + if not files: + return None + + known_files = { + "train.jsonl", + "val.jsonl", + "validation.jsonl", + "test.jsonl", + "accepted.jsonl", + "rejected.jsonl", + "stats.json", + "metadata.json", + "user_annotations.json", + } + if not any(file.name in known_files for file in files): + return None + + size_bytes = sum(file.stat().st_size for file in files) + latest_mtime = max(file.stat().st_mtime for file in files) + updated_at = datetime.fromtimestamp(latest_mtime).isoformat() + stats = _load_json(dataset_dir / "stats.json") + metadata = _load_json(dataset_dir / "metadata.json") + + return DatasetEntry( + name=dataset_dir.name, + path=dataset_dir, + size_bytes=size_bytes, + updated_at=updated_at, + files=[file.name for file in files], + stats=stats or None, + metadata=metadata or None, + ) + + +def _build_file_dataset_entry(dataset_file: Path) -> DatasetEntry | None: + size_bytes = dataset_file.stat().st_size + updated_at = datetime.fromtimestamp(dataset_file.stat().st_mtime).isoformat() + return DatasetEntry( + name=dataset_file.stem, + path=dataset_file, + size_bytes=size_bytes, + updated_at=updated_at, + files=[dataset_file.name], + stats=None, + metadata=None, + ) + + +def _load_json(path: Path) -> dict[str, Any]: + if not path.exists(): + return {} + try: + return json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return {} diff --git a/src/afs_scawful/resource_index.py b/src/afs_scawful/resource_index.py new file mode 100644 index 0000000..f392b79 --- /dev/null +++ b/src/afs_scawful/resource_index.py @@ -0,0 +1,294 @@ +"""Resource discovery and indexing for AFS Scawful.""" + +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any + +from .config import load_training_resources +from .paths import resolve_index_root + + +DEFAULT_SEARCH_PATTERNS = [ + "**/*.asm", + "**/*.md", + "**/*.txt", + "**/*.inc", + "**/*.s", + "**/*.65s", + "**/*.65c", + "**/*.c", + "**/*.h", + "**/*.cpp", + "**/*.cc", + "**/*.cs", + "**/*.pdf", +] + +DEFAULT_EXCLUDE_NAMES = { + "node_modules", + ".git", + "build", + "dist", + "__pycache__", + "venv", + ".venv", + "target", +} + + +@dataclass +class ResourceFile: + path: Path + file_type: str + size_bytes: int + last_modified: str + content_hash: str + source_dir: str + relative_path: str + metadata: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return { + "path": str(self.path), + "file_type": self.file_type, + "size_bytes": self.size_bytes, + "last_modified": self.last_modified, + "content_hash": self.content_hash, + "source_dir": self.source_dir, + "relative_path": self.relative_path, + "metadata": self.metadata, + } + + +@dataclass +class IndexResult: + total_files: int + by_type: dict[str, int] + by_source: dict[str, int] + files: list[ResourceFile] + duplicates_found: int + errors: list[str] + duration_seconds: float + indexed_at: str + + def to_dict(self) -> dict[str, Any]: + return { + "total_files": self.total_files, + "by_type": self.by_type, + "by_source": self.by_source, + "duplicates_found": self.duplicates_found, + "errors": self.errors, + "duration_seconds": self.duration_seconds, + "indexed_at": self.indexed_at, + } + + +class ResourceIndexer: + def __init__( + self, + *, + index_path: Path | None = None, + resource_roots: list[Path] | None = None, + search_patterns: list[str] | None = None, + exclude_patterns: list[str] | None = None, + ) -> None: + cfg = load_training_resources() + config = cfg.get("resource_discovery", {}) if isinstance(cfg, dict) else {} + + self.resource_roots = resource_roots or _parse_paths(config.get("resource_roots")) + self.search_patterns = search_patterns or _parse_patterns( + config.get("search_patterns") + ) + if not self.search_patterns: + self.search_patterns = list(DEFAULT_SEARCH_PATTERNS) + + self.exclude_patterns = exclude_patterns or _parse_patterns( + config.get("exclude_patterns") + ) + + self.index_path = index_path or _parse_index_path(config.get("index_path")) + if self.index_path is None: + self.index_path = resolve_index_root() / "resource_index.json" + + self._errors: list[str] = [] + self._hashes: set[str] = set() + + def build_index(self) -> IndexResult: + start = datetime.now() + files: list[ResourceFile] = [] + duplicates = 0 + + for root in self.resource_roots: + if not root.exists(): + self._errors.append(f"missing root: {root}") + continue + for pattern in self.search_patterns: + for path in root.rglob(pattern): + if not path.is_file(): + continue + if _should_exclude(path, self.exclude_patterns): + continue + resource = self._index_file(path, root) + if resource is None: + continue + if resource.content_hash and resource.content_hash in self._hashes: + duplicates += 1 + continue + if resource.content_hash: + self._hashes.add(resource.content_hash) + files.append(resource) + + by_type: dict[str, int] = {} + by_source: dict[str, int] = {} + for resource in files: + by_type[resource.file_type] = by_type.get(resource.file_type, 0) + 1 + source_name = Path(resource.source_dir).name + by_source[source_name] = by_source.get(source_name, 0) + 1 + + duration = (datetime.now() - start).total_seconds() + return IndexResult( + total_files=len(files), + by_type=by_type, + by_source=by_source, + files=files, + duplicates_found=duplicates, + errors=self._errors, + duration_seconds=duration, + indexed_at=datetime.now().isoformat(), + ) + + def write_index(self, result: IndexResult) -> Path: + payload = { + "metadata": result.to_dict(), + "files": [item.to_dict() for item in result.files], + } + self.index_path.parent.mkdir(parents=True, exist_ok=True) + self.index_path.write_text( + json.dumps(payload, indent=2) + "\n", + encoding="utf-8", + ) + return self.index_path + + def load_index(self) -> IndexResult | None: + if not self.index_path.exists(): + return None + try: + payload = json.loads(self.index_path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return None + files = [ + ResourceFile( + path=Path(item["path"]), + file_type=item["file_type"], + size_bytes=item["size_bytes"], + last_modified=item["last_modified"], + content_hash=item.get("content_hash", ""), + source_dir=item["source_dir"], + relative_path=item["relative_path"], + metadata=item.get("metadata", {}), + ) + for item in payload.get("files", []) + ] + meta = payload.get("metadata", {}) + return IndexResult( + total_files=meta.get("total_files", len(files)), + by_type=meta.get("by_type", {}), + by_source=meta.get("by_source", {}), + files=files, + duplicates_found=meta.get("duplicates_found", 0), + errors=meta.get("errors", []), + duration_seconds=meta.get("duration_seconds", 0.0), + indexed_at=meta.get("indexed_at", ""), + ) + + def _index_file(self, path: Path, source_root: Path) -> ResourceFile | None: + try: + stat = path.stat() + except OSError: + self._errors.append(f"stat failed: {path}") + return None + + file_type = _get_file_type(path) + content_hash = _hash_file(path) + relative = str(path.relative_to(source_root)) + return ResourceFile( + path=path, + file_type=file_type, + size_bytes=stat.st_size, + last_modified=datetime.fromtimestamp(stat.st_mtime).isoformat(), + content_hash=content_hash, + source_dir=str(source_root), + relative_path=relative, + ) + + +def _get_file_type(path: Path) -> str: + suffix = path.suffix.lower() + type_map = { + ".asm": "asm", + ".s": "asm", + ".65s": "asm", + ".65c": "asm", + ".inc": "asm_include", + ".md": "markdown", + ".txt": "text", + ".c": "c", + ".cc": "cpp", + ".cpp": "cpp", + ".h": "header", + ".cs": "csharp", + ".pdf": "pdf", + } + return type_map.get(suffix, "unknown") + + +def _hash_file(path: Path) -> str: + try: + content = path.read_bytes() + except OSError: + return "" + return hashlib.md5(content).hexdigest() + + +def _should_exclude(path: Path, patterns: list[str] | None) -> bool: + parts_lower = {part.lower() for part in path.parts} + for name in DEFAULT_EXCLUDE_NAMES: + if name.lower() in parts_lower: + return True + if not patterns: + return False + path_str = str(path) + for pattern in patterns: + if pattern and pattern in path_str: + return True + return False + + +def _parse_paths(raw: Any) -> list[Path]: + if not raw: + return [] + roots: list[Path] = [] + if isinstance(raw, list): + for item in raw: + if isinstance(item, str): + roots.append(Path(item).expanduser().resolve()) + return roots + + +def _parse_patterns(raw: Any) -> list[str]: + if not raw: + return [] + if isinstance(raw, list): + return [item for item in raw if isinstance(item, str)] + return [] + + +def _parse_index_path(raw: Any) -> Path | None: + if isinstance(raw, str) and raw: + return Path(raw).expanduser().resolve() + return None