Coverage for astrocyte/integrations/_sync_utils.py: 43%

14 statements  

« prev     ^ index     » next       coverage.py v7.15.0, created at 2026-07-04 05:24 +0000

1"""Shared async-to-sync bridge for integration adapters. 

2 

3The naive ``asyncio.run()`` fails when called from an already-running event loop 

4(common in notebook kernels, ASGI frameworks, etc.). The ``ThreadPoolExecutor + 

5asyncio.run`` workaround creates a NEW event loop in the worker thread, which 

6breaks if the coroutine uses resources bound to the original loop (e.g. asyncpg 

7connection pools, aiohttp sessions). 

8 

9This module provides a single helper that handles both cases safely: 

10- No running loop: use ``asyncio.run()`` directly. 

11- Running loop: schedule on the EXISTING loop via ``run_coroutine_threadsafe``, 

12 then block the calling (sync) thread waiting for the result. 

13""" 

14 

15from __future__ import annotations 

16 

17import asyncio 

18from collections.abc import Coroutine 

19from typing import TypeVar 

20 

21T = TypeVar("T") 

22 

23 

24def _run_async_from_sync(coro: Coroutine[object, object, T]) -> T: 

25 """Run an async coroutine from synchronous code, safely handling nested loops. 

26 

27 - If no event loop is running: creates one via ``asyncio.run()``. 

28 - If an event loop IS running (e.g. Jupyter, ASGI): schedules the coroutine 

29 on that loop via ``run_coroutine_threadsafe`` and blocks until complete. 

30 This keeps async resources (connection pools, sessions) on their original loop. 

31 """ 

32 try: 

33 loop = asyncio.get_running_loop() 

34 except RuntimeError: 

35 loop = None 

36 

37 if loop is None or not loop.is_running(): 

38 return asyncio.run(coro) 

39 

40 # Schedule on the existing loop, block the sync caller thread 

41 future = asyncio.run_coroutine_threadsafe(coro, loop) 

42 return future.result()