Module runflow.core

Core module for runflow.

Expand source code
"""Core module for runflow."""

import enum
import inspect
import logging
import traceback
import asyncio
from functools import reduce

import networkx
from decouple import config
from tenacity import (
    retry,
    stop_after_delay,
    stop_after_attempt,
)

from . import hcl2, utils
from .errors import (
    RunflowAcyclicTasksError,
    RunflowSyntaxError,
)
from .hcl2_parser import LarkError
from .registry import get_task_class, register_task_class
from .utils import import_module

logger = logging.getLogger(__name__)


DEPENDS_ON_KEY = "_depends_on"


class TaskStatus(enum.Enum):
    """Task execution status."""

    PENDING = enum.auto()
    SUCCESS = enum.auto()
    FAILED = enum.auto()
    CANCELED = enum.auto()


class TaskResult:
    """Task execution result."""

    def __init__(self, status):
        self.status = status
        self._result = None
        self._exception = None

    @property
    def result(self):
        """Get task result."""
        if self._exception:
            raise ValueError(
                "Task has no result due to a failed run."
            ) from self._exception
        return self._result

    @result.setter
    def result(self, _result):
        """Set task result."""
        self.status = TaskStatus.SUCCESS
        self._result = _result

    @property
    def exception(self):
        """Get task exception."""
        if self._result:
            raise ValueError("Task has no exception due to a successful run.")
        return self._exception

    @exception.setter
    def exception(self, _exception):
        """Set task exception."""
        self.status = TaskStatus.FAILED
        self._exception = _exception


class Task:
    """Task object maintains the running status."""

    def __init__(self, type, runner, name, payload):
        self.type = type
        self.runner = runner
        self.name = name
        self.payload = payload
        self.namespace = ""

    def __repr__(self):
        return (
            f"Task(type={self.type}, "
            f"name={self.name}, "
            f"payload={self.payload})"
        )

    def __str__(self):
        return (
            f"{self.namespace}{self.namespace and ' > ' or ''}"
            f"task.{self.type}.{self.name}"
        )

    def __hash__(self):
        return hash(repr(self))

    def __eq__(self, o):
        return self.type == o.type and self.name == o.name

    def set_namespace(self, namespace):
        self.namespace = namespace

    def should_run(self, context):
        return all(
            hcl2.evaluate(depends_on, context)
            for depends_on in self.payload.get(DEPENDS_ON_KEY, [])
        )

    def should_timeout(self, task, context):
        if "_timeout" not in self.payload:
            return None
        _timeout = hcl2.evaluate(self.payload["_timeout"], context)
        assert isinstance(_timeout, (int, float))
        return _timeout

    def should_rerun(self, task, context):  # pylint: disable=too-many-branches
        if "_retry" not in self.payload:
            return task.run

        _retry = hcl2.evaluate(self.payload.get("_retry", {}), context)
        _retry_params = {}
        stop_after = _retry.get("stop_after", "1 times")
        stopers = []
        for _stop_after in stop_after.split("|"):
            _stop_after = _stop_after.strip()
            if "times" in _stop_after:
                attempts = int(_stop_after.replace("times", "").strip())
                stoper = stop_after_attempt(attempts)
            elif "seconds" in _stop_after:
                seconds = float(_stop_after.replace("seconds", "").strip())
                stoper = stop_after_delay(seconds)
            else:
                raise ValueError(f"invalid _retry.stop_after: {_stop_after}")
            stopers.append(stoper)

        _retry_params["stop"] = reduce(lambda a, b: a | b, stopers)
        _retry_params["wait"] = _retry.get("wait", None)
        _retry_params["reraise"] = True
        return retry(**_retry_params)(task.run)

    def eval_payload(self, context):
        return hcl2.evaluate(
            {k: v for k, v in self.payload.items() if not k.startswith("_")},
            context,
        )

    async def run(self, context):
        """Run a task."""
        if self.type == "hcl2_template":
            context = dict(
                context,
                **hcl2.evaluate(self.payload.get("context", {}), context),
            )

        if not self.should_run(context):
            logger.info('"%s" is canceled due to falsy deps.', str(self))
            return TaskResult(TaskStatus.CANCELED)

        _payload = self.eval_payload(context)
        context["task"][self.type][self.name].update(_payload)

        task_result = TaskResult(TaskStatus.PENDING)

        try:
            logger.info('"%s" is started.', str(self))
            if isinstance(self.runner, Flow):
                task = self.runner.as_task(_payload)
            else:
                task = self.runner(**_payload)

            if isinstance(task, FlowRunTask):
                task.namespace = str(self)

            task_run = self.should_rerun(task, context)
            task_result.result = (
                await asyncio.wait_for(
                    task_run()
                    if inspect.iscoroutinefunction(task.run)
                    else utils.to_thread(task_run),
                    timeout=self.should_timeout(task, context),
                )
                or {}
            )
            task_result.result.update(_payload)
            logger.info('"%s" is successful.', str(self))
        except Exception as err:  # pylint: disable=broad-except
            task_result.exception = err
            logger.info('"%s" is failed.', str(self))
            traceback.print_exc()

        return task_result


