core: add orchestration and service definitions

This commit is contained in:
scawful
2025-12-30 13:36:53 -05:00
parent efc46027ff
commit f386327821
8 changed files with 638 additions and 0 deletions

View File

@@ -12,7 +12,9 @@ from .discovery import discover_contexts, get_project_stats
from .graph import build_graph, default_graph_path, write_graph
from .manager import AFSManager
from .models import MountType
from .orchestration import Orchestrator, TaskRequest
from .plugins import discover_plugins, load_plugins
from .services import ServiceManager
from .schema import AFSConfig, GeneralConfig, WorkspaceDirectory
from .validator import AFSValidator
@@ -161,6 +163,40 @@ def _plugins_command(args: argparse.Namespace) -> int:
return 0
def _services_list_command(args: argparse.Namespace) -> int:
manager = ServiceManager()
for definition in manager.list_definitions():
print(f"{definition.name}\t{definition.label}")
return 0
def _services_render_command(args: argparse.Namespace) -> int:
manager = ServiceManager()
print(manager.render_unit(args.name))
return 0
def _orchestrator_list_command(args: argparse.Namespace) -> int:
orchestrator = Orchestrator()
for agent in orchestrator.list_agents():
tags = ",".join(agent.tags) if agent.tags else "-"
print(f"{agent.name}\t{agent.role}\t{agent.backend}\t{tags}")
return 0
def _orchestrator_plan_command(args: argparse.Namespace) -> int:
orchestrator = Orchestrator()
request = TaskRequest(summary=args.summary, tags=args.tag or [], role=args.role)
plan = orchestrator.plan(request)
if plan.notes:
for note in plan.notes:
print(f"note: {note}")
for agent in plan.agents:
tags = ",".join(agent.tags) if agent.tags else "-"
print(f"{agent.name}\t{agent.role}\t{agent.backend}\t{tags}")
return 0
def _status_command(args: argparse.Namespace) -> int:
start_dir = Path(args.start_dir).expanduser().resolve() if args.start_dir else None
root = find_root(start_dir)
@@ -522,6 +558,28 @@ def build_parser() -> argparse.ArgumentParser:
plugins_parser.add_argument("--load", action="store_true", help="Attempt to import plugins.")
plugins_parser.set_defaults(func=_plugins_command)
services_parser = subparsers.add_parser("services", help="Service definitions.")
services_sub = services_parser.add_subparsers(dest="services_command")
services_list = services_sub.add_parser("list", help="List service definitions.")
services_list.set_defaults(func=_services_list_command)
services_render = services_sub.add_parser("render", help="Render service unit.")
services_render.add_argument("name", help="Service name.")
services_render.set_defaults(func=_services_render_command)
orch_parser = subparsers.add_parser("orchestrator", help="Orchestrator helpers.")
orch_sub = orch_parser.add_subparsers(dest="orchestrator_command")
orch_list = orch_sub.add_parser("list", help="List configured agents.")
orch_list.set_defaults(func=_orchestrator_list_command)
orch_plan = orch_sub.add_parser("plan", help="Plan agent routing.")
orch_plan.add_argument("summary", help="Task summary.")
orch_plan.add_argument("--tag", action="append", help="Tag to match.")
orch_plan.add_argument("--role", help="Role to match.")
orch_plan.set_defaults(func=_orchestrator_plan_command)
status_parser = subparsers.add_parser("status", help="Show context root status.")
status_parser.add_argument("--start-dir", help="Directory to search from.")
status_parser.set_defaults(func=_status_command)
@@ -713,6 +771,12 @@ def main(argv: Iterable[str] | None = None) -> int:
if args.command == "graph" and not getattr(args, "graph_command", None):
parser.print_help()
return 1
if args.command == "services" and not getattr(args, "services_command", None):
parser.print_help()
return 1
if args.command == "orchestrator" and not getattr(args, "orchestrator_command", None):
parser.print_help()
return 1
return args.func(args)

126
src/afs/orchestration.py Normal file
View File

