Package runflow

Runflow is a tool to define an run your workflows.

Please read more on runflow.org.

Expand source code
"""
Runflow is a tool to define an run your workflows.

Please read more on [runflow.org](https://runflow.org).
"""
__all__ = [
    "Flow",
    "Task",
    "runflow",
    "runflow_async",
    "cli",
    "registry",
    "get_task_class",
    "register_task_class",
    "RunflowError",
    "RunflowSyntaxError",
    "RunflowReferenceError",
    "RunflowTaskTypeError",
    "RunflowTaskError",
    "RunflowAcyclicTasksError",
]


try:
    from importlib.metadata import entry_points  # type: ignore
except ImportError:  # python < 3.8
    try:
        from importlib_metadata import entry_points  # type: ignore
    except ImportError:
        entry_points = None  # type: ignore

from . import autoloader  # noqa
from .cli import cli
from .core import Flow, Task
from .errors import (
    RunflowAcyclicTasksError,
    RunflowError,
    RunflowReferenceError,
    RunflowSyntaxError,
    RunflowTaskError,
    RunflowTaskTypeError,
)
from .registry import (
    registry,
    register_task_class,
    get_task_class,
)
from .run import runflow, runflow_async


if entry_points is not None:
    try:
        _entry_points = entry_points()
    except TypeError:
        pass  # importlib-metadata < 0.8
    else:
        # Python 3.10+ / importlib_metadata >= 3.9.0
        _tasks = (
            _entry_points.select(group="runflow.tasks")  # type: ignore
            if hasattr(_entry_points, "select")
            else _entry_points.get("runflow.tasks", [])
        )
        for _task in _tasks:
            register_task_class(_task.name, _task.value)


if __name__ == "__main__":
    cli()

Sub-modules

runflow.autoloader

Auto-load a .hcl file using Python import string …

runflow.community

runflow.community is a package of community-facing tasks.

runflow.contribs

runflow.contribs is a package for all core tasks.

runflow.core

Core module for runflow.

runflow.errors

Runflow errors.

runflow.hcl2

This module is a fork of amplify-education/python-hcl2 …

runflow.hcl2_parser
runflow.registry
runflow.run

Run a flow spec.

runflow.utils

Utility functions.

Functions

def cli(argv=None)

Command: runflow().

Expand source code
def cli(argv=None):
    """Command: `runflow`."""
    parser = cli_parser()
    args, _rest = parser.parse_known_args(argv)

    logging_format = "[%(asctime)-15s] %(message)s"
    logging.basicConfig(level=args.log_level, format=logging_format)

    if args.subparser_name in SUBCOMMANDS:
        SUBCOMMANDS[args.subparser_name](args)
    else:
        parser.print_help()
def get_task_class(name: str)

Get the task class by name.

Expand source code
def get_task_class(name: str):
    """Get the task class by name."""
    if name not in registry:
        if name not in task_implementations:
            raise ValueError(f"Unknown task type: {name}")
        impl = import_module(task_implementations[name]["class"])
        register_task_class(name, impl)
    return registry[name]["class"]
def register_task_class(name: str, cls: Union[str, Type], overwrite: bool = False)

Register a task class with a name.

Expand source code
def register_task_class(
    name: str, cls: Union[str, Type], overwrite: bool = False
):
    """Register a task class with a name."""
    if isinstance(cls, str):
        if name in task_implementations and not overwrite:
            raise ValueError(
                f"Task name {name} has been taken in the "
                "default task implementation."
            )
        task_implementations[name] = {"class": cls}
        return
    if name in registry and not overwrite:
        raise ValueError(f"Task name {name} has been taken in the registry.")
    if not hasattr(cls, "run"):
        raise ValueError("cls is not runnable.")
    _registry[name] = {"class": cls}
def runflow(path: str = None, source: str = None, module: str = None, flow: Flow = None, vars: Optional[dict] = None)

Run a flow object (sync).

Expand source code
def runflow(
    path: str = None,
    source: str = None,
    module: str = None,
    flow: Flow = None,
    vars: Optional[dict] = None,
):
    """Run a flow object (sync)."""
    run_async(
        runflow_async(
            path=path, source=source, module=module, flow=flow, vars=vars
        )
    )
async def runflow_async(path: str = None, source: str = None, module: str = None, flow: Flow = None, vars: Optional[dict] = None)

