Coverage for astrocyte/integrations/crewai.py: 97%

32 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""CrewAI integration — Astrocyte as crew/agent memory. 

2 

3Usage: 

4 from astrocyte import Astrocyte 

5 from astrocyte.integrations.crewai import AstrocyteCrewMemory 

6 

7 brain = Astrocyte.from_config("astrocyte.yaml") 

8 

9 crew = Crew( 

10 agents=[support_agent, research_agent], 

11 memory=AstrocyteCrewMemory(brain, bank_id="team-support"), 

12 # Optional: context=AstrocyteContext(principal="user:me") for ACL / OBO 

13 ) 

14 

15Maps: 

16 - save → brain.retain() 

17 - search → brain.recall() 

18 - Crew-level bank for shared memory; per-agent banks via agent_banks 

19""" 

20 

21from __future__ import annotations 

22 

23import logging 

24from typing import TYPE_CHECKING, Any 

25 

26if TYPE_CHECKING: 

27 from astrocyte._astrocyte import Astrocyte 

28 

29from astrocyte.types import AstrocyteContext 

30 

31logger = logging.getLogger("astrocyte.integrations.crewai") 

32 

33 

34class AstrocyteCrewMemory: 

35 """Astrocyte-backed memory for CrewAI crews and agents. 

36 

37 Implements the interface pattern expected by CrewAI's memory system: 

38 save(), search(), reset(). 

39 

40 Pass optional ``context`` (:class:`~astrocyte.types.AstrocyteContext`) for 

41 access control and OBO when ``access_control`` is enabled. 

42 

43 Thin wrapper — all policy enforcement happens inside Astrocyte. 

44 """ 

45 

46 def __init__( 

47 self, 

48 brain: Astrocyte, 

49 bank_id: str, 

50 *, 

51 context: AstrocyteContext | None = None, 

52 agent_banks: dict[str, str] | None = None, 

53 auto_retain: bool = False, 

54 ) -> None: 

55 self.brain = brain 

56 self.bank_id = bank_id 

57 self._context = context 

58 self._agent_banks = agent_banks or {} 

59 self.auto_retain = auto_retain 

60 

61 def _resolve_bank(self, agent_id: str | None = None) -> str: 

62 """Per-agent bank or shared crew bank.""" 

63 if agent_id and agent_id in self._agent_banks: 

64 return self._agent_banks[agent_id] 

65 return self.bank_id 

66 

67 async def save( 

68 self, 

69 content: str, 

70 *, 

71 agent_id: str | None = None, 

72 tags: list[str] | None = None, 

73 metadata: dict[str, Any] | None = None, 

74 ) -> None: 

75 """Save content to memory (retain).""" 

76 bank = self._resolve_bank(agent_id) 

77 meta = dict(metadata or {}) 

78 meta["source"] = "crewai" 

79 if agent_id: 

80 meta["agent_id"] = agent_id 

81 

82 result = await self.brain.retain( 

83 content, 

84 bank_id=bank, 

85 tags=tags or ["crewai"], 

86 metadata=meta, 

87 context=self._context, 

88 ) 

89 if not result.stored: 

90 logger.warning("CrewAI save failed for bank %s: %s", bank, result.error) 

91 

92 async def search( 

93 self, 

94 query: str, 

95 *, 

96 agent_id: str | None = None, 

97 max_results: int = 5, 

98 tags: list[str] | None = None, 

99 ) -> list[dict[str, Any]]: 

100 """Search memory (recall). Returns list of dicts.""" 

101 bank = self._resolve_bank(agent_id) 

102 result = await self.brain.recall( 

103 query, 

104 bank_id=bank, 

105 max_results=max_results, 

106 tags=tags, 

107 context=self._context, 

108 ) 

109 return [ 

110 { 

111 "text": hit.text, 

112 "score": hit.score, 

113 "metadata": hit.metadata, 

114 "memory_id": hit.memory_id, 

115 } 

116 for hit in result.hits 

117 ] 

118 

119 async def reset(self, *, agent_id: str | None = None) -> None: 

120 """Reset memory for a bank (forget all).""" 

121 bank = self._resolve_bank(agent_id) 

122 await self.brain.clear_bank(bank, context=self._context)