class FlowRunTask:
    def __init__(
        self,
        path=None,
        source=None,
        module=None,
        flow=None,
        vars=None,
        export=None,
    ):
        self.path = path
        self.source = source
        self.module = module
        self.flow = flow
        self.vars = vars or {}
        self.export = export or []
        self.namespace = ""

    async def run(self):
        flow = Flow.load(
            path=self.path,
            source=self.source,
            module=self.module,
            flow=self.flow,
        )
        for task in flow:
            task.set_namespace(self.namespace)

        flow_context = await flow.run(vars=self.vars)

        if flow.exception:
            raise flow.exception

        return {
            key: hcl2.evaluate(hcl2.loads(value, "eval"), flow_context)
            for export in self.export
            for key, value in export.items()
        }


class SequentialRunner:
    """This runner runs the flow tasks sequentially."""

    def __init__(self, flow):
        self.flow = flow
        self.results = {}

    def check_depends_on(self, task):
        for upstream_task in self.flow.graph.predecessors(task):
            if self.results[upstream_task].status in (
                TaskStatus.FAILED,
                TaskStatus.CANCELED,
            ):
                return False
        return True

    async def run_task(self, task, context):
        if not self.check_depends_on(task):
            logger.info(
                '"%s" is canceled due to previous task failed run.',
                f"task.{task.type}.{task.name}",
            )
            return TaskResult(TaskStatus.CANCELED)

        task_result = await task.run(context)

        if task_result.status == TaskStatus.SUCCESS:
            context["task"][task.type][task.name] = task_result.result

        return task_result

    async def run(self, context):
        """Run flow tasks."""
        for task in self.flow:
            self.results[task] = await self.run_task(task, context)


