Module runflow.registry
Expand source code
from typing import Dict, Union, Type
from .errors import RunflowReadOnlyError
from .utils import import_module
task_implementations = {
# Core tasks
"bash_run": {
"class": "runflow.contribs.bash:BashRunTask",
},
"flow_run": {
"class": "runflow.core:FlowRunTask",
},
"file_read": {
"class": "runflow.contribs.file:FileReadTask",
},
"file_write": {
"class": "runflow.contribs.file:FileWriteTask",
},
"hcl2_template": {
"class": "runflow.contribs.template:Hcl2TemplateTask",
},
"http_request": {
"class": "runflow.contribs.http:HttpRequestTask",
},
"smtp_send": {
"class": "runflow.contribs.mail:SmtpSendTask",
},
"sql_exec": {
"class": "runflow.contribs.sql:SqlExecTask",
},
"sql_row": {
"class": "runflow.contribs.sql:SqlRowTask",
},
# Community tasks
"docker_run": {
"class": "runflow.community.docker:DockerRunTask",
},
"feed_parse": {
"class": "runflow.community.rss:FeedParseTask",
},
"notion_api_call": {
"class": "runflow.community.notion:NotionApiCallTask",
},
"papermill_execute": {
"class": "runflow.community.papermill:PapermillExecuteTask",
},
"pushbullet_push": {
"class": "runflow.community.pushbullet:PushbulletPushTask",
},
"qrcode_generate": {
"class": "runflow.community.qrcode:QrcodeGenerateTask",
},
"slack_api_call": {
"class": "runflow.community.slack:SlackApiCallTask",
},
"telegram_api_call": {
"class": "runflow.community.telegram:TelegramApiCallTask",
},
}
class ImmutableDict(dict):
"""Dict but immutable.
Getitem is allowed; set/del/clear are disallowed.
"""
def __init__(self, target: Dict[str, dict]):
super().__init__()
self.target = target
def __getitem__(self, item: str):
return self.target[item]
def __delitem__(self, key: str):
raise RunflowReadOnlyError
def __setitem__(self, key: str, value):
raise RunflowReadOnlyError
def clear(self):
raise RunflowReadOnlyError
def __contains__(self, item):
return item in self.target
def __iter__(self):
yield from self.target
_registry: Dict[str, dict] = {}
registry = ImmutableDict(_registry)
def register_task_class(
name: str, cls: Union[str, Type], overwrite: bool = False
):
"""Register a task class with a name."""
if isinstance(cls, str):
if name in task_implementations and not overwrite:
raise ValueError(
f"Task name {name} has been taken in the "
"default task implementation."
)
task_implementations[name] = {"class": cls}
return
if name in registry and not overwrite:
raise ValueError(f"Task name {name} has been taken in the registry.")
if not hasattr(cls, "run"):
raise ValueError("cls is not runnable.")
_registry[name] = {"class": cls}
def get_task_class(name: str):
"""Get the task class by name."""
if name not in registry:
if name not in task_implementations:
raise ValueError(f"Unknown task type: {name}")
impl = import_module(task_implementations[name]["class"])
register_task_class(name, impl)
return registry[name]["class"]
Functions
def get_task_class(name: str)
-
Get the task class by name.
Expand source code
def get_task_class(name: str): """Get the task class by name.""" if name not in registry: if name not in task_implementations: raise ValueError(f"Unknown task type: {name}") impl = import_module(task_implementations[name]["class"]) register_task_class(name, impl) return registry[name]["class"]
def register_task_class(name: str, cls: Union[str, Type], overwrite: bool = False)
-
Register a task class with a name.
Expand source code
def register_task_class( name: str, cls: Union[str, Type], overwrite: bool = False ): """Register a task class with a name.""" if isinstance(cls, str): if name in task_implementations and not overwrite: raise ValueError( f"Task name {name} has been taken in the " "default task implementation." ) task_implementations[name] = {"class": cls} return if name in registry and not overwrite: raise ValueError(f"Task name {name} has been taken in the registry.") if not hasattr(cls, "run"): raise ValueError("cls is not runnable.") _registry[name] = {"class": cls}
Classes
class ImmutableDict (target: Dict[str, dict])
-
Dict but immutable.
Getitem is allowed; set/del/clear are disallowed.
Expand source code
class ImmutableDict(dict): """Dict but immutable. Getitem is allowed; set/del/clear are disallowed. """ def __init__(self, target: Dict[str, dict]): super().__init__() self.target = target def __getitem__(self, item: str): return self.target[item] def __delitem__(self, key: str): raise RunflowReadOnlyError def __setitem__(self, key: str, value): raise RunflowReadOnlyError def clear(self): raise RunflowReadOnlyError def __contains__(self, item): return item in self.target def __iter__(self): yield from self.target
Ancestors
- builtins.dict
Methods
def clear(self)
-
D.clear() -> None. Remove all items from D.
Expand source code
def clear(self): raise RunflowReadOnlyError