Coverage for astrocyte/ingest/payload.py: 75%
61 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Shared JSON / field parsing for webhook and stream ingest (M4+)."""
3from __future__ import annotations
5import json
6from collections.abc import Mapping
8from astrocyte.errors import IngestError
9from astrocyte.types import Metadata
12def _coerce_metadata(meta_raw: object) -> Metadata | None:
13 if not isinstance(meta_raw, dict):
14 return None
15 out: Metadata = {}
16 for k, v in meta_raw.items():
17 if isinstance(v, (str, int, float, bool)) or v is None:
18 out[str(k)] = v # type: ignore[assignment]
19 return out if out else None
22def parse_ingest_json_object(data: dict) -> tuple[str, str | None, str, Metadata | None]:
23 """Parse the webhook-style JSON object (``content``/``text``, optional ``principal``, …)."""
24 text = data.get("content") or data.get("text")
25 if not text or not isinstance(text, str):
26 raise IngestError("JSON must include string content or text")
27 principal = data.get("principal")
28 pr: str | None = str(principal).strip() if principal is not None else None
29 ct = data.get("content_type") or "text"
30 content_type = str(ct) if isinstance(ct, str) else "text"
31 meta_raw = data.get("metadata")
32 metadata = _coerce_metadata(meta_raw)
33 return text, pr, content_type, metadata
36def parse_ingest_stream_fields(fields: Mapping[str, str]) -> tuple[str, str | None, str, Metadata | None]:
37 """Parse Redis Stream message fields into retain inputs.
39 * If field ``payload`` is present, it must be a JSON string of an object — same schema as
40 :func:`parse_ingest_json_object`.
41 * Otherwise ``content`` or ``text`` is required; optional ``principal``, ``content_type``,
42 ``metadata`` (JSON string).
43 """
44 if "payload" in fields:
45 raw = fields["payload"]
46 try:
47 data = json.loads(raw)
48 except json.JSONDecodeError as e:
49 raise IngestError(f"invalid payload JSON: {e}") from e
50 if not isinstance(data, dict):
51 raise IngestError("payload must be a JSON object")
52 return parse_ingest_json_object(data)
54 text = fields.get("content") or fields.get("text")
55 if not text:
56 raise IngestError("stream fields must include content or text, or a payload field")
57 pr_raw = fields.get("principal")
58 pr: str | None = str(pr_raw).strip() if pr_raw else None
59 ct = fields.get("content_type") or "text"
60 content_type = str(ct) if ct else "text"
61 meta_s = fields.get("metadata")
62 metadata: Metadata | None = None
63 if meta_s:
64 try:
65 parsed = json.loads(meta_s)
66 except json.JSONDecodeError as e:
67 raise IngestError(f"invalid metadata JSON: {e}") from e
68 metadata = _coerce_metadata(parsed)
69 return text, pr, content_type, metadata
72def parse_ingest_kafka_value(value: bytes | None) -> tuple[str, str | None, str, Metadata | None]:
73 """Decode a Kafka record value: JSON object (webhook-shaped) or raw UTF-8 text."""
74 if not value:
75 raise IngestError("kafka message value is empty")
76 s = value.decode("utf-8")
77 try:
78 data = json.loads(s)
79 if isinstance(data, dict):
80 return parse_ingest_json_object(data)
81 except json.JSONDecodeError:
82 # Value is not JSON; treat the decoded string as plain text (see return below).
83 pass
84 return s, None, "text", None