Run a flow object (async).

Expand source code
async def runflow_async(
    path: str = None,
    source: str = None,
    module: str = None,
    flow: Flow = None,
    vars: Optional[dict] = None,
):
    """Run a flow object (async)."""
    _flow = Flow.load(path=path, source=source, module=module, flow=flow)
    assert _flow and isinstance(_flow, Flow)
    await _flow.run(vars or {})

Classes

class Flow (name, runner_cls=None)

Flow object manages the flow graph and the order of task executions.

Expand source code
class Flow:
    """Flow object manages the flow graph and the order of task executions."""

    def __init__(self, name, runner_cls=None):
        self.name = name
        self.graph = networkx.DiGraph()
        self.runner = (runner_cls or SequentialRunner)(self)
        self.vars = {}
        self.functions = {}

    def __iter__(self):
        """Iterate through all tasks in a dependent order."""
        try:
            yield from networkx.topological_sort(self.graph)
        except networkx.exception.NetworkXUnfeasible as err:
            raise RunflowAcyclicTasksError(str(err)) from err

    @property
    def exception(self):
        return next(
            (
                task_result.exception
                for task_result in self.runner.results.values()
                if task_result.status == TaskStatus.FAILED
            ),
            None,
        )

    @classmethod
    def from_spec(cls, source):
        """Load flow from a .hcl file content."""
        try:
            flow = hcl2.loads(source)
        except LarkError as err:
            raise RunflowSyntaxError(str(err)) from err

        assert "flow" in flow, "Need a flow block"
        assert len(flow["flow"]) == 1, "Runflow spec should have only one flow"

        flow_spec = flow["flow"][0]
        flow_name, flow_body = next(iter(flow_spec.items()))
        flow = cls(name=flow_name)
        flow.load_flow_spec_body(flow_body)
        return flow

    @classmethod
    def from_specfile(cls, path):
        """Load flow from a given file path."""
        with open(path) as file:
            flow_spec = file.read()
        return cls.from_spec(flow_spec)

    @classmethod
    def load(cls, path=None, source=None, module=None, flow=None):
        if path:
            return Flow.from_specfile(path)

        if source:
            return Flow.from_spec(source)

        if module:
            return import_module(module)

        return flow

    def add_task(self, task):
        """Add task to flow graph."""
        self.graph.add_node(task)

    def set_dependency(self, task, depends_on):
        """Let `task` depends on `depends_on`,

        e.g. `depends_on` is a predecessor edge of `task`.

        """
        self.graph.add_edge(depends_on, task)

    def set_default_var(self, name, value):
        """Set default value for variable."""
        self.vars[name] = value

    def load_function(self, func_name, import_string):
        """Load the imported function to task func namespace."""
        function = utils.import_module(import_string)
        self.functions[func_name] = function

    # pylint: disable=no-self-use
    def load_flow_tasks_from_spec(self, tasks_spec):
        """Load the `task` blocks."""
        for task_spec in tasks_spec:
            for task_type in task_spec:
                task_class = get_task_class(task_type)
                for task_name in task_spec[task_type]:
                    task_payload = dict(task_spec[task_type][task_name])
                    yield Task(task_type, task_class, task_name, task_payload)

    def load_task_by_task_reference(self, depends_on):
        """Find task by a reference like `task.TASK_TYPE.TASK_NAME`."""
        task_key = depends_on.split(".")
        assert len(task_key) == 3 and task_key[0] == "task"

        _, task_type, task_name = task_key

        for task in self.graph.nodes:
            if task.type == task_type and task.name == task_name:
                return task

        raise RunflowSyntaxError(
            f"Task depends on {task_key} "
            f"but the dependent task does not exist"
        )

    def load_flow_tasks_dependencies(self, task):
        """Find task dependencies."""
        deps_set = set()
        for key, value in task.payload.items():
            hcl2.resolve_deps(key, deps_set)
            hcl2.resolve_deps(value, deps_set)

        for dep in deps_set:
            yield self.load_task_by_task_reference(dep)

    def set_tasks_dependencies(self):
        """Walk the task graph and sort out the task dependencies."""
        for task in self.graph.nodes:
            explicit_deps = self.load_flow_tasks_dependencies(task)
            for dep in explicit_deps:
                self.set_dependency(task, dep)

    def load_flow_variables(self, vars_spec):
        """Load the `variable` block."""
        for var_spec in vars_spec:
            var_name = next(iter(var_spec.keys()))
            var_value_spec = next(iter(var_spec.values()))
            var_value = config(
                f"RUNFLOW_VAR_{var_name}", default=None
            ) or var_value_spec.get("default", None)
            var_required = var_value_spec.get("required")
            if var_required and var_value is None:
                raise NameError(f"{var_name} is required but not provided.")
            self.set_default_var(var_name, var_value)

    # pylint: disable=no-self-use
    def load_flow_imported_tasks(self, tasks):
        """Load the `import.tasks` attribute."""
        for task_name, task_impl in tasks.items():
            register_task_class(task_name, task_impl)

    def load_flow_imported_functions(self, functions):
        """Load the `import.functions` attribute."""
        for func_name, func_import in functions.items():
            self.load_function(func_name, func_import)

    def load_flow_imports(self, imports):
        """Load the `import` block."""
        for _import in imports:
            self.load_flow_imported_tasks(_import.get("tasks", []))
            self.load_flow_imported_functions(_import.get("functions", {}))

    def load_flow_tasks(self, tasks):
        for task in self.load_flow_tasks_from_spec(tasks):
            self.add_task(task)

    def load_flow_spec_body(self, spec):
        """Load the body of a flow block."""
        self.load_flow_imports(spec.get("import", []))
        self.load_flow_variables(spec.get("variable", []))
        self.load_flow_tasks(spec.get("task", []))
        self.set_tasks_dependencies()

    def make_run_context(self, vars=None):
        """Prepare the context for a task run."""
        context = {
            "var": dict(self.vars, **dict(vars or {})),
            "task": {},
            "func": self.functions,
        }
        for task in self:
            context["task"].setdefault(task.type, {})
            context["task"][task.type].setdefault(task.name, {})
        return context

    def as_task(self, vars=None):
        return FlowRunTask(flow=self, vars=vars)

    async def run(self, vars=None):
        """Run a flow."""
        context = self.make_run_context(vars)
        await self.runner.run(context)
        return context

