This guide will help you start writing workflows using Viewflow 2.0. It provides the essential details needed to create efficient workflows in your Django applications.
If you’ve just completed the quick start and are ready to design your own workflow, this page is for you. We’ll cover key concepts and practical steps to help you understand and implement workflows seamlessly.
Effective data management is crucial for building robust workflows. In Viewflow 2.0, it’s important to separate your business data from workflow data to maintain clarity and flexibility.
Flow data is stored within a process. Viewflow provides a Process model with a data JsonField to keep workflow-related information. This data can be accessed through viewflow.jsonstore as virtual Django model fields, making it easy to use in forms and admin interfaces.
from viewflow import jsonstore
from viewflow.workflow.models import Process
class HelloWorldProcess(Process):
# base Process model has `data` JsonField that keeps
# {'approved': True} exposed by viewflow.jsonstore
# as virtual django model fields available as normal fields for
# model forms and admin
approved = jsonstore.BooleanField(default=False)
class Meta:
proxy = True
Business data should be stored in separate Django models. The viewflow.workflow Process model contains an .artifact generic foreign key field that can be assigned to your business model.
class MyModel(models.Model):
# you business data goes here
message = models.CharField(max_length=150)
def task_view(request, **kwargs):
form = MyModelForm(request.POST or None)
if form.is_valid():
object = form.save(commit=True)
# Keep it as workflow process artifact
request.activation.process.artifact = object
# additional code here..
Each task in a workflow also has its own data JsonField to store task-specific information. This allows you to manage data relevant to individual tasks separately from the overall process data.
By separating your flow data, business data, and task-specific data, you can ensure better organization and maintainability of your workflows.
You can start a flow interactively through a user interface or programmatically within your code. This involves creating a process model instance that represents the workflow and initializing its state.
You can start a flow interactively using flow.Start
. This is typically done
through a form that initiates the workflow process. Viewflow provides built-in
views such as CreateProcessView and CreateArtifactView to simplify this process.
When you use these views, a new process model instance is created and the initial task is executed.
from viewflow import this
from viewflow.workflow import flow
from viewflow.workflow.flow import views
class MyFlow(flow.Flow):
start_with_artifact = (
flow.Start(views.CreateArtifactView.as_view(model=MyModel, fields=['message']))
.Annotation(title=_("Fill a form to start flow"))
.Permission("myapp.can_start_request")
.Next(this.next_task)
)
Flows can also be started programmatically using flow.StartHandle
. This allows
you to initiate workflows based on events or conditions within your application.
When starting a flow programmatically, you create a new process instance and
initialize its state within your code.
from viewflow import this
from viewflow.workflow import flow
class MyFlow(flow.Flow):
start_noninteractive = flow.StartHandle(this.start_process).Next(this.approve)
def start_process(self, activation, message=''):
object = MyModel.objects.create(message=message)
activation.process.artifact = object
return activation.process
# Somewhere in your code
process = MyFlow.start_noninteractive.run(message="Hello World")
User tasks are essential parts of a workflow where human interaction is
required. In Viewflow 2.0, these tasks are represented as flow.View
nodes. You
can use built-in views like UpdateProcessView and UpdateArtifactView, or create
your own custom views, either function-based or class-based.
class MyFlow(flow.Flow):
...
approve = (
flow.View(views.UpdateProcessView.as_view(fields=["approved"]))
.Permission(auto_create=True)
.Next(this.check_approve)
)
Non-interactive tasks are workflow steps that do not require user intervention. These tasks can be automated to perform various actions within the workflow
The flow.Function
node executes a
function synchronously as soon as the previous task finishes. This is useful for
tasks that need to be completed immediately within the same transaction.
from viewflow import this
from viewflow.workflow import flow
class MyFlow(flow.Flow):
process_class = MyProcess
start = (
flow.Start(...)
.Next(this.process_data)
)
process_data = (
flow.Function(this.process_data_function)
.Next(this.end)
)
def process_data_function(self, activation):
# Perform some data processing here
activation.process.sample_text = activation.process.sample_text.upper()
activation.done()
end = flow.End()
The flow.Handle
node waits to be called from the code outside the workflow. This
is useful for tasks that depend on external events or processes.
from viewflow import this
from viewflow.workflow import flow
class MyFlow(flow.Flow):
process_class = MyProcess
start = (
flow.Start(...)
.Next(this.wait_for_external_event)
)
wait_for_external_event = (
flow.Handle(this.handle_external_event)
.Next(this.end)
)
def handle_external_event(self, activation, data):
# Process the external event data here
activation.process.sample_text = data['new_text']
activation.done()
end = flow.End()
# Somewhere in your code
process = MyFlow.wait_for_external_event.run(process=my_process_instance, data={'new_text': 'Updated Text'})
The celery.Job node runs a task in the background using Celery. This is ideal for long-running tasks that should not block the workflow.
tasks.py:
from celery import shared_task
from viewflow.flow import flow_job
@shared_task
def send_hello_world_request(activation_ref):
with Job.activate(activation_ref) as activation:
# Perform a long-running task
result = 'Background Processing Done'
# No flow locks established for the long-runnig jobs , set uo our
# own or carefullty save, to avoid race conditions with another
# tasks
# with activation.flow_class.lock(process_pk):
# process.refresh_from_db()
# process.sample_text = result
# prcess.save()
activation.process.sample_text = result
activation.process.save(updated_fields=['sample_text'])
See also
see the Timer
The core concept of BPMN (Business Process Model and Notation) is the separation of tasks and gates. This separation ensures that all workflow states are design decisions, making it easier to restart flows and analyze business processes. Gates also make flows flexible, allowing tasks and views to be easily rearranged and reused in different flows.
The flow.If
gate evaluates a condition and directs the flow to one of two
branches based on the result. This is useful for implementing decision points
within your workflow.
from viewflow import this
from viewflow.workflow import flow
class MyFlow(flow.Flow):
...
check_condition = (
flow.If(lambda activation: activation.process.approved)
.Then(this.approved_task)
.Else(this.rejected_task)
)
...
See also
see the Switch
documentation, for the advanced branching options
The flow.Split
gate creates parallel branches of execution. Each branch will run
concurrently, allowing multiple tasks to be performed simultaneously. The
flow.Join
gate synchronizes parallel branches, waiting for all branches to
complete before continuing. This is required to ensure that multiple parallel
tasks are finished before moving to the next step.
from viewflow import this
from viewflow.workflow import flow
class MyFlow(flow.Flow):
process_class = MyProcess
start = (
flow.Start(...)
.Next(this.parallel_tasks)
)
parallel_tasks = (
flow.Split()
.Next(this.task1).Next(this.task2)
)
task1 = flow.View(...)
.Next(this.join)
task2 = flow.View(...)
.Next(this.join)
join = flow.Join()
.Next(this.next_step)
next_step = flow.End()
See also
see the SplitFirst
PRO-only
For simplification of your flow and better concept separation, consider using flow.Subprocess and flow.NSubprocess to split data processing into different flow classes. These nodes allow you to manage complex workflows by delegating tasks to separate subprocesses, which can be run sequentially or in parallel.
See also
see the Subprocess
See also
see the NSubprocess
Flows end with flow.End
. You can use multiple ends to express different outcomes of the flow.
class MyFlow(flow.Flow):
# previous flow steps...
check_approve = (
flow.If(act.process.approved)
.Then(this.approved)
.Else(this.rejected)
)
approved = flow.End()
rejected = flow.End()