Coverage for astrocyte/integrations/_sync_utils.py: 43%
14 statements
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
« prev ^ index » next coverage.py v7.15.0, created at 2026-07-04 05:24 +0000
1"""Shared async-to-sync bridge for integration adapters.
3The naive ``asyncio.run()`` fails when called from an already-running event loop
4(common in notebook kernels, ASGI frameworks, etc.). The ``ThreadPoolExecutor +
5asyncio.run`` workaround creates a NEW event loop in the worker thread, which
6breaks if the coroutine uses resources bound to the original loop (e.g. asyncpg
7connection pools, aiohttp sessions).
9This module provides a single helper that handles both cases safely:
10- No running loop: use ``asyncio.run()`` directly.
11- Running loop: schedule on the EXISTING loop via ``run_coroutine_threadsafe``,
12 then block the calling (sync) thread waiting for the result.
13"""
15from __future__ import annotations
17import asyncio
18from collections.abc import Coroutine
19from typing import TypeVar
21T = TypeVar("T")
24def _run_async_from_sync(coro: Coroutine[object, object, T]) -> T:
25 """Run an async coroutine from synchronous code, safely handling nested loops.
27 - If no event loop is running: creates one via ``asyncio.run()``.
28 - If an event loop IS running (e.g. Jupyter, ASGI): schedules the coroutine
29 on that loop via ``run_coroutine_threadsafe`` and blocks until complete.
30 This keeps async resources (connection pools, sessions) on their original loop.
31 """
32 try:
33 loop = asyncio.get_running_loop()
34 except RuntimeError:
35 loop = None
37 if loop is None or not loop.is_running():
38 return asyncio.run(coro)
40 # Schedule on the existing loop, block the sync caller thread
41 future = asyncio.run_coroutine_threadsafe(coro, loop)
42 return future.result()