Module runflow.utils
Utility functions.
Expand source code
"""Utility functions."""
import asyncio
import importlib
import sys
from typing import Callable, Coroutine
def run_async(coro: Coroutine):
"""Wrap async function as sync call."""
if sys.version_info[0] == 3 and sys.version_info[1] < 7:
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)
return
asyncio.run(coro)
async def to_thread(func: Callable, *args, **kwargs):
"""Run sync function in thread."""
if sys.version_info[0] == 3 and sys.version_info[1] < 9:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
# pylint: disable=no-member
return await asyncio.to_thread(func, *args, **kwargs)
def import_module(path: str):
"""Import a path like `path.to.module:class`."""
try:
package_name, module_name = path.split(":")
result = importlib.import_module(package_name)
getters = module_name.split(".")
for getter in getters:
result = getattr(result, getter)
return result
except (AttributeError, ValueError) as err:
raise ImportError(path) from err
class RunflowValidators:
@staticmethod
def not_empty(instance, attribute, value):
if not value:
raise ValueError(f"{attribute} must not be empty.")
Functions
def import_module(path: str)
-
Import a path like
path.to.module:class
.Expand source code
def import_module(path: str): """Import a path like `path.to.module:class`.""" try: package_name, module_name = path.split(":") result = importlib.import_module(package_name) getters = module_name.split(".") for getter in getters: result = getattr(result, getter) return result except (AttributeError, ValueError) as err: raise ImportError(path) from err
def run_async(coro: Coroutine)
-
Wrap async function as sync call.
Expand source code
def run_async(coro: Coroutine): """Wrap async function as sync call.""" if sys.version_info[0] == 3 and sys.version_info[1] < 7: loop = asyncio.get_event_loop() loop.run_until_complete(coro) return asyncio.run(coro)
async def to_thread(func: Callable, *args, **kwargs)
-
Run sync function in thread.
Expand source code
async def to_thread(func: Callable, *args, **kwargs): """Run sync function in thread.""" if sys.version_info[0] == 3 and sys.version_info[1] < 9: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, lambda: func(*args, **kwargs)) # pylint: disable=no-member return await asyncio.to_thread(func, *args, **kwargs)
Classes
class RunflowValidators
-
Expand source code
class RunflowValidators: @staticmethod def not_empty(instance, attribute, value): if not value: raise ValueError(f"{attribute} must not be empty.")
Static methods
def not_empty(instance, attribute, value)
-
Expand source code
@staticmethod def not_empty(instance, attribute, value): if not value: raise ValueError(f"{attribute} must not be empty.")