# pylint: disable=too-many-public-methods
class Flow:
    """Flow object manages the flow graph and the order of task executions."""

    def __init__(self, name, runner_cls=None):
        self.name = name
        self.graph = networkx.DiGraph()
        self.runner = (runner_cls or SequentialRunner)(self)
        self.vars = {}
        self.functions = {}

    def __iter__(self):
        """Iterate through all tasks in a dependent order."""
        try:
            yield from networkx.topological_sort(self.graph)
        except networkx.exception.NetworkXUnfeasible as err:
            raise RunflowAcyclicTasksError(str(err)) from err

    @property
    def exception(self):
        return next(
            (
                task_result.exception
                for task_result in self.runner.results.values()
                if task_result.status == TaskStatus.FAILED
            ),
            None,
        )

    @classmethod
    def from_spec(cls, source):
        """Load flow from a .hcl file content."""
        try:
            flow = hcl2.loads(source)
        except LarkError as err:
            raise RunflowSyntaxError(str(err)) from err

        assert "flow" in flow, "Need a flow block"
        assert len(flow["flow"]) == 1, "Runflow spec should have only one flow"

        flow_spec = flow["flow"][0]
        flow_name, flow_body = next(iter(flow_spec.items()))
        flow = cls(name=flow_name)
        flow.load_flow_spec_body(flow_body)
        return flow

    @classmethod
    def from_specfile(cls, path):
        """Load flow from a given file path."""
        with open(path) as file:
            flow_spec = file.read()
        return cls.from_spec(flow_spec)

    @classmethod
    def load(cls, path=None, source=None, module=None, flow=None):
        if path:
            return Flow.from_specfile(path)

        if source:
            return Flow.from_spec(source)

        if module:
            return import_module(module)

        return flow

    def add_task(self, task):
        """Add task to flow graph."""
        self.graph.add_node(task)

    def set_dependency(self, task, depends_on):
        """Let `task` depends on `depends_on`,

        e.g. `depends_on` is a predecessor edge of `task`.

        """
        self.graph.add_edge(depends_on, task)

    def set_default_var(self, name, value):
        """Set default value for variable."""
        self.vars[name] = value

    def load_function(self, func_name, import_string):
        """Load the imported function to task func namespace."""
        function = utils.import_module(import_string)
        self.functions[func_name] = function

    # pylint: disable=no-self-use
    def load_flow_tasks_from_spec(self, tasks_spec):
        """Load the `task` blocks."""
        for task_spec in tasks_spec:
            for task_type in task_spec:
                task_class = get_task_class(task_type)
                for task_name in task_spec[task_type]:
                    task_payload = dict(task_spec[task_type][task_name])
                    yield Task(task_type, task_class, task_name, task_payload)

    def load_task_by_task_reference(self, depends_on):
        """Find task by a reference like `task.TASK_TYPE.TASK_NAME`."""
        task_key = depends_on.split(".")
        assert len(task_key) == 3 and task_key[0] == "task"

        _, task_type, task_name = task_key

        for task in self.graph.nodes:
            if task.type == task_type and task.name == task_name:
                return task

        raise RunflowSyntaxError(
            f"Task depends on {task_key} "
            f"but the dependent task does not exist"
        )

    def load_flow_tasks_dependencies(self, task):
        """Find task dependencies."""
        deps_set = set()
        for key, value in task.payload.items():
            hcl2.resolve_deps(key, deps_set)
            hcl2.resolve_deps(value, deps_set)

        for dep in deps_set:
            yield self.load_task_by_task_reference(dep)

    def set_tasks_dependencies(self):
        """Walk the task graph and sort out the task dependencies."""
        for task in self.graph.nodes:
            explicit_deps = self.load_flow_tasks_dependencies(task)
            for dep in explicit_deps:
                self.set_dependency(task, dep)

    def load_flow_variables(self, vars_spec):
        """Load the `variable` block."""
        for var_spec in vars_spec:
            var_name = next(iter(var_spec.keys()))
            var_value_spec = next(iter(var_spec.values()))
            var_value = config(
                f"RUNFLOW_VAR_{var_name}", default=None
            ) or var_value_spec.get("default", None)
            var_required = var_value_spec.get("required")
            if var_required and var_value is None:
                raise NameError(f"{var_name} is required but not provided.")
            self.set_default_var(var_name, var_value)

    # pylint: disable=no-self-use
    def load_flow_imported_tasks(self, tasks):
        """Load the `import.tasks` attribute."""
        for task_name, task_impl in tasks.items():
            register_task_class(task_name, task_impl)

    def load_flow_imported_functions(self, functions):
        """Load the `import.functions` attribute."""
        for func_name, func_import in functions.items():
            self.load_function(func_name, func_import)

    def load_flow_imports(self, imports):
        """Load the `import` block."""
        for _import in imports:
            self.load_flow_imported_tasks(_import.get("tasks", []))
            self.load_flow_imported_functions(_import.get("functions", {}))

    def load_flow_tasks(self, tasks):
        for task in self.load_flow_tasks_from_spec(tasks):
            self.add_task(task)

    def load_flow_spec_body(self, spec):
        """Load the body of a flow block."""
        self.load_flow_imports(spec.get("import", []))
        self.load_flow_variables(spec.get("variable", []))
        self.load_flow_tasks(spec.get("task", []))
        self.set_tasks_dependencies()

    def make_run_context(self, vars=None):
        """Prepare the context for a task run."""
        context = {
            "var": dict(self.vars, **dict(vars or {})),
            "task": {},
            "func": self.functions,
        }
        for task in self:
            context["task"].setdefault(task.type, {})
            context["task"][task.type].setdefault(task.name, {})
        return context

    def as_task(self, vars=None):
        return FlowRunTask(flow=self, vars=vars)

    async def run(self, vars=None):
        """Run a flow."""
        context = self.make_run_context(vars)
        await self.runner.run(context)
        return context

Classes

class Flow (name, runner_cls=None)

Flow object manages the flow graph and the order of task executions.