@@ -0,0 +1,126 @@
"""Minimal orchestration helpers for routing tasks to agents."""
from __future__ import annotations
import argparse
from dataclasses import dataclass, field
from typing import Iterable
from .config import load_config_model
from .schema import AgentConfig, OrchestratorConfig
@dataclass
class TaskRequest:
summary: str
tags: list[str] = field(default_factory=list)
role: str | None = None
@dataclass
class OrchestrationPlan:
summary: str
agents: list[AgentConfig]
notes: list[str] = field(default_factory=list)
class Orchestrator:
def __init__(self, config: OrchestratorConfig | None = None) -> None:
self.config = config or load_config_model().orchestrator
def list_agents(self) -> list[AgentConfig]:
return list(self.config.default_agents)
def plan(self, request: TaskRequest) -> OrchestrationPlan:
if not self.config.enabled:
return OrchestrationPlan(
summary=request.summary,
agents=[],
notes=["orchestrator disabled"],
)
candidates = list(self.config.default_agents)
if request.role:
candidates = [a for a in candidates if a.role == request.role]
if request.tags:
tagged = [
agent
for agent in candidates
if set(request.tags) & set(agent.tags)
]
if tagged:
candidates = tagged
if not candidates:
return OrchestrationPlan(
summary=request.summary,
agents=[],
notes=["no matching agents"],
)
selected = candidates[: self.config.max_agents]
notes = []
if len(candidates) > len(selected):
notes.append("truncated agent list to max_agents")
return OrchestrationPlan(
summary=request.summary,
agents=selected,
notes=notes,
)
def _list_command(args: argparse.Namespace) -> int:
orchestrator = Orchestrator()
agents = orchestrator.list_agents()
for agent in agents:
tags = ",".join(agent.tags) if agent.tags else "-"
print(f"{agent.name}\t{agent.role}\t{agent.backend}\t{tags}")
return 0
def _plan_command(args: argparse.Namespace) -> int:
orchestrator = Orchestrator()
request = TaskRequest(
summary=args.summary,
tags=args.tag or [],
role=args.role,
)
plan = orchestrator.plan(request)
if plan.notes:
for note in plan.notes:
print(f"note: {note}")
for agent in plan.agents:
tags = ",".join(agent.tags) if agent.tags else "-"
print(f"{agent.name}\t{agent.role}\t{agent.backend}\t{tags}")
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="afs-orchestrator")
sub = parser.add_subparsers(dest="command")
list_cmd = sub.add_parser("list", help="List configured agents.")
list_cmd.set_defaults(func=_list_command)
plan_cmd = sub.add_parser("plan", help="Plan a routing decision.")
plan_cmd.add_argument("summary", help="Task summary.")
plan_cmd.add_argument("--tag", action="append", help="Tag to match.")
plan_cmd.add_argument("--role", help="Role to match.")
plan_cmd.set_defaults(func=_plan_command)
return parser
def main(argv: Iterable[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if not getattr(args, "command", None):
parser.print_help()
return 1
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -182,6 +182,110 @@ class PluginsConfig:
)
@dataclass
class AgentConfig:
name: str
role: str = "general"
backend: str = "local"
description: str = ""
tags: list[str] = field(default_factory=list)
auto_start: bool = False
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "AgentConfig":
tags = data.get("tags", [])
if isinstance(tags, list):
tags = [tag for tag in tags if isinstance(tag, str)]
else:
tags = []
return cls(
name=str(data.get("name", "")).strip(),
role=str(data.get("role", "general")).strip() or "general",
backend=str(data.get("backend", "local")).strip() or "local",
description=str(data.get("description", "")).strip(),
tags=tags,
auto_start=bool(data.get("auto_start", False)),
)
@dataclass
class OrchestratorConfig:
enabled: bool = False
max_agents: int = 5
default_agents: list[AgentConfig] = field(default_factory=list)
auto_routing: bool = True
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "OrchestratorConfig":
agents_raw = data.get("default_agents", [])
agents = [
AgentConfig.from_dict(item)
for item in agents_raw
if isinstance(item, dict)
]
max_agents = data.get("max_agents", cls().max_agents)
return cls(
enabled=bool(data.get("enabled", False)),
max_agents=int(max_agents) if isinstance(max_agents, int) else cls().max_agents,
default_agents=agents,
auto_routing=bool(data.get("auto_routing", True)),
)
@dataclass
class ServiceConfig:
name: str
enabled: bool = True
auto_start: bool = False
command: list[str] = field(default_factory=list)
working_directory: Path | None = None
environment: dict[str, str] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ServiceConfig":
command = data.get("command", [])
if isinstance(command, list):
command = [str(item) for item in command]
else:
command = []
env = data.get("environment", {})
if isinstance(env, dict):
environment = {str(key): str(value) for key, value in env.items()}
else:
environment = {}
working_directory = data.get("working_directory")
return cls(
name=str(data.get("name", "")).strip(),
enabled=bool(data.get("enabled", True)),
auto_start=bool(data.get("auto_start", False)),
command=command,
working_directory=_as_path(working_directory)
if working_directory
else None,
environment=environment,
)
@dataclass
class ServicesConfig:
enabled: bool = False
services: dict[str, ServiceConfig] = field(default_factory=dict)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ServicesConfig":
enabled = bool(data.get("enabled", False))
raw_services = data.get("services", {})
parsed: dict[str, ServiceConfig] = {}
if isinstance(raw_services, dict):
for name, payload in raw_services.items():
if not isinstance(payload, dict):
continue
payload = dict(payload)
payload.setdefault("name", name)
parsed[name] = ServiceConfig.from_dict(payload)
return cls(enabled=enabled, services=parsed)
@dataclass
class CognitiveConfig:
enabled: bool = False
@@ -207,6 +311,8 @@ class AFSConfig:
plugins: PluginsConfig = field(default_factory=PluginsConfig)
directories: list[DirectoryConfig] = field(default_factory=default_directory_configs)
cognitive: CognitiveConfig = field(default_factory=CognitiveConfig)
orchestrator: OrchestratorConfig = field(default_factory=OrchestratorConfig)
services: ServicesConfig = field(default_factory=ServicesConfig)
@classmethod
def from_dict(cls, data: dict[str, Any] | None) -> "AFSConfig":
@@ -215,11 +321,15 @@ class AFSConfig:
plugins = PluginsConfig.from_dict(data.get("plugins", {}))
directories = _parse_directory_config(data)
cognitive = CognitiveConfig.from_dict(data.get("cognitive", {}))
orchestrator = OrchestratorConfig.from_dict(data.get("orchestrator", {}))
services = ServicesConfig.from_dict(data.get("services", {}))
return cls(
general=general,
plugins=plugins,
directories=directories,
cognitive=cognitive,
orchestrator=orchestrator,
services=services,
)

View File

@@ -0,0 +1,12 @@
"""AFS service management primitives."""
from .manager import ServiceManager
from .models import ServiceDefinition, ServiceState, ServiceStatus, ServiceType
__all__ = [
"ServiceManager",
"ServiceDefinition",
"ServiceState",
"ServiceStatus",
"ServiceType",
]

198
src/afs/services/manager.py Normal file
View File

@@ -0,0 +1,198 @@
"""Minimal service manager for AFS background tasks."""
from __future__ import annotations
import json
import platform
import sys
from pathlib import Path
from typing import Iterable
from ..config import load_config_model
from ..schema import AFSConfig, ServiceConfig
from .models import ServiceDefinition, ServiceState, ServiceStatus, ServiceType
class ServiceManager:
"""Build and render service definitions without mutating the system."""
def __init__(
self,
config: AFSConfig | None = None,
*,
service_root: Path | None = None,
platform_name: str | None = None,
) -> None:
self.config = config or load_config_model()
self.service_root = service_root or Path.home() / ".config" / "afs" / "services"
self.platform_name = platform_name or platform.system().lower()
def list_definitions(self) -> list[ServiceDefinition]:
definitions = self._builtin_definitions()
merged = self._merge_config(definitions)
return sorted(merged.values(), key=lambda item: item.name)
def get_definition(self, name: str) -> ServiceDefinition | None:
merged = self._merge_config(self._builtin_definitions())
return merged.get(name)
def render_unit(self, name: str) -> str:
definition = self.get_definition(name)
if not definition:
raise KeyError(f"Unknown service: {name}")
if self.platform_name.startswith("darwin"):
payload = render_launchd_plist(definition)
return json.dumps(payload, indent=2)
return render_systemd_unit(definition)
def status(self, name: str) -> ServiceStatus:
definition = self.get_definition(name)
if not definition:
return ServiceStatus(name=name, state=ServiceState.UNKNOWN, enabled=False)
return ServiceStatus(name=definition.name, state=ServiceState.UNKNOWN, enabled=False)
def _merge_config(
self, definitions: dict[str, ServiceDefinition]
) -> dict[str, ServiceDefinition]:
merged = dict(definitions)
for name, config in self.config.services.services.items():
if not config.enabled:
merged.pop(name, None)
continue
base = merged.get(name)
if base:
merged[name] = _merge_definition(base, config)
elif config.command:
merged[name] = ServiceDefinition(
name=config.name,
label=config.name,
command=list(config.command),
working_directory=config.working_directory,
environment=dict(config.environment),
service_type=ServiceType.DAEMON,
keep_alive=True,
run_at_load=config.auto_start,
)
return merged
def _builtin_definitions(self) -> dict[str, ServiceDefinition]:
python = self._resolve_python_executable()
repo_root = self._find_repo_root()
environment = self._service_environment()
return {
"orchestrator": ServiceDefinition(
name="orchestrator",
label="AFS Orchestrator",
description="Routing and coordination for local agents",
command=[python, "-m", "afs.orchestration", "--daemon"],
working_directory=repo_root,
environment=environment,
service_type=ServiceType.DAEMON,
keep_alive=True,
run_at_load=False,
),
"context-discovery": ServiceDefinition(
name="context-discovery",
label="AFS Context Discovery",
description="Discover and index AFS contexts",
command=[python, "-m", "afs", "context", "discover"],
working_directory=repo_root,
environment=environment,
service_type=ServiceType.ONESHOT,
keep_alive=False,
run_at_load=False,
),
"context-graph-export": ServiceDefinition(
name="context-graph-export",
label="AFS Context Graph Export",
description="Export AFS context graph JSON",
command=[python, "-m", "afs", "graph", "export"],
working_directory=repo_root,
environment=environment,
service_type=ServiceType.ONESHOT,
keep_alive=False,
run_at_load=False,
),
}
def _resolve_python_executable(self) -> str:
if self.config.general.python_executable:
return str(self.config.general.python_executable)
return sys.executable
def _find_repo_root(self) -> Path | None:
for parent in Path(__file__).resolve().parents:
if (parent / "pyproject.toml").exists():
return parent
return None
def _service_environment(self) -> dict[str, str]:
env: dict[str, str] = {}
repo_root = self._find_repo_root()
if repo_root and (repo_root / "src").exists():
env["PYTHONPATH"] = str(repo_root / "src")
user_config = Path.home() / ".config" / "afs" / "config.toml"
if user_config.exists():
env["AFS_CONFIG_PATH"] = str(user_config)
env["AFS_PREFER_USER_CONFIG"] = "1"
return env
def _merge_definition(
base: ServiceDefinition, override: ServiceConfig
) -> ServiceDefinition:
command = list(override.command) if override.command else list(base.command)
environment = dict(base.environment)
environment.update(override.environment)
return ServiceDefinition(
name=base.name,
label=base.label,
description=base.description,
command=command,
working_directory=override.working_directory or base.working_directory,
environment=environment,
service_type=base.service_type,
keep_alive=base.keep_alive,
run_at_load=override.auto_start,
)
def render_launchd_plist(definition: ServiceDefinition) -> dict[str, object]:
payload: dict[str, object] = {
"Label": f"afs.{definition.name}",
"ProgramArguments": list(definition.command),
"RunAtLoad": bool(definition.run_at_load),
"KeepAlive": bool(definition.keep_alive),
}
if definition.working_directory:
payload["WorkingDirectory"] = str(definition.working_directory)
if definition.environment:
payload["EnvironmentVariables"] = dict(definition.environment)
return payload
def render_systemd_unit(definition: ServiceDefinition) -> str:
lines = [
"[Unit]",
f"Description={definition.label}",
"",
"[Service]",
f"ExecStart={' '.join(definition.command)}",
]
if definition.working_directory:
lines.append(f"WorkingDirectory={definition.working_directory}")
if definition.environment:
for key, value in definition.environment.items():
lines.append(f"Environment={key}={value}")
if definition.service_type == ServiceType.ONESHOT:
lines.append("Type=oneshot")
if definition.keep_alive:
lines.append("Restart=on-failure")
lines.extend([
"",
"[Install]",
"WantedBy=default.target",
])
return "\n".join(lines)

View File

@@ -0,0 +1,73 @@
"""Data models for AFS service management."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any
class ServiceState(str, Enum):
STOPPED = "stopped"
RUNNING = "running"
FAILED = "failed"
STARTING = "starting"
STOPPING = "stopping"
UNKNOWN = "unknown"
class ServiceType(str, Enum):
DAEMON = "daemon"
ONESHOT = "oneshot"
@dataclass
class ServiceDefinition:
name: str
label: str
description: str = ""
command: list[str] = field(default_factory=list)
working_directory: Path | None = None
environment: dict[str, str] = field(default_factory=dict)
service_type: ServiceType = ServiceType.DAEMON
keep_alive: bool = True
run_at_load: bool = False
def to_dict(self) -> dict[str, Any]:
return {
"name": self.name,
"label": self.label,
"description": self.description,
"command": list(self.command),
"working_directory": str(self.working_directory)
if self.working_directory
else None,
"environment": dict(self.environment),
"service_type": self.service_type.value,
"keep_alive": self.keep_alive,
"run_at_load": self.run_at_load,
}
@dataclass
class ServiceStatus:
name: str
state: ServiceState
pid: int | None = None
enabled: bool = False
last_started: datetime | None = None
last_stopped: datetime | None = None
error_message: str | None = None
def to_dict(self) -> dict[str, Any]:
return {
"name": self.name,
"state": self.state.value,
"pid": self.pid,
"enabled": self.enabled,
"last_started": self.last_started.isoformat() if self.last_started else None,
"last_stopped": self.last_stopped.isoformat() if self.last_stopped else None,
"error_message": self.error_message,
}

View File

@@ -0,0 +1,27 @@
from __future__ import annotations
from afs.orchestration import Orchestrator, TaskRequest
from afs.schema import AgentConfig, OrchestratorConfig
def test_orchestrator_disabled_returns_note() -> None:
config = OrchestratorConfig(enabled=False)
orchestrator = Orchestrator(config=config)
plan = orchestrator.plan(TaskRequest(summary="Test"))
assert not plan.agents
assert "orchestrator disabled" in plan.notes
def test_orchestrator_matches_tags() -> None:
config = OrchestratorConfig(
enabled=True,
max_agents=2,
default_agents=[
AgentConfig(name="planner", role="planner", tags=["plan"]),
AgentConfig(name="builder", role="coder", tags=["build"]),
],
)
orchestrator = Orchestrator(config=config)
plan = orchestrator.plan(TaskRequest(summary="Build", tags=["build"]))
assert len(plan.agents) == 1
assert plan.agents[0].name == "builder"

28
tests/test_services.py Normal file
View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from afs.schema import AFSConfig, ServiceConfig, ServicesConfig
from afs.services.manager import ServiceManager
def test_service_manager_lists_builtins() -> None:
manager = ServiceManager(config=AFSConfig(), platform_name="linux")
names = [definition.name for definition in manager.list_definitions()]
assert "orchestrator" in names
def test_service_config_can_disable_service() -> None:
services = ServicesConfig(
enabled=True,
services={
"orchestrator": ServiceConfig(name="orchestrator", enabled=False),
},
)
manager = ServiceManager(config=AFSConfig(services=services), platform_name="linux")
names = [definition.name for definition in manager.list_definitions()]
assert "orchestrator" not in names
def test_service_render_contains_execstart() -> None:
manager = ServiceManager(config=AFSConfig(), platform_name="linux")
unit = manager.render_unit("orchestrator")
assert "ExecStart=" in unit