Static methods

def from_spec(source)

Load flow from a .hcl file content.

Expand source code
@classmethod
def from_spec(cls, source):
    """Load flow from a .hcl file content."""
    try:
        flow = hcl2.loads(source)
    except LarkError as err:
        raise RunflowSyntaxError(str(err)) from err

    assert "flow" in flow, "Need a flow block"
    assert len(flow["flow"]) == 1, "Runflow spec should have only one flow"

    flow_spec = flow["flow"][0]
    flow_name, flow_body = next(iter(flow_spec.items()))
    flow = cls(name=flow_name)
    flow.load_flow_spec_body(flow_body)
    return flow
def from_specfile(path)

Load flow from a given file path.

Expand source code
@classmethod
def from_specfile(cls, path):
    """Load flow from a given file path."""
    with open(path) as file:
        flow_spec = file.read()
    return cls.from_spec(flow_spec)
def load(path=None, source=None, module=None, flow=None)
Expand source code
@classmethod
def load(cls, path=None, source=None, module=None, flow=None):
    if path:
        return Flow.from_specfile(path)

    if source:
        return Flow.from_spec(source)

    if module:
        return import_module(module)

    return flow

Instance variables

var exception
Expand source code
@property
def exception(self):
    return next(
        (
            task_result.exception
            for task_result in self.runner.results.values()
            if task_result.status == TaskStatus.FAILED
        ),
        None,
    )

Methods

def add_task(self, task)

Add task to flow graph.

Expand source code
def add_task(self, task):
    """Add task to flow graph."""
    self.graph.add_node(task)
def as_task(self, vars=None)
Expand source code
def as_task(self, vars=None):
    return FlowRunTask(flow=self, vars=vars)
def load_flow_imported_functions(self, functions)

Load the import.functions attribute.

Expand source code
def load_flow_imported_functions(self, functions):
    """Load the `import.functions` attribute."""
    for func_name, func_import in functions.items():
        self.load_function(func_name, func_import)
def load_flow_imported_tasks(self, tasks)

Load the import.tasks attribute.