Expand source code
class Flow:
    """Flow object manages the flow graph and the order of task executions."""

    def __init__(self, name, runner_cls=None):
        self.name = name
        self.graph = networkx.DiGraph()
        self.runner = (runner_cls or SequentialRunner)(self)
        self.vars = {}
        self.functions = {}

    def __iter__(self):
        """Iterate through all tasks in a dependent order."""
        try:
            yield from networkx.topological_sort(self.graph)
        except networkx.exception.NetworkXUnfeasible as err:
            raise RunflowAcyclicTasksError(str(err)) from err

    @property
    def exception(self):
        return next(
            (
                task_result.exception
                for task_result in self.runner.results.values()
                if task_result.status == TaskStatus.FAILED
            ),
            None,
        )

    @classmethod
    def from_spec(cls, source):
        """Load flow from a .hcl file content."""
        try:
            flow = hcl2.loads(source)
        except LarkError as err:
            raise RunflowSyntaxError(str(err)) from err

        assert "flow" in flow, "Need a flow block"
        assert len(flow["flow"]) == 1, "Runflow spec should have only one flow"

        flow_spec = flow["flow"][0]
        flow_name, flow_body = next(iter(flow_spec.items()))
        flow = cls(name=flow_name)
        flow.load_flow_spec_body(flow_body)
        return flow

    @classmethod
    def from_specfile(cls, path):
        """Load flow from a given file path."""
        with open(path) as file:
            flow_spec = file.read()
        return cls.from_spec(flow_spec)

    @classmethod
    def load(cls, path=None, source=None, module=None, flow=None):
        if path:
            return Flow.from_specfile(path)

        if source:
            return Flow.from_spec(source)

        if module:
            return import_module(module)

        return flow

    def add_task(self, task):
        """Add task to flow graph."""
        self.graph.add_node(task)

    def set_dependency(self, task, depends_on):
        """Let `task` depends on `depends_on`,

        e.g. `depends_on` is a predecessor edge of `task`.

        """
        self.graph.add_edge(depends_on, task)

    def set_default_var(self, name, value):
        """Set default value for variable."""
        self.vars[name] = value

    def load_function(self, func_name, import_string):
        """Load the imported function to task func namespace."""
        function = utils.import_module(import_string)
        self.functions[func_name] = function

    # pylint: disable=no-self-use
    def load_flow_tasks_from_spec(self, tasks_spec):
        """Load the `task` blocks."""
        for task_spec in tasks_spec:
            for task_type in task_spec:
                task_class = get_task_class(task_type)
                for task_name in task_spec[task_type]:
                    task_payload = dict(task_spec[task_type][task_name])
                    yield Task(task_type, task_class, task_name, task_payload)

    def load_task_by_task_reference(self, depends_on):
        """Find task by a reference like `task.TASK_TYPE.TASK_NAME`."""
        task_key = depends_on.split(".")
        assert len(task_key) == 3 and task_key[0] == "task"

        _, task_type, task_name = task_key

        for task in self.graph.nodes:
            if task.type == task_type and task.name == task_name:
                return task

        raise RunflowSyntaxError(
            f"Task depends on {task_key} "
            f"but the dependent task does not exist"
        )

    def load_flow_tasks_dependencies(self, task):
        """Find task dependencies."""
        deps_set = set()
        for key, value in task.payload.items():
            hcl2.resolve_deps(key, deps_set)
            hcl2.resolve_deps(value, deps_set)

        for dep in deps_set:
            yield self.load_task_by_task_reference(dep)

    def set_tasks_dependencies(self):
        """Walk the task graph and sort out the task dependencies."""
        for task in self.graph.nodes:
            explicit_deps = self.load_flow_tasks_dependencies(task)
            for dep in explicit_deps:
                self.set_dependency(task, dep)

    def load_flow_variables(self, vars_spec):
        """Load the `variable` block."""
        for var_spec in vars_spec:
            var_name = next(iter(var_spec.keys()))
            var_value_spec = next(iter(var_spec.values()))
            var_value = config(
                f"RUNFLOW_VAR_{var_name}", default=None
            ) or var_value_spec.get("default", None)
            var_required = var_value_spec.get("required")
            if var_required and var_value is None:
                raise NameError(f"{var_name} is required but not provided.")
            self.set_default_var(var_name, var_value)

    # pylint: disable=no-self-use
    def load_flow_imported_tasks(self, tasks):
        """Load the `import.tasks` attribute."""
        for task_name, task_impl in tasks.items():
            register_task_class(task_name, task_impl)

    def load_flow_imported_functions(self, functions):
        """Load the `import.functions` attribute."""
        for func_name, func_import in functions.items():
            self.load_function(func_name, func_import)

    def load_flow_imports(self, imports):
        """Load the `import` block."""
        for _import in imports:
            self.load_flow_imported_tasks(_import.get("tasks", []))
            self.load_flow_imported_functions(_import.get("functions", {}))

    def load_flow_tasks(self, tasks):
        for task in self.load_flow_tasks_from_spec(tasks):
            self.add_task(task)

    def load_flow_spec_body(self, spec):
        """Load the body of a flow block."""
        self.load_flow_imports(spec.get("import", []))
        self.load_flow_variables(spec.get("variable", []))
        self.load_flow_tasks(spec.get("task", []))
        self.set_tasks_dependencies()

    def make_run_context(self, vars=None):
        """Prepare the context for a task run."""
        context = {
            "var": dict(self.vars, **dict(vars or {})),
            "task": {},
            "func": self.functions,
        }
        for task in self:
            context["task"].setdefault(task.type, {})
            context["task"][task.type].setdefault(task.name, {})
        return context

    def as_task(self, vars=None):
        return FlowRunTask(flow=self, vars=vars)

    async def run(self, vars=None):
        """Run a flow."""
        context = self.make_run_context(vars)
        await self.runner.run(context)
        return context

