Built-in Nodes

Start a Flow

class viewflow.workflow.flow.Start(view, undo_func=None, **kwargs)

The Start node in a flow.

This node is used as the initial step in a flow by a user.

Live Demo / Cookbook sample

class MyFlow(flow.Flow):
    start = (
        flow.Start(views.CreateProcessView.as_view(fields=["text"]))
        .Annotation(title=_("New message"))
        .Permission(auto_create=True)
        .Next(this.approve)
    )

    ...
Next(node, task_data: Callable[[Activation], Dict[str, Any]] | None = None, task_seed: Callable[[Activation], Any] | None = None)

Next node to activate.

Permission(permission=None, auto_create=False, obj=None, help_text=None)

Make task available for users with specific permission.

Accepts permissions name or callable :: Callable[Activation] -> string:

.Permission('my_app.can_approve')
.Permission(lambda process: 'my_app.department_manager_{}'.format(process.department.pk))

Task specific permission could be auto created during migration:

# Creates `process_class.can_do_task_process_class` permission
do_task = View().Permission(auto_create=True)

# You can specify permission codename and description right here
# The following creates `process_class.can_execute_task` permission
do_task = View().Permission('can_execute_task', help_text='Custom text', auto_create=True)
can_execute(user, task=None)

Check whether the user is authorized to start a flow.

This method checks whether the user is authorized to start a flow based on the owner and owner permission attributes.

can_view(user, task)

Check if user has a view task detail permission.

detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

undo_view_class

alias of UndoTaskView

class viewflow.workflow.flow.StartHandle(func=None, undo_func=None)

The Start handle node in a flow.

This node is used as the initial step in a flow from code

class MyFlow(flow.Flow):
    start = flow.StartHandle(this.on_start_process).Next(this.approve)

    def start_process(self, activation, sample=False):
        activation.process.sample = sample
        return activation.process

    ...

process = MyFlow.start.run(sample=True)
Next(node, task_data: Callable[[Activation], Dict[str, Any]] | None = None, task_seed: Callable[[Activation], Any] | None = None)

Next node to activate.

can_view(user, task)

Check if user has a view task detail permission.

detail_view_class

alias of DetailTaskView

undo_view_class

alias of UndoTaskView

User Task

class viewflow.workflow.flow.View(view, undo_func=None, **kwargs)

Represents a user-interaction node within a flow

class MyFlow(flow.Flow):
    ...

    approve = (
        flow.View(views.UpdateProcessView.as_view(fields=["approved"]))
        .Annotation(
            title=_("Approve"),
            description=_("Supervisor approvement"),
            summary_template=_("Message review required"),
            result_template=_(
                "Message was {{ process.approved|yesno:'Approved,Rejected' }}"
            ),
        )
        .Permission(auto_create=True)
        .Next(this.check_approve)
    )

    ...
Assign(owner=None, **owner_kwargs)

Assign task to the User immediately on activation.

Accepts user lookup kwargs or callable :: Process -> User:

.Assign(username='employee')
.Assign(lambda activation: activation.process.created_by)
Next(node, task_data: Callable[[Activation], Dict[str, Any]] | None = None, task_seed: Callable[[Activation], Any] | None = None)

Next node to activate.

Permission(permission=None, auto_create=False, obj=None, help_text=None)

Make task available for users with specific permission.

Accepts permissions name or callable :: Callable[Activation] -> string:

.Permission('my_app.can_approve')
.Permission(lambda process: 'my_app.department_manager_{}'.format(process.department.pk))

Task specific permission could be auto created during migration:

# Creates `process_class.can_do_task_process_class` permission
do_task = View().Permission(auto_create=True)

# You can specify permission codename and description right here
# The following creates `process_class.can_execute_task` permission
do_task = View().Permission('can_execute_task', help_text='Custom text', auto_create=True)
assign_view_class

alias of AssignTaskView

calc_owner(activation)

Determine a user to auto-assign the task.

calc_owner_permission(activation)

Determine required permission to assign and execute this task.

calc_owner_permission_obj(activation)

