This guide covers the main concepts for building workflows. If you’ve finished the quick start, this page explains how to design your own flows.
Workflows have two kinds of data:
Keep them separate. Business data lives in your Django models. Flow data lives in the Process model.
The Process model has a data JSONField. Use viewflow.jsonstore to
expose fields from that JSON:
from viewflow import jsonstore
from viewflow.workflow.models import Process
class HelloWorldProcess(Process):
# Stored as process.data['approved'], accessible as process.approved
approved = jsonstore.BooleanField(default=False)
class Meta:
proxy = True
Store business data in separate models. The Process model has an artifact
generic foreign key for linking to your business objects:
class MyModel(models.Model):
message = models.CharField(max_length=150)
def task_view(request, **kwargs):
form = MyModelForm(request.POST or None)
if form.is_valid():
object = form.save(commit=True)
request.activation.process.artifact = object
# additional code here..
Each task also has a data JSONField for task-specific information. This
keeps data scoped to individual tasks rather than the whole process.
Use flow.Start with a view. Viewflow provides CreateProcessView and
CreateArtifactView:
from viewflow import this
from viewflow.workflow import flow
from viewflow.workflow.flow import views
class MyFlow(flow.Flow):
start_with_artifact = (
flow.Start(views.CreateArtifactView.as_view(model=MyModel, fields=['message']))
.Annotation(title=_("Fill a form to start flow"))
.Permission("myapp.can_start_request")
.Next(this.next_task)
)
Use flow.StartHandle to start flows from code:
from viewflow import this
from viewflow.workflow import flow
class MyFlow(flow.Flow):
start_noninteractive = flow.StartHandle(this.start_process).Next(this.approve)
def start_process(self, activation, message=''):
object = MyModel.objects.create(message=message)
activation.process.artifact = object
return activation.process
# Start from anywhere in your code
process = MyFlow.start_noninteractive.run(message="Hello World")
User tasks need human input. Use flow.View with built-in views like
UpdateProcessView or UpdateArtifactView:
class MyFlow(flow.Flow):
...
approve = (
flow.View(views.UpdateProcessView.as_view(fields=["approved"]))
.Permission(auto_create=True)
.Next(this.check_approve)
)
Runs a function immediately when the previous task finishes. Runs in the same database transaction:
from viewflow import this
from viewflow.workflow import flow
class MyFlow(flow.Flow):
process_class = MyProcess
start = (
flow.Start(...)
.Next(this.process_data)
)
process_data = (
flow.Function(this.process_data_function)
.Next(this.end)
)
def process_data_function(self, activation):
activation.process.sample_text = activation.process.sample_text.upper()
activation.done()
end = flow.End()
Waits for external code to call it. Use this for webhooks or external events:
from viewflow import this
from viewflow.workflow import flow
class MyFlow(flow.Flow):
process_class = MyProcess
start = (
flow.Start(...)
.Next(this.wait_for_external_event)
)
wait_for_external_event = (
flow.Handle(this.handle_external_event)
.Next(this.end)
)
def handle_external_event(self, activation, data):
activation.process.sample_text = data['new_text']
activation.done()
end = flow.End()
# Call from external code
process = MyFlow.wait_for_external_event.run(
process=my_process_instance,
data={'new_text': 'Updated Text'}
)
Runs a task in Celery for long-running work:
# tasks.py
from celery import shared_task
from viewflow.flow import flow_job
@shared_task
def send_hello_world_request(activation_ref):
with Job.activate(activation_ref) as activation:
result = 'Background Processing Done'
# No locks during long jobs - save carefully
activation.process.sample_text = result
activation.process.save(updated_fields=['sample_text'])
See also
BPMN separates tasks (do something) from gates (decide what’s next). This makes workflows easier to understand and modify.
Takes one of two paths based on a condition:
from viewflow import this
from viewflow.workflow import flow
class MyFlow(flow.Flow):
...
check_condition = (
flow.If(lambda activation: activation.process.approved)
.Then(this.approved_task)
.Else(this.rejected_task)
)
...
See also
Switch for more than two branches
Split creates parallel branches. Join waits for all branches to finish:
from viewflow import this
from viewflow.workflow import flow
class MyFlow(flow.Flow):
process_class = MyProcess
start = (
flow.Start(...)
.Next(this.parallel_tasks)
)
parallel_tasks = (
flow.Split()
.Next(this.task1).Next(this.task2)
)
task1 = flow.View(...)
.Next(this.join)
task2 = flow.View(...)
.Next(this.join)
join = flow.Join()
.Next(this.next_step)
next_step = flow.End()
See also
PRO-only
Use flow.Subprocess and flow.NSubprocess to break large flows into
smaller, reusable pieces.
See also
See also
Use flow.End to finish a process. You can have multiple end nodes for
different outcomes:
class MyFlow(flow.Flow):
# previous flow steps...
check_approve = (
flow.If(act.process.approved)
.Then(this.approved)
.Else(this.rejected)
)
approved = flow.End()
rejected = flow.End()