mirror of
https://github.com/sartography/SpiffWorkflow.git
synced 2025-08-02 00:59:41 +08:00
Merge pull request #389 from sartography/feature/task-data-reference
make TaskDataReference do something
This commit is contained in:
@ -76,9 +76,9 @@ class BpmnTaskSpec(TaskSpec):
|
||||
if self.io_specification is not None and len(self.io_specification.data_inputs) > 0:
|
||||
data = {}
|
||||
for var in self.io_specification.data_inputs:
|
||||
if var.bpmn_id not in my_task.data:
|
||||
if not var.exists(my_task):
|
||||
raise WorkflowDataException("Missing data input", task=my_task, data_input=var)
|
||||
data[var.bpmn_id] = my_task.data[var.bpmn_id]
|
||||
data[var.bpmn_id] = var.get(my_task)
|
||||
my_task.data = data
|
||||
|
||||
return True
|
||||
@ -88,9 +88,9 @@ class BpmnTaskSpec(TaskSpec):
|
||||
if self.io_specification is not None and len(self.io_specification.data_outputs) > 0:
|
||||
data = {}
|
||||
for var in self.io_specification.data_outputs:
|
||||
if var.bpmn_id not in my_task.data:
|
||||
if not var.exists(my_task):
|
||||
raise WorkflowDataException("Missing data ouput", task=my_task, data_output=var)
|
||||
data[var.bpmn_id] = my_task.data[var.bpmn_id]
|
||||
data[var.bpmn_id] = var.get(my_task)
|
||||
my_task.data = data
|
||||
|
||||
for obj in self.data_output_associations:
|
||||
@ -98,7 +98,7 @@ class BpmnTaskSpec(TaskSpec):
|
||||
|
||||
for obj in self.data_input_associations:
|
||||
# Remove the any copied input variables that might not have already been removed
|
||||
my_task.data.pop(obj.bpmn_id, None)
|
||||
obj.delete(my_task)
|
||||
|
||||
super()._on_complete_hook(my_task)
|
||||
|
||||
|
@ -77,7 +77,18 @@ class DataObject(BpmnDataSpecification):
|
||||
del my_task.data[self.bpmn_id]
|
||||
data_log.info(f'Set workflow variable {self.bpmn_id}', extra=my_task.log_info())
|
||||
|
||||
def delete(self, my_task):
|
||||
my_task.data.pop(self.bpmn_id, None)
|
||||
|
||||
|
||||
class TaskDataReference(BpmnDataSpecification):
|
||||
"""A representation of task data that can be used in a BPMN diagram"""
|
||||
pass
|
||||
|
||||
def get(self, my_task):
|
||||
return my_task.data.get(self.bpmn_id)
|
||||
|
||||
def set(self, my_task, value):
|
||||
my_task.data[self.bpmn_id] = value
|
||||
|
||||
def exists(self, my_task):
|
||||
return self.bpmn_id in my_task.data
|
@ -149,12 +149,12 @@ class MultiInstanceTask(LoopTask):
|
||||
"""This merges child data into this task's data."""
|
||||
|
||||
if self.data_output is not None and self.output_item is not None:
|
||||
if self.output_item.bpmn_id not in child.data:
|
||||
if not self.output_item.exists(child):
|
||||
self.raise_data_exception("Expected an output item", child)
|
||||
item = child.data[self.output_item.bpmn_id]
|
||||
item = self.output_item.get(child)
|
||||
key_or_index = child.internal_data.get('key_or_index')
|
||||
data_output = my_task.data[self.data_output.bpmn_id]
|
||||
data_input = my_task.data[self.data_input.bpmn_id] if self.data_input is not None else None
|
||||
data_output = self.data_output.get(my_task)
|
||||
data_input = self.data_input.get(my_task) if self.data_input is not None else None
|
||||
if key_or_index is not None and (isinstance(data_output, Mapping) or data_input is data_output):
|
||||
data_output[key_or_index] = item
|
||||
else:
|
||||
@ -167,12 +167,12 @@ class MultiInstanceTask(LoopTask):
|
||||
task_spec = my_task.workflow.spec.task_specs[self.task_spec]
|
||||
child = my_task._add_child(task_spec, TaskState.WAITING)
|
||||
child.triggered = True
|
||||
if self.input_item is not None and self.input_item.bpmn_id in my_task.data:
|
||||
if self.input_item is not None and self.input_item.exists(my_task):
|
||||
raise WorkflowDataException(f'Multiinstance input item {self.input_item.bpmn_id} already exists.', my_task)
|
||||
if self.output_item is not None and self.output_item.bpmn_id in my_task.data:
|
||||
if self.output_item is not None and self.output_item.exists(my_task):
|
||||
raise WorkflowDataException(f'Multiinstance output item {self.output_item.bpmn_id} already exists.', my_task)
|
||||
if self.input_item is not None:
|
||||
child.data[self.input_item.bpmn_id] = deepcopy(item)
|
||||
if self.input_item is not None:
|
||||
self.input_item.set(child, deepcopy(item))
|
||||
if key_or_index is not None:
|
||||
child.internal_data['key_or_index'] = key_or_index
|
||||
else:
|
||||
@ -188,19 +188,18 @@ class MultiInstanceTask(LoopTask):
|
||||
|
||||
def init_data_output_with_input_data(self, my_task, input_data):
|
||||
|
||||
name = self.data_output.bpmn_id
|
||||
if name not in my_task.data:
|
||||
if not self.data_output.exists(my_task):
|
||||
if isinstance(input_data, (MutableMapping, MutableSequence)):
|
||||
# We can use the same class if it implements __setitem__
|
||||
my_task.data[name] = input_data.__class__()
|
||||
self.data_output.set(my_task, input_data.__class__())
|
||||
elif isinstance(input_data, Mapping):
|
||||
# If we have a map without __setitem__, use a dict
|
||||
my_task.data[name] = dict()
|
||||
self.data_output.set(my_task, dict())
|
||||
else:
|
||||
# For all other types, we'll append to a list
|
||||
my_task.data[name] = list()
|
||||
self.data_output.set(my_task, list())
|
||||
else:
|
||||
output_data = my_task.data[self.data_output.bpmn_id]
|
||||
output_data = self.data_output.get(my_task)
|
||||
if not isinstance(output_data, (MutableSequence, MutableMapping)):
|
||||
self.raise_data_exception("Only a mutable map (dict) or sequence (list) can be used for output", my_task)
|
||||
if input_data is not output_data and not isinstance(output_data, Mapping) and len(output_data) > 0:
|
||||
@ -209,14 +208,15 @@ class MultiInstanceTask(LoopTask):
|
||||
|
||||
def init_data_output_with_cardinality(self, my_task):
|
||||
|
||||
name = self.data_output.bpmn_id
|
||||
if name not in my_task.data:
|
||||
my_task.data[name] = list()
|
||||
elif not isinstance(my_task.data[name], MutableMapping) and len(my_task.data[name]) > 0:
|
||||
self.raise_data_exception(
|
||||
"If loop cardinality is specificied, the output must be a map (dict) or empty sequence (list)",
|
||||
my_task
|
||||
)
|
||||
if not self.data_output.exists(my_task):
|
||||
self.data_output.set(my_task, list())
|
||||
else:
|
||||
data_output = self.data_output.get(my_task)
|
||||
if not isinstance(data_ouput, MutableMapping) and len(data_output) > 0:
|
||||
self.raise_data_exception(
|
||||
"If loop cardinality is specificied, the output must be a map (dict) or empty sequence (list)",
|
||||
my_task
|
||||
)
|
||||
|
||||
def raise_data_exception(self, message, my_task):
|
||||
raise WorkflowDataException(message, my_task, data_input=self.data_input, data_output=self.data_output)
|
||||
@ -260,7 +260,7 @@ class SequentialMultiInstanceTask(MultiInstanceTask):
|
||||
|
||||
def get_next_input_item(self, my_task):
|
||||
|
||||
input_data = my_task.data[self.data_input.bpmn_id]
|
||||
input_data = self.data_input.get(my_task)
|
||||
remaining = my_task.internal_data.get('remaining')
|
||||
|
||||
if remaining is None:
|
||||
@ -282,9 +282,9 @@ class SequentialMultiInstanceTask(MultiInstanceTask):
|
||||
|
||||
def init_remaining_items(self, my_task):
|
||||
|
||||
if self.data_input.bpmn_id not in my_task.data:
|
||||
if not self.data_input.exists(my_task):
|
||||
self.raise_data_exception("Missing data input for multiinstance task", my_task)
|
||||
input_data = my_task.data[self.data_input.bpmn_id]
|
||||
input_data = self.data_input.get(my_task)
|
||||
|
||||
# This is internal bookkeeping, so we know where we are; we get the actual items when we create the task
|
||||
if isinstance(input_data, Sequence):
|
||||
@ -340,8 +340,8 @@ class ParallelMultiInstanceTask(MultiInstanceTask):
|
||||
|
||||
def create_children(self, my_task):
|
||||
|
||||
data_input = my_task.data[self.data_input.bpmn_id] if self.data_input is not None else None
|
||||
if data_input is not None:
|
||||
if self.data_input is not None:
|
||||
data_input = self.data_input.get(my_task)
|
||||
# We have to preserve the key or index for maps/sequences, in case we're updating in place, or the output is a mapping
|
||||
if isinstance(data_input, Mapping):
|
||||
children = data_input.items()
|
||||
@ -359,7 +359,7 @@ class ParallelMultiInstanceTask(MultiInstanceTask):
|
||||
|
||||
if self.data_output is not None:
|
||||
if self.data_input is not None:
|
||||
self.init_data_output_with_input_data(my_task, my_task.data[self.data_input.bpmn_id])
|
||||
self.init_data_output_with_input_data(my_task, self.data_input.get(my_task))
|
||||
else:
|
||||
self.init_data_output_with_cardinality(my_task)
|
||||
|
||||
|
@ -96,13 +96,13 @@ class CallActivity(SubWorkflowTask):
|
||||
else:
|
||||
# Otherwise copy only task data with the specified names
|
||||
for var in subworkflow.spec.io_specification.data_inputs:
|
||||
if var.bpmn_id not in my_task.data:
|
||||
if not var.exists(my_task):
|
||||
raise WorkflowDataException(
|
||||
"You are missing a required Data Input for a call activity.",
|
||||
task=my_task,
|
||||
data_input=var,
|
||||
)
|
||||
start.data[var.bpmn_id] = my_task.data[var.bpmn_id]
|
||||
var.set(start, var.get(my_task))
|
||||
|
||||
def update_data(self, my_task, subworkflow):
|
||||
|
||||
@ -119,7 +119,7 @@ class CallActivity(SubWorkflowTask):
|
||||
task=my_task,
|
||||
data_output=var,
|
||||
)
|
||||
my_task.data[var.bpmn_id] = end.data[var.bpmn_id]
|
||||
var.set(my_task, var.get(end))
|
||||
|
||||
|
||||
class TransactionSubprocess(SubWorkflowTask):
|
||||
|
@ -54,6 +54,9 @@ class TestDataStore(BpmnDataStoreSpecification):
|
||||
TestDataStore._value = my_task.data[self.bpmn_id]
|
||||
del my_task.data[self.bpmn_id]
|
||||
|
||||
def delete(self, my_task):
|
||||
del my_task.data[self.bpmn_id]
|
||||
|
||||
class TestDataStoreConverter(BpmnConverter):
|
||||
|
||||
def to_dict(self, spec):
|
||||
|
Reference in New Issue
Block a user