Creating Custom Workflows
This section is aimed at advanced users. Before reading it, make sure you have a good understanding of Workflows, Blueprints, and Plugins.
Introduction to Implementing Workflows
Workflows implementation shares several similarities with plugins implementation:
- Workflows are also implemented as Python functions.
- A workflow method is, optionally, decorated with
@workflow
, a decorator from thecloudify.decorators
module of thecloudify-plugins-common
package. - Workflow methods should import
ctx
fromcloudify.workflows
, which offers access to context data and various system services. While sharing some resemblance, thisctx
object is not of the same type as the one used by a plugin method.
Example: A typical workflow method’s signature
from cloudify.decorators import workflow
from cloudify.workflows import ctx
@workflow
def my_workflow(**kwargs):
pass
Workflow parameters are received by the method as named parameters, and so if a workflow uses parameters they should usually appear in the method signature as well (or they’ll end up under kwargs
).
It’s recommended not to have default values for parameters in the workflow method’s signature. Instead, the default values should be provided in the workflow’s parameters declaration in the blueprint - That way, the parameter and its default value are visible to the user both in the blueprint and via CLI commands, and the user is also able to override the default value by simply editing the blueprint, without modifying code. For more information on workflow parameters declaration, refer to the Blueprint mapping section.
There are two approaches to implementing workflows:
- Standard workflows - workflows which simply use the APIs to execute and manage tasks.
- Graph-based workflows - workflows which use the APIs on top of the Graph framework, a framework which offers a simplified process of scheduling and creating dependencies among tasks, as well as built-in support for some of the common aspects of workflows (e.g. cancellation support).
APIs
The ctx
object used by workflow methods is of type CloudifyWorkflowContext
. It offers access to context data and various services. Additionally, any node, node instance, relationship or relationship instance objects returned by the context object (or from objects returned by the context object) will be wrapped in types which offer additional context data and services.
For full API reference, refer to the documentation over at cloudify-plugins-common.readthedocs.org.
Blueprint Mapping
As is the case with plugins, it’s the workflow author’s responsibility to write a yaml file (named plugin.yaml
by convention) which will contain both the workflow mapping as well as the workflow’s plugin declaration.
Mapping a workflow name to a workflow implementation in the blueprint is done in a similar fashion to the way plugin operations are mapped, i.e. in one of two ways:
- Simple mapping - this should be used to map a workflow name to a workflow implementation which requires no parameters.
Example: Mapping my_workflow
to be implemented by the my_workflow_method_name
method in the my_workflow_module_name
module in the my_workflow_plugin_name
plugin
workflows:
my_workflow: my_workflow_plugin_name.my_workflow_module_name.my_workflow_method_name
- Mapping with parameters - this should be used to map a workflow name to a workflow implementation which uses parameters.
Workflow parameters declaration is done in a similar manner to the way type properties are declared: It’s structured as a schema map, where each entry specifies the parameter schema.
Example: Making the same mapping yet with parameters declaration - A single mandatory parameter named mandatory_parameter
, and two optional parameters with default values (one of which is a complex value)*
workflows:
my_workflow:
mapping: my_workflow_plugin_name.my_workflow_module_name.my_workflow_method_name
parameters:
mandatory_parameter:
description: this parameter is mandatory
optional_parameter:
description: this paramters is optional
default: optional_parameter_default_value
nested_parameter:
description: >
this parameter is also optional,
it's default value has nested values.
default:
key1: value1
key2: value2
The workflows implementations are considered as workflows plugins. As such, they are joined to the blueprint using the exact same mapping that’s used to join regular plugins, e.g.:
plugins:
my_workflow_plugin_name:
executor: central_deployment_agent
source: http://example.com/url/to/plugin.zip
It’s currently impossible to override a workflow mapping in the blueprint.
Cancellation Support
A workflow should have support for graceful (AKA Standard) cancellation. It is up to the workflow author to decide the semantics of graceful in this regard (and document them properly) - One workflow may merely stop the execution, while another may perform a rollback, and so on.
Standard workflows which don’t implement such support will simply ignore the cancellation request and continue executing the workflow. To implement cancellation support for standard workflows, some constructs from the cloudify.workflows.workflows_api
module need to be used.
Importing the module is done as so:
from cloudify.workflows import api
Then, api.has_cancel_request()
can be used to determine whether the workflow execution should be cancelled due to a standard cancellation request. If it returns True
, the workflow should take whichever actions its author deems as a proper graceful cancellation, and then raise an api.ExecutionCancelled
error.
Waiting for a task to end by calling the method get
of a WorkflowTaskResult
object will make the execution go into blocking mode which responds to cancel requests by raising an api.ExecutionCancelled
error. This means that standard workflows which use this method will in fact respond to a cancel request, even if that request was sent before the get
method was called.
Further more - when a standard workflow’s code has finished running, the execution doesn’t actually end until all tasks that have been launched have completed as well. This is implemented by iterating over all tasks whose get
method hasn’t yet been called and calling get
on each one, and therefore if a cancel request was issued and any task was used in the workflow, yet hadn’t been called with get
before the cancel request was received, then the workflow will respond to the cancel request at this final waiting phase by ending (no longer waiting for the rest of the tasks [if any] to end), and doing so with a cancelled
status.
Graph-based workflows have inherent support for graceful cancellation. Upon receiving such a request once the graph’s execute
method has been called, the defined behavior is for the workflow execution to simply end - yet any tasks related to the execution which might have been running at the moment of cancellation will continue to run until they’re over.
Once the graph execution ends, the tasks_graph
’s method execute
will raise the api.ExecutionCancelled
error.
For both types of workflows, it’s of course possible to catch api.ExecutionCancelled
errors that may have been raised, thus allowing to perform any sort of cleanup or custom behavior before re-raising the error.
The api.EXECUTION_CANCELLED_RESULT
value, which may have been returned from a workflow to signal that it has cancelled sucessfully, is now deprecated. Raise the api.ExecutionCancelled
error instead to indicate such an event.
The Graph API will now raise an api.ExecutionCancelled
error instead of returning the deprecated api.EXECUTION_CANCELLED_RESULT
in the event of an execution cancellation. This means that any workflows which made any additional operations beyond the call to the graph’s execute
method, should now use a try-finally clause to be able to perform these additional operations and still raise the approriate error once they’re done.
Neither standard workflows nor graph-based workflows have any control over force-cancellation requests. Any workflow execution which was issued with a force-cancellation request will be terminated immediately, while any tasks related to the execution which might have been running at the moment of termination will continue to run until they’re over.
Resuming Support
Resuming workflows is a way to continue execution after the execution has failed or has been cancelled, or after a Conductor Manager failure (loss of power, or any other scenario leading to an ungraceful shutdown of the management worker).
Resuming a workflow essentially means running it again, so the workflow author must make sure the workflow is able to cope with being re-run. This is the easiest to achieve when organizing the workflow in terms of a tasks graph. The tasks graphs may be stored and restored while keeping information about the state of each operation in the graph.
For convenience, Studio Conductor provides the make_or_get_graph
function, which will take care of restoring a tasks graph during a resume, or will otherwise call a function which should create a new tasks graph.
This function also requires a name
parameter, so that if the workflow creates multiple tasks graphs, they can be distinguished.
Example: Creating a resumable workflow using a tasks graph
from cloudify.decorators import workflow
from cloudify.workflows.tasks_graph import make_or_get_graph
@workflow(resumable=True) # declare that this workflow can be resumed
def my_workflow(ctx, parameter=None):
graph = _my_workflow_graph(ctx, name='workflow', parameter=parameter)
graph.execute()
@make_or_get_graph
def _my_workflow_graph(ctx, parameter):
graph = ctx.graph_mode()
graph.add(some_operation)
return graph
Note that the workflow must declare that it is resumable by passing the resumable=True
keyword to the workflow
decorator. If not declared, the workflow will fail when a resume is attempted.
Workflows that do not use the tasks graph can be resumed as well, but the workflow author must implement themselves what should happen when a resume is attempted. The resume
workflow context attribute (ctx.resume
) says if the workflow is currently being resumed (True) or if the workflow is executed for the first time (False).
Step by Step Tutorial
In this tutorial we will create from scratch a custom graph-based workflow whose purpose is to execute plugin operations.
The tutorial will offer some guidance and reference about the following:
- Graph framework
- Adding tasks
- Creating dependencies between tasks
- Using the
TaskSequence
construct - Using the
forkjoin
construct
- Workflow APIs
- Executing node operations
- Executing relationship operations
- Sending events
- Working with nodes, node instances, relationships and relationship instances
- Workflow parameters
Requirements
Similarly to plugins, workflows require the cloudify-plugins-common package to be able to use the Studio Conductor workflows API and framework.
Implementing the Workflow
We’ll be implementing the workflow one step at a time, where in each step we’ll have a valid, working workflow, but with more features than the one in the previous step.
We could continue improving our workflow and extending its features, but in the scope of this tutorial, this last version of the workflow will be the one we’ll be using throughout the remaining tutorial sections.
Blueprint Mappings
The workflow plugin declaration will look like this:
plugins:
my_workflow_plugin_name:
executor: central_deployment_agent
source: http://example.com/url/to/plugin.zip
The workflow mapping may look like so:
workflows:
my_workflow:
mapping: my_workflow_plugin_name.my_workflow_module_name.run_operation
parameters:
operation:
description: the operation to execute
type_name:
description: the base type for filtering nodes
default: cloudify.nodes.Root
operation_kwargs:
description: the operation kwargs
default: {}
is_node_operation:
description: >
is the operation a node operation or
a relationship operation otherwise
default: true
This will define a workflow named my_workflow
, whose implementation is the run_operation
workflow method we coded.
The workflow has four parameters declared:
- The mandatory
operation
parameter - The optional
type_name
parameter, which defaults tocloudify.nodes.Root
(meaning the operation will run on all nodes if this value isn’t overridden) - The optional
operation_kwargs
parameter, which defaults to an empty dictionary. - The optional
is_node_operation
parameter, which defaults totrue
.
Packaging the Workflow
Since workflows are joined to the blueprint the same way plugins do, they are also packaged the same way. Refer to the Plugin creation guide for more information.
Advanced Usage
What follows are code snippets showing some advanced API that is exposed by the workflow framework.
Task Handlers
Task handlers are callbacks you set on tasks. They get called when a task fails or succeeds.
from cloudify.decorators import workflow
from cloudify.workflows import ctx
from cloudify.workflows import tasks
@workflow
def use_task_handlers(**kwargs):
graph = ctx.graph_mode()
node = ctx.get_node('some_node')
instance = next(node.instances)
def on_success(task):
instance.logger.info('Task {0} succeeded!'.format(task.id))
# HandlerResult.cont() is the default for on_success.
# If a task handler is defined for a task, it must return
# a HandlerResult instance
return tasks.HandlerResult.cont()
def on_failure(task):
instance.logger.info('Task {0} failed :('.format(task.id))
# Handler result may override the default behavior.
# If for example the task was to be retried,
# this will cause the framework to ignore the failure
# and move on. (the default in this case is HandlerResult.fail())
return tasks.HandlerResult.ignore()
task = instance.execute_operation('my_interface.my_task')
task.on_success = on_success
task.on_failure = on_failure
graph.add_task(task)
return graph.execute()
Deployment Modification
Deployment modification changes the data model to add or remove node instances, and returns the modified node instances for the workflow to operate on them. The built-in scale workflow makes use of this API to scale a node instance up or down.
from cloudify.decorators import workflow
from cloudify.workflows import ctx
@workflow
def use_modify(**kwargs):
new_number_of_instances = 12
node_id = 'webserver_vm'
node = ctx.get_node(node_id)
if node.number_of_instances == new_number_of_instances:
# no change is required
return
modification = ctx.deployment.start_modification({
node.id: {
'instances': number_of_new_instances
}
})
going_up = node.number_of_instances < new_number_of_instances
try:
if going_up:
# added.node_instances returns all node instances that are
# affected by the increasing a node's number of instances.
# Some are newly added and have their
# instance.modification == 'added'.
# Others are node instances that have new relationships
# to the added node instances.
added_and_related = modification.added.node_instances
for instance in added_and_related:
if instance.modification == 'added':
# do stuff
pass
else:
# do other stuff
pass
else:
# removed.node_instances returns all node instances that are
# affected by the decreasing a node's number of instances.
# Some are removed and have their
# instance.modification == 'removed'.
# Others are node instances that will have relationships
# to the removed node instances removed after calling
# modification.finish().
for instance in removed_and_related:
if instance.modification == 'removed':
# do stuff
pass
else:
# do other stuff
pass
except:
# Do stuff to restore the logical state and then
# call this to restore that storage state
modification.rollback()
raise
else:
modification.finish()
To use newly-created node instances in the same workflow after the modification has finished, eg. for running an additional task on them, call ctx.refresh_node_instances()
to update the workflow context with the updated node instance list.
Subgraphs (Experimental)
Subgraphs provide means for easier modeling of complex workflows, by grouping certain operations into their own subgraphs and creating dependencies between different subgraphs. Subgraphs expose the Task and Graph API’s, i.e. they have methods on them to create dependencies between tasks (Graph API) and dependencies can be created between them and other tasks (which may be subgraphs themselves) (Task API).
The Subgraph API is still in its early stage and it may change in backward incompatible ways in the future.
from cloudify.decorators import workflow
from cloudify.workflows import ctx
@workflow
def use_subgraphs(**kwargs):
graph = ctx.graph_mode()
node = ctx.get_node('some_node')
instance = next(node.instances)
# A subgraph to create some node instance
# creating a subgraph also adds it as a task to the graph from
# which it was created.
start_subgraph = graph.subgraph('some_start_subgraph')
start_subgraph_sequence = start_subgraph.sequence()
start_subgraph_sequence.add([
instance.execute_opeartion('my_interface.create'),
instance.execute_opeartion('my_interface.configure'),
instance.execute_opeartion('my_interface.start')
])
# A subgraph to upgrade some node instance
upgrade_subgraph = graph.subgraph('some_upgrade_subgraph')
upgrade_subgraph_sequence = upgrade_subgraph.sequence()
upgrade_subgraph_sequence.add([
instance.execute_opeartion('my_interface.upgrade.step1'),
instance.execute_opeartion('my_interface.upgrade.step2'),
instance.execute_opeartion('my_interface.upgrade.step3')
])
# Start running operations on the upgrade subgraph
# only when the start subgraph ended
graph.add_dependency(upgrade_subgraph, start_subgraph)
return graph.execute()
Contained Subgraph
Get all node instances that are contained in a node instance. The built-in heal workflow
makes use of this API to calculate all node instances that belong to a cloudify.nodes.Compute
node that should be healed.
from cloudify.decorators import workflow
from cloudify.workflows import ctx
@workflow
def use_contained_subgraph(**kwargs):
node = ctx.get_node('some_node')
instance = next(node.instances)
# get node instances that are directly contained in the instance
for contained_instance in instance.contained_instances:
# do something
pass
# get node instances that are recursively contained in the instance
# (including the instance itself)
for contained_instance in instance.get_contained_subgraph():
# do something
passs