core: context management + graph export

This commit is contained in:
scawful
2025-12-30 10:24:31 -05:00
parent 369bfe1a01
commit 39b74ffed3
12 changed files with 1470 additions and 2 deletions

View File

@@ -16,3 +16,10 @@ Quickstart:
- `python -m afs init --context-root ~/src/context --workspace-name trunk`
- `python -m afs status`
- `python -m afs workspace add --path ~/src/trunk --name trunk`
- `python -m afs context init --path ~/src/trunk`
- `python -m afs context validate --path ~/src/trunk`
- `python -m afs context discover --path ~/src/trunk`
- `python -m afs context ensure-all --path ~/src/trunk`
- `python -m afs graph export --path ~/src/trunk`
Discovery skips directory names in `general.discovery_ignore` (default: legacy, archive, archives).

View File

@@ -1,7 +1,7 @@
# STATUS
Stage: Prototype
Now: init/status/workspace commands; minimal config + plugin discovery.
Now: init/status/workspace commands; context init/list/mount/validate/discover/ensure-all; graph export; minimal config + plugin discovery.
Not yet: service runtime; full configuration schema validation.
Next: one small utility; smoke-test stub.
Issues: no runtime yet.

View File

@@ -4,7 +4,13 @@ __version__ = "0.0.0"
from .config import load_config, load_config_model
from .core import find_root, resolve_context_root
from .discovery import discover_contexts, get_project_stats
from .graph import build_graph, default_graph_path, write_graph
from .manager import AFSManager
from .models import ContextRoot, MountPoint, MountType, ProjectMetadata
from .plugins import discover_plugins, load_plugins
from .schema import DirectoryConfig, PolicyType
from .validator import AFSValidator
__all__ = [
"load_config",
@@ -13,4 +19,17 @@ __all__ = [
"load_plugins",
"find_root",
"resolve_context_root",
"discover_contexts",
"get_project_stats",
"build_graph",
"default_graph_path",
"write_graph",
"AFSManager",
"AFSValidator",
"MountType",
"MountPoint",
"ProjectMetadata",
"ContextRoot",
"DirectoryConfig",
"PolicyType",
]

View File

@@ -8,8 +8,13 @@ from typing import Iterable
from .config import load_config, load_config_model
from .core import find_root, resolve_context_root
from .discovery import discover_contexts, get_project_stats
from .graph import build_graph, default_graph_path, write_graph
from .manager import AFSManager
from .models import MountType
from .plugins import discover_plugins, load_plugins
from .schema import AFSConfig, GeneralConfig, WorkspaceDirectory
from .validator import AFSValidator
AFS_DIRS = [
@@ -24,6 +29,34 @@ AFS_DIRS = [
]
def _parse_mount_type(value: str) -> MountType:
try:
return MountType(value)
except ValueError as exc:
raise argparse.ArgumentTypeError(f"Unknown mount type: {value}") from exc
def _load_manager(config_path: Path | None) -> AFSManager:
config = load_config_model(config_path=config_path, merge_user=True)
return AFSManager(config=config)
def _resolve_context_paths(
args: argparse.Namespace, manager: AFSManager
) -> tuple[Path, Path, Path | None, str | None]:
project_path = Path(args.path).expanduser().resolve() if args.path else Path.cwd()
context_root = (
Path(args.context_root).expanduser().resolve() if args.context_root else None
)
context_dir = args.context_dir if args.context_dir else None
context_path = manager.resolve_context_path(
project_path,
context_root=context_root,
context_dir=context_dir,
)
return project_path, context_path, context_root, context_dir
def _ensure_context_root(root: Path) -> None:
root.mkdir(parents=True, exist_ok=True)
for name in AFS_DIRS:
@@ -149,6 +182,197 @@ def _status_command(args: argparse.Namespace) -> int:
return 0
def _context_init_command(args: argparse.Namespace) -> int:
config_path = Path(args.config) if args.config else None
manager = _load_manager(config_path)
project_path, _context_path, context_root, context_dir = _resolve_context_paths(
args, manager
)
context = manager.init(
path=project_path,
context_root=context_root,
context_dir=context_dir,
link_context=args.link_context,
force=args.force,
)
print(f"context_path: {context.path}")
print(f"project: {context.project_name}")
return 0
def _context_ensure_command(args: argparse.Namespace) -> int:
config_path = Path(args.config) if args.config else None
manager = _load_manager(config_path)
project_path, _context_path, context_root, context_dir = _resolve_context_paths(
args, manager
)
context = manager.ensure(
path=project_path,
context_root=context_root,
context_dir=context_dir,
link_context=args.link_context,
)
print(f"context_path: {context.path}")
print(f"project: {context.project_name}")
return 0
def _context_list_command(args: argparse.Namespace) -> int:
config_path = Path(args.config) if args.config else None
manager = _load_manager(config_path)
_project_path, context_path, _context_root, _context_dir = _resolve_context_paths(
args, manager
)
context = manager.list_context(context_path=context_path)
print(f"context_path: {context.path}")
print(f"project: {context.project_name}")
if not context.mounts:
print("mounts: (none)")
return 0
for mount_type in MountType:
mounts = context.mounts.get(mount_type, [])
if not mounts:
continue
print(f"{mount_type.value}:")
for mount in mounts:
suffix = " (link)" if mount.is_symlink else ""
print(f"- {mount.name} -> {mount.source}{suffix}")
return 0
def _context_mount_command(args: argparse.Namespace) -> int:
config_path = Path(args.config) if args.config else None
manager = _load_manager(config_path)
_project_path, context_path, _context_root, _context_dir = _resolve_context_paths(
args, manager
)
mount_type = _parse_mount_type(args.mount_type)
source = Path(args.source).expanduser().resolve()
mount = manager.mount(
source=source,
mount_type=mount_type,
alias=args.alias,
context_path=context_path,
)
print(f"mounted {mount.name} in {mount.mount_type.value}: {mount.source}")
return 0
def _context_unmount_command(args: argparse.Namespace) -> int:
config_path = Path(args.config) if args.config else None
manager = _load_manager(config_path)
_project_path, context_path, _context_root, _context_dir = _resolve_context_paths(
args, manager
)
mount_type = _parse_mount_type(args.mount_type)
removed = manager.unmount(
alias=args.alias,
mount_type=mount_type,
context_path=context_path,
)
if not removed:
print(f"mount not found: {args.alias}")
return 1
print(f"unmounted {args.alias} from {mount_type.value}")
return 0
def _context_validate_command(args: argparse.Namespace) -> int:
config_path = Path(args.config) if args.config else None
manager = _load_manager(config_path)
_project_path, context_path, _context_root, _context_dir = _resolve_context_paths(
args, manager
)
validator = AFSValidator(context_path, afs_directories=manager.config.directories)
status = validator.check_integrity()
missing = ", ".join(status.get("missing", [])) or "(none)"
errors = status.get("errors", [])
print(f"valid: {status.get('valid', False)}")
print(f"missing: {missing}")
if errors:
print(f"errors: {', '.join(errors)}")
return 0 if status.get("valid", False) else 1
def _context_discover_command(args: argparse.Namespace) -> int:
config_path = Path(args.config) if args.config else None
config = load_config_model(config_path=config_path, merge_user=True)
search_paths = None
if args.path:
search_paths = [Path(path).expanduser() for path in args.path]
ignore_names = args.ignore if args.ignore else None
projects = discover_contexts(
search_paths=search_paths,
max_depth=args.max_depth,
ignore_names=ignore_names,
config=config,
)
if not projects:
print("(no contexts)")
return 0
for project in projects:
label = project.project_name
print(f"{label}\t{project.path}")
if args.stats:
stats = get_project_stats(projects)
pairs = [f"{key}={value}" for key, value in stats.items()]
print("stats: " + ", ".join(pairs))
return 0
def _context_ensure_all_command(args: argparse.Namespace) -> int:
config_path = Path(args.config) if args.config else None
config = load_config_model(config_path=config_path, merge_user=True)
search_paths = None
if args.path:
search_paths = [Path(path).expanduser() for path in args.path]
ignore_names = args.ignore if args.ignore else None
projects = discover_contexts(
search_paths=search_paths,
max_depth=args.max_depth,
ignore_names=ignore_names,
config=config,
)
if not projects:
print("(no contexts)")
return 0
manager = AFSManager(config=config)
for project in projects:
if args.dry_run:
print(f"would ensure: {project.project_name}\t{project.path}")
continue
context = manager.ensure(
path=project.path.parent,
context_root=project.path,
)
print(f"ensured: {context.project_name}\t{context.path}")
return 0
def _graph_export_command(args: argparse.Namespace) -> int:
config_path = Path(args.config) if args.config else None
config = load_config_model(config_path=config_path, merge_user=True)
search_paths = None
if args.path:
search_paths = [Path(path).expanduser() for path in args.path]
ignore_names = args.ignore if args.ignore else None
graph = build_graph(
search_paths=search_paths,
max_depth=args.max_depth,
ignore_names=ignore_names,
config=config,
)
output_path = (
Path(args.output).expanduser().resolve()
if args.output
else default_graph_path(config)
)
write_graph(graph, output_path)
print(f"graph: {output_path}")
return 0
def _workspace_registry_path() -> Path:
config = load_config_model()
return config.general.context_root / "workspaces.toml"
@@ -302,6 +526,156 @@ def build_parser() -> argparse.ArgumentParser:
status_parser.add_argument("--start-dir", help="Directory to search from.")
status_parser.set_defaults(func=_status_command)
context_parser = subparsers.add_parser(
"context", help="Manage per-project .context directories."
)
context_sub = context_parser.add_subparsers(dest="context_command")
ctx_init = context_sub.add_parser("init", help="Initialize a project context.")
ctx_init.add_argument("--path", help="Project path (default: cwd).")
ctx_init.add_argument("--context-root", help="Context root path override.")
ctx_init.add_argument("--context-dir", help="Context directory name.")
ctx_init.add_argument(
"--link-context",
action="store_true",
help="Link project context to the specified context root.",
)
ctx_init.add_argument("--force", action="store_true", help="Overwrite existing context.")
ctx_init.add_argument("--config", help="Config path for directory policies.")
ctx_init.set_defaults(func=_context_init_command)
ctx_ensure = context_sub.add_parser("ensure", help="Ensure a project context exists.")
ctx_ensure.add_argument("--path", help="Project path (default: cwd).")
ctx_ensure.add_argument("--context-root", help="Context root path override.")
ctx_ensure.add_argument("--context-dir", help="Context directory name.")
ctx_ensure.add_argument(
"--link-context",
action="store_true",
help="Link project context to the specified context root.",
)
ctx_ensure.add_argument("--config", help="Config path for directory policies.")
ctx_ensure.set_defaults(func=_context_ensure_command)
ctx_list = context_sub.add_parser("list", help="List mounts for a project context.")
ctx_list.add_argument("--path", help="Project path (default: cwd).")
ctx_list.add_argument("--context-root", help="Context root path override.")
ctx_list.add_argument("--context-dir", help="Context directory name.")
ctx_list.add_argument("--config", help="Config path for directory policies.")
ctx_list.set_defaults(func=_context_list_command)
ctx_mount = context_sub.add_parser("mount", help="Mount a resource into a context.")
ctx_mount.add_argument("source", help="Source path to mount.")
ctx_mount.add_argument(
"--mount-type",
required=True,
choices=[m.value for m in MountType],
help="Target mount type.",
)
ctx_mount.add_argument("--alias", help="Alias for the mount point.")
ctx_mount.add_argument("--path", help="Project path (default: cwd).")
ctx_mount.add_argument("--context-root", help="Context root path override.")
ctx_mount.add_argument("--context-dir", help="Context directory name.")
ctx_mount.add_argument("--config", help="Config path for directory policies.")
ctx_mount.set_defaults(func=_context_mount_command)
ctx_unmount = context_sub.add_parser("unmount", help="Remove a mounted resource.")
ctx_unmount.add_argument("alias", help="Alias of the mount point to remove.")
ctx_unmount.add_argument(
"--mount-type",
required=True,
choices=[m.value for m in MountType],
help="Mount type containing the alias.",
)
ctx_unmount.add_argument("--path", help="Project path (default: cwd).")
ctx_unmount.add_argument("--context-root", help="Context root path override.")
ctx_unmount.add_argument("--context-dir", help="Context directory name.")
ctx_unmount.add_argument("--config", help="Config path for directory policies.")
ctx_unmount.set_defaults(func=_context_unmount_command)
ctx_validate = context_sub.add_parser("validate", help="Validate context structure.")
ctx_validate.add_argument("--path", help="Project path (default: cwd).")
ctx_validate.add_argument("--context-root", help="Context root path override.")
ctx_validate.add_argument("--context-dir", help="Context directory name.")
ctx_validate.add_argument("--config", help="Config path for directory policies.")
ctx_validate.set_defaults(func=_context_validate_command)
ctx_discover = context_sub.add_parser(
"discover", help="Discover .context directories."
)
ctx_discover.add_argument(
"--path",
action="append",
help="Search root path (repeatable). Defaults to workspace directories.",
)
ctx_discover.add_argument(
"--max-depth",
type=int,
default=3,
help="Maximum directory depth to scan.",
)
ctx_discover.add_argument(
"--ignore",
action="append",
help="Directory name to ignore (repeatable).",
)
ctx_discover.add_argument("--stats", action="store_true", help="Print summary stats.")
ctx_discover.add_argument("--config", help="Config path for directory policies.")
ctx_discover.set_defaults(func=_context_discover_command)
ctx_ensure_all = context_sub.add_parser(
"ensure-all", help="Ensure all discovered contexts exist."
)
ctx_ensure_all.add_argument(
"--path",
action="append",
help="Search root path (repeatable). Defaults to workspace directories.",
)
ctx_ensure_all.add_argument(
"--max-depth",
type=int,
default=3,
help="Maximum directory depth to scan.",
)
ctx_ensure_all.add_argument(
"--ignore",
action="append",
help="Directory name to ignore (repeatable).",
)
ctx_ensure_all.add_argument(
"--dry-run",
action="store_true",
help="List contexts without writing.",
)
ctx_ensure_all.add_argument("--config", help="Config path for directory policies.")
ctx_ensure_all.set_defaults(func=_context_ensure_all_command)
graph_parser = subparsers.add_parser("graph", help="Export AFS graph data.")
graph_sub = graph_parser.add_subparsers(dest="graph_command")
graph_export = graph_sub.add_parser("export", help="Export graph JSON.")
graph_export.add_argument(
"--path",
action="append",
help="Search root path (repeatable). Defaults to workspace directories.",
)
graph_export.add_argument(
"--max-depth",
type=int,
default=3,
help="Maximum directory depth to scan.",
)
graph_export.add_argument(
"--ignore",
action="append",
help="Directory name to ignore (repeatable).",
)
graph_export.add_argument(
"--output",
help="Output path for graph JSON (default: context_root/index/afs_graph.json).",
)
graph_export.add_argument("--config", help="Config path for directory policies.")
graph_export.set_defaults(func=_graph_export_command)
workspace_parser = subparsers.add_parser("workspace", help="Manage workspace links.")
workspace_sub = workspace_parser.add_subparsers(dest="workspace_command")
@@ -333,6 +707,12 @@ def main(argv: Iterable[str] | None = None) -> int:
if args.command == "workspace" and not getattr(args, "workspace_command", None):
parser.print_help()
return 1
if args.command == "context" and not getattr(args, "context_command", None):
parser.print_help()
return 1
if args.command == "graph" and not getattr(args, "graph_command", None):
parser.print_help()
return 1
return args.func(args)

126
src/afs/discovery.py Normal file
View File

@@ -0,0 +1,126 @@
"""AFS discovery helpers for locating .context roots."""
from __future__ import annotations
from pathlib import Path
from typing import Iterable, Iterator
from .config import load_config_model
from .manager import AFSManager
from .models import ContextRoot
from .schema import AFSConfig
def discover_contexts(
search_paths: Iterable[Path] | None = None,
*,
max_depth: int = 3,
ignore_names: Iterable[str] | None = None,
config: AFSConfig | None = None,
) -> list[ContextRoot]:
config = config or load_config_model()
manager = AFSManager(config=config)
roots = _resolve_search_paths(search_paths, config)
ignore_set = _normalize_ignore_names(ignore_names, config)
contexts: list[ContextRoot] = []
seen: set[Path] = set()
for root in roots:
if root.name.lower() in ignore_set:
continue
for context_path in _find_context_dirs(root, max_depth, ignore_set):
resolved = context_path.resolve()
if resolved in seen:
continue
seen.add(resolved)
try:
contexts.append(manager.list_context(context_path=resolved))
except Exception:
continue
contexts.sort(key=lambda item: item.project_name.lower())
return contexts
def get_project_stats(projects: list[ContextRoot]) -> dict[str, int]:
total_mounts = 0
mounts_by_type: dict[str, int] = {}
for project in projects:
for mount_type, mount_list in project.mounts.items():
total_mounts += len(mount_list)
mounts_by_type[mount_type.value] = (
mounts_by_type.get(mount_type.value, 0) + len(mount_list)
)
return {
"total_projects": len(projects),
"total_mounts": total_mounts,
**mounts_by_type,
}
def _resolve_search_paths(
search_paths: Iterable[Path] | None,
config: AFSConfig,
) -> list[Path]:
resolved: list[Path] = []
seen: set[Path] = set()
def _add_path(path: Path) -> None:
try:
resolved_path = path.expanduser().resolve()
except OSError:
return
if resolved_path in seen or not resolved_path.exists():
return
seen.add(resolved_path)
resolved.append(resolved_path)
if search_paths:
for entry in search_paths:
_add_path(entry)
return resolved
for workspace in config.general.workspace_directories:
_add_path(workspace.path)
if config.general.agent_workspaces_dir:
_add_path(config.general.agent_workspaces_dir)
return resolved
def _find_context_dirs(
root: Path,
max_depth: int,
ignore_names: set[str],
current_depth: int = 0,
) -> Iterator[Path]:
if current_depth > max_depth:
return
try:
for entry in root.iterdir():
if entry.name.lower() in ignore_names:
continue
if entry.name == ".context" and entry.is_dir():
yield entry
elif entry.is_dir() and not entry.name.startswith("."):
yield from _find_context_dirs(
entry, max_depth, ignore_names, current_depth + 1
)
except OSError:
return
def _normalize_ignore_names(
ignore_names: Iterable[str] | None, config: AFSConfig
) -> set[str]:
names: list[str] = []
if config.general.discovery_ignore:
names.extend(config.general.discovery_ignore)
if ignore_names:
names.extend(ignore_names)
return {name.strip().lower() for name in names if name and name.strip()}

169
src/afs/graph.py Normal file
View File

@@ -0,0 +1,169 @@
"""Build a graph export for AFS contexts."""
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Iterable
from .config import load_config_model
from .discovery import discover_contexts
from .mapping import resolve_directory_name
from .models import ContextRoot, MountType
from .schema import AFSConfig
from . import __version__
def build_graph(
search_paths: Iterable[Path] | None = None,
*,
max_depth: int = 3,
ignore_names: Iterable[str] | None = None,
config: AFSConfig | None = None,
) -> dict[str, object]:
config = config or load_config_model()
contexts = discover_contexts(
search_paths=search_paths,
max_depth=max_depth,
ignore_names=ignore_names,
config=config,
)
nodes: list[dict[str, object]] = []
edges: list[dict[str, str]] = []
contexts_payload: list[dict[str, object]] = []
mounts_summary: dict[str, int] = {}
for context in contexts:
ctx_id = _context_id(context)
nodes.append(
{
"id": ctx_id,
"type": "context",
"label": context.project_name,
"path": str(context.path),
}
)
dir_ids: dict[str, str] = {}
for mount_type in MountType:
dir_name = resolve_directory_name(
mount_type,
afs_directories=config.directories,
metadata=context.metadata,
)
dir_id = _dir_id(context, mount_type)
dir_ids[mount_type.value] = dir_id
nodes.append(
{
"id": dir_id,
"type": "mount_dir",
"label": dir_name,
"mount_type": mount_type.value,
"path": str(context.path / dir_name),
}
)
edges.append({"from": ctx_id, "to": dir_id, "kind": "contains"})
mounts_payload: list[dict[str, object]] = []
for mount_type, mounts in context.mounts.items():
dir_name = resolve_directory_name(
mount_type,
afs_directories=config.directories,
metadata=context.metadata,
)
for mount in mounts:
mount_id = _mount_id(context, mount_type, mount.name)
mount_path = context.path / dir_name / mount.name
nodes.append(
{
"id": mount_id,
"type": "mount",
"label": mount.name,
"mount_type": mount_type.value,
"path": str(mount_path),
"source": str(mount.source),
"is_symlink": mount.is_symlink,
}
)
edges.append(
{
"from": dir_ids.get(mount_type.value, ctx_id),
"to": mount_id,
"kind": "contains",
}
)
mounts_payload.append(
{
"id": mount_id,
"name": mount.name,
"mount_type": mount_type.value,
"path": str(mount_path),
"source": str(mount.source),
"is_symlink": mount.is_symlink,
}
)
mounts_summary[mount_type.value] = (
mounts_summary.get(mount_type.value, 0) + 1
)
contexts_payload.append(
{
"id": ctx_id,
"name": context.project_name,
"path": str(context.path),
"metadata": context.metadata.to_dict(),
"mounts": mounts_payload,
}
)
summary = {
"total_contexts": len(contexts),
"total_mounts": sum(mounts_summary.values()),
"mounts_by_type": mounts_summary,
}
return {
"meta": {
"generated_at": datetime.now().isoformat(),
"afs_version": __version__,
"context_root": str(config.general.context_root),
"max_depth": max_depth,
"ignore": list(ignore_names or config.general.discovery_ignore),
},
"workspaces": [
{
"path": str(ws.path),
"description": ws.description,
}
for ws in config.general.workspace_directories
],
"contexts": contexts_payload,
"nodes": nodes,
"edges": edges,
"summary": summary,
}
def write_graph(graph: dict[str, object], output_path: Path) -> Path:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(graph, indent=2) + "\n", encoding="utf-8")
return output_path
def default_graph_path(config: AFSConfig | None = None) -> Path:
config = config or load_config_model()
return config.general.context_root / "index" / "afs_graph.json"
def _context_id(context: ContextRoot) -> str:
return f"ctx:{context.path}"
def _dir_id(context: ContextRoot, mount_type: MountType) -> str:
return f"dir:{context.path}:{mount_type.value}"
def _mount_id(context: ContextRoot, mount_type: MountType, name: str) -> str:
return f"mount:{context.path}:{mount_type.value}:{name}"

371
src/afs/manager.py Normal file
View File

@@ -0,0 +1,371 @@
"""AFS manager for .context directories."""
from __future__ import annotations
import json
import shutil
from datetime import datetime
from pathlib import Path
from typing import Optional
from .config import load_config_model
from .mapping import resolve_directory_map, resolve_directory_name
from .models import ContextRoot, MountPoint, MountType, ProjectMetadata
from .schema import AFSConfig, DirectoryConfig
class AFSManager:
"""Manage AFS context roots for projects."""
CONTEXT_DIR_DEFAULT = ".context"
METADATA_FILE = "metadata.json"
STATE_FILE = "state.md"
DEFERRED_FILE = "deferred.md"
METACOGNITION_FILE = "metacognition.json"
GOALS_FILE = "goals.json"
EMOTIONS_FILE = "emotions.json"
EPISTEMIC_FILE = "epistemic.json"
DEFAULT_STATE_TEMPLATE = "# Agent State\n\n"
DEFAULT_DEFERRED_TEMPLATE = "# Deferred\n\n"
def __init__(
self,
config: AFSConfig | None = None,
directories: list[DirectoryConfig] | None = None,
) -> None:
self.config = config or load_config_model()
self._directories = directories or list(self.config.directories)
self._directory_map = resolve_directory_map(afs_directories=self._directories)
def resolve_context_path(
self,
project_path: Path,
context_root: Path | None = None,
context_dir: str | None = None,
) -> Path:
if context_root:
return context_root.expanduser().resolve()
context_dir = context_dir or self.CONTEXT_DIR_DEFAULT
return project_path.resolve() / context_dir
def ensure(
self,
path: Path = Path("."),
*,
context_root: Path | None = None,
context_dir: str | None = None,
link_context: bool = False,
) -> ContextRoot:
project_path = path.resolve()
context_path = self.resolve_context_path(
project_path,
context_root=context_root,
context_dir=context_dir,
)
self._ensure_context_dirs(context_path)
metadata = self._ensure_metadata(context_path, project_path)
self._ensure_cognitive_scaffold(context_path)
if link_context and context_root:
link_path = project_path / (context_dir or self.CONTEXT_DIR_DEFAULT)
self._ensure_link(link_path, context_path, force=False)
return self.list_context(context_path=context_path, metadata=metadata)
def init(
self,
path: Path = Path("."),
*,
context_root: Path | None = None,
context_dir: str | None = None,
link_context: bool = False,
force: bool = False,
) -> ContextRoot:
project_path = path.resolve()
context_path = self.resolve_context_path(
project_path,
context_root=context_root,
context_dir=context_dir,
)
if link_context and context_root:
link_path = project_path / (context_dir or self.CONTEXT_DIR_DEFAULT)
self._ensure_context_dirs(context_path)
metadata = self._ensure_metadata(context_path, project_path)
self._ensure_cognitive_scaffold(context_path)
self._ensure_link(link_path, context_path, force=force)
return self.list_context(context_path=context_path, metadata=metadata)
if context_path.exists():
if not force:
raise FileExistsError(f"AFS already exists at {context_path}")
self._remove_context_path(context_path)
self._ensure_context_dirs(context_path)
metadata = self._ensure_metadata(context_path, project_path)
self._ensure_cognitive_scaffold(context_path)
return self.list_context(context_path=context_path, metadata=metadata)
def mount(
self,
source: Path,
mount_type: MountType,
alias: Optional[str] = None,
context_path: Optional[Path] = None,
) -> MountPoint:
if context_path is None:
context_path = Path(".") / self.CONTEXT_DIR_DEFAULT
source = source.expanduser().resolve()
if not source.exists():
raise FileNotFoundError(f"Source {source} does not exist")
context_path = context_path.resolve()
if not context_path.exists():
raise FileNotFoundError(f"No AFS context at {context_path}")
metadata = self._load_metadata(context_path)
directory_name = resolve_directory_name(
mount_type,
afs_directories=self._directories,
metadata=metadata,
)
alias = alias or source.name
destination = context_path / directory_name / alias
if destination.exists():
raise FileExistsError(
f"Mount point '{alias}' already exists in {mount_type.value}"
)
destination.symlink_to(source)
return MountPoint(
name=alias,
source=source,
mount_type=mount_type,
is_symlink=True,
)
def unmount(
self,
alias: str,
mount_type: MountType,
context_path: Optional[Path] = None,
) -> bool:
if context_path is None:
context_path = Path(".") / self.CONTEXT_DIR_DEFAULT
metadata = self._load_metadata(context_path)
directory_name = resolve_directory_name(
mount_type,
afs_directories=self._directories,
metadata=metadata,
)
mount_path = context_path / directory_name / alias
if mount_path.exists() or mount_path.is_symlink():
mount_path.unlink()
return True
return False
def list_context(
self,
context_path: Optional[Path] = None,
metadata: ProjectMetadata | None = None,
) -> ContextRoot:
if context_path is None:
context_path = Path(".") / self.CONTEXT_DIR_DEFAULT
context_path = context_path.resolve()
if not context_path.exists():
raise FileNotFoundError("No AFS initialized")
if metadata is None:
metadata = self._load_metadata(context_path)
if metadata is None:
metadata = ProjectMetadata()
mounts: dict[MountType, list[MountPoint]] = {}
directory_map = resolve_directory_map(
afs_directories=self._directories,
metadata=metadata,
)
for mount_type in MountType:
subdir = context_path / directory_map.get(mount_type, mount_type.value)
if not subdir.exists():
continue
mount_list: list[MountPoint] = []
for item in subdir.iterdir():
if item.name in {".keep", self.METADATA_FILE}:
continue
source = item.resolve() if item.is_symlink() else item
mount_list.append(
MountPoint(
name=item.name,
source=source,
mount_type=mount_type,
is_symlink=item.is_symlink(),
)
)
mounts[mount_type] = mount_list
return ContextRoot(
path=context_path,
project_name=context_path.parent.name,
metadata=metadata,
mounts=mounts,
)
def clean(self, context_path: Optional[Path] = None) -> None:
if context_path is None:
context_path = Path(".") / self.CONTEXT_DIR_DEFAULT
if context_path.exists():
self._remove_context_path(context_path)
def update_metadata(
self,
context_path: Optional[Path] = None,
*,
description: Optional[str] = None,
agents: Optional[list[str]] = None,
) -> ProjectMetadata:
if context_path is None:
context_path = Path(".") / self.CONTEXT_DIR_DEFAULT
metadata_path = context_path / self.METADATA_FILE
if not metadata_path.exists():
raise FileNotFoundError("No AFS initialized")
metadata = self._load_metadata(context_path) or ProjectMetadata()
if description is not None:
metadata.description = description
if agents is not None:
metadata.agents = agents
self._write_metadata(metadata_path, metadata)
return metadata
def _ensure_context_dirs(self, context_path: Path) -> None:
context_path.mkdir(parents=True, exist_ok=True)
for dir_config in self._directories:
subdir = context_path / dir_config.name
subdir.mkdir(parents=True, exist_ok=True)
keep = subdir / ".keep"
if not keep.exists():
keep.touch()
def _ensure_metadata(self, context_path: Path, project_path: Path) -> ProjectMetadata:
metadata_path = context_path / self.METADATA_FILE
directory_map = {
mount_type.value: name for mount_type, name in self._directory_map.items()
}
if not metadata_path.exists():
metadata = ProjectMetadata(
created_at=datetime.now().isoformat(),
description=f"AFS for {project_path.name}",
directories=directory_map,
)
self._write_metadata(metadata_path, metadata)
return metadata
metadata = self._load_metadata(context_path)
if metadata is None:
metadata = ProjectMetadata(
created_at=datetime.now().isoformat(),
description=f"AFS for {project_path.name}",
directories=directory_map,
)
self._write_metadata(metadata_path, metadata)
return metadata
if not metadata.directories and directory_map:
metadata.directories = directory_map
self._write_metadata(metadata_path, metadata)
return metadata
def _ensure_cognitive_scaffold(self, context_path: Path) -> None:
if not self.config.cognitive.enabled:
return
metadata = self._load_metadata(context_path)
scratchpad_dir = context_path / resolve_directory_name(
MountType.SCRATCHPAD,
afs_directories=self._directories,
metadata=metadata,
)
memory_dir = context_path / resolve_directory_name(
MountType.MEMORY,
afs_directories=self._directories,
metadata=metadata,
)
scratchpad_dir.mkdir(parents=True, exist_ok=True)
memory_dir.mkdir(parents=True, exist_ok=True)
state_file = scratchpad_dir / self.STATE_FILE
if not state_file.exists():
state_file.write_text(self.DEFAULT_STATE_TEMPLATE, encoding="utf-8")
deferred_file = scratchpad_dir / self.DEFERRED_FILE
if not deferred_file.exists():
deferred_file.write_text(self.DEFAULT_DEFERRED_TEMPLATE, encoding="utf-8")
if self.config.cognitive.record_metacognition:
meta_file = scratchpad_dir / self.METACOGNITION_FILE
if not meta_file.exists():
meta_file.write_text("{}\n", encoding="utf-8")
if self.config.cognitive.record_goals:
goals_file = scratchpad_dir / self.GOALS_FILE
if not goals_file.exists():
goals_file.write_text("[]\n", encoding="utf-8")
if self.config.cognitive.record_emotions:
emotions_file = scratchpad_dir / self.EMOTIONS_FILE
if not emotions_file.exists():
emotions_file.write_text("[]\n", encoding="utf-8")
if self.config.cognitive.record_epistemic:
epistemic_file = scratchpad_dir / self.EPISTEMIC_FILE
if not epistemic_file.exists():
epistemic_file.write_text("{}\n", encoding="utf-8")
def _ensure_link(self, link_path: Path, target: Path, force: bool) -> None:
if link_path.is_symlink():
if link_path.resolve() == target.resolve():
return
if not force:
raise FileExistsError(f"Context link already exists at {link_path}")
link_path.unlink()
elif link_path.exists():
if not force:
raise FileExistsError(f"Context path already exists at {link_path}")
self._remove_context_path(link_path)
link_path.symlink_to(target)
def _remove_context_path(self, context_path: Path) -> None:
if context_path.is_symlink():
context_path.unlink()
elif context_path.exists():
shutil.rmtree(context_path)
def _load_metadata(self, context_path: Path) -> ProjectMetadata | None:
metadata_path = context_path / self.METADATA_FILE
if not metadata_path.exists():
return None
try:
payload = json.loads(metadata_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return None
return ProjectMetadata.from_dict(payload)
def _write_metadata(self, path: Path, metadata: ProjectMetadata) -> None:
path.write_text(
json.dumps(metadata.to_dict(), indent=2, default=str) + "\n",
encoding="utf-8",
)

67
src/afs/mapping.py Normal file
View File

@@ -0,0 +1,67 @@
"""Helpers for mapping AFS roles to on-disk directory names."""
from __future__ import annotations
from typing import Iterable
from .models import MountType, ProjectMetadata
from .schema import DirectoryConfig
def _role_to_mount_type(role_name: str) -> MountType | None:
try:
return MountType(role_name)
except ValueError:
return None
def build_directory_map_from_config(
afs_directories: Iterable[DirectoryConfig] | None,
) -> dict[MountType, str]:
mapping: dict[MountType, str] = {}
if not afs_directories:
return mapping
for dir_config in afs_directories:
role_name = dir_config.role.value if dir_config.role else dir_config.name
mount_type = _role_to_mount_type(role_name)
if not mount_type:
continue
mapping[mount_type] = dir_config.name
return mapping
def build_directory_map_from_metadata(
metadata: ProjectMetadata | None,
) -> dict[MountType, str]:
mapping: dict[MountType, str] = {}
if not metadata or not metadata.directories:
return mapping
for role_name, dir_name in metadata.directories.items():
mount_type = _role_to_mount_type(role_name)
if not mount_type:
continue
mapping[mount_type] = dir_name
return mapping
def resolve_directory_map(
*,
afs_directories: Iterable[DirectoryConfig] | None = None,
metadata: ProjectMetadata | None = None,
) -> dict[MountType, str]:
mapping = build_directory_map_from_metadata(metadata)
if not mapping:
mapping = build_directory_map_from_config(afs_directories)
return mapping
def resolve_directory_name(
mount_type: MountType,
*,
afs_directories: Iterable[DirectoryConfig] | None = None,
metadata: ProjectMetadata | None = None,
) -> str:
mapping = resolve_directory_map(afs_directories=afs_directories, metadata=metadata)
return mapping.get(mount_type, mount_type.value)

97
src/afs/models.py Normal file
View File

@@ -0,0 +1,97 @@
"""Core AFS data models."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any
class MountType(str, Enum):
"""Supported AFS directory roles."""
MEMORY = "memory"
KNOWLEDGE = "knowledge"
TOOLS = "tools"
SCRATCHPAD = "scratchpad"
HISTORY = "history"
HIVEMIND = "hivemind"
GLOBAL = "global"
ITEMS = "items"
@dataclass(frozen=True)
class MountPoint:
"""A mounted resource inside an AFS directory."""
name: str
source: Path
mount_type: MountType
is_symlink: bool = True
@dataclass
class ProjectMetadata:
"""Metadata for an AFS context root."""
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
description: str = ""
agents: list[str] = field(default_factory=list)
directories: dict[str, str] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: dict[str, Any] | None) -> "ProjectMetadata":
data = data or {}
created_at = data.get("created_at")
if isinstance(created_at, datetime):
created_at = created_at.isoformat()
if not isinstance(created_at, str):
created_at = datetime.now().isoformat()
description = data.get("description") if isinstance(data.get("description"), str) else ""
agents = [agent for agent in data.get("agents", []) if isinstance(agent, str)]
directories: dict[str, str] = {}
raw_dirs = data.get("directories")
if isinstance(raw_dirs, dict):
for key, value in raw_dirs.items():
directories[str(key)] = str(value)
return cls(
created_at=created_at,
description=description,
agents=agents,
directories=directories,
)
def to_dict(self) -> dict[str, Any]:
return {
"created_at": self.created_at,
"description": self.description,
"agents": list(self.agents),
"directories": dict(self.directories),
}
@dataclass
class ContextRoot:
"""An AFS .context directory."""
path: Path
project_name: str
metadata: ProjectMetadata = field(default_factory=ProjectMetadata)
mounts: dict[MountType, list[MountPoint]] = field(default_factory=dict)
@property
def is_valid(self) -> bool:
required = [mount_type.value for mount_type in MountType]
directory_map = self.metadata.directories if self.metadata else {}
return all(
(self.path / directory_map.get(role, role)).exists() for role in required
)
@property
def total_mounts(self) -> int:
return sum(len(mounts) for mounts in self.mounts.values())
def get_mounts(self, mount_type: MountType) -> list[MountPoint]:
return self.mounts.get(mount_type, [])

66
src/afs/policy.py Normal file
View File

@@ -0,0 +1,66 @@
"""AFS permission policy enforcement."""
from __future__ import annotations
from .models import MountType
from .schema import DirectoryConfig, PolicyType
class PolicyEnforcer:
"""Enforces AFS directory policies."""
def __init__(self, directories: list[DirectoryConfig]):
self._policies: dict[MountType, PolicyType] = {}
for directory in directories:
role_name = directory.role.value if directory.role else directory.name
try:
mount_type = MountType(role_name)
except ValueError:
continue
self._policies[mount_type] = directory.policy
def get_policy(self, mount_type: MountType) -> PolicyType:
return self._policies.get(mount_type, PolicyType.READ_ONLY)
def can_read(self, mount_type: MountType) -> bool:
return True
def can_write(self, mount_type: MountType) -> bool:
policy = self.get_policy(mount_type)
return policy in (PolicyType.WRITABLE, PolicyType.EXECUTABLE)
def can_execute(self, mount_type: MountType) -> bool:
return self.get_policy(mount_type) == PolicyType.EXECUTABLE
def validate_operation(self, mount_type: MountType, operation: str) -> tuple[bool, str]:
policy = self.get_policy(mount_type)
if operation == "read":
return (True, "")
if operation == "write":
if policy in (PolicyType.WRITABLE, PolicyType.EXECUTABLE):
return (True, "")
return (
False,
f"{mount_type.value} is {policy.value}, writing not allowed",
)
if operation == "execute":
if policy == PolicyType.EXECUTABLE:
return (True, "")
return (
False,
f"{mount_type.value} is {policy.value}, execution not allowed",
)
return (False, f"Unknown operation: {operation}")
def get_policy_description(self, mount_type: MountType) -> str:
policy = self.get_policy(mount_type)
descriptions = {
PolicyType.READ_ONLY: "Read-only (no modifications allowed)",
PolicyType.WRITABLE: "Writable (modifications allowed)",
PolicyType.EXECUTABLE: "Executable (can run scripts/binaries)",
}
return descriptions.get(policy, "Unknown policy")

View File

@@ -3,14 +3,100 @@
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any
from .models import MountType
def _as_path(value: str | Path) -> Path:
return value if isinstance(value, Path) else Path(value).expanduser().resolve()
def default_discovery_ignore() -> list[str]:
return ["legacy", "archive", "archives"]
class PolicyType(str, Enum):
READ_ONLY = "read_only"
WRITABLE = "writable"
EXECUTABLE = "executable"
@dataclass
class DirectoryConfig:
name: str
policy: PolicyType
description: str = ""
role: MountType | None = None
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "DirectoryConfig":
name = str(data.get("name", "")).strip()
role_raw = data.get("role")
role = None
if isinstance(role_raw, str):
try:
role = MountType(role_raw)
except ValueError:
role = None
if not name and role:
name = role.value
policy_raw = data.get("policy", PolicyType.READ_ONLY.value)
try:
policy = PolicyType(policy_raw)
except ValueError:
policy = PolicyType.READ_ONLY
description = data.get("description") if isinstance(data.get("description"), str) else ""
return cls(name=name, policy=policy, description=description, role=role)
def default_directory_configs() -> list[DirectoryConfig]:
return [
DirectoryConfig(
name="memory",
policy=PolicyType.READ_ONLY,
role=MountType.MEMORY,
),
DirectoryConfig(
name="knowledge",
policy=PolicyType.READ_ONLY,
role=MountType.KNOWLEDGE,
),
DirectoryConfig(
name="tools",
policy=PolicyType.EXECUTABLE,
role=MountType.TOOLS,
),
DirectoryConfig(
name="scratchpad",
policy=PolicyType.WRITABLE,
role=MountType.SCRATCHPAD,
),
DirectoryConfig(
name="history",
policy=PolicyType.READ_ONLY,
role=MountType.HISTORY,
),
DirectoryConfig(
name="hivemind",
policy=PolicyType.WRITABLE,
role=MountType.HIVEMIND,
),
DirectoryConfig(
name="global",
policy=PolicyType.WRITABLE,
role=MountType.GLOBAL,
),
DirectoryConfig(
name="items",
policy=PolicyType.WRITABLE,
role=MountType.ITEMS,
),
]
@dataclass
class WorkspaceDirectory:
path: Path
@@ -31,6 +117,7 @@ class GeneralConfig:
)
python_executable: Path | None = None
workspace_directories: list[WorkspaceDirectory] = field(default_factory=list)
discovery_ignore: list[str] = field(default_factory=default_discovery_ignore)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "GeneralConfig":
@@ -42,6 +129,11 @@ class GeneralConfig:
for item in data.get("workspace_directories", [])
if isinstance(item, dict)
]
raw_ignore = data.get("discovery_ignore")
if isinstance(raw_ignore, list):
discovery_ignore = [item for item in raw_ignore if isinstance(item, str)]
else:
discovery_ignore = default_discovery_ignore()
return cls(
context_root=_as_path(context_root)
if context_root
@@ -53,6 +145,7 @@ class GeneralConfig:
if python_executable
else None,
workspace_directories=workspace_directories,
discovery_ignore=discovery_ignore,
)
@@ -112,6 +205,7 @@ class CognitiveConfig:
class AFSConfig:
general: GeneralConfig = field(default_factory=GeneralConfig)
plugins: PluginsConfig = field(default_factory=PluginsConfig)
directories: list[DirectoryConfig] = field(default_factory=default_directory_configs)
cognitive: CognitiveConfig = field(default_factory=CognitiveConfig)
@classmethod
@@ -119,5 +213,22 @@ class AFSConfig:
data = data or {}
general = GeneralConfig.from_dict(data.get("general", {}))
plugins = PluginsConfig.from_dict(data.get("plugins", {}))
directories = _parse_directory_config(data)
cognitive = CognitiveConfig.from_dict(data.get("cognitive", {}))
return cls(general=general, plugins=plugins, cognitive=cognitive)
return cls(
general=general,
plugins=plugins,
directories=directories,
cognitive=cognitive,
)
def _parse_directory_config(data: dict[str, Any]) -> list[DirectoryConfig]:
raw = data.get("directories")
if raw is None:
raw = data.get("afs_directories")
if raw is None:
return default_directory_configs()
if not isinstance(raw, list):
return default_directory_configs()
return [DirectoryConfig.from_dict(item) for item in raw if isinstance(item, dict)]

55
src/afs/validator.py Normal file
View File

@@ -0,0 +1,55 @@
"""AFS context validator."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from .mapping import resolve_directory_map
from .models import MountType, ProjectMetadata
from .schema import DirectoryConfig
class AFSValidator:
def __init__(
self,
context_root: Path,
afs_directories: list[DirectoryConfig] | None = None,
) -> None:
self.root = context_root
self._afs_directories = afs_directories
def check_integrity(self) -> dict[str, Any]:
if not self.root.exists():
return {
"valid": False,
"missing": [],
"errors": ["context root does not exist"],
}
metadata = _load_metadata(self.root)
directory_map = resolve_directory_map(
afs_directories=self._afs_directories,
metadata=metadata,
)
required_dirs = [directory_map.get(mt, mt.value) for mt in MountType]
status: dict[str, Any] = {"valid": True, "missing": [], "errors": []}
for directory in required_dirs:
if not (self.root / directory).is_dir():
status["valid"] = False
status["missing"].append(directory)
return status
def _load_metadata(context_root: Path) -> ProjectMetadata | None:
metadata_path = context_root / "metadata.json"
if not metadata_path.exists():
return None
try:
payload = json.loads(metadata_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return None
return ProjectMetadata.from_dict(payload)