Determine required permission to assign and execute this task.

can_assign(user, task)

Check if user can assign the task.

can_execute(user, task)

Check user permission to execute the task.

can_unassign(user, task)

Check if user can unassign the task.

can_view(user, task)

Check if user has a view task detail permission.

cancel_view_class

alias of CancelTaskView

detail_view_class

alias of DetailTaskView

index_view_class

alias of UserIndexTaskView

onCreate(ref)

Call a function when task created:

class MyFlow(Flow):
    approve = flow.View(...).OnCreate(this.on_approve_created)

    def on_approve_created(self, activation):
        if activation.task.owner:
            send_mail(
                'View task assigned to you','Here is the message.',
                'from@example.com', [activation.task.owner.email]
            )
revive_view_class

alias of ReviveTaskView

unassign_view_class

alias of UnassignTaskView

undo_view_class

alias of UndoTaskView

Script Tasks

class viewflow.workflow.flow.Function(func, **kwargs)

Represents a callback function executed synchronously as part of a workflow node.

A Function node is used within a flow to execute a callable (e.g., a method) that operates on the process instance. The execution is synchronous, meaning the workflow will wait for the callable to complete before proceeding to the next node.

Usage:

In the following example, a Function node is used within a MyFlow class to execute a logging function immediately after a task been activated.

class MyFlow(flow.Flow):
    ...

    log_immediately = (
        flow.Function(this.log) .Next(this.end)
    )

    def log(self, activation):
        print(f"Process is in action {activation.process.pk}")
cancel_view_class

alias of CancelTaskView

detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

class viewflow.workflow.flow.Handle(func=None, undo_func=None, **kwargs)

Represents a task executed from the other parts of code

Usage: To define a handle in a flow and run it:

class MyFlow(flow.Flow):
    ...
    my_handle = flow.Handle().Next(this.join_gate)

task = task=process.task_set.get(flow_task=MyFlow.my_handle, status=STATUS.NEW),
MyFlow.my_handle.run(task)
cancel_view_class

alias of CancelTaskView

detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

class viewflow.workflow.flow.Subprocess(start_subprocess_task: StartHandle, get_subprocess_kwargs: Callable[[Activation], Dict[str, Any]] | None = None, process_data: Callable[[Activation], Dict[str, Any]] | None = None, task_data: Callable[[Activation], Dict[str, Any]] | None = None, process_seed: Callable[[Activation], Any] | None = None, task_seed: Callable[[Activation], Any] | None = None, **kwargs)

The Subprocess node in a flow (PRO-only)

This node is used to start a subprocess flow within a parent flow. The subprocess must be completed before the parent flow can proceed.

class ExampleSubFlow(flow.Flow):
    start = flow.StartHandle(this.start_func).Next(this.task)
    task = flow.Handle(this.task_func).Next(this.end)
    end = flow.End()

    def start_func(self, activation):
        # get access to parent process and data
        activation.process.parent_task.process.data

    def task_func(self, activation):
        pass

class MainFlowWithSubprocess(flow.Flow):
    start = flow.StartHandle().Next(this.subprocess)
    subprocess = flow.Subprocess(ExampleSubFlow.start).Next(this.end)
    end = flow.End()
cancel_view_class

alias of CancelTaskView

detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

class viewflow.workflow.flow.NSubprocess(start_subprocess_task, subitem_source, **kwargs)

The NSubprocess node in a flow (PRO-only)

This node is used to start multiple instances of a subprocess flow within a parent flow. Each instance processes a different item, and all subprocesses must be completed before the parent flow can proceed.

class ExampleSubFlow(flow.Flow):
    start = flow.StartHandle(this.start_func).Next(this.task) task =
    flow.Handle(this.task_func).Next(this.end)
    end = flow.End()

    def start_func(self, activation, item=0):
        # instantialed with one of 1, 2, 3, 4 as item
        activation.process.data = item

    def task_func(self, activation):
        activation.process.data += 100