Static methods

def from_spec(source)

Load flow from a .hcl file content.

Expand source code
@classmethod
def from_spec(cls, source):
    """Load flow from a .hcl file content."""
    try:
        flow = hcl2.loads(source)
    except LarkError as err:
        raise RunflowSyntaxError(str(err)) from err

    assert "flow" in flow, "Need a flow block"
    assert len(flow["flow"]) == 1, "Runflow spec should have only one flow"

    flow_spec = flow["flow"][0]
    flow_name, flow_body = next(iter(flow_spec.items()))
    flow = cls(name=flow_name)
    flow.load_flow_spec_body(flow_body)
    return flow
def from_specfile(path)

Load flow from a given file path.

Expand source code
@classmethod
def from_specfile(cls, path):
    """Load flow from a given file path."""
    with open(path) as file:
        flow_spec = file.read()
    return cls.from_spec(flow_spec)
def load(path=None, source=None, module=None, flow=None)
Expand source code
@classmethod
def load(cls, path=None, source=None, module=None, flow=None):
    if path:
        return Flow.from_specfile(path)

    if source:
        return Flow.from_spec(source)

    if module:
        return import_module(module)

    return flow

Instance variables

var exception
Expand source code
@property
def exception(self):
    return next(
        (
            task_result.exception
            for task_result in self.runner.results.values()
            if task_result.status == TaskStatus.FAILED
        ),
        None,
    )

Methods

def add_task(self, task)

Add task to flow graph.

Expand source code
def add_task(self, task):
    """Add task to flow graph."""
    self.graph.add_node(task)
def as_task(self, vars=None)
Expand source code
def as_task(self, vars=None):
    return FlowRunTask(flow=self, vars=vars)
def load_flow_imported_functions(self, functions)

Load the import.functions attribute.

Expand source code
def load_flow_imported_functions(self, functions):
    """Load the `import.functions` attribute."""
    for func_name, func_import in functions.items():
        self.load_function(func_name, func_import)
def load_flow_imported_tasks(self, tasks)

Load the import.tasks attribute.

Expand source code
def load_flow_imported_tasks(self, tasks):
    """Load the `import.tasks` attribute."""
    for task_name, task_impl in tasks.items():
        register_task_class(task_name, task_impl)
def load_flow_imports(self, imports)

Load the import block.

Expand source code
def load_flow_imports(self, imports):
    """Load the `import` block."""
    for _import in imports:
        self.load_flow_imported_tasks(_import.get("tasks", []))
        self.load_flow_imported_functions(_import.get("functions", {}))
def load_flow_spec_body(self, spec)

Load the body of a flow block.

Expand source code
def load_flow_spec_body(self, spec):
    """Load the body of a flow block."""
    self.load_flow_imports(spec.get("import", []))
    self.load_flow_variables(spec.get("variable", []))
    self.load_flow_tasks(spec.get("task", []))
    self.set_tasks_dependencies()
def load_flow_tasks(self, tasks)
Expand source code
def load_flow_tasks(self, tasks):
    for task in self.load_flow_tasks_from_spec(tasks):
        self.add_task(task)
def load_flow_tasks_dependencies(self, task)

Find task dependencies.

Expand source code
def load_flow_tasks_dependencies(self, task):
    """Find task dependencies."""
    deps_set = set()
    for key, value in task.payload.items():
        hcl2.resolve_deps(key, deps_set)
        hcl2.resolve_deps(value, deps_set)

    for dep in deps_set:
        yield self.load_task_by_task_reference(dep)
def load_flow_tasks_from_spec(self, tasks_spec)

Load the task blocks.

Expand source code
def load_flow_tasks_from_spec(self, tasks_spec):
    """Load the `task` blocks."""
    for task_spec in tasks_spec:
        for task_type in task_spec:
            task_class = get_task_class(task_type)
            for task_name in task_spec[task_type]:
                task_payload = dict(task_spec[task_type][task_name])
                yield Task(task_type, task_class, task_name, task_payload)
def load_flow_variables(self, vars_spec)

Load the variable block.

Expand source code
def load_flow_variables(self, vars_spec):
    """Load the `variable` block."""
    for var_spec in vars_spec:
        var_name = next(iter(var_spec.keys()))
        var_value_spec = next(iter(var_spec.values()))
        var_value = config(
            f"RUNFLOW_VAR_{var_name}", default=None
        ) or var_value_spec.get("default", None)
        var_required = var_value_spec.get("required")
        if var_required and var_value is None:
            raise NameError(f"{var_name} is required but not provided.")
        self.set_default_var(var_name, var_value)
def load_function(self, func_name, import_string)

Load the imported function to task func namespace.

