plugin: add dataset and resource indexing
This commit is contained in:
20
README.md
20
README.md
@@ -10,3 +10,23 @@ Docs:
|
||||
- `docs/STATUS.md`
|
||||
- `docs/ROADMAP.md`
|
||||
- `docs/REPO_FACTS.json`
|
||||
|
||||
Quickstart:
|
||||
- `python -m afs_scawful datasets index`
|
||||
- `python -m afs_scawful resources index`
|
||||
|
||||
Mounts (AFS Studio):
|
||||
- Create `mounts.json` in `~/.config/afs/afs_scawful/` or `~/.config/afs/plugins/afs_scawful/config/`
|
||||
- Optional override: `AFS_SCAWFUL_MOUNTS=/path/to/mounts.json`
|
||||
- Mount entries are user-specific; keep this file out of version control.
|
||||
|
||||
Example `mounts.json`:
|
||||
```json
|
||||
{
|
||||
"mounts": [
|
||||
{ "name": "Projects", "path": "~/projects" },
|
||||
{ "name": "Training", "path": "~/Mounts/windows-training" },
|
||||
{ "name": "Reference", "path": "~/docs/reference" }
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# STATUS
|
||||
|
||||
Stage: Prototype
|
||||
Now: package stub; guardrails; config helpers for training paths/resources.
|
||||
Not yet: plugin features; generators.
|
||||
Next: minimal plugin layout; one small utility.
|
||||
Now: config helpers; dataset registry builder; resource indexer.
|
||||
Not yet: generators; training runtime.
|
||||
Next: hook registry/index outputs into AFS Studio.
|
||||
Issues: no runtime yet.
|
||||
|
||||
15
scripts/build_dataset_registry.py
Normal file
15
scripts/build_dataset_registry.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""Build the dataset registry for AFS Scawful."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from afs_scawful.registry import index_datasets
|
||||
|
||||
|
||||
def main() -> int:
|
||||
path = index_datasets()
|
||||
print(f"dataset_registry: {path}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
17
scripts/rebuild_resource_index.py
Normal file
17
scripts/rebuild_resource_index.py
Normal file
@@ -0,0 +1,17 @@
|
||||
"""Rebuild the resource index for AFS Scawful."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from afs_scawful.resource_index import ResourceIndexer
|
||||
|
||||
|
||||
def main() -> int:
|
||||
indexer = ResourceIndexer()
|
||||
result = indexer.build_index()
|
||||
path = indexer.write_index(result)
|
||||
print(f"resource_index: {path}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -3,5 +3,18 @@
|
||||
__version__ = "0.0.0"
|
||||
|
||||
from .config import load_training_paths, load_training_resources
|
||||
from .paths import resolve_datasets_root, resolve_index_root, resolve_training_root
|
||||
from .registry import build_dataset_registry, index_datasets, write_dataset_registry
|
||||
from .resource_index import ResourceIndexer
|
||||
|
||||
__all__ = ["load_training_paths", "load_training_resources"]
|
||||
__all__ = [
|
||||
"load_training_paths",
|
||||
"load_training_resources",
|
||||
"resolve_training_root",
|
||||
"resolve_datasets_root",
|
||||
"resolve_index_root",
|
||||
"build_dataset_registry",
|
||||
"write_dataset_registry",
|
||||
"index_datasets",
|
||||
"ResourceIndexer",
|
||||
]
|
||||
|
||||
7
src/afs_scawful/__main__.py
Normal file
7
src/afs_scawful/__main__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""AFS Scawful module entry point."""
|
||||
|
||||
from .cli import main
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
99
src/afs_scawful/cli.py
Normal file
99
src/afs_scawful/cli.py
Normal file
@@ -0,0 +1,99 @@
|
||||
"""AFS Scawful command-line helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
from .registry import index_datasets, build_dataset_registry, write_dataset_registry
|
||||
from .resource_index import ResourceIndexer
|
||||
from .paths import resolve_datasets_root, resolve_index_root
|
||||
|
||||
|
||||
def _datasets_index_command(args: argparse.Namespace) -> int:
|
||||
datasets_root = (
|
||||
Path(args.root).expanduser().resolve() if args.root else resolve_datasets_root()
|
||||
)
|
||||
output_path = (
|
||||
Path(args.output).expanduser().resolve()
|
||||
if args.output
|
||||
else resolve_index_root() / "dataset_registry.json"
|
||||
)
|
||||
registry = build_dataset_registry(datasets_root)
|
||||
write_dataset_registry(registry, output_path)
|
||||
print(f"dataset_registry: {output_path}")
|
||||
return 0
|
||||
|
||||
|
||||
def _resources_index_command(args: argparse.Namespace) -> int:
|
||||
indexer = ResourceIndexer(
|
||||
index_path=Path(args.output).expanduser().resolve()
|
||||
if args.output
|
||||
else None,
|
||||
resource_roots=[Path(path).expanduser().resolve() for path in args.root]
|
||||
if args.root
|
||||
else None,
|
||||
search_patterns=args.pattern if args.pattern else None,
|
||||
exclude_patterns=args.exclude if args.exclude else None,
|
||||
)
|
||||
result = indexer.build_index()
|
||||
output_path = indexer.write_index(result)
|
||||
print(f"resource_index: {output_path}")
|
||||
return 0
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(prog="afs_scawful")
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
datasets_parser = subparsers.add_parser("datasets", help="Dataset registry tools.")
|
||||
datasets_sub = datasets_parser.add_subparsers(dest="datasets_command")
|
||||
|
||||
datasets_index = datasets_sub.add_parser("index", help="Build dataset registry.")
|
||||
datasets_index.add_argument("--root", help="Datasets root override.")
|
||||
datasets_index.add_argument("--output", help="Output registry path.")
|
||||
datasets_index.set_defaults(func=_datasets_index_command)
|
||||
|
||||
resources_parser = subparsers.add_parser("resources", help="Resource index tools.")
|
||||
resources_sub = resources_parser.add_subparsers(dest="resources_command")
|
||||
|
||||
resources_index = resources_sub.add_parser("index", help="Build resource index.")
|
||||
resources_index.add_argument(
|
||||
"--root",
|
||||
action="append",
|
||||
help="Resource root override (repeatable).",
|
||||
)
|
||||
resources_index.add_argument(
|
||||
"--pattern",
|
||||
action="append",
|
||||
help="Search pattern override (repeatable).",
|
||||
)
|
||||
resources_index.add_argument(
|
||||
"--exclude",
|
||||
action="append",
|
||||
help="Exclude pattern override (repeatable).",
|
||||
)
|
||||
resources_index.add_argument("--output", help="Output index path.")
|
||||
resources_index.set_defaults(func=_resources_index_command)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main(argv: Iterable[str] | None = None) -> int:
|
||||
parser = build_parser()
|
||||
args = parser.parse_args(argv)
|
||||
if not getattr(args, "command", None):
|
||||
parser.print_help()
|
||||
return 1
|
||||
if args.command == "datasets" and not getattr(args, "datasets_command", None):
|
||||
parser.print_help()
|
||||
return 1
|
||||
if args.command == "resources" and not getattr(args, "resources_command", None):
|
||||
parser.print_help()
|
||||
return 1
|
||||
return args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
53
src/afs_scawful/paths.py
Normal file
53
src/afs_scawful/paths.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""AFS Scawful training path helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .config import load_training_paths
|
||||
|
||||
|
||||
def default_training_root() -> Path:
|
||||
candidate = Path.home() / "src" / "training"
|
||||
if candidate.exists():
|
||||
return candidate
|
||||
return Path.home() / ".context" / "training"
|
||||
|
||||
|
||||
def resolve_training_root(config_path: Path | None = None) -> Path:
|
||||
data = load_training_paths(config_path=config_path)
|
||||
paths: dict[str, Any] = data.get("paths", {}) if isinstance(data, dict) else {}
|
||||
training_root = paths.get("training_root") or paths.get("training")
|
||||
if isinstance(training_root, Path):
|
||||
return training_root
|
||||
if isinstance(training_root, str) and training_root:
|
||||
return Path(training_root).expanduser().resolve()
|
||||
datasets = paths.get("datasets")
|
||||
if isinstance(datasets, Path):
|
||||
return datasets.parent
|
||||
if isinstance(datasets, str) and datasets:
|
||||
return Path(datasets).expanduser().resolve().parent
|
||||
return default_training_root()
|
||||
|
||||
|
||||
def resolve_datasets_root(config_path: Path | None = None) -> Path:
|
||||
data = load_training_paths(config_path=config_path)
|
||||
paths: dict[str, Any] = data.get("paths", {}) if isinstance(data, dict) else {}
|
||||
datasets = paths.get("datasets")
|
||||
if isinstance(datasets, Path):
|
||||
return datasets
|
||||
if isinstance(datasets, str) and datasets:
|
||||
return Path(datasets).expanduser().resolve()
|
||||
return resolve_training_root(config_path=config_path) / "datasets"
|
||||
|
||||
|
||||
def resolve_index_root(config_path: Path | None = None) -> Path:
|
||||
data = load_training_paths(config_path=config_path)
|
||||
paths: dict[str, Any] = data.get("paths", {}) if isinstance(data, dict) else {}
|
||||
index_root = paths.get("index_root")
|
||||
if isinstance(index_root, Path):
|
||||
return index_root
|
||||
if isinstance(index_root, str) and index_root:
|
||||
return Path(index_root).expanduser().resolve()
|
||||
return resolve_training_root(config_path=config_path) / "index"
|
||||
135
src/afs_scawful/registry.py
Normal file
135
src/afs_scawful/registry.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Dataset registry utilities for AFS Scawful."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .paths import resolve_datasets_root, resolve_index_root
|
||||
|
||||
|
||||
@dataclass
|
||||
class DatasetEntry:
|
||||
name: str
|
||||
path: Path
|
||||
size_bytes: int
|
||||
updated_at: str
|
||||
files: list[str]
|
||||
stats: dict[str, Any] | None = None
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
payload: dict[str, Any] = {
|
||||
"name": self.name,
|
||||
"path": str(self.path),
|
||||
"size_bytes": self.size_bytes,
|
||||
"updated_at": self.updated_at,
|
||||
"files": list(self.files),
|
||||
}
|
||||
if self.stats:
|
||||
payload["stats"] = self.stats
|
||||
if self.metadata:
|
||||
payload["metadata"] = self.metadata
|
||||
return payload
|
||||
|
||||
|
||||
def build_dataset_registry(datasets_root: Path) -> dict[str, Any]:
|
||||
entries: list[DatasetEntry] = []
|
||||
if not datasets_root.exists():
|
||||
return {
|
||||
"generated_at": datetime.now().isoformat(),
|
||||
"datasets": [],
|
||||
}
|
||||
|
||||
for entry in sorted(datasets_root.iterdir()):
|
||||
if entry.is_dir():
|
||||
dataset_entry = _build_dataset_entry(entry)
|
||||
if dataset_entry:
|
||||
entries.append(dataset_entry)
|
||||
elif entry.is_file() and entry.suffix.lower() in {".jsonl", ".json"}:
|
||||
dataset_entry = _build_file_dataset_entry(entry)
|
||||
if dataset_entry:
|
||||
entries.append(dataset_entry)
|
||||
|
||||
return {
|
||||
"generated_at": datetime.now().isoformat(),
|
||||
"datasets": [entry.to_dict() for entry in entries],
|
||||
}
|
||||
|
||||
|
||||
def write_dataset_registry(registry: dict[str, Any], output_path: Path) -> Path:
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(json.dumps(registry, indent=2) + "\n", encoding="utf-8")
|
||||
return output_path
|
||||
|
||||
|
||||
def index_datasets(
|
||||
datasets_root: Path | None = None,
|
||||
output_path: Path | None = None,
|
||||
) -> Path:
|
||||
datasets_root = datasets_root or resolve_datasets_root()
|
||||
output_path = output_path or resolve_index_root() / "dataset_registry.json"
|
||||
registry = build_dataset_registry(datasets_root)
|
||||
return write_dataset_registry(registry, output_path)
|
||||
|
||||
|
||||
def _build_dataset_entry(dataset_dir: Path) -> DatasetEntry | None:
|
||||
files = [file for file in dataset_dir.iterdir() if file.is_file()]
|
||||
if not files:
|
||||
return None
|
||||
|
||||
known_files = {
|
||||
"train.jsonl",
|
||||
"val.jsonl",
|
||||
"validation.jsonl",
|
||||
"test.jsonl",
|
||||
"accepted.jsonl",
|
||||
"rejected.jsonl",
|
||||
"stats.json",
|
||||
"metadata.json",
|
||||
"user_annotations.json",
|
||||
}
|
||||
if not any(file.name in known_files for file in files):
|
||||
return None
|
||||
|
||||
size_bytes = sum(file.stat().st_size for file in files)
|
||||
latest_mtime = max(file.stat().st_mtime for file in files)
|
||||
updated_at = datetime.fromtimestamp(latest_mtime).isoformat()
|
||||
stats = _load_json(dataset_dir / "stats.json")
|
||||
metadata = _load_json(dataset_dir / "metadata.json")
|
||||
|
||||
return DatasetEntry(
|
||||
name=dataset_dir.name,
|
||||
path=dataset_dir,
|
||||
size_bytes=size_bytes,
|
||||
updated_at=updated_at,
|
||||
files=[file.name for file in files],
|
||||
stats=stats or None,
|
||||
metadata=metadata or None,
|
||||
)
|
||||
|
||||
|
||||
def _build_file_dataset_entry(dataset_file: Path) -> DatasetEntry | None:
|
||||
size_bytes = dataset_file.stat().st_size
|
||||
updated_at = datetime.fromtimestamp(dataset_file.stat().st_mtime).isoformat()
|
||||
return DatasetEntry(
|
||||
name=dataset_file.stem,
|
||||
path=dataset_file,
|
||||
size_bytes=size_bytes,
|
||||
updated_at=updated_at,
|
||||
files=[dataset_file.name],
|
||||
stats=None,
|
||||
metadata=None,
|
||||
)
|
||||
|
||||
|
||||
def _load_json(path: Path) -> dict[str, Any]:
|
||||
if not path.exists():
|
||||
return {}
|
||||
try:
|
||||
return json.loads(path.read_text(encoding="utf-8"))
|
||||
except json.JSONDecodeError:
|
||||
return {}
|
||||
294
src/afs_scawful/resource_index.py
Normal file
294
src/afs_scawful/resource_index.py
Normal file
@@ -0,0 +1,294 @@
|
||||
"""Resource discovery and indexing for AFS Scawful."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .config import load_training_resources
|
||||
from .paths import resolve_index_root
|
||||
|
||||
|
||||
DEFAULT_SEARCH_PATTERNS = [
|
||||
"**/*.asm",
|
||||
"**/*.md",
|
||||
"**/*.txt",
|
||||
"**/*.inc",
|
||||
"**/*.s",
|
||||
"**/*.65s",
|
||||
"**/*.65c",
|
||||
"**/*.c",
|
||||
"**/*.h",
|
||||
"**/*.cpp",
|
||||
"**/*.cc",
|
||||
"**/*.cs",
|
||||
"**/*.pdf",
|
||||
]
|
||||
|
||||
DEFAULT_EXCLUDE_NAMES = {
|
||||
"node_modules",
|
||||
".git",
|
||||
"build",
|
||||
"dist",
|
||||
"__pycache__",
|
||||
"venv",
|
||||
".venv",
|
||||
"target",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResourceFile:
|
||||
path: Path
|
||||
file_type: str
|
||||
size_bytes: int
|
||||
last_modified: str
|
||||
content_hash: str
|
||||
source_dir: str
|
||||
relative_path: str
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"path": str(self.path),
|
||||
"file_type": self.file_type,
|
||||
"size_bytes": self.size_bytes,
|
||||
"last_modified": self.last_modified,
|
||||
"content_hash": self.content_hash,
|
||||
"source_dir": self.source_dir,
|
||||
"relative_path": self.relative_path,
|
||||
"metadata": self.metadata,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class IndexResult:
|
||||
total_files: int
|
||||
by_type: dict[str, int]
|
||||
by_source: dict[str, int]
|
||||
files: list[ResourceFile]
|
||||
duplicates_found: int
|
||||
errors: list[str]
|
||||
duration_seconds: float
|
||||
indexed_at: str
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"total_files": self.total_files,
|
||||
"by_type": self.by_type,
|
||||
"by_source": self.by_source,
|
||||
"duplicates_found": self.duplicates_found,
|
||||
"errors": self.errors,
|
||||
"duration_seconds": self.duration_seconds,
|
||||
"indexed_at": self.indexed_at,
|
||||
}
|
||||
|
||||
|
||||
class ResourceIndexer:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
index_path: Path | None = None,
|
||||
resource_roots: list[Path] | None = None,
|
||||
search_patterns: list[str] | None = None,
|
||||
exclude_patterns: list[str] | None = None,
|
||||
) -> None:
|
||||
cfg = load_training_resources()
|
||||
config = cfg.get("resource_discovery", {}) if isinstance(cfg, dict) else {}
|
||||
|
||||
self.resource_roots = resource_roots or _parse_paths(config.get("resource_roots"))
|
||||
self.search_patterns = search_patterns or _parse_patterns(
|
||||
config.get("search_patterns")
|
||||
)
|
||||
if not self.search_patterns:
|
||||
self.search_patterns = list(DEFAULT_SEARCH_PATTERNS)
|
||||
|
||||
self.exclude_patterns = exclude_patterns or _parse_patterns(
|
||||
config.get("exclude_patterns")
|
||||
)
|
||||
|
||||
self.index_path = index_path or _parse_index_path(config.get("index_path"))
|
||||
if self.index_path is None:
|
||||
self.index_path = resolve_index_root() / "resource_index.json"
|
||||
|
||||
self._errors: list[str] = []
|
||||
self._hashes: set[str] = set()
|
||||
|
||||
def build_index(self) -> IndexResult:
|
||||
start = datetime.now()
|
||||
files: list[ResourceFile] = []
|
||||
duplicates = 0
|
||||
|
||||
for root in self.resource_roots:
|
||||
if not root.exists():
|
||||
self._errors.append(f"missing root: {root}")
|
||||
continue
|
||||
for pattern in self.search_patterns:
|
||||
for path in root.rglob(pattern):
|
||||
if not path.is_file():
|
||||
continue
|
||||
if _should_exclude(path, self.exclude_patterns):
|
||||
continue
|
||||
resource = self._index_file(path, root)
|
||||
if resource is None:
|
||||
continue
|
||||
if resource.content_hash and resource.content_hash in self._hashes:
|
||||
duplicates += 1
|
||||
continue
|
||||
if resource.content_hash:
|
||||
self._hashes.add(resource.content_hash)
|
||||
files.append(resource)
|
||||
|
||||
by_type: dict[str, int] = {}
|
||||
by_source: dict[str, int] = {}
|
||||
for resource in files:
|
||||
by_type[resource.file_type] = by_type.get(resource.file_type, 0) + 1
|
||||
source_name = Path(resource.source_dir).name
|
||||
by_source[source_name] = by_source.get(source_name, 0) + 1
|
||||
|
||||
duration = (datetime.now() - start).total_seconds()
|
||||
return IndexResult(
|
||||
total_files=len(files),
|
||||
by_type=by_type,
|
||||
by_source=by_source,
|
||||
files=files,
|
||||
duplicates_found=duplicates,
|
||||
errors=self._errors,
|
||||
duration_seconds=duration,
|
||||
indexed_at=datetime.now().isoformat(),
|
||||
)
|
||||
|
||||
def write_index(self, result: IndexResult) -> Path:
|
||||
payload = {
|
||||
"metadata": result.to_dict(),
|
||||
"files": [item.to_dict() for item in result.files],
|
||||
}
|
||||
self.index_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.index_path.write_text(
|
||||
json.dumps(payload, indent=2) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return self.index_path
|
||||
|
||||
def load_index(self) -> IndexResult | None:
|
||||
if not self.index_path.exists():
|
||||
return None
|
||||
try:
|
||||
payload = json.loads(self.index_path.read_text(encoding="utf-8"))
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
files = [
|
||||
ResourceFile(
|
||||
path=Path(item["path"]),
|
||||
file_type=item["file_type"],
|
||||
size_bytes=item["size_bytes"],
|
||||
last_modified=item["last_modified"],
|
||||
content_hash=item.get("content_hash", ""),
|
||||
source_dir=item["source_dir"],
|
||||
relative_path=item["relative_path"],
|
||||
metadata=item.get("metadata", {}),
|
||||
)
|
||||
for item in payload.get("files", [])
|
||||
]
|
||||
meta = payload.get("metadata", {})
|
||||
return IndexResult(
|
||||
total_files=meta.get("total_files", len(files)),
|
||||
by_type=meta.get("by_type", {}),
|
||||
by_source=meta.get("by_source", {}),
|
||||
files=files,
|
||||
duplicates_found=meta.get("duplicates_found", 0),
|
||||
errors=meta.get("errors", []),
|
||||
duration_seconds=meta.get("duration_seconds", 0.0),
|
||||
indexed_at=meta.get("indexed_at", ""),
|
||||
)
|
||||
|
||||
def _index_file(self, path: Path, source_root: Path) -> ResourceFile | None:
|
||||
try:
|
||||
stat = path.stat()
|
||||
except OSError:
|
||||
self._errors.append(f"stat failed: {path}")
|
||||
return None
|
||||
|
||||
file_type = _get_file_type(path)
|
||||
content_hash = _hash_file(path)
|
||||
relative = str(path.relative_to(source_root))
|
||||
return ResourceFile(
|
||||
path=path,
|
||||
file_type=file_type,
|
||||
size_bytes=stat.st_size,
|
||||
last_modified=datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
||||
content_hash=content_hash,
|
||||
source_dir=str(source_root),
|
||||
relative_path=relative,
|
||||
)
|
||||
|
||||
|
||||
def _get_file_type(path: Path) -> str:
|
||||
suffix = path.suffix.lower()
|
||||
type_map = {
|
||||
".asm": "asm",
|
||||
".s": "asm",
|
||||
".65s": "asm",
|
||||
".65c": "asm",
|
||||
".inc": "asm_include",
|
||||
".md": "markdown",
|
||||
".txt": "text",
|
||||
".c": "c",
|
||||
".cc": "cpp",
|
||||
".cpp": "cpp",
|
||||
".h": "header",
|
||||
".cs": "csharp",
|
||||
".pdf": "pdf",
|
||||
}
|
||||
return type_map.get(suffix, "unknown")
|
||||
|
||||
|
||||
def _hash_file(path: Path) -> str:
|
||||
try:
|
||||
content = path.read_bytes()
|
||||
except OSError:
|
||||
return ""
|
||||
return hashlib.md5(content).hexdigest()
|
||||
|
||||
|
||||
def _should_exclude(path: Path, patterns: list[str] | None) -> bool:
|
||||
parts_lower = {part.lower() for part in path.parts}
|
||||
for name in DEFAULT_EXCLUDE_NAMES:
|
||||
if name.lower() in parts_lower:
|
||||
return True
|
||||
if not patterns:
|
||||
return False
|
||||
path_str = str(path)
|
||||
for pattern in patterns:
|
||||
if pattern and pattern in path_str:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _parse_paths(raw: Any) -> list[Path]:
|
||||
if not raw:
|
||||
return []
|
||||
roots: list[Path] = []
|
||||
if isinstance(raw, list):
|
||||
for item in raw:
|
||||
if isinstance(item, str):
|
||||
roots.append(Path(item).expanduser().resolve())
|
||||
return roots
|
||||
|
||||
|
||||
def _parse_patterns(raw: Any) -> list[str]:
|
||||
if not raw:
|
||||
return []
|
||||
if isinstance(raw, list):
|
||||
return [item for item in raw if isinstance(item, str)]
|
||||
return []
|
||||
|
||||
|
||||
def _parse_index_path(raw: Any) -> Path | None:
|
||||
if isinstance(raw, str) and raw:
|
||||
return Path(raw).expanduser().resolve()
|
||||
return None
|
||||
Reference in New Issue
Block a user