Coverage for astrocyte/ingest/payload.py: 75%

61 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Shared JSON / field parsing for webhook and stream ingest (M4+).""" 

2 

3from __future__ import annotations 

4 

5import json 

6from collections.abc import Mapping 

7 

8from astrocyte.errors import IngestError 

9from astrocyte.types import Metadata 

10 

11 

12def _coerce_metadata(meta_raw: object) -> Metadata | None: 

13 if not isinstance(meta_raw, dict): 

14 return None 

15 out: Metadata = {} 

16 for k, v in meta_raw.items(): 

17 if isinstance(v, (str, int, float, bool)) or v is None: 

18 out[str(k)] = v # type: ignore[assignment] 

19 return out if out else None 

20 

21 

22def parse_ingest_json_object(data: dict) -> tuple[str, str | None, str, Metadata | None]: 

23 """Parse the webhook-style JSON object (``content``/``text``, optional ``principal``, …).""" 

24 text = data.get("content") or data.get("text") 

25 if not text or not isinstance(text, str): 

26 raise IngestError("JSON must include string content or text") 

27 principal = data.get("principal") 

28 pr: str | None = str(principal).strip() if principal is not None else None 

29 ct = data.get("content_type") or "text" 

30 content_type = str(ct) if isinstance(ct, str) else "text" 

31 meta_raw = data.get("metadata") 

32 metadata = _coerce_metadata(meta_raw) 

33 return text, pr, content_type, metadata 

34 

35 

36def parse_ingest_stream_fields(fields: Mapping[str, str]) -> tuple[str, str | None, str, Metadata | None]: 

37 """Parse Redis Stream message fields into retain inputs. 

38 

39 * If field ``payload`` is present, it must be a JSON string of an object — same schema as 

40 :func:`parse_ingest_json_object`. 

41 * Otherwise ``content`` or ``text`` is required; optional ``principal``, ``content_type``, 

42 ``metadata`` (JSON string). 

43 """ 

44 if "payload" in fields: 

45 raw = fields["payload"] 

46 try: 

47 data = json.loads(raw) 

48 except json.JSONDecodeError as e: 

49 raise IngestError(f"invalid payload JSON: {e}") from e 

50 if not isinstance(data, dict): 

51 raise IngestError("payload must be a JSON object") 

52 return parse_ingest_json_object(data) 

53 

54 text = fields.get("content") or fields.get("text") 

55 if not text: 

56 raise IngestError("stream fields must include content or text, or a payload field") 

57 pr_raw = fields.get("principal") 

58 pr: str | None = str(pr_raw).strip() if pr_raw else None 

59 ct = fields.get("content_type") or "text" 

60 content_type = str(ct) if ct else "text" 

61 meta_s = fields.get("metadata") 

62 metadata: Metadata | None = None 

63 if meta_s: 

64 try: 

65 parsed = json.loads(meta_s) 

66 except json.JSONDecodeError as e: 

67 raise IngestError(f"invalid metadata JSON: {e}") from e 

68 metadata = _coerce_metadata(parsed) 

69 return text, pr, content_type, metadata 

70 

71 

72def parse_ingest_kafka_value(value: bytes | None) -> tuple[str, str | None, str, Metadata | None]: 

73 """Decode a Kafka record value: JSON object (webhook-shaped) or raw UTF-8 text.""" 

74 if not value: 

75 raise IngestError("kafka message value is empty") 

76 s = value.decode("utf-8") 

77 try: 

78 data = json.loads(s) 

79 if isinstance(data, dict): 

80 return parse_ingest_json_object(data) 

81 except json.JSONDecodeError: 

82 # Value is not JSON; treat the decoded string as plain text (see return below). 

83 pass 

84 return s, None, "text", None