Coverage for astrocyte/integrations/crewai.py: 97%
32 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""CrewAI integration — Astrocyte as crew/agent memory.
3Usage:
4 from astrocyte import Astrocyte
5 from astrocyte.integrations.crewai import AstrocyteCrewMemory
7 brain = Astrocyte.from_config("astrocyte.yaml")
9 crew = Crew(
10 agents=[support_agent, research_agent],
11 memory=AstrocyteCrewMemory(brain, bank_id="team-support"),
12 # Optional: context=AstrocyteContext(principal="user:me") for ACL / OBO
13 )
15Maps:
16 - save → brain.retain()
17 - search → brain.recall()
18 - Crew-level bank for shared memory; per-agent banks via agent_banks
19"""
21from __future__ import annotations
23import logging
24from typing import TYPE_CHECKING, Any
26if TYPE_CHECKING:
27 from astrocyte._astrocyte import Astrocyte
29from astrocyte.types import AstrocyteContext
31logger = logging.getLogger("astrocyte.integrations.crewai")
34class AstrocyteCrewMemory:
35 """Astrocyte-backed memory for CrewAI crews and agents.
37 Implements the interface pattern expected by CrewAI's memory system:
38 save(), search(), reset().
40 Pass optional ``context`` (:class:`~astrocyte.types.AstrocyteContext`) for
41 access control and OBO when ``access_control`` is enabled.
43 Thin wrapper — all policy enforcement happens inside Astrocyte.
44 """
46 def __init__(
47 self,
48 brain: Astrocyte,
49 bank_id: str,
50 *,
51 context: AstrocyteContext | None = None,
52 agent_banks: dict[str, str] | None = None,
53 auto_retain: bool = False,
54 ) -> None:
55 self.brain = brain
56 self.bank_id = bank_id
57 self._context = context
58 self._agent_banks = agent_banks or {}
59 self.auto_retain = auto_retain
61 def _resolve_bank(self, agent_id: str | None = None) -> str:
62 """Per-agent bank or shared crew bank."""
63 if agent_id and agent_id in self._agent_banks:
64 return self._agent_banks[agent_id]
65 return self.bank_id
67 async def save(
68 self,
69 content: str,
70 *,
71 agent_id: str | None = None,
72 tags: list[str] | None = None,
73 metadata: dict[str, Any] | None = None,
74 ) -> None:
75 """Save content to memory (retain)."""
76 bank = self._resolve_bank(agent_id)
77 meta = dict(metadata or {})
78 meta["source"] = "crewai"
79 if agent_id:
80 meta["agent_id"] = agent_id
82 result = await self.brain.retain(
83 content,
84 bank_id=bank,
85 tags=tags or ["crewai"],
86 metadata=meta,
87 context=self._context,
88 )
89 if not result.stored:
90 logger.warning("CrewAI save failed for bank %s: %s", bank, result.error)
92 async def search(
93 self,
94 query: str,
95 *,
96 agent_id: str | None = None,
97 max_results: int = 5,
98 tags: list[str] | None = None,
99 ) -> list[dict[str, Any]]:
100 """Search memory (recall). Returns list of dicts."""
101 bank = self._resolve_bank(agent_id)
102 result = await self.brain.recall(
103 query,
104 bank_id=bank,
105 max_results=max_results,
106 tags=tags,
107 context=self._context,
108 )
109 return [
110 {
111 "text": hit.text,
112 "score": hit.score,
113 "metadata": hit.metadata,
114 "memory_id": hit.memory_id,
115 }
116 for hit in result.hits
117 ]
119 async def reset(self, *, agent_id: str | None = None) -> None:
120 """Reset memory for a bank (forget all)."""
121 bank = self._resolve_bank(agent_id)
122 await self.brain.clear_bank(bank, context=self._context)