viewflow.workflow.flow.
Start
(view, undo_func=None, **kwargs)¶The Start
node in a flow.
This node is used as the initial step in a flow by a user.
class MyFlow(flow.Flow):
start = (
flow.Start(views.CreateProcessView.as_view(fields=["text"]))
.Annotation(title=_("New message"))
.Permission(auto_create=True)
.Next(this.approve)
)
...
Next
(node, task_data: Callable[[Activation], Dict[str, Any]] | None = None, task_seed: Callable[[Activation], Any] | None = None)¶Next node to activate.
Permission
(permission=None, auto_create=False, obj=None, help_text=None)¶Make task available for users with specific permission.
Accepts permissions name or callable :: Callable[Activation] -> string:
.Permission('my_app.can_approve')
.Permission(lambda process: 'my_app.department_manager_{}'.format(process.department.pk))
Task specific permission could be auto created during migration:
# Creates `process_class.can_do_task_process_class` permission
do_task = View().Permission(auto_create=True)
# You can specify permission codename and description right here
# The following creates `process_class.can_execute_task` permission
do_task = View().Permission('can_execute_task', help_text='Custom text', auto_create=True)
can_execute
(user, task=None)¶Check whether the user is authorized to start a flow.
This method checks whether the user is authorized to start a flow based on the owner and owner permission attributes.
can_view
(user, task)¶Check if user has a view task detail permission.
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
StartHandle
(func=None, undo_func=None)¶The Start
handle node in a flow.
This node is used as the initial step in a flow from code
class MyFlow(flow.Flow):
start = flow.StartHandle(this.on_start_process).Next(this.approve)
def start_process(self, activation, sample=False):
activation.process.sample = sample
return activation.process
...
process = MyFlow.start.run(sample=True)
Next
(node, task_data: Callable[[Activation], Dict[str, Any]] | None = None, task_seed: Callable[[Activation], Any] | None = None)¶Next node to activate.
can_view
(user, task)¶Check if user has a view task detail permission.
detail_view_class
¶alias of DetailTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
View
(view, undo_func=None, **kwargs)¶Represents a user-interaction node within a flow
class MyFlow(flow.Flow):
...
approve = (
flow.View(views.UpdateProcessView.as_view(fields=["approved"]))
.Annotation(
title=_("Approve"),
description=_("Supervisor approvement"),
summary_template=_("Message review required"),
result_template=_(
"Message was {{ process.approved|yesno:'Approved,Rejected' }}"
),
)
.Permission(auto_create=True)
.Next(this.check_approve)
)
...
Assign
(owner=None, **owner_kwargs)¶Assign task to the User immediately on activation.
Accepts user lookup kwargs or callable :: Process -> User:
.Assign(username='employee')
.Assign(lambda activation: activation.process.created_by)
Next
(node, task_data: Callable[[Activation], Dict[str, Any]] | None = None, task_seed: Callable[[Activation], Any] | None = None)¶Next node to activate.
Permission
(permission=None, auto_create=False, obj=None, help_text=None)¶Make task available for users with specific permission.
Accepts permissions name or callable :: Callable[Activation] -> string:
.Permission('my_app.can_approve')
.Permission(lambda process: 'my_app.department_manager_{}'.format(process.department.pk))
Task specific permission could be auto created during migration:
# Creates `process_class.can_do_task_process_class` permission
do_task = View().Permission(auto_create=True)
# You can specify permission codename and description right here
# The following creates `process_class.can_execute_task` permission
do_task = View().Permission('can_execute_task', help_text='Custom text', auto_create=True)
assign_view_class
¶alias of AssignTaskView
calc_owner
(activation)¶Determine a user to auto-assign the task.
calc_owner_permission
(activation)¶Determine required permission to assign and execute this task.
calc_owner_permission_obj
(activation)¶Determine required permission to assign and execute this task.
can_assign
(user, task)¶Check if user can assign the task.
can_execute
(user, task)¶Check user permission to execute the task.
can_unassign
(user, task)¶Check if user can unassign the task.
can_view
(user, task)¶Check if user has a view task detail permission.
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of UserIndexTaskView
onCreate
(ref)¶Call a function when task created:
class MyFlow(Flow):
approve = flow.View(...).OnCreate(this.on_approve_created)
def on_approve_created(self, activation):
if activation.task.owner:
send_mail(
'View task assigned to you','Here is the message.',
'from@example.com', [activation.task.owner.email]
)
revive_view_class
¶alias of ReviveTaskView
unassign_view_class
¶alias of UnassignTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
Function
(func, **kwargs)¶Represents a callback function executed synchronously as part of a workflow node.
A Function node is used within a flow to execute a callable (e.g., a method) that operates on the process instance. The execution is synchronous, meaning the workflow will wait for the callable to complete before proceeding to the next node.
Usage:
In the following example, a Function node is used within a MyFlow class to execute a logging function immediately after a task been activated.
class MyFlow(flow.Flow):
...
log_immediately = (
flow.Function(this.log) .Next(this.end)
)
def log(self, activation):
print(f"Process is in action {activation.process.pk}")
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
Handle
(func=None, undo_func=None, **kwargs)¶Represents a task executed from the other parts of code
Usage: To define a handle in a flow and run it:
class MyFlow(flow.Flow):
...
my_handle = flow.Handle().Next(this.join_gate)
task = task=process.task_set.get(flow_task=MyFlow.my_handle, status=STATUS.NEW),
MyFlow.my_handle.run(task)
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
Subprocess
(start_subprocess_task: StartHandle, get_subprocess_kwargs: Callable[[Activation], Dict[str, Any]] | None = None, process_data: Callable[[Activation], Dict[str, Any]] | None = None, task_data: Callable[[Activation], Dict[str, Any]] | None = None, process_seed: Callable[[Activation], Any] | None = None, task_seed: Callable[[Activation], Any] | None = None, **kwargs)¶The Subprocess
node in a flow (PRO-only)
This node is used to start a subprocess flow within a parent flow. The subprocess must be completed before the parent flow can proceed.
class ExampleSubFlow(flow.Flow):
start = flow.StartHandle(this.start_func).Next(this.task)
task = flow.Handle(this.task_func).Next(this.end)
end = flow.End()
def start_func(self, activation):
# get access to parent process and data
activation.process.parent_task.process.data
def task_func(self, activation):
pass
class MainFlowWithSubprocess(flow.Flow):
start = flow.StartHandle().Next(this.subprocess)
subprocess = flow.Subprocess(ExampleSubFlow.start).Next(this.end)
end = flow.End()
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
NSubprocess
(start_subprocess_task, subitem_source, **kwargs)¶The NSubprocess
node in a flow (PRO-only)
This node is used to start multiple instances of a subprocess flow within a parent flow. Each instance processes a different item, and all subprocesses must be completed before the parent flow can proceed.
class ExampleSubFlow(flow.Flow):
start = flow.StartHandle(this.start_func).Next(this.task) task =
flow.Handle(this.task_func).Next(this.end)
end = flow.End()
def start_func(self, activation, item=0):
# instantialed with one of 1, 2, 3, 4 as item
activation.process.data = item
def task_func(self, activation):
activation.process.data += 100
class MainFlowWithNSubprocess(flow.Flow):
start = flow.StartFunction().Next(this.nsubprocess) nsubprocess =
flow.NSubprocess(ExampleSubFlow.start, lambda p: [1, 2, 3, 4]).Next(this.end)
end = flow.End()
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.contrib.celery.
Job
(celery_task, undo_func=None, *args, **kwargs)¶Run celery a task in background
Example.
tasks.py:
from celery import shared_task
from viewflow.flow import flow_job
@shared_task
def sample_task(activation_ref):
with celery.Job.activate(activation_ref) as activation:
print(activation.process.message)
...
flows.py:
from viewflow.contrib import celery
class MyFlow(Flow):
...
task = celery.Job(tasks.sample_task)
....
For more information, see the Celery
Delay
(delay)¶Async task execution delay
Eta
(eta_callable)¶Expects callable that would get the task and return datetime for task execution
activation_class
¶alias of JobActivation
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of CeleryDetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
If
(cond, **kwargs)¶The If-gate
class MyFlow(flow.Flow):
...
check_approve = (
flow.If(lambda activation: activation.process.approved)
.Annotation(title=_("Approvement check"))
.Then(this.send)
.Else(this.end)
)
. ...
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
Switch
(**kwargs)¶Gateway that selects one of the outgoing node.
Activates first node with matched condition.
Example:
select_responsible_person = (
flow.Switch()
.Case(this.dean_approval, lambda act: a.process.need_dean)
.Case(this.head_approval, lambda act: a.process.need_head)
.Default(this.supervisor_approval)
)
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
Split
(**kwargs)¶Represents a parallel split gateway in a workflow, allowing branching into multiple parallel paths.
Example:
flow.Split()
.Next(
this.approve,
case=act.process.approved,
data_source=lambda activation: [{"sample": "test task 1"}, {"sample": "test task 2"}],
)
.Always(this.required)
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
SplitFirst
(**kwargs)¶Parallel split, as soon as the first task is completed, the remaining tasks are cancelled.
The SplitFirst class is useful in workflows where you want to initiate multiple parallel tasks but only require the first task to complete, cancelling the rest once the first task finishes.
Example:
class MyFlow(flow.Flow):
split_first = SplitFirst().Next(this.task_a).Next(this.task_b)
task_a = flow.View(views.UserView).Next(this.join)
task_b = celery.Timer(delay=timedelata(minutes=10)).Next(this.join)
join = flow.Join()
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
Join
(continue_on_condition=None, cancel_active=True, **kwargs)¶cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
Subprocess
(start_subprocess_task: StartHandle, get_subprocess_kwargs: Callable[[Activation], Dict[str, Any]] | None = None, process_data: Callable[[Activation], Dict[str, Any]] | None = None, task_data: Callable[[Activation], Dict[str, Any]] | None = None, process_seed: Callable[[Activation], Any] | None = None, task_seed: Callable[[Activation], Any] | None = None, **kwargs)¶The Subprocess
node in a flow (PRO-only)
This node is used to start a subprocess flow within a parent flow. The subprocess must be completed before the parent flow can proceed.
class ExampleSubFlow(flow.Flow):
start = flow.StartHandle(this.start_func).Next(this.task)
task = flow.Handle(this.task_func).Next(this.end)
end = flow.End()
def start_func(self, activation):
# get access to parent process and data
activation.process.parent_task.process.data
def task_func(self, activation):
pass
class MainFlowWithSubprocess(flow.Flow):
start = flow.StartHandle().Next(this.subprocess)
subprocess = flow.Subprocess(ExampleSubFlow.start).Next(this.end)
end = flow.End()
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.workflow.flow.
NSubprocess
(start_subprocess_task, subitem_source, **kwargs)¶The NSubprocess
node in a flow (PRO-only)
This node is used to start multiple instances of a subprocess flow within a parent flow. Each instance processes a different item, and all subprocesses must be completed before the parent flow can proceed.
class ExampleSubFlow(flow.Flow):
start = flow.StartHandle(this.start_func).Next(this.task) task =
flow.Handle(this.task_func).Next(this.end)
end = flow.End()
def start_func(self, activation, item=0):
# instantialed with one of 1, 2, 3, 4 as item
activation.process.data = item
def task_func(self, activation):
activation.process.data += 100
class MainFlowWithNSubprocess(flow.Flow):
start = flow.StartFunction().Next(this.nsubprocess) nsubprocess =
flow.NSubprocess(ExampleSubFlow.start, lambda p: [1, 2, 3, 4]).Next(this.end)
end = flow.End()
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of DetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
undo_view_class
¶alias of UndoTaskView
viewflow.contrib.celery.
Timer
(delay, task=None, *args, **kwargs)¶Wait till specified time interval
flows.py:
from viewflow.contrib import celery
class MyFlow(Flow):
...
wait = celery.Timer(timedelta(minutes=10)).Next(this.check)
...._
activation_class
¶alias of TimerActivation
cancel_view_class
¶alias of CancelTaskView
detail_view_class
¶alias of CeleryDetailTaskView
index_view_class
¶alias of IndexTaskView
revive_view_class
¶alias of ReviveTaskView
viewflow.workflow.
Node
(activation_class: type | None = None, **kwargs: Any)¶Base class for a flow task definition.
Annotation
(title: str | None = None, description: str | None = None, summary_template: str | None = None, result_template: str | None = None) Node ¶Sets annotation for the node.
The node instance with the updated annotation values
get_available_actions
(activation: Activation, user: Any) Iterator[Tuple[str, str]] ¶Returns a list of available actions for the given user on the current node.
A list of available actions as a tuple of (name, url).
list
has_view_permission
(user: Any, obj: Any | None = None)¶Determine if the user has permission to view the viewset.
name
: str¶Get the name of the node.”
parent
: BaseViewset | None¶Get the parent viewset.
reverse
(viewname: str, args: List[Any] | None = None, kwargs: Dict[str, Any] | None = None, current_app: str | None = None) str ¶Get the URL for a given viewname, including the namespace.