Expand source code
def load_function(self, func_name, import_string):
    """Load the imported function to task func namespace."""
    function = utils.import_module(import_string)
    self.functions[func_name] = function
def load_task_by_task_reference(self, depends_on)

Find task by a reference like task.TASK_TYPE.TASK_NAME.

Expand source code
def load_task_by_task_reference(self, depends_on):
    """Find task by a reference like `task.TASK_TYPE.TASK_NAME`."""
    task_key = depends_on.split(".")
    assert len(task_key) == 3 and task_key[0] == "task"

    _, task_type, task_name = task_key

    for task in self.graph.nodes:
        if task.type == task_type and task.name == task_name:
            return task

    raise RunflowSyntaxError(
        f"Task depends on {task_key} "
        f"but the dependent task does not exist"
    )
def make_run_context(self, vars=None)

Prepare the context for a task run.

Expand source code
def make_run_context(self, vars=None):
    """Prepare the context for a task run."""
    context = {
        "var": dict(self.vars, **dict(vars or {})),
        "task": {},
        "func": self.functions,
    }
    for task in self:
        context["task"].setdefault(task.type, {})
        context["task"][task.type].setdefault(task.name, {})
    return context
async def run(self, vars=None)

Run a flow.

Expand source code
async def run(self, vars=None):
    """Run a flow."""
    context = self.make_run_context(vars)
    await self.runner.run(context)
    return context
def set_default_var(self, name, value)

Set default value for variable.

Expand source code
def set_default_var(self, name, value):
    """Set default value for variable."""
    self.vars[name] = value
def set_dependency(self, task, depends_on)

Let task depends on depends_on,

e.g. depends_on is a predecessor edge of task.

Expand source code
def set_dependency(self, task, depends_on):
    """Let `task` depends on `depends_on`,

    e.g. `depends_on` is a predecessor edge of `task`.

    """
    self.graph.add_edge(depends_on, task)
def set_tasks_dependencies(self)

Walk the task graph and sort out the task dependencies.

Expand source code
def set_tasks_dependencies(self):
    """Walk the task graph and sort out the task dependencies."""
    for task in self.graph.nodes:
        explicit_deps = self.load_flow_tasks_dependencies(task)
        for dep in explicit_deps:
            self.set_dependency(task, dep)
class FlowRunTask (path=None, source=None, module=None, flow=None, vars=None, export=None)
Expand source code
class FlowRunTask:
    def __init__(
        self,
        path=None,
        source=None,
        module=None,
        flow=None,
        vars=None,
        export=None,
    ):
        self.path = path
        self.source = source
        self.module = module
        self.flow = flow
        self.vars = vars or {}
        self.export = export or []
        self.namespace = ""

    async def run(self):
        flow = Flow.load(
            path=self.path,
            source=self.source,
            module=self.module,
            flow=self.flow,
        )
        for task in flow:
            task.set_namespace(self.namespace)

        flow_context = await flow.run(vars=self.vars)

        if flow.exception:
            raise flow.exception

        return {
            key: hcl2.evaluate(hcl2.loads(value, "eval"), flow_context)
            for export in self.export
            for key, value in export.items()
        }

Methods

async def run(self)
Expand source code
async def run(self):
    flow = Flow.load(
        path=self.path,
        source=self.source,
        module=self.module,
        flow=self.flow,
    )
    for task in flow:
        task.set_namespace(self.namespace)

    flow_context = await flow.run(vars=self.vars)

    if flow.exception:
        raise flow.exception

    return {
        key: hcl2.evaluate(hcl2.loads(value, "eval"), flow_context)
        for export in self.export
        for key, value in export.items()
    }
class SequentialRunner (flow)

This runner runs the flow tasks sequentially.

Expand source code
class SequentialRunner:
    """This runner runs the flow tasks sequentially."""

    def __init__(self, flow):
        self.flow = flow
        self.results = {}

    def check_depends_on(self, task):
        for upstream_task in self.flow.graph.predecessors(task):
            if self.results[upstream_task].status in (
                TaskStatus.FAILED,
                TaskStatus.CANCELED,
            ):
                return False
        return True

    async def run_task(self, task, context):
        if not self.check_depends_on(task):
            logger.info(
                '"%s" is canceled due to previous task failed run.',
                f"task.{task.type}.{task.name}",
            )
            return TaskResult(TaskStatus.CANCELED)

        task_result = await task.run(context)

        if task_result.status == TaskStatus.SUCCESS:
            context["task"][task.type][task.name] = task_result.result

        return task_result

    async def run(self, context):
        """Run flow tasks."""
        for task in self.flow:
            self.results[task] = await self.run_task(task, context)

Methods