class MainFlowWithNSubprocess(flow.Flow):
    start = flow.StartFunction().Next(this.nsubprocess) nsubprocess =
    flow.NSubprocess(ExampleSubFlow.start, lambda p: [1, 2, 3, 4]).Next(this.end)
    end = flow.End()
cancel_view_class

alias of CancelTaskView

detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

Job

class viewflow.contrib.celery.Job(celery_task, undo_func=None, *args, **kwargs)

Run celery a task in background

Example.

tasks.py:

from celery import shared_task
from viewflow.flow import flow_job


@shared_task
def sample_task(activation_ref):
    with celery.Job.activate(activation_ref) as activation:
        print(activation.process.message)

        ...

flows.py:

from viewflow.contrib import celery

class MyFlow(Flow):
    ...
    task = celery.Job(tasks.sample_task)
    ....

For more information, see the Celery

Delay(delay)

Async task execution delay

Eta(eta_callable)

Expects callable that would get the task and return datetime for task execution

activation_class

alias of JobActivation

cancel_view_class

alias of CancelTaskView

detail_view_class

alias of CeleryDetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

Gates

class viewflow.workflow.flow.If(cond, **kwargs)

The If-gate

   class MyFlow(flow.Flow):
       ...

       check_approve = (
           flow.If(lambda activation: activation.process.approved)
           .Annotation(title=_("Approvement check"))
           .Then(this.send)
           .Else(this.end)
       )

.       ...
cancel_view_class

alias of CancelTaskView

detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

class viewflow.workflow.flow.Switch(**kwargs)

Gateway that selects one of the outgoing node.

Activates first node with matched condition.

Example:

select_responsible_person = (
    flow.Switch()
    .Case(this.dean_approval, lambda act: a.process.need_dean)
    .Case(this.head_approval, lambda act: a.process.need_head)
    .Default(this.supervisor_approval)
)
cancel_view_class

alias of CancelTaskView

detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

class viewflow.workflow.flow.Split(**kwargs)

Represents a parallel split gateway in a workflow, allowing branching into multiple parallel paths.

Methods:
  • Next(node, case=None, data_source=None): Defines the subsequent node in the workflow.
    • node: The next node to execute.
    • case (optional): A callable that takes an activation and returns True if the node should be activated.
    • data_source (optional): A callable that takes an activation and returns a list of data items, creating an instance of the node for each item, with task.data set to the item.
  • Always(node): A shortcut to define a subsequent node that is always executed.

Example:

flow.Split()
    .Next(
        this.approve,
        case=act.process.approved,
        data_source=lambda activation: [{"sample": "test task 1"}, {"sample": "test task 2"}],
    )
    .Always(this.required)
In this example:
  • The approve node is executed multiple times based on the data_source list.
  • The required node is always executed unconditionally in parallel.
Notes:
  • If case is not provided, the node is always activated.
  • If data_source is not provided, the node is created only once.
detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

class viewflow.workflow.flow.SplitFirst(**kwargs)

Parallel split, as soon as the first task is completed, the remaining tasks are cancelled.

The SplitFirst class is useful in workflows where you want to initiate multiple parallel tasks but only require the first task to complete, cancelling the rest once the first task finishes.

Example:

class MyFlow(flow.Flow):
    split_first = SplitFirst().Next(this.task_a).Next(this.task_b)

    task_a = flow.View(views.UserView).Next(this.join)

    task_b = celery.Timer(delay=timedelata(minutes=10)).Next(this.join)

    join = flow.Join()
detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

class viewflow.workflow.flow.Join(continue_on_condition=None, cancel_active=True, **kwargs)
cancel_view_class

alias of CancelTaskView

detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

class viewflow.workflow.flow.Subprocess(start_subprocess_task: StartHandle, get_subprocess_kwargs: Callable[[Activation], Dict[str, Any]] | None = None, process_data: Callable[[Activation], Dict[str, Any]] | None = None, task_data: Callable[[Activation], Dict[str, Any]] | None = None, process_seed: Callable[[Activation], Any] | None = None, task_seed: Callable[[Activation], Any] | None = None, **kwargs)

