From 39b74ffed3d90e571e66d807c6561ab4d0111268 Mon Sep 17 00:00:00 2001 From: scawful Date: Tue, 30 Dec 2025 10:24:31 -0500 Subject: [PATCH] core: context management + graph export --- README.md | 7 + docs/STATUS.md | 2 +- src/afs/__init__.py | 19 +++ src/afs/cli.py | 380 +++++++++++++++++++++++++++++++++++++++++++ src/afs/discovery.py | 126 ++++++++++++++ src/afs/graph.py | 169 +++++++++++++++++++ src/afs/manager.py | 371 ++++++++++++++++++++++++++++++++++++++++++ src/afs/mapping.py | 67 ++++++++ src/afs/models.py | 97 +++++++++++ src/afs/policy.py | 66 ++++++++ src/afs/schema.py | 113 ++++++++++++- src/afs/validator.py | 55 +++++++ 12 files changed, 1470 insertions(+), 2 deletions(-) create mode 100644 src/afs/discovery.py create mode 100644 src/afs/graph.py create mode 100644 src/afs/manager.py create mode 100644 src/afs/mapping.py create mode 100644 src/afs/models.py create mode 100644 src/afs/policy.py create mode 100644 src/afs/validator.py diff --git a/README.md b/README.md index f3eb1ff..716c35a 100644 --- a/README.md +++ b/README.md @@ -16,3 +16,10 @@ Quickstart: - `python -m afs init --context-root ~/src/context --workspace-name trunk` - `python -m afs status` - `python -m afs workspace add --path ~/src/trunk --name trunk` +- `python -m afs context init --path ~/src/trunk` +- `python -m afs context validate --path ~/src/trunk` +- `python -m afs context discover --path ~/src/trunk` +- `python -m afs context ensure-all --path ~/src/trunk` +- `python -m afs graph export --path ~/src/trunk` + +Discovery skips directory names in `general.discovery_ignore` (default: legacy, archive, archives). diff --git a/docs/STATUS.md b/docs/STATUS.md index aad1b26..dbe990a 100644 --- a/docs/STATUS.md +++ b/docs/STATUS.md @@ -1,7 +1,7 @@ # STATUS Stage: Prototype -Now: init/status/workspace commands; minimal config + plugin discovery. +Now: init/status/workspace commands; context init/list/mount/validate/discover/ensure-all; graph export; minimal config + plugin discovery. Not yet: service runtime; full configuration schema validation. Next: one small utility; smoke-test stub. Issues: no runtime yet. diff --git a/src/afs/__init__.py b/src/afs/__init__.py index f1cb4d9..53e9af7 100644 --- a/src/afs/__init__.py +++ b/src/afs/__init__.py @@ -4,7 +4,13 @@ __version__ = "0.0.0" from .config import load_config, load_config_model from .core import find_root, resolve_context_root +from .discovery import discover_contexts, get_project_stats +from .graph import build_graph, default_graph_path, write_graph +from .manager import AFSManager +from .models import ContextRoot, MountPoint, MountType, ProjectMetadata from .plugins import discover_plugins, load_plugins +from .schema import DirectoryConfig, PolicyType +from .validator import AFSValidator __all__ = [ "load_config", @@ -13,4 +19,17 @@ __all__ = [ "load_plugins", "find_root", "resolve_context_root", + "discover_contexts", + "get_project_stats", + "build_graph", + "default_graph_path", + "write_graph", + "AFSManager", + "AFSValidator", + "MountType", + "MountPoint", + "ProjectMetadata", + "ContextRoot", + "DirectoryConfig", + "PolicyType", ] diff --git a/src/afs/cli.py b/src/afs/cli.py index d6e04be..4de250d 100644 --- a/src/afs/cli.py +++ b/src/afs/cli.py @@ -8,8 +8,13 @@ from typing import Iterable from .config import load_config, load_config_model from .core import find_root, resolve_context_root +from .discovery import discover_contexts, get_project_stats +from .graph import build_graph, default_graph_path, write_graph +from .manager import AFSManager +from .models import MountType from .plugins import discover_plugins, load_plugins from .schema import AFSConfig, GeneralConfig, WorkspaceDirectory +from .validator import AFSValidator AFS_DIRS = [ @@ -24,6 +29,34 @@ AFS_DIRS = [ ] +def _parse_mount_type(value: str) -> MountType: + try: + return MountType(value) + except ValueError as exc: + raise argparse.ArgumentTypeError(f"Unknown mount type: {value}") from exc + + +def _load_manager(config_path: Path | None) -> AFSManager: + config = load_config_model(config_path=config_path, merge_user=True) + return AFSManager(config=config) + + +def _resolve_context_paths( + args: argparse.Namespace, manager: AFSManager +) -> tuple[Path, Path, Path | None, str | None]: + project_path = Path(args.path).expanduser().resolve() if args.path else Path.cwd() + context_root = ( + Path(args.context_root).expanduser().resolve() if args.context_root else None + ) + context_dir = args.context_dir if args.context_dir else None + context_path = manager.resolve_context_path( + project_path, + context_root=context_root, + context_dir=context_dir, + ) + return project_path, context_path, context_root, context_dir + + def _ensure_context_root(root: Path) -> None: root.mkdir(parents=True, exist_ok=True) for name in AFS_DIRS: @@ -149,6 +182,197 @@ def _status_command(args: argparse.Namespace) -> int: return 0 +def _context_init_command(args: argparse.Namespace) -> int: + config_path = Path(args.config) if args.config else None + manager = _load_manager(config_path) + project_path, _context_path, context_root, context_dir = _resolve_context_paths( + args, manager + ) + context = manager.init( + path=project_path, + context_root=context_root, + context_dir=context_dir, + link_context=args.link_context, + force=args.force, + ) + print(f"context_path: {context.path}") + print(f"project: {context.project_name}") + return 0 + + +def _context_ensure_command(args: argparse.Namespace) -> int: + config_path = Path(args.config) if args.config else None + manager = _load_manager(config_path) + project_path, _context_path, context_root, context_dir = _resolve_context_paths( + args, manager + ) + context = manager.ensure( + path=project_path, + context_root=context_root, + context_dir=context_dir, + link_context=args.link_context, + ) + print(f"context_path: {context.path}") + print(f"project: {context.project_name}") + return 0 + + +def _context_list_command(args: argparse.Namespace) -> int: + config_path = Path(args.config) if args.config else None + manager = _load_manager(config_path) + _project_path, context_path, _context_root, _context_dir = _resolve_context_paths( + args, manager + ) + context = manager.list_context(context_path=context_path) + print(f"context_path: {context.path}") + print(f"project: {context.project_name}") + if not context.mounts: + print("mounts: (none)") + return 0 + for mount_type in MountType: + mounts = context.mounts.get(mount_type, []) + if not mounts: + continue + print(f"{mount_type.value}:") + for mount in mounts: + suffix = " (link)" if mount.is_symlink else "" + print(f"- {mount.name} -> {mount.source}{suffix}") + return 0 + + +def _context_mount_command(args: argparse.Namespace) -> int: + config_path = Path(args.config) if args.config else None + manager = _load_manager(config_path) + _project_path, context_path, _context_root, _context_dir = _resolve_context_paths( + args, manager + ) + mount_type = _parse_mount_type(args.mount_type) + source = Path(args.source).expanduser().resolve() + mount = manager.mount( + source=source, + mount_type=mount_type, + alias=args.alias, + context_path=context_path, + ) + print(f"mounted {mount.name} in {mount.mount_type.value}: {mount.source}") + return 0 + + +def _context_unmount_command(args: argparse.Namespace) -> int: + config_path = Path(args.config) if args.config else None + manager = _load_manager(config_path) + _project_path, context_path, _context_root, _context_dir = _resolve_context_paths( + args, manager + ) + mount_type = _parse_mount_type(args.mount_type) + removed = manager.unmount( + alias=args.alias, + mount_type=mount_type, + context_path=context_path, + ) + if not removed: + print(f"mount not found: {args.alias}") + return 1 + print(f"unmounted {args.alias} from {mount_type.value}") + return 0 + + +def _context_validate_command(args: argparse.Namespace) -> int: + config_path = Path(args.config) if args.config else None + manager = _load_manager(config_path) + _project_path, context_path, _context_root, _context_dir = _resolve_context_paths( + args, manager + ) + validator = AFSValidator(context_path, afs_directories=manager.config.directories) + status = validator.check_integrity() + missing = ", ".join(status.get("missing", [])) or "(none)" + errors = status.get("errors", []) + print(f"valid: {status.get('valid', False)}") + print(f"missing: {missing}") + if errors: + print(f"errors: {', '.join(errors)}") + return 0 if status.get("valid", False) else 1 + + +def _context_discover_command(args: argparse.Namespace) -> int: + config_path = Path(args.config) if args.config else None + config = load_config_model(config_path=config_path, merge_user=True) + search_paths = None + if args.path: + search_paths = [Path(path).expanduser() for path in args.path] + ignore_names = args.ignore if args.ignore else None + projects = discover_contexts( + search_paths=search_paths, + max_depth=args.max_depth, + ignore_names=ignore_names, + config=config, + ) + if not projects: + print("(no contexts)") + return 0 + for project in projects: + label = project.project_name + print(f"{label}\t{project.path}") + if args.stats: + stats = get_project_stats(projects) + pairs = [f"{key}={value}" for key, value in stats.items()] + print("stats: " + ", ".join(pairs)) + return 0 + + +def _context_ensure_all_command(args: argparse.Namespace) -> int: + config_path = Path(args.config) if args.config else None + config = load_config_model(config_path=config_path, merge_user=True) + search_paths = None + if args.path: + search_paths = [Path(path).expanduser() for path in args.path] + ignore_names = args.ignore if args.ignore else None + projects = discover_contexts( + search_paths=search_paths, + max_depth=args.max_depth, + ignore_names=ignore_names, + config=config, + ) + if not projects: + print("(no contexts)") + return 0 + + manager = AFSManager(config=config) + for project in projects: + if args.dry_run: + print(f"would ensure: {project.project_name}\t{project.path}") + continue + context = manager.ensure( + path=project.path.parent, + context_root=project.path, + ) + print(f"ensured: {context.project_name}\t{context.path}") + return 0 + + +def _graph_export_command(args: argparse.Namespace) -> int: + config_path = Path(args.config) if args.config else None + config = load_config_model(config_path=config_path, merge_user=True) + search_paths = None + if args.path: + search_paths = [Path(path).expanduser() for path in args.path] + ignore_names = args.ignore if args.ignore else None + graph = build_graph( + search_paths=search_paths, + max_depth=args.max_depth, + ignore_names=ignore_names, + config=config, + ) + output_path = ( + Path(args.output).expanduser().resolve() + if args.output + else default_graph_path(config) + ) + write_graph(graph, output_path) + print(f"graph: {output_path}") + return 0 + + def _workspace_registry_path() -> Path: config = load_config_model() return config.general.context_root / "workspaces.toml" @@ -302,6 +526,156 @@ def build_parser() -> argparse.ArgumentParser: status_parser.add_argument("--start-dir", help="Directory to search from.") status_parser.set_defaults(func=_status_command) + context_parser = subparsers.add_parser( + "context", help="Manage per-project .context directories." + ) + context_sub = context_parser.add_subparsers(dest="context_command") + + ctx_init = context_sub.add_parser("init", help="Initialize a project context.") + ctx_init.add_argument("--path", help="Project path (default: cwd).") + ctx_init.add_argument("--context-root", help="Context root path override.") + ctx_init.add_argument("--context-dir", help="Context directory name.") + ctx_init.add_argument( + "--link-context", + action="store_true", + help="Link project context to the specified context root.", + ) + ctx_init.add_argument("--force", action="store_true", help="Overwrite existing context.") + ctx_init.add_argument("--config", help="Config path for directory policies.") + ctx_init.set_defaults(func=_context_init_command) + + ctx_ensure = context_sub.add_parser("ensure", help="Ensure a project context exists.") + ctx_ensure.add_argument("--path", help="Project path (default: cwd).") + ctx_ensure.add_argument("--context-root", help="Context root path override.") + ctx_ensure.add_argument("--context-dir", help="Context directory name.") + ctx_ensure.add_argument( + "--link-context", + action="store_true", + help="Link project context to the specified context root.", + ) + ctx_ensure.add_argument("--config", help="Config path for directory policies.") + ctx_ensure.set_defaults(func=_context_ensure_command) + + ctx_list = context_sub.add_parser("list", help="List mounts for a project context.") + ctx_list.add_argument("--path", help="Project path (default: cwd).") + ctx_list.add_argument("--context-root", help="Context root path override.") + ctx_list.add_argument("--context-dir", help="Context directory name.") + ctx_list.add_argument("--config", help="Config path for directory policies.") + ctx_list.set_defaults(func=_context_list_command) + + ctx_mount = context_sub.add_parser("mount", help="Mount a resource into a context.") + ctx_mount.add_argument("source", help="Source path to mount.") + ctx_mount.add_argument( + "--mount-type", + required=True, + choices=[m.value for m in MountType], + help="Target mount type.", + ) + ctx_mount.add_argument("--alias", help="Alias for the mount point.") + ctx_mount.add_argument("--path", help="Project path (default: cwd).") + ctx_mount.add_argument("--context-root", help="Context root path override.") + ctx_mount.add_argument("--context-dir", help="Context directory name.") + ctx_mount.add_argument("--config", help="Config path for directory policies.") + ctx_mount.set_defaults(func=_context_mount_command) + + ctx_unmount = context_sub.add_parser("unmount", help="Remove a mounted resource.") + ctx_unmount.add_argument("alias", help="Alias of the mount point to remove.") + ctx_unmount.add_argument( + "--mount-type", + required=True, + choices=[m.value for m in MountType], + help="Mount type containing the alias.", + ) + ctx_unmount.add_argument("--path", help="Project path (default: cwd).") + ctx_unmount.add_argument("--context-root", help="Context root path override.") + ctx_unmount.add_argument("--context-dir", help="Context directory name.") + ctx_unmount.add_argument("--config", help="Config path for directory policies.") + ctx_unmount.set_defaults(func=_context_unmount_command) + + ctx_validate = context_sub.add_parser("validate", help="Validate context structure.") + ctx_validate.add_argument("--path", help="Project path (default: cwd).") + ctx_validate.add_argument("--context-root", help="Context root path override.") + ctx_validate.add_argument("--context-dir", help="Context directory name.") + ctx_validate.add_argument("--config", help="Config path for directory policies.") + ctx_validate.set_defaults(func=_context_validate_command) + + ctx_discover = context_sub.add_parser( + "discover", help="Discover .context directories." + ) + ctx_discover.add_argument( + "--path", + action="append", + help="Search root path (repeatable). Defaults to workspace directories.", + ) + ctx_discover.add_argument( + "--max-depth", + type=int, + default=3, + help="Maximum directory depth to scan.", + ) + ctx_discover.add_argument( + "--ignore", + action="append", + help="Directory name to ignore (repeatable).", + ) + ctx_discover.add_argument("--stats", action="store_true", help="Print summary stats.") + ctx_discover.add_argument("--config", help="Config path for directory policies.") + ctx_discover.set_defaults(func=_context_discover_command) + + ctx_ensure_all = context_sub.add_parser( + "ensure-all", help="Ensure all discovered contexts exist." + ) + ctx_ensure_all.add_argument( + "--path", + action="append", + help="Search root path (repeatable). Defaults to workspace directories.", + ) + ctx_ensure_all.add_argument( + "--max-depth", + type=int, + default=3, + help="Maximum directory depth to scan.", + ) + ctx_ensure_all.add_argument( + "--ignore", + action="append", + help="Directory name to ignore (repeatable).", + ) + ctx_ensure_all.add_argument( + "--dry-run", + action="store_true", + help="List contexts without writing.", + ) + ctx_ensure_all.add_argument("--config", help="Config path for directory policies.") + ctx_ensure_all.set_defaults(func=_context_ensure_all_command) + + graph_parser = subparsers.add_parser("graph", help="Export AFS graph data.") + graph_sub = graph_parser.add_subparsers(dest="graph_command") + + graph_export = graph_sub.add_parser("export", help="Export graph JSON.") + graph_export.add_argument( + "--path", + action="append", + help="Search root path (repeatable). Defaults to workspace directories.", + ) + graph_export.add_argument( + "--max-depth", + type=int, + default=3, + help="Maximum directory depth to scan.", + ) + graph_export.add_argument( + "--ignore", + action="append", + help="Directory name to ignore (repeatable).", + ) + graph_export.add_argument( + "--output", + help="Output path for graph JSON (default: context_root/index/afs_graph.json).", + ) + graph_export.add_argument("--config", help="Config path for directory policies.") + graph_export.set_defaults(func=_graph_export_command) + workspace_parser = subparsers.add_parser("workspace", help="Manage workspace links.") workspace_sub = workspace_parser.add_subparsers(dest="workspace_command") @@ -333,6 +707,12 @@ def main(argv: Iterable[str] | None = None) -> int: if args.command == "workspace" and not getattr(args, "workspace_command", None): parser.print_help() return 1 + if args.command == "context" and not getattr(args, "context_command", None): + parser.print_help() + return 1 + if args.command == "graph" and not getattr(args, "graph_command", None): + parser.print_help() + return 1 return args.func(args) diff --git a/src/afs/discovery.py b/src/afs/discovery.py new file mode 100644 index 0000000..c057539 --- /dev/null +++ b/src/afs/discovery.py @@ -0,0 +1,126 @@ +"""AFS discovery helpers for locating .context roots.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Iterable, Iterator + +from .config import load_config_model +from .manager import AFSManager +from .models import ContextRoot +from .schema import AFSConfig + + +def discover_contexts( + search_paths: Iterable[Path] | None = None, + *, + max_depth: int = 3, + ignore_names: Iterable[str] | None = None, + config: AFSConfig | None = None, +) -> list[ContextRoot]: + config = config or load_config_model() + manager = AFSManager(config=config) + roots = _resolve_search_paths(search_paths, config) + ignore_set = _normalize_ignore_names(ignore_names, config) + + contexts: list[ContextRoot] = [] + seen: set[Path] = set() + + for root in roots: + if root.name.lower() in ignore_set: + continue + for context_path in _find_context_dirs(root, max_depth, ignore_set): + resolved = context_path.resolve() + if resolved in seen: + continue + seen.add(resolved) + try: + contexts.append(manager.list_context(context_path=resolved)) + except Exception: + continue + + contexts.sort(key=lambda item: item.project_name.lower()) + return contexts + + +def get_project_stats(projects: list[ContextRoot]) -> dict[str, int]: + total_mounts = 0 + mounts_by_type: dict[str, int] = {} + + for project in projects: + for mount_type, mount_list in project.mounts.items(): + total_mounts += len(mount_list) + mounts_by_type[mount_type.value] = ( + mounts_by_type.get(mount_type.value, 0) + len(mount_list) + ) + + return { + "total_projects": len(projects), + "total_mounts": total_mounts, + **mounts_by_type, + } + + +def _resolve_search_paths( + search_paths: Iterable[Path] | None, + config: AFSConfig, +) -> list[Path]: + resolved: list[Path] = [] + seen: set[Path] = set() + + def _add_path(path: Path) -> None: + try: + resolved_path = path.expanduser().resolve() + except OSError: + return + if resolved_path in seen or not resolved_path.exists(): + return + seen.add(resolved_path) + resolved.append(resolved_path) + + if search_paths: + for entry in search_paths: + _add_path(entry) + return resolved + + for workspace in config.general.workspace_directories: + _add_path(workspace.path) + + if config.general.agent_workspaces_dir: + _add_path(config.general.agent_workspaces_dir) + + return resolved + + +def _find_context_dirs( + root: Path, + max_depth: int, + ignore_names: set[str], + current_depth: int = 0, +) -> Iterator[Path]: + if current_depth > max_depth: + return + + try: + for entry in root.iterdir(): + if entry.name.lower() in ignore_names: + continue + if entry.name == ".context" and entry.is_dir(): + yield entry + elif entry.is_dir() and not entry.name.startswith("."): + yield from _find_context_dirs( + entry, max_depth, ignore_names, current_depth + 1 + ) + except OSError: + return + + +def _normalize_ignore_names( + ignore_names: Iterable[str] | None, config: AFSConfig +) -> set[str]: + names: list[str] = [] + if config.general.discovery_ignore: + names.extend(config.general.discovery_ignore) + if ignore_names: + names.extend(ignore_names) + return {name.strip().lower() for name in names if name and name.strip()} diff --git a/src/afs/graph.py b/src/afs/graph.py new file mode 100644 index 0000000..31b1d62 --- /dev/null +++ b/src/afs/graph.py @@ -0,0 +1,169 @@ +"""Build a graph export for AFS contexts.""" + +from __future__ import annotations + +import json +from datetime import datetime +from pathlib import Path +from typing import Iterable + +from .config import load_config_model +from .discovery import discover_contexts +from .mapping import resolve_directory_name +from .models import ContextRoot, MountType +from .schema import AFSConfig +from . import __version__ + + +def build_graph( + search_paths: Iterable[Path] | None = None, + *, + max_depth: int = 3, + ignore_names: Iterable[str] | None = None, + config: AFSConfig | None = None, +) -> dict[str, object]: + config = config or load_config_model() + contexts = discover_contexts( + search_paths=search_paths, + max_depth=max_depth, + ignore_names=ignore_names, + config=config, + ) + + nodes: list[dict[str, object]] = [] + edges: list[dict[str, str]] = [] + contexts_payload: list[dict[str, object]] = [] + mounts_summary: dict[str, int] = {} + + for context in contexts: + ctx_id = _context_id(context) + nodes.append( + { + "id": ctx_id, + "type": "context", + "label": context.project_name, + "path": str(context.path), + } + ) + + dir_ids: dict[str, str] = {} + for mount_type in MountType: + dir_name = resolve_directory_name( + mount_type, + afs_directories=config.directories, + metadata=context.metadata, + ) + dir_id = _dir_id(context, mount_type) + dir_ids[mount_type.value] = dir_id + nodes.append( + { + "id": dir_id, + "type": "mount_dir", + "label": dir_name, + "mount_type": mount_type.value, + "path": str(context.path / dir_name), + } + ) + edges.append({"from": ctx_id, "to": dir_id, "kind": "contains"}) + + mounts_payload: list[dict[str, object]] = [] + for mount_type, mounts in context.mounts.items(): + dir_name = resolve_directory_name( + mount_type, + afs_directories=config.directories, + metadata=context.metadata, + ) + for mount in mounts: + mount_id = _mount_id(context, mount_type, mount.name) + mount_path = context.path / dir_name / mount.name + nodes.append( + { + "id": mount_id, + "type": "mount", + "label": mount.name, + "mount_type": mount_type.value, + "path": str(mount_path), + "source": str(mount.source), + "is_symlink": mount.is_symlink, + } + ) + edges.append( + { + "from": dir_ids.get(mount_type.value, ctx_id), + "to": mount_id, + "kind": "contains", + } + ) + mounts_payload.append( + { + "id": mount_id, + "name": mount.name, + "mount_type": mount_type.value, + "path": str(mount_path), + "source": str(mount.source), + "is_symlink": mount.is_symlink, + } + ) + mounts_summary[mount_type.value] = ( + mounts_summary.get(mount_type.value, 0) + 1 + ) + + contexts_payload.append( + { + "id": ctx_id, + "name": context.project_name, + "path": str(context.path), + "metadata": context.metadata.to_dict(), + "mounts": mounts_payload, + } + ) + + summary = { + "total_contexts": len(contexts), + "total_mounts": sum(mounts_summary.values()), + "mounts_by_type": mounts_summary, + } + + return { + "meta": { + "generated_at": datetime.now().isoformat(), + "afs_version": __version__, + "context_root": str(config.general.context_root), + "max_depth": max_depth, + "ignore": list(ignore_names or config.general.discovery_ignore), + }, + "workspaces": [ + { + "path": str(ws.path), + "description": ws.description, + } + for ws in config.general.workspace_directories + ], + "contexts": contexts_payload, + "nodes": nodes, + "edges": edges, + "summary": summary, + } + + +def write_graph(graph: dict[str, object], output_path: Path) -> Path: + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(json.dumps(graph, indent=2) + "\n", encoding="utf-8") + return output_path + + +def default_graph_path(config: AFSConfig | None = None) -> Path: + config = config or load_config_model() + return config.general.context_root / "index" / "afs_graph.json" + + +def _context_id(context: ContextRoot) -> str: + return f"ctx:{context.path}" + + +def _dir_id(context: ContextRoot, mount_type: MountType) -> str: + return f"dir:{context.path}:{mount_type.value}" + + +def _mount_id(context: ContextRoot, mount_type: MountType, name: str) -> str: + return f"mount:{context.path}:{mount_type.value}:{name}" diff --git a/src/afs/manager.py b/src/afs/manager.py new file mode 100644 index 0000000..f30e2e8 --- /dev/null +++ b/src/afs/manager.py @@ -0,0 +1,371 @@ +"""AFS manager for .context directories.""" + +from __future__ import annotations + +import json +import shutil +from datetime import datetime +from pathlib import Path +from typing import Optional + +from .config import load_config_model +from .mapping import resolve_directory_map, resolve_directory_name +from .models import ContextRoot, MountPoint, MountType, ProjectMetadata +from .schema import AFSConfig, DirectoryConfig + + +class AFSManager: + """Manage AFS context roots for projects.""" + + CONTEXT_DIR_DEFAULT = ".context" + METADATA_FILE = "metadata.json" + STATE_FILE = "state.md" + DEFERRED_FILE = "deferred.md" + METACOGNITION_FILE = "metacognition.json" + GOALS_FILE = "goals.json" + EMOTIONS_FILE = "emotions.json" + EPISTEMIC_FILE = "epistemic.json" + + DEFAULT_STATE_TEMPLATE = "# Agent State\n\n" + DEFAULT_DEFERRED_TEMPLATE = "# Deferred\n\n" + + def __init__( + self, + config: AFSConfig | None = None, + directories: list[DirectoryConfig] | None = None, + ) -> None: + self.config = config or load_config_model() + self._directories = directories or list(self.config.directories) + self._directory_map = resolve_directory_map(afs_directories=self._directories) + + def resolve_context_path( + self, + project_path: Path, + context_root: Path | None = None, + context_dir: str | None = None, + ) -> Path: + if context_root: + return context_root.expanduser().resolve() + context_dir = context_dir or self.CONTEXT_DIR_DEFAULT + return project_path.resolve() / context_dir + + def ensure( + self, + path: Path = Path("."), + *, + context_root: Path | None = None, + context_dir: str | None = None, + link_context: bool = False, + ) -> ContextRoot: + project_path = path.resolve() + context_path = self.resolve_context_path( + project_path, + context_root=context_root, + context_dir=context_dir, + ) + + self._ensure_context_dirs(context_path) + metadata = self._ensure_metadata(context_path, project_path) + self._ensure_cognitive_scaffold(context_path) + if link_context and context_root: + link_path = project_path / (context_dir or self.CONTEXT_DIR_DEFAULT) + self._ensure_link(link_path, context_path, force=False) + return self.list_context(context_path=context_path, metadata=metadata) + + def init( + self, + path: Path = Path("."), + *, + context_root: Path | None = None, + context_dir: str | None = None, + link_context: bool = False, + force: bool = False, + ) -> ContextRoot: + project_path = path.resolve() + context_path = self.resolve_context_path( + project_path, + context_root=context_root, + context_dir=context_dir, + ) + + if link_context and context_root: + link_path = project_path / (context_dir or self.CONTEXT_DIR_DEFAULT) + self._ensure_context_dirs(context_path) + metadata = self._ensure_metadata(context_path, project_path) + self._ensure_cognitive_scaffold(context_path) + self._ensure_link(link_path, context_path, force=force) + return self.list_context(context_path=context_path, metadata=metadata) + + if context_path.exists(): + if not force: + raise FileExistsError(f"AFS already exists at {context_path}") + self._remove_context_path(context_path) + + self._ensure_context_dirs(context_path) + metadata = self._ensure_metadata(context_path, project_path) + self._ensure_cognitive_scaffold(context_path) + return self.list_context(context_path=context_path, metadata=metadata) + + def mount( + self, + source: Path, + mount_type: MountType, + alias: Optional[str] = None, + context_path: Optional[Path] = None, + ) -> MountPoint: + if context_path is None: + context_path = Path(".") / self.CONTEXT_DIR_DEFAULT + + source = source.expanduser().resolve() + if not source.exists(): + raise FileNotFoundError(f"Source {source} does not exist") + + context_path = context_path.resolve() + if not context_path.exists(): + raise FileNotFoundError(f"No AFS context at {context_path}") + + metadata = self._load_metadata(context_path) + directory_name = resolve_directory_name( + mount_type, + afs_directories=self._directories, + metadata=metadata, + ) + alias = alias or source.name + destination = context_path / directory_name / alias + + if destination.exists(): + raise FileExistsError( + f"Mount point '{alias}' already exists in {mount_type.value}" + ) + + destination.symlink_to(source) + return MountPoint( + name=alias, + source=source, + mount_type=mount_type, + is_symlink=True, + ) + + def unmount( + self, + alias: str, + mount_type: MountType, + context_path: Optional[Path] = None, + ) -> bool: + if context_path is None: + context_path = Path(".") / self.CONTEXT_DIR_DEFAULT + + metadata = self._load_metadata(context_path) + directory_name = resolve_directory_name( + mount_type, + afs_directories=self._directories, + metadata=metadata, + ) + mount_path = context_path / directory_name / alias + if mount_path.exists() or mount_path.is_symlink(): + mount_path.unlink() + return True + return False + + def list_context( + self, + context_path: Optional[Path] = None, + metadata: ProjectMetadata | None = None, + ) -> ContextRoot: + if context_path is None: + context_path = Path(".") / self.CONTEXT_DIR_DEFAULT + + context_path = context_path.resolve() + if not context_path.exists(): + raise FileNotFoundError("No AFS initialized") + + if metadata is None: + metadata = self._load_metadata(context_path) + + if metadata is None: + metadata = ProjectMetadata() + + mounts: dict[MountType, list[MountPoint]] = {} + directory_map = resolve_directory_map( + afs_directories=self._directories, + metadata=metadata, + ) + + for mount_type in MountType: + subdir = context_path / directory_map.get(mount_type, mount_type.value) + if not subdir.exists(): + continue + + mount_list: list[MountPoint] = [] + for item in subdir.iterdir(): + if item.name in {".keep", self.METADATA_FILE}: + continue + source = item.resolve() if item.is_symlink() else item + mount_list.append( + MountPoint( + name=item.name, + source=source, + mount_type=mount_type, + is_symlink=item.is_symlink(), + ) + ) + + mounts[mount_type] = mount_list + + return ContextRoot( + path=context_path, + project_name=context_path.parent.name, + metadata=metadata, + mounts=mounts, + ) + + def clean(self, context_path: Optional[Path] = None) -> None: + if context_path is None: + context_path = Path(".") / self.CONTEXT_DIR_DEFAULT + + if context_path.exists(): + self._remove_context_path(context_path) + + def update_metadata( + self, + context_path: Optional[Path] = None, + *, + description: Optional[str] = None, + agents: Optional[list[str]] = None, + ) -> ProjectMetadata: + if context_path is None: + context_path = Path(".") / self.CONTEXT_DIR_DEFAULT + + metadata_path = context_path / self.METADATA_FILE + if not metadata_path.exists(): + raise FileNotFoundError("No AFS initialized") + + metadata = self._load_metadata(context_path) or ProjectMetadata() + + if description is not None: + metadata.description = description + if agents is not None: + metadata.agents = agents + + self._write_metadata(metadata_path, metadata) + return metadata + + def _ensure_context_dirs(self, context_path: Path) -> None: + context_path.mkdir(parents=True, exist_ok=True) + for dir_config in self._directories: + subdir = context_path / dir_config.name + subdir.mkdir(parents=True, exist_ok=True) + keep = subdir / ".keep" + if not keep.exists(): + keep.touch() + + def _ensure_metadata(self, context_path: Path, project_path: Path) -> ProjectMetadata: + metadata_path = context_path / self.METADATA_FILE + directory_map = { + mount_type.value: name for mount_type, name in self._directory_map.items() + } + if not metadata_path.exists(): + metadata = ProjectMetadata( + created_at=datetime.now().isoformat(), + description=f"AFS for {project_path.name}", + directories=directory_map, + ) + self._write_metadata(metadata_path, metadata) + return metadata + + metadata = self._load_metadata(context_path) + if metadata is None: + metadata = ProjectMetadata( + created_at=datetime.now().isoformat(), + description=f"AFS for {project_path.name}", + directories=directory_map, + ) + self._write_metadata(metadata_path, metadata) + return metadata + + if not metadata.directories and directory_map: + metadata.directories = directory_map + self._write_metadata(metadata_path, metadata) + return metadata + + def _ensure_cognitive_scaffold(self, context_path: Path) -> None: + if not self.config.cognitive.enabled: + return + + metadata = self._load_metadata(context_path) + scratchpad_dir = context_path / resolve_directory_name( + MountType.SCRATCHPAD, + afs_directories=self._directories, + metadata=metadata, + ) + memory_dir = context_path / resolve_directory_name( + MountType.MEMORY, + afs_directories=self._directories, + metadata=metadata, + ) + scratchpad_dir.mkdir(parents=True, exist_ok=True) + memory_dir.mkdir(parents=True, exist_ok=True) + + state_file = scratchpad_dir / self.STATE_FILE + if not state_file.exists(): + state_file.write_text(self.DEFAULT_STATE_TEMPLATE, encoding="utf-8") + + deferred_file = scratchpad_dir / self.DEFERRED_FILE + if not deferred_file.exists(): + deferred_file.write_text(self.DEFAULT_DEFERRED_TEMPLATE, encoding="utf-8") + + if self.config.cognitive.record_metacognition: + meta_file = scratchpad_dir / self.METACOGNITION_FILE + if not meta_file.exists(): + meta_file.write_text("{}\n", encoding="utf-8") + + if self.config.cognitive.record_goals: + goals_file = scratchpad_dir / self.GOALS_FILE + if not goals_file.exists(): + goals_file.write_text("[]\n", encoding="utf-8") + + if self.config.cognitive.record_emotions: + emotions_file = scratchpad_dir / self.EMOTIONS_FILE + if not emotions_file.exists(): + emotions_file.write_text("[]\n", encoding="utf-8") + + if self.config.cognitive.record_epistemic: + epistemic_file = scratchpad_dir / self.EPISTEMIC_FILE + if not epistemic_file.exists(): + epistemic_file.write_text("{}\n", encoding="utf-8") + + def _ensure_link(self, link_path: Path, target: Path, force: bool) -> None: + if link_path.is_symlink(): + if link_path.resolve() == target.resolve(): + return + if not force: + raise FileExistsError(f"Context link already exists at {link_path}") + link_path.unlink() + elif link_path.exists(): + if not force: + raise FileExistsError(f"Context path already exists at {link_path}") + self._remove_context_path(link_path) + + link_path.symlink_to(target) + + def _remove_context_path(self, context_path: Path) -> None: + if context_path.is_symlink(): + context_path.unlink() + elif context_path.exists(): + shutil.rmtree(context_path) + + def _load_metadata(self, context_path: Path) -> ProjectMetadata | None: + metadata_path = context_path / self.METADATA_FILE + if not metadata_path.exists(): + return None + try: + payload = json.loads(metadata_path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return None + return ProjectMetadata.from_dict(payload) + + def _write_metadata(self, path: Path, metadata: ProjectMetadata) -> None: + path.write_text( + json.dumps(metadata.to_dict(), indent=2, default=str) + "\n", + encoding="utf-8", + ) diff --git a/src/afs/mapping.py b/src/afs/mapping.py new file mode 100644 index 0000000..4a8d10e --- /dev/null +++ b/src/afs/mapping.py @@ -0,0 +1,67 @@ +"""Helpers for mapping AFS roles to on-disk directory names.""" + +from __future__ import annotations + +from typing import Iterable + +from .models import MountType, ProjectMetadata +from .schema import DirectoryConfig + + +def _role_to_mount_type(role_name: str) -> MountType | None: + try: + return MountType(role_name) + except ValueError: + return None + + +def build_directory_map_from_config( + afs_directories: Iterable[DirectoryConfig] | None, +) -> dict[MountType, str]: + mapping: dict[MountType, str] = {} + if not afs_directories: + return mapping + + for dir_config in afs_directories: + role_name = dir_config.role.value if dir_config.role else dir_config.name + mount_type = _role_to_mount_type(role_name) + if not mount_type: + continue + mapping[mount_type] = dir_config.name + return mapping + + +def build_directory_map_from_metadata( + metadata: ProjectMetadata | None, +) -> dict[MountType, str]: + mapping: dict[MountType, str] = {} + if not metadata or not metadata.directories: + return mapping + + for role_name, dir_name in metadata.directories.items(): + mount_type = _role_to_mount_type(role_name) + if not mount_type: + continue + mapping[mount_type] = dir_name + return mapping + + +def resolve_directory_map( + *, + afs_directories: Iterable[DirectoryConfig] | None = None, + metadata: ProjectMetadata | None = None, +) -> dict[MountType, str]: + mapping = build_directory_map_from_metadata(metadata) + if not mapping: + mapping = build_directory_map_from_config(afs_directories) + return mapping + + +def resolve_directory_name( + mount_type: MountType, + *, + afs_directories: Iterable[DirectoryConfig] | None = None, + metadata: ProjectMetadata | None = None, +) -> str: + mapping = resolve_directory_map(afs_directories=afs_directories, metadata=metadata) + return mapping.get(mount_type, mount_type.value) diff --git a/src/afs/models.py b/src/afs/models.py new file mode 100644 index 0000000..7aaab1c --- /dev/null +++ b/src/afs/models.py @@ -0,0 +1,97 @@ +"""Core AFS data models.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import Any + + +class MountType(str, Enum): + """Supported AFS directory roles.""" + + MEMORY = "memory" + KNOWLEDGE = "knowledge" + TOOLS = "tools" + SCRATCHPAD = "scratchpad" + HISTORY = "history" + HIVEMIND = "hivemind" + GLOBAL = "global" + ITEMS = "items" + + +@dataclass(frozen=True) +class MountPoint: + """A mounted resource inside an AFS directory.""" + + name: str + source: Path + mount_type: MountType + is_symlink: bool = True + + +@dataclass +class ProjectMetadata: + """Metadata for an AFS context root.""" + + created_at: str = field(default_factory=lambda: datetime.now().isoformat()) + description: str = "" + agents: list[str] = field(default_factory=list) + directories: dict[str, str] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: dict[str, Any] | None) -> "ProjectMetadata": + data = data or {} + created_at = data.get("created_at") + if isinstance(created_at, datetime): + created_at = created_at.isoformat() + if not isinstance(created_at, str): + created_at = datetime.now().isoformat() + description = data.get("description") if isinstance(data.get("description"), str) else "" + agents = [agent for agent in data.get("agents", []) if isinstance(agent, str)] + directories: dict[str, str] = {} + raw_dirs = data.get("directories") + if isinstance(raw_dirs, dict): + for key, value in raw_dirs.items(): + directories[str(key)] = str(value) + return cls( + created_at=created_at, + description=description, + agents=agents, + directories=directories, + ) + + def to_dict(self) -> dict[str, Any]: + return { + "created_at": self.created_at, + "description": self.description, + "agents": list(self.agents), + "directories": dict(self.directories), + } + + +@dataclass +class ContextRoot: + """An AFS .context directory.""" + + path: Path + project_name: str + metadata: ProjectMetadata = field(default_factory=ProjectMetadata) + mounts: dict[MountType, list[MountPoint]] = field(default_factory=dict) + + @property + def is_valid(self) -> bool: + required = [mount_type.value for mount_type in MountType] + directory_map = self.metadata.directories if self.metadata else {} + return all( + (self.path / directory_map.get(role, role)).exists() for role in required + ) + + @property + def total_mounts(self) -> int: + return sum(len(mounts) for mounts in self.mounts.values()) + + def get_mounts(self, mount_type: MountType) -> list[MountPoint]: + return self.mounts.get(mount_type, []) diff --git a/src/afs/policy.py b/src/afs/policy.py new file mode 100644 index 0000000..b24c27d --- /dev/null +++ b/src/afs/policy.py @@ -0,0 +1,66 @@ +"""AFS permission policy enforcement.""" + +from __future__ import annotations + +from .models import MountType +from .schema import DirectoryConfig, PolicyType + + +class PolicyEnforcer: + """Enforces AFS directory policies.""" + + def __init__(self, directories: list[DirectoryConfig]): + self._policies: dict[MountType, PolicyType] = {} + for directory in directories: + role_name = directory.role.value if directory.role else directory.name + try: + mount_type = MountType(role_name) + except ValueError: + continue + self._policies[mount_type] = directory.policy + + def get_policy(self, mount_type: MountType) -> PolicyType: + return self._policies.get(mount_type, PolicyType.READ_ONLY) + + def can_read(self, mount_type: MountType) -> bool: + return True + + def can_write(self, mount_type: MountType) -> bool: + policy = self.get_policy(mount_type) + return policy in (PolicyType.WRITABLE, PolicyType.EXECUTABLE) + + def can_execute(self, mount_type: MountType) -> bool: + return self.get_policy(mount_type) == PolicyType.EXECUTABLE + + def validate_operation(self, mount_type: MountType, operation: str) -> tuple[bool, str]: + policy = self.get_policy(mount_type) + + if operation == "read": + return (True, "") + + if operation == "write": + if policy in (PolicyType.WRITABLE, PolicyType.EXECUTABLE): + return (True, "") + return ( + False, + f"{mount_type.value} is {policy.value}, writing not allowed", + ) + + if operation == "execute": + if policy == PolicyType.EXECUTABLE: + return (True, "") + return ( + False, + f"{mount_type.value} is {policy.value}, execution not allowed", + ) + + return (False, f"Unknown operation: {operation}") + + def get_policy_description(self, mount_type: MountType) -> str: + policy = self.get_policy(mount_type) + descriptions = { + PolicyType.READ_ONLY: "Read-only (no modifications allowed)", + PolicyType.WRITABLE: "Writable (modifications allowed)", + PolicyType.EXECUTABLE: "Executable (can run scripts/binaries)", + } + return descriptions.get(policy, "Unknown policy") diff --git a/src/afs/schema.py b/src/afs/schema.py index 16fb273..5373e2d 100644 --- a/src/afs/schema.py +++ b/src/afs/schema.py @@ -3,14 +3,100 @@ from __future__ import annotations from dataclasses import dataclass, field +from enum import Enum from pathlib import Path from typing import Any +from .models import MountType + def _as_path(value: str | Path) -> Path: return value if isinstance(value, Path) else Path(value).expanduser().resolve() +def default_discovery_ignore() -> list[str]: + return ["legacy", "archive", "archives"] + + +class PolicyType(str, Enum): + READ_ONLY = "read_only" + WRITABLE = "writable" + EXECUTABLE = "executable" + + +@dataclass +class DirectoryConfig: + name: str + policy: PolicyType + description: str = "" + role: MountType | None = None + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "DirectoryConfig": + name = str(data.get("name", "")).strip() + role_raw = data.get("role") + role = None + if isinstance(role_raw, str): + try: + role = MountType(role_raw) + except ValueError: + role = None + if not name and role: + name = role.value + policy_raw = data.get("policy", PolicyType.READ_ONLY.value) + try: + policy = PolicyType(policy_raw) + except ValueError: + policy = PolicyType.READ_ONLY + description = data.get("description") if isinstance(data.get("description"), str) else "" + return cls(name=name, policy=policy, description=description, role=role) + + +def default_directory_configs() -> list[DirectoryConfig]: + return [ + DirectoryConfig( + name="memory", + policy=PolicyType.READ_ONLY, + role=MountType.MEMORY, + ), + DirectoryConfig( + name="knowledge", + policy=PolicyType.READ_ONLY, + role=MountType.KNOWLEDGE, + ), + DirectoryConfig( + name="tools", + policy=PolicyType.EXECUTABLE, + role=MountType.TOOLS, + ), + DirectoryConfig( + name="scratchpad", + policy=PolicyType.WRITABLE, + role=MountType.SCRATCHPAD, + ), + DirectoryConfig( + name="history", + policy=PolicyType.READ_ONLY, + role=MountType.HISTORY, + ), + DirectoryConfig( + name="hivemind", + policy=PolicyType.WRITABLE, + role=MountType.HIVEMIND, + ), + DirectoryConfig( + name="global", + policy=PolicyType.WRITABLE, + role=MountType.GLOBAL, + ), + DirectoryConfig( + name="items", + policy=PolicyType.WRITABLE, + role=MountType.ITEMS, + ), + ] + + @dataclass class WorkspaceDirectory: path: Path @@ -31,6 +117,7 @@ class GeneralConfig: ) python_executable: Path | None = None workspace_directories: list[WorkspaceDirectory] = field(default_factory=list) + discovery_ignore: list[str] = field(default_factory=default_discovery_ignore) @classmethod def from_dict(cls, data: dict[str, Any]) -> "GeneralConfig": @@ -42,6 +129,11 @@ class GeneralConfig: for item in data.get("workspace_directories", []) if isinstance(item, dict) ] + raw_ignore = data.get("discovery_ignore") + if isinstance(raw_ignore, list): + discovery_ignore = [item for item in raw_ignore if isinstance(item, str)] + else: + discovery_ignore = default_discovery_ignore() return cls( context_root=_as_path(context_root) if context_root @@ -53,6 +145,7 @@ class GeneralConfig: if python_executable else None, workspace_directories=workspace_directories, + discovery_ignore=discovery_ignore, ) @@ -112,6 +205,7 @@ class CognitiveConfig: class AFSConfig: general: GeneralConfig = field(default_factory=GeneralConfig) plugins: PluginsConfig = field(default_factory=PluginsConfig) + directories: list[DirectoryConfig] = field(default_factory=default_directory_configs) cognitive: CognitiveConfig = field(default_factory=CognitiveConfig) @classmethod @@ -119,5 +213,22 @@ class AFSConfig: data = data or {} general = GeneralConfig.from_dict(data.get("general", {})) plugins = PluginsConfig.from_dict(data.get("plugins", {})) + directories = _parse_directory_config(data) cognitive = CognitiveConfig.from_dict(data.get("cognitive", {})) - return cls(general=general, plugins=plugins, cognitive=cognitive) + return cls( + general=general, + plugins=plugins, + directories=directories, + cognitive=cognitive, + ) + + +def _parse_directory_config(data: dict[str, Any]) -> list[DirectoryConfig]: + raw = data.get("directories") + if raw is None: + raw = data.get("afs_directories") + if raw is None: + return default_directory_configs() + if not isinstance(raw, list): + return default_directory_configs() + return [DirectoryConfig.from_dict(item) for item in raw if isinstance(item, dict)] diff --git a/src/afs/validator.py b/src/afs/validator.py new file mode 100644 index 0000000..1bda9a0 --- /dev/null +++ b/src/afs/validator.py @@ -0,0 +1,55 @@ +"""AFS context validator.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from .mapping import resolve_directory_map +from .models import MountType, ProjectMetadata +from .schema import DirectoryConfig + + +class AFSValidator: + def __init__( + self, + context_root: Path, + afs_directories: list[DirectoryConfig] | None = None, + ) -> None: + self.root = context_root + self._afs_directories = afs_directories + + def check_integrity(self) -> dict[str, Any]: + if not self.root.exists(): + return { + "valid": False, + "missing": [], + "errors": ["context root does not exist"], + } + + metadata = _load_metadata(self.root) + directory_map = resolve_directory_map( + afs_directories=self._afs_directories, + metadata=metadata, + ) + required_dirs = [directory_map.get(mt, mt.value) for mt in MountType] + status: dict[str, Any] = {"valid": True, "missing": [], "errors": []} + + for directory in required_dirs: + if not (self.root / directory).is_dir(): + status["valid"] = False + status["missing"].append(directory) + + return status + + +def _load_metadata(context_root: Path) -> ProjectMetadata | None: + metadata_path = context_root / "metadata.json" + if not metadata_path.exists(): + return None + try: + payload = json.loads(metadata_path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return None + return ProjectMetadata.from_dict(payload)