def check_depends_on(self, task)
Expand source code
def check_depends_on(self, task):
    for upstream_task in self.flow.graph.predecessors(task):
        if self.results[upstream_task].status in (
            TaskStatus.FAILED,
            TaskStatus.CANCELED,
        ):
            return False
    return True
async def run(self, context)

Run flow tasks.

Expand source code
async def run(self, context):
    """Run flow tasks."""
    for task in self.flow:
        self.results[task] = await self.run_task(task, context)
async def run_task(self, task, context)
Expand source code
async def run_task(self, task, context):
    if not self.check_depends_on(task):
        logger.info(
            '"%s" is canceled due to previous task failed run.',
            f"task.{task.type}.{task.name}",
        )
        return TaskResult(TaskStatus.CANCELED)

    task_result = await task.run(context)

    if task_result.status == TaskStatus.SUCCESS:
        context["task"][task.type][task.name] = task_result.result

    return task_result
class Task (type, runner, name, payload)

Task object maintains the running status.

Expand source code
class Task:
    """Task object maintains the running status."""

    def __init__(self, type, runner, name, payload):
        self.type = type
        self.runner = runner
        self.name = name
        self.payload = payload
        self.namespace = ""

    def __repr__(self):
        return (
            f"Task(type={self.type}, "
            f"name={self.name}, "
            f"payload={self.payload})"
        )

    def __str__(self):
        return (
            f"{self.namespace}{self.namespace and ' > ' or ''}"
            f"task.{self.type}.{self.name}"
        )

    def __hash__(self):
        return hash(repr(self))

    def __eq__(self, o):
        return self.type == o.type and self.name == o.name

    def set_namespace(self, namespace):
        self.namespace = namespace

    def should_run(self, context):
        return all(
            hcl2.evaluate(depends_on, context)
            for depends_on in self.payload.get(DEPENDS_ON_KEY, [])
        )

    def should_timeout(self, task, context):
        if "_timeout" not in self.payload:
            return None
        _timeout = hcl2.evaluate(self.payload["_timeout"], context)
        assert isinstance(_timeout, (int, float))
        return _timeout

    def should_rerun(self, task, context):  # pylint: disable=too-many-branches
        if "_retry" not in self.payload:
            return task.run

        _retry = hcl2.evaluate(self.payload.get("_retry", {}), context)
        _retry_params = {}
        stop_after = _retry.get("stop_after", "1 times")
        stopers = []
        for _stop_after in stop_after.split("|"):
            _stop_after = _stop_after.strip()
            if "times" in _stop_after:
                attempts = int(_stop_after.replace("times", "").strip())
                stoper = stop_after_attempt(attempts)
            elif "seconds" in _stop_after:
                seconds = float(_stop_after.replace("seconds", "").strip())
                stoper = stop_after_delay(seconds)
            else:
                raise ValueError(f"invalid _retry.stop_after: {_stop_after}")
            stopers.append(stoper)

        _retry_params["stop"] = reduce(lambda a, b: a | b, stopers)
        _retry_params["wait"] = _retry.get("wait", None)
        _retry_params["reraise"] = True
        return retry(**_retry_params)(task.run)

    def eval_payload(self, context):
        return hcl2.evaluate(
            {k: v for k, v in self.payload.items() if not k.startswith("_")},
            context,
        )

    async def run(self, context):
        """Run a task."""
        if self.type == "hcl2_template":
            context = dict(
                context,
                **hcl2.evaluate(self.payload.get("context", {}), context),
            )

        if not self.should_run(context):
            logger.info('"%s" is canceled due to falsy deps.', str(self))
            return TaskResult(TaskStatus.CANCELED)

        _payload = self.eval_payload(context)
        context["task"][self.type][self.name].update(_payload)

        task_result = TaskResult(TaskStatus.PENDING)

        try:
            logger.info('"%s" is started.', str(self))
            if isinstance(self.runner, Flow):
                task = self.runner.as_task(_payload)
            else:
                task = self.runner(**_payload)

            if isinstance(task, FlowRunTask):
                task.namespace = str(self)

            task_run = self.should_rerun(task, context)
            task_result.result = (
                await asyncio.wait_for(
                    task_run()
                    if inspect.iscoroutinefunction(task.run)
                    else utils.to_thread(task_run),
                    timeout=self.should_timeout(task, context),
                )
                or {}
            )
            task_result.result.update(_payload)
            logger.info('"%s" is successful.', str(self))
        except Exception as err:  # pylint: disable=broad-except
            task_result.exception = err
            logger.info('"%s" is failed.', str(self))
            traceback.print_exc()

        return task_result

Methods

def eval_payload(self, context)
Expand source code
def eval_payload(self, context):
    return hcl2.evaluate(
        {k: v for k, v in self.payload.items() if not k.startswith("_")},
        context,
    )
