Module runflow.run

Run a flow spec.

Expand source code
"""Run a flow spec."""

from typing import Optional

from .core import Flow
from .utils import run_async


async def runflow_async(
    path: str = None,
    source: str = None,
    module: str = None,
    flow: Flow = None,
    vars: Optional[dict] = None,
):
    """Run a flow object (async)."""
    _flow = Flow.load(path=path, source=source, module=module, flow=flow)
    assert _flow and isinstance(_flow, Flow)
    await _flow.run(vars or {})


def runflow(
    path: str = None,
    source: str = None,
    module: str = None,
    flow: Flow = None,
    vars: Optional[dict] = None,
):
    """Run a flow object (sync)."""
    run_async(
        runflow_async(
            path=path, source=source, module=module, flow=flow, vars=vars
        )
    )

Functions

def runflow(path: str = None, source: str = None, module: str = None, flow: Flow = None, vars: Optional[dict] = None)

Run a flow object (sync).

Expand source code
def runflow(
    path: str = None,
    source: str = None,
    module: str = None,
    flow: Flow = None,
    vars: Optional[dict] = None,
):
    """Run a flow object (sync)."""
    run_async(
        runflow_async(
            path=path, source=source, module=module, flow=flow, vars=vars
        )
    )
async def runflow_async(path: str = None, source: str = None, module: str = None, flow: Flow = None, vars: Optional[dict] = None)

Run a flow object (async).

Expand source code
async def runflow_async(
    path: str = None,
    source: str = None,
    module: str = None,
    flow: Flow = None,
    vars: Optional[dict] = None,
):
    """Run a flow object (async)."""
    _flow = Flow.load(path=path, source=source, module=module, flow=flow)
    assert _flow and isinstance(_flow, Flow)
    await _flow.run(vars or {})