Expand source code
def load_flow_imported_tasks(self, tasks):
    """Load the `import.tasks` attribute."""
    for task_name, task_impl in tasks.items():
        register_task_class(task_name, task_impl)
def load_flow_imports(self, imports)

Load the import block.

Expand source code
def load_flow_imports(self, imports):
    """Load the `import` block."""
    for _import in imports:
        self.load_flow_imported_tasks(_import.get("tasks", []))
        self.load_flow_imported_functions(_import.get("functions", {}))
def load_flow_spec_body(self, spec)

Load the body of a flow block.

Expand source code
def load_flow_spec_body(self, spec):
    """Load the body of a flow block."""
    self.load_flow_imports(spec.get("import", []))
    self.load_flow_variables(spec.get("variable", []))
    self.load_flow_tasks(spec.get("task", []))
    self.set_tasks_dependencies()
def load_flow_tasks(self, tasks)
Expand source code
def load_flow_tasks(self, tasks):
    for task in self.load_flow_tasks_from_spec(tasks):
        self.add_task(task)
def load_flow_tasks_dependencies(self, task)

Find task dependencies.

Expand source code
def load_flow_tasks_dependencies(self, task):
    """Find task dependencies."""
    deps_set = set()
    for key, value in task.payload.items():
        hcl2.resolve_deps(key, deps_set)
        hcl2.resolve_deps(value, deps_set)

    for dep in deps_set:
        yield self.load_task_by_task_reference(dep)
def load_flow_tasks_from_spec(self, tasks_spec)

Load the task blocks.

Expand source code
def load_flow_tasks_from_spec(self, tasks_spec):
    """Load the `task` blocks."""
    for task_spec in tasks_spec:
        for task_type in task_spec:
            task_class = get_task_class(task_type)
            for task_name in task_spec[task_type]:
                task_payload = dict(task_spec[task_type][task_name])
                yield Task(task_type, task_class, task_name, task_payload)
def load_flow_variables(self, vars_spec)

Load the variable block.

Expand source code
def load_flow_variables(self, vars_spec):
    """Load the `variable` block."""
    for var_spec in vars_spec:
        var_name = next(iter(var_spec.keys()))
        var_value_spec = next(iter(var_spec.values()))
        var_value = config(
            f"RUNFLOW_VAR_{var_name}", default=None
        ) or var_value_spec.get("default", None)
        var_required = var_value_spec.get("required")
        if var_required and var_value is None:
            raise NameError(f"{var_name} is required but not provided.")
        self.set_default_var(var_name, var_value)
def load_function(self, func_name, import_string)

Load the imported function to task func namespace.

Expand source code
def load_function(self, func_name, import_string):
    """Load the imported function to task func namespace."""
    function = utils.import_module(import_string)
    self.functions[func_name] = function
def load_task_by_task_reference(self, depends_on)

Find task by a reference like task.TASK_TYPE.TASK_NAME.

Expand source code
def load_task_by_task_reference(self, depends_on):
    """Find task by a reference like `task.TASK_TYPE.TASK_NAME`."""
    task_key = depends_on.split(".")
    assert len(task_key) == 3 and task_key[0] == "task"

    _, task_type, task_name = task_key

    for task in self.graph.nodes:
        if task.type == task_type and task.name == task_name:
            return task

    raise RunflowSyntaxError(
        f"Task depends on {task_key} "
        f"but the dependent task does not exist"
    )
def make_run_context(self, vars=None)

Prepare the context for a task run.

Expand source code
def make_run_context(self, vars=None):
    """Prepare the context for a task run."""
    context = {
        "var": dict(self.vars, **dict(vars or {})),
        "task": {},
        "func": self.functions,
    }
    for task in self:
        context["task"].setdefault(task.type, {})
        context["task"][task.type].setdefault(task.name, {})
    return context
async def run(self, vars=None)

Run a flow.

Expand source code
async def run(self, vars=None):
    """Run a flow."""
    context = self.make_run_context(vars)
    await self.runner.run(context)
    return context
def set_default_var(self, name, value)

Set default value for variable.

Expand source code
def set_default_var(self, name, value):
    """Set default value for variable."""
    self.vars[name] = value
def set_dependency(self, task, depends_on)

Let task depends on depends_on,

e.g. depends_on is a predecessor edge of task.