The Subprocess node in a flow (PRO-only)

This node is used to start a subprocess flow within a parent flow. The subprocess must be completed before the parent flow can proceed.

class ExampleSubFlow(flow.Flow):
    start = flow.StartHandle(this.start_func).Next(this.task)
    task = flow.Handle(this.task_func).Next(this.end)
    end = flow.End()

    def start_func(self, activation):
        # get access to parent process and data
        activation.process.parent_task.process.data

    def task_func(self, activation):
        pass

class MainFlowWithSubprocess(flow.Flow):
    start = flow.StartHandle().Next(this.subprocess)
    subprocess = flow.Subprocess(ExampleSubFlow.start).Next(this.end)
    end = flow.End()
cancel_view_class

alias of CancelTaskView

detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

class viewflow.workflow.flow.NSubprocess(start_subprocess_task, subitem_source, **kwargs)

The NSubprocess node in a flow (PRO-only)

This node is used to start multiple instances of a subprocess flow within a parent flow. Each instance processes a different item, and all subprocesses must be completed before the parent flow can proceed.

class ExampleSubFlow(flow.Flow):
    start = flow.StartHandle(this.start_func).Next(this.task) task =
    flow.Handle(this.task_func).Next(this.end)
    end = flow.End()

    def start_func(self, activation, item=0):
        # instantialed with one of 1, 2, 3, 4 as item
        activation.process.data = item

    def task_func(self, activation):
        activation.process.data += 100

class MainFlowWithNSubprocess(flow.Flow):
    start = flow.StartFunction().Next(this.nsubprocess) nsubprocess =
    flow.NSubprocess(ExampleSubFlow.start, lambda p: [1, 2, 3, 4]).Next(this.end)
    end = flow.End()
cancel_view_class

alias of CancelTaskView

detail_view_class

alias of DetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

undo_view_class

alias of UndoTaskView

Timer

class viewflow.contrib.celery.Timer(delay, task=None, *args, **kwargs)

Wait till specified time interval

flows.py:

from viewflow.contrib import celery

class MyFlow(Flow):
    ...
    wait = celery.Timer(timedelta(minutes=10)).Next(this.check)
    ...._
activation_class

alias of TimerActivation

cancel_view_class

alias of CancelTaskView

detail_view_class

alias of CeleryDetailTaskView

index_view_class

alias of IndexTaskView

revive_view_class

alias of ReviveTaskView

Base class

class viewflow.workflow.Node(activation_class: type | None = None, **kwargs: Any)

Base class for a flow task definition.

Parameters
  • task_type – Human readable task type
  • activation_class – Activation implementation specific for this node
Annotation(title: str | None = None, description: str | None = None, summary_template: str | None = None, result_template: str | None = None) Node

Sets annotation for the node.

Parameters
  • title – The title for the task
  • description – The description for the task
  • summary_template – The template for the task summary
  • result_template – The template for the task result
Returns

The node instance with the updated annotation values

get_available_actions(activation: Activation, user: Any) Iterator[Tuple[str, str]]

Returns a list of available actions for the given user on the current node.

Parameters
Returns

A list of available actions as a tuple of (name, url).

Return type

list

has_view_permission(user: Any, obj: Any | None = None)

Determine if the user has permission to view the viewset.

Args:
user (User): The user to check permissions for. obj (Optional[Any], optional): The object being viewed.
Returns:
bool: True if the user has view permission, False otherwise.
property name: str

Get the name of the node.”

property parent: BaseViewset | None

Get the parent viewset.

Returns:
Optional[BaseViewset]: The parent viewset, if any.
reverse(viewname: str, args: List[Any] | None = None, kwargs: Dict[str, Any] | None = None, current_app: str | None = None) str

Get the URL for a given viewname, including the namespace.

Args:
viewname (str): The name of the view. args (Optional[List[Any]], optional): Positional arguments for the view. kwargs (Optional[Dict[str, Any]], optional): Keyword arguments for the view. current_app (Optional[str], optional): The current application namespace.
Returns:
str: The URL for the view with the appropriate namespace