TaskSpecs that have a 'times' attribute handle it consistently and more flexibly

This commit is contained in:
Samuel Abels
2017-03-16 08:51:14 +01:00
parent 9b1433325d
commit aadb75cc88
4 changed files with 19 additions and 17 deletions

View File

@ -31,7 +31,7 @@ class MultiInstance(TaskSpec):
This task has one or more inputs and may have any number of outputs.
"""
def __init__(self, parent, name, times = None, **kwargs):
def __init__(self, parent, name, times, **kwargs):
"""
Constructor.
@ -39,11 +39,13 @@ class MultiInstance(TaskSpec):
:param parent: A reference to the parent task spec.
:type name: str
:param name: The name of the task spec.
:type times: int
:type times: int or :class:`SpiffWorkflow.operators.Term`
:param times: The number of tasks to create.
:type kwargs: dict
:param kwargs: See :class:`SpiffWorkflow.specs.TaskSpec`.
"""
if times is None:
raise ValueError('times argument is required')
TaskSpec.__init__(self, parent, name, **kwargs)
self.times = times
@ -72,7 +74,7 @@ class MultiInstance(TaskSpec):
output._predict(new_task)
def _get_predicted_outputs(self, my_task):
split_n = my_task._get_internal_data('splits', 1)
split_n = int(valueof(my_task, self.times, 1))
# Predict the outputs.
outputs = []
@ -81,10 +83,8 @@ class MultiInstance(TaskSpec):
return outputs
def _predict_hook(self, my_task):
split_n = valueof(my_task, self.times)
if split_n is None:
return
my_task._set_internal_data(splits = split_n)
split_n = int(valueof(my_task, self.times, 1))
my_task._set_internal_data(splits=split_n)
# Create the outgoing tasks.
outputs = []

View File

@ -45,7 +45,7 @@ class ThreadSplit(TaskSpec):
:param parent: A reference to the parent (usually a workflow).
:type name: string
:param name: A name for the task.
:type times: int or None or :class:`SpiffWorkflow.operators.Term`
:type times: int or :class:`SpiffWorkflow.operators.Term`
:param times: The number of tasks to create.
:type suppress_threadstart_creation: bool
:param suppress_threadstart_creation: Don't create a ThreadStart, because

View File

@ -18,6 +18,7 @@ from __future__ import division
from SpiffWorkflow.Task import Task
from SpiffWorkflow.exceptions import WorkflowException
from SpiffWorkflow.specs.TaskSpec import TaskSpec
from SpiffWorkflow.operators import valueof
class Trigger(TaskSpec):
"""
@ -29,7 +30,7 @@ class Trigger(TaskSpec):
parallel split.
"""
def __init__(self, parent, name, context, times = 1, **kwargs):
def __init__(self, parent, name, context, times=1, **kwargs):
"""
Constructor.
@ -39,7 +40,7 @@ class Trigger(TaskSpec):
:param name: The name of the task spec.
:type context: list(str)
:param context: A list of the names of tasks that are to be triggered.
:type times: int or None
:type times: int or :class:`SpiffWorkflow.operators.Term`
:param times: The number of signals before the trigger fires.
:type kwargs: dict
:param kwargs: See :class:`SpiffWorkflow.specs.TaskSpec`.
@ -77,7 +78,8 @@ class Trigger(TaskSpec):
:rtype: bool
:returns: True on success, False otherwise.
"""
for i in range(self.times + self.queued):
times = int(valueof(my_task, self.times, 1)) + self.queued
for i in range(times):
for task_name in self.context:
task = my_task.workflow.get_task_spec_from_name(task_name)
task._on_trigger(my_task)

View File

@ -180,7 +180,7 @@ class DictionarySerializer(Serializer):
spec = CancelTask(wf_spec,
s_state['name'],
s_state['context'],
times=s_state['times'])
times=self.deserialize_arg(s_state['times']))
self.deserialize_task_spec(wf_spec, s_state, spec=spec)
return spec
@ -298,13 +298,13 @@ class DictionarySerializer(Serializer):
def serialize_multi_instance(self, spec):
s_state = self.serialize_task_spec(spec)
s_state['times'] = spec.times
s_state['times'] = self.serialize_arg(spec.times)
return s_state
def deserialize_multi_instance(self, wf_spec, s_state):
spec = MultiInstance(wf_spec,
s_state['name'],
times=s_state['times'])
times=self.deserialize_arg(s_state['times']))
self.deserialize_task_spec(wf_spec, s_state, spec=spec)
return spec
@ -375,7 +375,7 @@ class DictionarySerializer(Serializer):
def deserialize_thread_split(self, wf_spec, s_state):
spec = ThreadSplit(wf_spec,
s_state['name'],
self.deserialize_arg(s_state['times']),
times=self.deserialize_arg(s_state['times']),
suppress_threadstart_creation=True)
self.deserialize_task_spec(wf_spec, s_state, spec=spec)
return spec
@ -398,7 +398,7 @@ class DictionarySerializer(Serializer):
def serialize_trigger(self, spec):
s_state = self.serialize_task_spec(spec)
s_state['context'] = spec.context
s_state['times'] = spec.times
s_state['times'] = self.serialize_arg(spec.times)
s_state['queued'] = spec.queued
return s_state
@ -406,7 +406,7 @@ class DictionarySerializer(Serializer):
spec = Trigger(wf_spec,
s_state['name'],
s_state['context'],
times=s_state['times'])
self.deserialize_arg(s_state['times']))
self.deserialize_task_spec(wf_spec, s_state, spec=spec)
return spec