Expand source code
def set_dependency(self, task, depends_on):
    """Let `task` depends on `depends_on`,

    e.g. `depends_on` is a predecessor edge of `task`.

    """
    self.graph.add_edge(depends_on, task)
def set_tasks_dependencies(self)

Walk the task graph and sort out the task dependencies.

Expand source code
def set_tasks_dependencies(self):
    """Walk the task graph and sort out the task dependencies."""
    for task in self.graph.nodes:
        explicit_deps = self.load_flow_tasks_dependencies(task)
        for dep in explicit_deps:
            self.set_dependency(task, dep)
class RunflowAcyclicTasksError (*args, **kwargs)

The task has circular dependency.

Expand source code
class RunflowAcyclicTasksError(RunflowError):
    """The task has circular dependency."""

Ancestors

class RunflowError (*args, **kwargs)

Base Runflow error.

Expand source code
class RunflowError(Exception):
    """Base Runflow error."""

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class RunflowReferenceError (*args, **kwargs)

The variable is not declared in the flow spec.

Expand source code
class RunflowReferenceError(RunflowError):
    """The variable is not declared in the flow spec."""

Ancestors

class RunflowSyntaxError (*args, **kwargs)

The flow spec has a syntax error.

Expand source code
class RunflowSyntaxError(RunflowError):
    """The flow spec has a syntax error."""

Ancestors

class RunflowTaskError (*args, **kwargs)

The task execution is failed.

Expand source code
class RunflowTaskError(RunflowError):
    """The task execution is failed."""

Ancestors

class RunflowTaskTypeError (*args, **kwargs)

The task type is not registered.

Expand source code
class RunflowTaskTypeError(RunflowError):
    """The task type is not registered."""

Ancestors

class Task (type, runner, name, payload)

Task object maintains the running status.

Expand source code
class Task:
    """Task object maintains the running status."""

    def __init__(self, type, runner, name, payload):
        self.type = type
        self.runner = runner
        self.name = name
        self.payload = payload
        self.namespace = ""

    def __repr__(self):
        return (
            f"Task(type={self.type}, "
            f"name={self.name}, "
            f"payload={self.payload})"
        )

    def __str__(self):
        return (
            f"{self.namespace}{self.namespace and ' > ' or ''}"
            f"task.{self.type}.{self.name}"
        )

    def __hash__(self):
        return hash(repr(self))

    def __eq__(self, o):
        return self.type == o.type and self.name == o.name

    def set_namespace(self, namespace):
        self.namespace = namespace

    def should_run(self, context):
        return all(
            hcl2.evaluate(depends_on, context)
            for depends_on in self.payload.get(DEPENDS_ON_KEY, [])
        )

    def should_timeout(self, task, context):
        if "_timeout" not in self.payload:
            return None
        _timeout = hcl2.evaluate(self.payload["_timeout"], context)
        assert isinstance(_timeout, (int, float))
        return _timeout

    def should_rerun(self, task, context):  # pylint: disable=too-many-branches
        if "_retry" not in self.payload:
            return task.run

        _retry = hcl2.evaluate(self.payload.get("_retry", {}), context)
        _retry_params = {}
        stop_after = _retry.get("stop_after", "1 times")
        stopers = []
        for _stop_after in stop_after.split("|"):
            _stop_after = _stop_after.strip()
            if "times" in _stop_after:
                attempts = int(_stop_after.replace("times", "").strip())
                stoper = stop_after_attempt(attempts)
            elif "seconds" in _stop_after:
                seconds = float(_stop_after.replace("seconds", "").strip())
                stoper = stop_after_delay(seconds)
            else:
                raise ValueError(f"invalid _retry.stop_after: {_stop_after}")
            stopers.append(stoper)

        _retry_params["stop"] = reduce(lambda a, b: a | b, stopers)
        _retry_params["wait"] = _retry.get("wait", None)
        _retry_params["reraise"] = True
        return retry(**_retry_params)(task.run)

    def eval_payload(self, context):
        return hcl2.evaluate(
            {k: v for k, v in self.payload.items() if not k.startswith("_")},
            context,
        )

    async def run(self, context):
        """Run a task."""
        if self.type == "hcl2_template":
            context = dict(
                context,
                **hcl2.evaluate(self.payload.get("context", {}), context),
            )

        if not self.should_run(context):
            logger.info('"%s" is canceled due to falsy deps.', str(self))
            return TaskResult(TaskStatus.CANCELED)

        _payload = self.eval_payload(context)
        context["task"][self.type][self.name].update(_payload)

        task_result = TaskResult(TaskStatus.PENDING)

        try:
            logger.info('"%s" is started.', str(self))
            if isinstance(self.runner, Flow):
                task = self.runner.as_task(_payload)
            else:
                task = self.runner(**_payload)

            if isinstance(task, FlowRunTask):
                task.namespace = str(self)

            task_run = self.should_rerun(task, context)
            task_result.result = (
                await asyncio.wait_for(
                    task_run()
                    if inspect.iscoroutinefunction(task.run)
                    else utils.to_thread(task_run),
                    timeout=self.should_timeout(task, context),
                )
                or {}
            )
            task_result.result.update(_payload)
            logger.info('"%s" is successful.', str(self))
        except Exception as err:  # pylint: disable=broad-except
            task_result.exception = err
            logger.info('"%s" is failed.', str(self))
            traceback.print_exc()

        return task_result

Methods

def eval_payload(self, context)
Expand source code
def eval_payload(self, context):
    return hcl2.evaluate(
        {k: v for k, v in self.payload.items() if not k.startswith("_")},
        context,
    )
async def run(self, context)

Run a task.

Expand source code
async def run(self, context):
    """Run a task."""
    if self.type == "hcl2_template":
        context = dict(
            context,
            **hcl2.evaluate(self.payload.get("context", {}), context),
        )

    if not self.should_run(context):
        logger.info('"%s" is canceled due to falsy deps.', str(self))
        return TaskResult(TaskStatus.CANCELED)

    _payload = self.eval_payload(context)
    context["task"][self.type][self.name].update(_payload)

    task_result = TaskResult(TaskStatus.PENDING)

    try:
        logger.info('"%s" is started.', str(self))
        if isinstance(self.runner, Flow):
            task = self.runner.as_task(_payload)
        else:
            task = self.runner(**_payload)

        if isinstance(task, FlowRunTask):
            task.namespace = str(self)

        task_run = self.should_rerun(task, context)
        task_result.result = (
            await asyncio.wait_for(
                task_run()
                if inspect.iscoroutinefunction(task.run)
                else utils.to_thread(task_run),
                timeout=self.should_timeout(task, context),
            )
            or {}
        )
        task_result.result.update(_payload)
        logger.info('"%s" is successful.', str(self))
    except Exception as err:  # pylint: disable=broad-except
        task_result.exception = err
        logger.info('"%s" is failed.', str(self))
        traceback.print_exc()

    return task_result
def set_namespace(self, namespace)
Expand source code
def set_namespace(self, namespace):
    self.namespace = namespace
def should_rerun(self, task, context)
Expand source code
def should_rerun(self, task, context):  # pylint: disable=too-many-branches
    if "_retry" not in self.payload:
        return task.run

    _retry = hcl2.evaluate(self.payload.get("_retry", {}), context)
    _retry_params = {}
    stop_after = _retry.get("stop_after", "1 times")
    stopers = []
    for _stop_after in stop_after.split("|"):
        _stop_after = _stop_after.strip()
        if "times" in _stop_after:
            attempts = int(_stop_after.replace("times", "").strip())
            stoper = stop_after_attempt(attempts)
        elif "seconds" in _stop_after:
            seconds = float(_stop_after.replace("seconds", "").strip())
            stoper = stop_after_delay(seconds)
        else:
            raise ValueError(f"invalid _retry.stop_after: {_stop_after}")
        stopers.append(stoper)

    _retry_params["stop"] = reduce(lambda a, b: a | b, stopers)
    _retry_params["wait"] = _retry.get("wait", None)
    _retry_params["reraise"] = True
    return retry(**_retry_params)(task.run)
def should_run(self, context)
Expand source code
def should_run(self, context):
    return all(
        hcl2.evaluate(depends_on, context)
        for depends_on in self.payload.get(DEPENDS_ON_KEY, [])
    )
def should_timeout(self, task, context)
Expand source code
def should_timeout(self, task, context):
    if "_timeout" not in self.payload:
        return None
    _timeout = hcl2.evaluate(self.payload["_timeout"], context)
    assert isinstance(_timeout, (int, float))
    return _timeout