Skip to content
Snippets Groups Projects
Commit 2c821f19 authored by Jo Sama's avatar Jo Sama
Browse files

Merge pull request #29 in INT/tofh from support-stateful-tasks to master

* commit 'dc09e9f6':
  Support stateful tasks
parents b76a2ac4 dc09e9f6
No related branches found
No related tags found
No related merge requests found
......@@ -10,6 +10,8 @@ flask:
tasks:
- name: 'my_task_name'
call: 'my_module:my_function'
# stateless is the default:
# task_type: stateless
triggers:
- type: 'tofh.plugins.mq'
select:
......
......@@ -77,38 +77,44 @@ class TaskInfoWrapper(celery.Task):
raise self.retry(exc=e, **retry_opts)
def load_func_tasks(app, tasks):
def load_stateful_tasks(app, tasks):
"""
Load tasks from task list as celery tasks.
Load tasks from task list as stateful celery tasks.
Each task entry point is loaded immediately, and wrapped using
:py:func:`celery.Celery.task`.
:py:func:`celery.Celery.task`, which the entry point is bound to.
See :py:func:`tofh.demo.stateful_task` for an example.
:type app: celery.Celery
:type tasks: list
:param tasks:
A list of :py:class:`tofh.tasks.TaskInfo` objects to load.
"""
"""
for task in tasks:
opts = task_info_to_params(task)
opts['autoretry_for'] = (Exception, )
app.task(**opts)(task.entry_point.resolve())
if task.task_type == tofh.tasks.TaskType.stateful:
opts = task_info_to_params(task)
opts['autoretry_for'] = (Exception, )
app.task(bind=True, **opts)(task.entry_point.resolve())
def load_lazy_tasks(app, tasks):
"""
Load tasks from task list as custom celery tasks.
Load tasks from task list as custom stateless celery tasks.
Each task is wrapped in a :py:class:`TaskInfoWrapper` celery task, and
added to the celery app using :py:func:`celery.Celery.register_task`.
See :py:func:`tofh.demo.echo` for an example.
:type app: celery.Celery
:type tasks: list
:param tasks:
A list of :py:class:`tofh.tasks.TaskInfo` objects to load.
"""
for task in tasks:
app.register_task(TaskInfoWrapper(task))
if task.task_type == tofh.tasks.TaskType.stateless:
app.register_task(TaskInfoWrapper(task))
def autoload_app(config=None):
......@@ -126,7 +132,7 @@ def autoload_app(config=None):
tofh.logging.configure(config)
# load_func_tasks(app, config.tasks.values())
load_stateful_tasks(app, config.tasks.values())
load_lazy_tasks(app, config.tasks.values())
@celery.signals.setup_logging.connect(weak=False)
......
......@@ -6,6 +6,27 @@ import time
logger = logging.getLogger(__name__)
def extract_body_transform(event):
return (event.body, {})
def stateful_task(self, *args, **kwargs):
if getattr(self, 'var', None):
logger.info('self.var is %d', self.var)
else:
logger.info('self.var is unset')
self.var = random.randint(0, 100)
logger.info('self.var has been set to %d', self.var)
def failing_stateful_task(self, *args, **kwargs):
if getattr(self, 'var', None):
logger.info('self.var is %d', self.var)
else:
logger.info('self.var is unset')
self.var = random.randint(0, 100)
logger.info('self.var has been set to %d', self.var)
logger.info('Provoking TypeError')
'a' + 1
def echo(*args, **kwargs):
""" Return all arguments
......
import collections
import enum
import attr
import pkg_resources
@enum.unique
class TaskType(enum.Enum):
stateless = 'stateless'
stateful = 'stateful'
def get_entry_point(name, entry_point):
"""Create an EntryPoint object.
......@@ -69,6 +76,7 @@ class TaskInfo:
name = attr.ib(converter=str)
call = attr.ib(converter=str)
task_type = attr.ib(default=TaskType.stateless, converter=TaskType)
timeout_soft = attr.ib(default=None, converter=convert_none(int))
timeout_hard = attr.ib(default=120, converter=convert_none(int))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment