diff --git a/src/afs/cli.py b/src/afs/cli.py index 4de250d..fc201ce 100644 --- a/src/afs/cli.py +++ b/src/afs/cli.py @@ -12,7 +12,9 @@ from .discovery import discover_contexts, get_project_stats from .graph import build_graph, default_graph_path, write_graph from .manager import AFSManager from .models import MountType +from .orchestration import Orchestrator, TaskRequest from .plugins import discover_plugins, load_plugins +from .services import ServiceManager from .schema import AFSConfig, GeneralConfig, WorkspaceDirectory from .validator import AFSValidator @@ -161,6 +163,40 @@ def _plugins_command(args: argparse.Namespace) -> int: return 0 +def _services_list_command(args: argparse.Namespace) -> int: + manager = ServiceManager() + for definition in manager.list_definitions(): + print(f"{definition.name}\t{definition.label}") + return 0 + + +def _services_render_command(args: argparse.Namespace) -> int: + manager = ServiceManager() + print(manager.render_unit(args.name)) + return 0 + + +def _orchestrator_list_command(args: argparse.Namespace) -> int: + orchestrator = Orchestrator() + for agent in orchestrator.list_agents(): + tags = ",".join(agent.tags) if agent.tags else "-" + print(f"{agent.name}\t{agent.role}\t{agent.backend}\t{tags}") + return 0 + + +def _orchestrator_plan_command(args: argparse.Namespace) -> int: + orchestrator = Orchestrator() + request = TaskRequest(summary=args.summary, tags=args.tag or [], role=args.role) + plan = orchestrator.plan(request) + if plan.notes: + for note in plan.notes: + print(f"note: {note}") + for agent in plan.agents: + tags = ",".join(agent.tags) if agent.tags else "-" + print(f"{agent.name}\t{agent.role}\t{agent.backend}\t{tags}") + return 0 + + def _status_command(args: argparse.Namespace) -> int: start_dir = Path(args.start_dir).expanduser().resolve() if args.start_dir else None root = find_root(start_dir) @@ -522,6 +558,28 @@ def build_parser() -> argparse.ArgumentParser: plugins_parser.add_argument("--load", action="store_true", help="Attempt to import plugins.") plugins_parser.set_defaults(func=_plugins_command) + services_parser = subparsers.add_parser("services", help="Service definitions.") + services_sub = services_parser.add_subparsers(dest="services_command") + + services_list = services_sub.add_parser("list", help="List service definitions.") + services_list.set_defaults(func=_services_list_command) + + services_render = services_sub.add_parser("render", help="Render service unit.") + services_render.add_argument("name", help="Service name.") + services_render.set_defaults(func=_services_render_command) + + orch_parser = subparsers.add_parser("orchestrator", help="Orchestrator helpers.") + orch_sub = orch_parser.add_subparsers(dest="orchestrator_command") + + orch_list = orch_sub.add_parser("list", help="List configured agents.") + orch_list.set_defaults(func=_orchestrator_list_command) + + orch_plan = orch_sub.add_parser("plan", help="Plan agent routing.") + orch_plan.add_argument("summary", help="Task summary.") + orch_plan.add_argument("--tag", action="append", help="Tag to match.") + orch_plan.add_argument("--role", help="Role to match.") + orch_plan.set_defaults(func=_orchestrator_plan_command) + status_parser = subparsers.add_parser("status", help="Show context root status.") status_parser.add_argument("--start-dir", help="Directory to search from.") status_parser.set_defaults(func=_status_command) @@ -713,6 +771,12 @@ def main(argv: Iterable[str] | None = None) -> int: if args.command == "graph" and not getattr(args, "graph_command", None): parser.print_help() return 1 + if args.command == "services" and not getattr(args, "services_command", None): + parser.print_help() + return 1 + if args.command == "orchestrator" and not getattr(args, "orchestrator_command", None): + parser.print_help() + return 1 return args.func(args) diff --git a/src/afs/orchestration.py b/src/afs/orchestration.py new file mode 100644 index 0000000..81d1984 --- /dev/null +++ b/src/afs/orchestration.py @@ -0,0 +1,126 @@ +"""Minimal orchestration helpers for routing tasks to agents.""" + +from __future__ import annotations + +import argparse +from dataclasses import dataclass, field +from typing import Iterable + +from .config import load_config_model +from .schema import AgentConfig, OrchestratorConfig + + +@dataclass +class TaskRequest: + summary: str + tags: list[str] = field(default_factory=list) + role: str | None = None + + +@dataclass +class OrchestrationPlan: + summary: str + agents: list[AgentConfig] + notes: list[str] = field(default_factory=list) + + +class Orchestrator: + def __init__(self, config: OrchestratorConfig | None = None) -> None: + self.config = config or load_config_model().orchestrator + + def list_agents(self) -> list[AgentConfig]: + return list(self.config.default_agents) + + def plan(self, request: TaskRequest) -> OrchestrationPlan: + if not self.config.enabled: + return OrchestrationPlan( + summary=request.summary, + agents=[], + notes=["orchestrator disabled"], + ) + + candidates = list(self.config.default_agents) + if request.role: + candidates = [a for a in candidates if a.role == request.role] + + if request.tags: + tagged = [ + agent + for agent in candidates + if set(request.tags) & set(agent.tags) + ] + if tagged: + candidates = tagged + + if not candidates: + return OrchestrationPlan( + summary=request.summary, + agents=[], + notes=["no matching agents"], + ) + + selected = candidates[: self.config.max_agents] + notes = [] + if len(candidates) > len(selected): + notes.append("truncated agent list to max_agents") + + return OrchestrationPlan( + summary=request.summary, + agents=selected, + notes=notes, + ) + + +def _list_command(args: argparse.Namespace) -> int: + orchestrator = Orchestrator() + agents = orchestrator.list_agents() + for agent in agents: + tags = ",".join(agent.tags) if agent.tags else "-" + print(f"{agent.name}\t{agent.role}\t{agent.backend}\t{tags}") + return 0 + + +def _plan_command(args: argparse.Namespace) -> int: + orchestrator = Orchestrator() + request = TaskRequest( + summary=args.summary, + tags=args.tag or [], + role=args.role, + ) + plan = orchestrator.plan(request) + if plan.notes: + for note in plan.notes: + print(f"note: {note}") + for agent in plan.agents: + tags = ",".join(agent.tags) if agent.tags else "-" + print(f"{agent.name}\t{agent.role}\t{agent.backend}\t{tags}") + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="afs-orchestrator") + sub = parser.add_subparsers(dest="command") + + list_cmd = sub.add_parser("list", help="List configured agents.") + list_cmd.set_defaults(func=_list_command) + + plan_cmd = sub.add_parser("plan", help="Plan a routing decision.") + plan_cmd.add_argument("summary", help="Task summary.") + plan_cmd.add_argument("--tag", action="append", help="Tag to match.") + plan_cmd.add_argument("--role", help="Role to match.") + plan_cmd.set_defaults(func=_plan_command) + + return parser + + +def main(argv: Iterable[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + if not getattr(args, "command", None): + parser.print_help() + return 1 + return args.func(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/afs/schema.py b/src/afs/schema.py index 5373e2d..4faf09d 100644 --- a/src/afs/schema.py +++ b/src/afs/schema.py @@ -182,6 +182,110 @@ class PluginsConfig: ) +@dataclass +class AgentConfig: + name: str + role: str = "general" + backend: str = "local" + description: str = "" + tags: list[str] = field(default_factory=list) + auto_start: bool = False + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "AgentConfig": + tags = data.get("tags", []) + if isinstance(tags, list): + tags = [tag for tag in tags if isinstance(tag, str)] + else: + tags = [] + return cls( + name=str(data.get("name", "")).strip(), + role=str(data.get("role", "general")).strip() or "general", + backend=str(data.get("backend", "local")).strip() or "local", + description=str(data.get("description", "")).strip(), + tags=tags, + auto_start=bool(data.get("auto_start", False)), + ) + + +@dataclass +class OrchestratorConfig: + enabled: bool = False + max_agents: int = 5 + default_agents: list[AgentConfig] = field(default_factory=list) + auto_routing: bool = True + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "OrchestratorConfig": + agents_raw = data.get("default_agents", []) + agents = [ + AgentConfig.from_dict(item) + for item in agents_raw + if isinstance(item, dict) + ] + max_agents = data.get("max_agents", cls().max_agents) + return cls( + enabled=bool(data.get("enabled", False)), + max_agents=int(max_agents) if isinstance(max_agents, int) else cls().max_agents, + default_agents=agents, + auto_routing=bool(data.get("auto_routing", True)), + ) + + +@dataclass +class ServiceConfig: + name: str + enabled: bool = True + auto_start: bool = False + command: list[str] = field(default_factory=list) + working_directory: Path | None = None + environment: dict[str, str] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "ServiceConfig": + command = data.get("command", []) + if isinstance(command, list): + command = [str(item) for item in command] + else: + command = [] + env = data.get("environment", {}) + if isinstance(env, dict): + environment = {str(key): str(value) for key, value in env.items()} + else: + environment = {} + working_directory = data.get("working_directory") + return cls( + name=str(data.get("name", "")).strip(), + enabled=bool(data.get("enabled", True)), + auto_start=bool(data.get("auto_start", False)), + command=command, + working_directory=_as_path(working_directory) + if working_directory + else None, + environment=environment, + ) + + +@dataclass +class ServicesConfig: + enabled: bool = False + services: dict[str, ServiceConfig] = field(default_factory=dict) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "ServicesConfig": + enabled = bool(data.get("enabled", False)) + raw_services = data.get("services", {}) + parsed: dict[str, ServiceConfig] = {} + if isinstance(raw_services, dict): + for name, payload in raw_services.items(): + if not isinstance(payload, dict): + continue + payload = dict(payload) + payload.setdefault("name", name) + parsed[name] = ServiceConfig.from_dict(payload) + return cls(enabled=enabled, services=parsed) + + @dataclass class CognitiveConfig: enabled: bool = False @@ -207,6 +311,8 @@ class AFSConfig: plugins: PluginsConfig = field(default_factory=PluginsConfig) directories: list[DirectoryConfig] = field(default_factory=default_directory_configs) cognitive: CognitiveConfig = field(default_factory=CognitiveConfig) + orchestrator: OrchestratorConfig = field(default_factory=OrchestratorConfig) + services: ServicesConfig = field(default_factory=ServicesConfig) @classmethod def from_dict(cls, data: dict[str, Any] | None) -> "AFSConfig": @@ -215,11 +321,15 @@ class AFSConfig: plugins = PluginsConfig.from_dict(data.get("plugins", {})) directories = _parse_directory_config(data) cognitive = CognitiveConfig.from_dict(data.get("cognitive", {})) + orchestrator = OrchestratorConfig.from_dict(data.get("orchestrator", {})) + services = ServicesConfig.from_dict(data.get("services", {})) return cls( general=general, plugins=plugins, directories=directories, cognitive=cognitive, + orchestrator=orchestrator, + services=services, ) diff --git a/src/afs/services/__init__.py b/src/afs/services/__init__.py new file mode 100644 index 0000000..2dceb6d --- /dev/null +++ b/src/afs/services/__init__.py @@ -0,0 +1,12 @@ +"""AFS service management primitives.""" + +from .manager import ServiceManager +from .models import ServiceDefinition, ServiceState, ServiceStatus, ServiceType + +__all__ = [ + "ServiceManager", + "ServiceDefinition", + "ServiceState", + "ServiceStatus", + "ServiceType", +] diff --git a/src/afs/services/manager.py b/src/afs/services/manager.py new file mode 100644 index 0000000..8111e1a --- /dev/null +++ b/src/afs/services/manager.py @@ -0,0 +1,198 @@ +"""Minimal service manager for AFS background tasks.""" + +from __future__ import annotations + +import json +import platform +import sys +from pathlib import Path +from typing import Iterable + +from ..config import load_config_model +from ..schema import AFSConfig, ServiceConfig +from .models import ServiceDefinition, ServiceState, ServiceStatus, ServiceType + + +class ServiceManager: + """Build and render service definitions without mutating the system.""" + + def __init__( + self, + config: AFSConfig | None = None, + *, + service_root: Path | None = None, + platform_name: str | None = None, + ) -> None: + self.config = config or load_config_model() + self.service_root = service_root or Path.home() / ".config" / "afs" / "services" + self.platform_name = platform_name or platform.system().lower() + + def list_definitions(self) -> list[ServiceDefinition]: + definitions = self._builtin_definitions() + merged = self._merge_config(definitions) + return sorted(merged.values(), key=lambda item: item.name) + + def get_definition(self, name: str) -> ServiceDefinition | None: + merged = self._merge_config(self._builtin_definitions()) + return merged.get(name) + + def render_unit(self, name: str) -> str: + definition = self.get_definition(name) + if not definition: + raise KeyError(f"Unknown service: {name}") + + if self.platform_name.startswith("darwin"): + payload = render_launchd_plist(definition) + return json.dumps(payload, indent=2) + return render_systemd_unit(definition) + + def status(self, name: str) -> ServiceStatus: + definition = self.get_definition(name) + if not definition: + return ServiceStatus(name=name, state=ServiceState.UNKNOWN, enabled=False) + return ServiceStatus(name=definition.name, state=ServiceState.UNKNOWN, enabled=False) + + def _merge_config( + self, definitions: dict[str, ServiceDefinition] + ) -> dict[str, ServiceDefinition]: + merged = dict(definitions) + for name, config in self.config.services.services.items(): + if not config.enabled: + merged.pop(name, None) + continue + base = merged.get(name) + if base: + merged[name] = _merge_definition(base, config) + elif config.command: + merged[name] = ServiceDefinition( + name=config.name, + label=config.name, + command=list(config.command), + working_directory=config.working_directory, + environment=dict(config.environment), + service_type=ServiceType.DAEMON, + keep_alive=True, + run_at_load=config.auto_start, + ) + return merged + + def _builtin_definitions(self) -> dict[str, ServiceDefinition]: + python = self._resolve_python_executable() + repo_root = self._find_repo_root() + environment = self._service_environment() + + return { + "orchestrator": ServiceDefinition( + name="orchestrator", + label="AFS Orchestrator", + description="Routing and coordination for local agents", + command=[python, "-m", "afs.orchestration", "--daemon"], + working_directory=repo_root, + environment=environment, + service_type=ServiceType.DAEMON, + keep_alive=True, + run_at_load=False, + ), + "context-discovery": ServiceDefinition( + name="context-discovery", + label="AFS Context Discovery", + description="Discover and index AFS contexts", + command=[python, "-m", "afs", "context", "discover"], + working_directory=repo_root, + environment=environment, + service_type=ServiceType.ONESHOT, + keep_alive=False, + run_at_load=False, + ), + "context-graph-export": ServiceDefinition( + name="context-graph-export", + label="AFS Context Graph Export", + description="Export AFS context graph JSON", + command=[python, "-m", "afs", "graph", "export"], + working_directory=repo_root, + environment=environment, + service_type=ServiceType.ONESHOT, + keep_alive=False, + run_at_load=False, + ), + } + + def _resolve_python_executable(self) -> str: + if self.config.general.python_executable: + return str(self.config.general.python_executable) + return sys.executable + + def _find_repo_root(self) -> Path | None: + for parent in Path(__file__).resolve().parents: + if (parent / "pyproject.toml").exists(): + return parent + return None + + def _service_environment(self) -> dict[str, str]: + env: dict[str, str] = {} + repo_root = self._find_repo_root() + if repo_root and (repo_root / "src").exists(): + env["PYTHONPATH"] = str(repo_root / "src") + user_config = Path.home() / ".config" / "afs" / "config.toml" + if user_config.exists(): + env["AFS_CONFIG_PATH"] = str(user_config) + env["AFS_PREFER_USER_CONFIG"] = "1" + return env + + +def _merge_definition( + base: ServiceDefinition, override: ServiceConfig +) -> ServiceDefinition: + command = list(override.command) if override.command else list(base.command) + environment = dict(base.environment) + environment.update(override.environment) + return ServiceDefinition( + name=base.name, + label=base.label, + description=base.description, + command=command, + working_directory=override.working_directory or base.working_directory, + environment=environment, + service_type=base.service_type, + keep_alive=base.keep_alive, + run_at_load=override.auto_start, + ) + + +def render_launchd_plist(definition: ServiceDefinition) -> dict[str, object]: + payload: dict[str, object] = { + "Label": f"afs.{definition.name}", + "ProgramArguments": list(definition.command), + "RunAtLoad": bool(definition.run_at_load), + "KeepAlive": bool(definition.keep_alive), + } + if definition.working_directory: + payload["WorkingDirectory"] = str(definition.working_directory) + if definition.environment: + payload["EnvironmentVariables"] = dict(definition.environment) + return payload + + +def render_systemd_unit(definition: ServiceDefinition) -> str: + lines = [ + "[Unit]", + f"Description={definition.label}", + "", + "[Service]", + f"ExecStart={' '.join(definition.command)}", + ] + if definition.working_directory: + lines.append(f"WorkingDirectory={definition.working_directory}") + if definition.environment: + for key, value in definition.environment.items(): + lines.append(f"Environment={key}={value}") + if definition.service_type == ServiceType.ONESHOT: + lines.append("Type=oneshot") + if definition.keep_alive: + lines.append("Restart=on-failure") + lines.extend([ + "", + "[Install]", + "WantedBy=default.target", + ]) + return "\n".join(lines) diff --git a/src/afs/services/models.py b/src/afs/services/models.py new file mode 100644 index 0000000..60b9fe0 --- /dev/null +++ b/src/afs/services/models.py @@ -0,0 +1,73 @@ +"""Data models for AFS service management.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import Any + + +class ServiceState(str, Enum): + STOPPED = "stopped" + RUNNING = "running" + FAILED = "failed" + STARTING = "starting" + STOPPING = "stopping" + UNKNOWN = "unknown" + + +class ServiceType(str, Enum): + DAEMON = "daemon" + ONESHOT = "oneshot" + + +@dataclass +class ServiceDefinition: + name: str + label: str + description: str = "" + command: list[str] = field(default_factory=list) + working_directory: Path | None = None + environment: dict[str, str] = field(default_factory=dict) + service_type: ServiceType = ServiceType.DAEMON + keep_alive: bool = True + run_at_load: bool = False + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "label": self.label, + "description": self.description, + "command": list(self.command), + "working_directory": str(self.working_directory) + if self.working_directory + else None, + "environment": dict(self.environment), + "service_type": self.service_type.value, + "keep_alive": self.keep_alive, + "run_at_load": self.run_at_load, + } + + +@dataclass +class ServiceStatus: + name: str + state: ServiceState + pid: int | None = None + enabled: bool = False + last_started: datetime | None = None + last_stopped: datetime | None = None + error_message: str | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "state": self.state.value, + "pid": self.pid, + "enabled": self.enabled, + "last_started": self.last_started.isoformat() if self.last_started else None, + "last_stopped": self.last_stopped.isoformat() if self.last_stopped else None, + "error_message": self.error_message, + } diff --git a/tests/test_orchestration.py b/tests/test_orchestration.py new file mode 100644 index 0000000..ae7014b --- /dev/null +++ b/tests/test_orchestration.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from afs.orchestration import Orchestrator, TaskRequest +from afs.schema import AgentConfig, OrchestratorConfig + + +def test_orchestrator_disabled_returns_note() -> None: + config = OrchestratorConfig(enabled=False) + orchestrator = Orchestrator(config=config) + plan = orchestrator.plan(TaskRequest(summary="Test")) + assert not plan.agents + assert "orchestrator disabled" in plan.notes + + +def test_orchestrator_matches_tags() -> None: + config = OrchestratorConfig( + enabled=True, + max_agents=2, + default_agents=[ + AgentConfig(name="planner", role="planner", tags=["plan"]), + AgentConfig(name="builder", role="coder", tags=["build"]), + ], + ) + orchestrator = Orchestrator(config=config) + plan = orchestrator.plan(TaskRequest(summary="Build", tags=["build"])) + assert len(plan.agents) == 1 + assert plan.agents[0].name == "builder" diff --git a/tests/test_services.py b/tests/test_services.py new file mode 100644 index 0000000..ce2f951 --- /dev/null +++ b/tests/test_services.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from afs.schema import AFSConfig, ServiceConfig, ServicesConfig +from afs.services.manager import ServiceManager + + +def test_service_manager_lists_builtins() -> None: + manager = ServiceManager(config=AFSConfig(), platform_name="linux") + names = [definition.name for definition in manager.list_definitions()] + assert "orchestrator" in names + + +def test_service_config_can_disable_service() -> None: + services = ServicesConfig( + enabled=True, + services={ + "orchestrator": ServiceConfig(name="orchestrator", enabled=False), + }, + ) + manager = ServiceManager(config=AFSConfig(services=services), platform_name="linux") + names = [definition.name for definition in manager.list_definitions()] + assert "orchestrator" not in names + + +def test_service_render_contains_execstart() -> None: + manager = ServiceManager(config=AFSConfig(), platform_name="linux") + unit = manager.render_unit("orchestrator") + assert "ExecStart=" in unit