async def run(self, context)

Run a task.

Expand source code
async def run(self, context):
    """Run a task."""
    if self.type == "hcl2_template":
        context = dict(
            context,
            **hcl2.evaluate(self.payload.get("context", {}), context),
        )

    if not self.should_run(context):
        logger.info('"%s" is canceled due to falsy deps.', str(self))
        return TaskResult(TaskStatus.CANCELED)

    _payload = self.eval_payload(context)
    context["task"][self.type][self.name].update(_payload)

    task_result = TaskResult(TaskStatus.PENDING)

    try:
        logger.info('"%s" is started.', str(self))
        if isinstance(self.runner, Flow):
            task = self.runner.as_task(_payload)
        else:
            task = self.runner(**_payload)

        if isinstance(task, FlowRunTask):
            task.namespace = str(self)

        task_run = self.should_rerun(task, context)
        task_result.result = (
            await asyncio.wait_for(
                task_run()
                if inspect.iscoroutinefunction(task.run)
                else utils.to_thread(task_run),
                timeout=self.should_timeout(task, context),
            )
            or {}
        )
        task_result.result.update(_payload)
        logger.info('"%s" is successful.', str(self))
    except Exception as err:  # pylint: disable=broad-except
        task_result.exception = err
        logger.info('"%s" is failed.', str(self))
        traceback.print_exc()

    return task_result
def set_namespace(self, namespace)
Expand source code
def set_namespace(self, namespace):
    self.namespace = namespace
def should_rerun(self, task, context)
Expand source code
def should_rerun(self, task, context):  # pylint: disable=too-many-branches
    if "_retry" not in self.payload:
        return task.run

    _retry = hcl2.evaluate(self.payload.get("_retry", {}), context)
    _retry_params = {}
    stop_after = _retry.get("stop_after", "1 times")
    stopers = []
    for _stop_after in stop_after.split("|"):
        _stop_after = _stop_after.strip()
        if "times" in _stop_after:
            attempts = int(_stop_after.replace("times", "").strip())
            stoper = stop_after_attempt(attempts)
        elif "seconds" in _stop_after:
            seconds = float(_stop_after.replace("seconds", "").strip())
            stoper = stop_after_delay(seconds)
        else:
            raise ValueError(f"invalid _retry.stop_after: {_stop_after}")
        stopers.append(stoper)

    _retry_params["stop"] = reduce(lambda a, b: a | b, stopers)
    _retry_params["wait"] = _retry.get("wait", None)
    _retry_params["reraise"] = True
    return retry(**_retry_params)(task.run)
def should_run(self, context)
Expand source code
def should_run(self, context):
    return all(
        hcl2.evaluate(depends_on, context)
        for depends_on in self.payload.get(DEPENDS_ON_KEY, [])
    )
def should_timeout(self, task, context)
Expand source code
def should_timeout(self, task, context):
    if "_timeout" not in self.payload:
        return None
    _timeout = hcl2.evaluate(self.payload["_timeout"], context)
    assert isinstance(_timeout, (int, float))
    return _timeout
class TaskResult (status)

Task execution result.

Expand source code
class TaskResult:
    """Task execution result."""

    def __init__(self, status):
        self.status = status
        self._result = None
        self._exception = None

    @property
    def result(self):
        """Get task result."""
        if self._exception:
            raise ValueError(
                "Task has no result due to a failed run."
            ) from self._exception
        return self._result

    @result.setter
    def result(self, _result):
        """Set task result."""
        self.status = TaskStatus.SUCCESS
        self._result = _result

    @property
    def exception(self):
        """Get task exception."""
        if self._result:
            raise ValueError("Task has no exception due to a successful run.")
        return self._exception

    @exception.setter
    def exception(self, _exception):
        """Set task exception."""
        self.status = TaskStatus.FAILED
        self._exception = _exception

Instance variables

var exception

Get task exception.

Expand source code
@property
def exception(self):
    """Get task exception."""
    if self._result:
        raise ValueError("Task has no exception due to a successful run.")
    return self._exception
var result

Get task result.

Expand source code
@property
def result(self):
    """Get task result."""
    if self._exception:
        raise ValueError(
            "Task has no result due to a failed run."
        ) from self._exception
    return self._result
class TaskStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)

Task execution status.

Expand source code
class TaskStatus(enum.Enum):
    """Task execution status."""

    PENDING = enum.auto()
    SUCCESS = enum.auto()
    FAILED = enum.auto()
    CANCELED = enum.auto()

Ancestors

  • enum.Enum

Class variables

var CANCELED
var FAILED
var PENDING
var SUCCESS