Python SDK developer's guide - Features
The Features section of the Temporal Developer's guide provides basic implementation guidance on how to use many of the development features available to Workflows and Activities in the Temporal Platform.
This guide is a work in progress. Some sections may be incomplete or missing for some languages. Information may change at any time.
If you can't find what you are looking for in the Developer's guide, it could be in older docs for SDKs.
In this section you can find the following:
- How to develop Signals
- How to develop Queries
- How to start a Child Workflow Execution
- How to start a Temporal Cron Job
- How to use Continue-As-New
- How to set Workflow timeouts & retries
- How to set Activity timeouts & retries
- How to Heartbeat an Activity
- How to Asynchronously complete an Activity
- How to register Namespaces
- How to use custom payload conversion
Signals
A SignalWhat is a Signal?
A Signal is an asynchronous request to a Workflow Execution.
Learn more is a message sent to a running Workflow Execution.
Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.
A Signal has a name and can have arguments.
- The name, also called a Signal type, is a string.
- The arguments must be serializable.
To define a Signal, set the Signal decorator @workflow.signalon the Signal function inside your Workflow.
Customize name
Non-dynamic methods can only have positional arguments. Temporal suggests taking a single argument that is an object or data class of fields that can be added to as needed.
Return values from Signal methods are ignored.
You can have a name parameter to customize the Signal's name, otherwise it defaults to the unqualified method __name__.
from temporalio import workflow
# ...
# ...
    @workflow.signal
    async def submit_greeting(self, name: str) -> None:
        await self._pending_greetings.put(name)
    @workflow.signal
    def exit(self) -> None:
# ...
    @workflow.signal(name="Custom Signal Name")
    async def custom_signal(self, name: str) -> None:
        await self._pending_greetings.put(name)
Workflows listen for Signals by the Signal's name.
To send a Signal to the Workflow, use the signal method from the WorkflowHandle class.
View source codefrom temporalio.client import Client
# ...
# ...
    await handle.signal(GreetingWorkflow.submit_greeting, "User 1")
Send Signal from Client
When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaledEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more Event appears in the Event History of the Workflow that receives the Signal.
To send a Signal from the Client, use the signal() function on the Workflow handle.
To get the Workflow handle, you can use any of the following options.
- Use the get_workflow_handle() method.
- Use the get_workflow_handle_for() method to get a type-safe Workflow handle by its Workflow Id.
- Use the start_workflow() to start a Workflow and return its handle.
from temporalio.client import Client
# ...
# ...
    client = await Client.connect("localhost:7233")
    handle = await client.start_workflow(
        GreetingWorkflow.run,
        id="your-greeting-workflow",
        task_queue="signal-tq",
    )
    await handle.signal(GreetingWorkflow.submit_greeting, "User 1")
Send Signal from Workflow
A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiatedEvents reference 
 Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
 Learn more Event appears in the sender's Event History.
- A WorkflowExecutionSignaledEvents reference 
 Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
 Learn more Event appears in the recipient's Event History.
Use get_external_workflow_handle_for to get a typed Workflow handle to an existing Workflow by its identifier.
Use get_external_workflow_handle when you don't know the type of the other Workflow.
The Workflow Type passed is only for type annotations and not for validation.
# ...
@workflow.defn
class WorkflowB:
    @workflow.run
    async def run(self) -> None:
        handle = workflow.get_external_workflow_handle_for(WorkflowA.run, "workflow-a")
        await handle.signal(WorkflowA.your_signal, "signal argument")
Signal-With-Start
Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.
If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.
To send a Signal-With-Start in Python, use the start_workflow() method and pass the start_signal argument with the name of your Signal.
from temporalio.client import Client
# ...
# ...
async def main():
    client = await Client.connect("localhost:7233")
    await client.start_workflow(
        GreetingWorkflow.run,
        id="your-signal-with-start-workflow",
        task_queue="signal-tq",
        start_signal="submit_greeting",
        start_signal_args=["User Signal with Start"],
    )
Queries
A QueryWhat is a Query?
A Query is a synchronous operation that is used to report the state of a Workflow Execution.
Learn more is a synchronous operation that is used to get the state of a Workflow Execution.
Define Query
A Query has a name and can have arguments.
- The name, also called a Query type, is a string.
- The arguments must be serializableWhat is a Data Converter? 
 A Data Converter is a Temporal SDK component that serializes and encodes data entering and exiting a Temporal Cluster.
 Learn more.
To define a Query, set the Query decorator @workflow.query on the Query function inside your Workflow.
Customize names
You can have a name parameter to customize the Query's name, otherwise it defaults to the unqualified method __name__.
You can either set the name or the dynamic parameter in a Query's decorator, but not both.
# ...
    @workflow.query
    def greeting(self) -> str:
        return self._greeting
Handle Query
Queries are handled by your Workflow.
Don’t include any logic that causes CommandWhat is a Command?
A Command is a requested action issued by a Worker to the Temporal Cluster after a Workflow Task Execution completes.
Learn more generation within a Query handler (such as executing Activities).
Including such logic causes unexpected behavior.
To send a Query to the Workflow, use the query method from the WorkflowHandle class.
# ...
    result = await handle.query(GreetingWorkflow.greeting)
Send Query
Queries are sent from a Temporal Client.
To send a Query to a Workflow Execution from Client code, use the query() method on the Workflow handle.
# ...
    result = await handle.query(GreetingWorkflow.greeting)
Workflow timeouts
Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.
Workflow timeouts are set when starting the Workflow ExecutionWorkflow timeouts
Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.
Learn more.
- Workflow Execution TimeoutWhat is a Workflow Execution Timeout? 
 A Workflow Execution Timeout is the maximum time that a Workflow Execution can be executing (have an Open status) including retries and any usage of Continue As New.
 Learn more - restricts the maximum amount of time that a single Workflow Execution can be executed.
- Workflow Run TimeoutWhat is a Workflow Run Timeout? 
 This is the maximum amount of time that a single Workflow Run is restricted to.
 Learn more: restricts the maximum amount of time that a single Workflow Run can last.
- Workflow Task TimeoutWhat is a Workflow Task Timeout? 
 A Workflow Task Timeout is the maximum amount of time that the Temporal Server will wait for a Worker to start processing a Workflow Task after the Task has been pulled from the Task Queue.
 Learn more: restricts the maximum amount of time that a Worker can execute a Workflow Task.
Set the timeout from either the start_workflow() or execute_workflow() asynchronous methods.
Available timeouts are:
- execution_timeout
- run_timeout
- task_timeout
handle = await client.start_workflow(
    "your-workflow-name",
    "some arg",
    id="your-workflow-id",
    task_queue="your-task-queue",
    start_signal="your-signal-name",
    # Set Workflow Timeout duration
    execution_timeout=timedelta(seconds=2),
    # run_timeout=timedelta(seconds=2),
    # task_timeout=timedelta(seconds=2),
)
handle = await client.execute_workflow(
    "your-workflow-name",
    "some arg",
    id="your-workflow-id",
    task_queue="your-task-queue",
    start_signal="your-signal-name",
    # Set Workflow Timeout duration
    execution_timeout=timedelta(seconds=2),
    # run_timeout=timedelta(seconds=2),
    # task_timeout=timedelta(seconds=2),
)
Workflow retries
A Retry Policy can work in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Use a Retry PolicyWhat is a Retry Policy?
A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.
Learn more to retry a Workflow Execution in the event of a failure.
Workflow Executions do not retry by default, and Retry Policies should be used with Workflow Executions only in certain situations.
Set the Retry Policy from either the start_workflow() or execute_workflow() asynchronous methods.
handle = await client.start_workflow(
    "your-workflow-name",
    "some arg",
    id="your-workflow-id",
    task_queue="your-task-queue",
    start_signal="your-signal-name",
    retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)
handle = await client.execute_workflow(
    "your-workflow-name",
    "some arg",
    id="your-workflow-id",
    task_queue="your-task-queue",
    start_signal="your-signal-name",
    retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)
Activity timeouts
Each Activity timeout controls the maximum duration of a different aspect of an Activity Execution.
The following timeouts are available in the Activity Options.
- Schedule-To-Close TimeoutWhat is a Schedule-To-Close Timeout? 
 A Schedule-To-Close Timeout is the maximum amount of time allowed for the overall Activity Execution, from when the first Activity Task is scheduled to when the last Activity Task, in the chain of Activity Tasks that make up the Activity Execution, reaches a Closed status.
 Learn more: is the maximum amount of time allowed for the overall Activity ExecutionWhat is an Activity Execution? 
 An Activity Execution is the full chain of Activity Task Executions.
 Learn more.
- Start-To-Close TimeoutWhat is a Start-To-Close Timeout? 
 A Start-To-Close Timeout is the maximum time allowed for a single Activity Task Execution.
 Learn more: is the maximum time allowed for a single Activity Task ExecutionWhat is an Activity Task Execution? 
 An Activity Task Execution occurs when a Worker uses the context provided from the Activity Task and executes the Activity Definition.
 Learn more.
- Schedule-To-Start TimeoutWhat is a Schedule-To-Start Timeout? 
 A Schedule-To-Start Timeout is the maximum amount of time that is allowed from when an Activity Task is placed in a Task Queue to when a Worker picks it up from the Task Queue.
 Learn more: is the maximum amount of time that is allowed from when an Activity TaskWhat is an Activity Task? 
 An Activity Task contains the context needed to make an Activity Task Execution.
 Learn more is scheduled to when a WorkerWhat is a Worker? 
 In day-to-day conversations, the term Worker is used to denote both a Worker Program and a Worker Process. Temporal documentation aims to be explicit and differentiate between them.
 Learn more starts that Activity Task.
An Activity Execution must have either the Start-To-Close or the Schedule-To-Close Timeout set.
Activity options are set as keyword arguments after the Activity arguments.
Available timeouts are:
- schedule_to_close_timeout
- schedule_to_start_timeout
- start_to_close_timeout
@workflow.defn
class YourWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        return await workflow.execute_activity(
            your_activity,
            name,
            schedule_to_close_timeout=timedelta(seconds=5),
            # schedule_to_start_timeout=timedelta(seconds=5),
            # start_to_close_timeout=timedelta(seconds=5),
        )
Activity retries
A Retry Policy works in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Activity Executions are automatically associated with a default Retry PolicyWhat is a Retry Policy?
A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.
Learn more if a custom one is not provided.
To create an Activity Retry Policy in Python, set the RetryPolicy class within the start_activity() or execute_activity() function.
The following example sets the maximum interval to 2 seconds.
workflow.execute_activity(
    your_activity,
    name,
    start_to_close_timeout=timedelta(seconds=10),
    retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)
Activity retry simulator
Use this tool to visualize total Activity Execution times and experiment with different Activity timeouts and Retry Policies.
The simulator is based on a common Activity use-case, which is to call a third party HTTP API and return the results. See the example code snippets below.
Use the Activity Retries settings to configure how long the API request takes to succeed or fail. There is an option to generate scenarios. The Task Time in Queue simulates the time the Activity Task might be waiting in the Task Queue.
Use the Activity Timeouts and Retry Policy settings to see how they impact the success or failure of an Activity Execution.
Sample Activity
import axios from 'axios';
async function testActivity(url: string): Promise<void> {
  await axios.get(url);
}
export default testActivity;
Activity Retries (in ms)
Activity Timeouts (in ms)
Retry Policy (in ms)
Success after 1 ms
{
  "startToCloseTimeout": 10000,
  "retryPolicy": {
    "backoffCoefficient": 2,
    "initialInterval": 1000
  }
}
Activity Heartbeats
An Activity HeartbeatWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more is a ping from the Worker ProcessWhat is a Worker Process?
A Worker Process is responsible for polling a Task Queue, dequeueing a Task, executing your code in response to a Task, and responding to the Temporal Server with the results.
Learn more that is executing the Activity to the Temporal ClusterWhat is a Temporal Cluster?
A Temporal Cluster is a Temporal Server paired with Persistence and Visibility stores.
Learn more.
Each Heartbeat informs the Temporal Cluster that the Activity ExecutionWhat is an Activity Execution?
An Activity Execution is the full chain of Activity Task Executions.
Learn more is making progress and the Worker has not crashed.
If the Cluster does not receive a Heartbeat within a Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more time period, the Activity will be considered failed and another Activity Task ExecutionWhat is an Activity Task Execution?
An Activity Task Execution occurs when a Worker uses the context provided from the Activity Task and executes the Activity Definition.
Learn more may be scheduled according to the Retry Policy.
Heartbeats may not always be sent to the Cluster—they may be throttledWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more by the Worker.
Activity Cancellations are delivered to Activities from the Cluster when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.
Heartbeats can contain a details field describing the Activity's current progress.
If an Activity gets retried, the Activity can access the details from the last Heartbeat that was sent to the Cluster.
To Heartbeat an Activity Execution in Python, use the heartbeat() API.
@activity.defn
async def your_activity_definition() -> str:
    activity.heartbeat("heartbeat details!")
In addition to obtaining cancellation information, Heartbeats also support detail data that persists on the server for retrieval during Activity retry.
If an Activity calls heartbeat(123, 456) and then fails and is retried, heartbeat_details returns an iterable containing 123 and 456 on the next Run.
Heartbeat Timeout
A Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more works in conjunction with Activity HeartbeatsWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more.
heartbeat_timeout is a class variable for the start_activity() function used to set the maximum time between Activity Heartbeats.
workflow.start_activity(
    activity="your-activity",
    schedule_to_close_timeout=timedelta(seconds=5),
    heartbeat_timeout=timedelta(seconds=1),
)
execute_activity() is a shortcut for start_activity() that waits on its result.
To get just the handle to wait and cancel separately, use start_activity(). execute_activity() should be used in most cases unless advanced task capabilities are needed.
workflow.execute_activity(
    activity="your-activity",
    name,
    schedule_to_close_timeout=timedelta(seconds=5),
    heartbeat_timeout=timedelta(seconds=1),
)
Asynchronous Activity Completion
Asynchronous Activity CompletionAsynchronous Activity Completion occurs when an external system provides the final result of a computation, started by an Activity, to the Temporal System.
Learn more enables the Activity Function to return without the Activity Execution completing.
There are three steps to follow:
- The Activity provides the external system with identifying information needed to complete the Activity Execution.
Identifying information can be a Task TokenWhat is a Task Token? 
 A Task Token is a unique Id that correlates to an Activity Execution.
 Learn more, or a combination of Namespace, Workflow Id, and Activity Id.
- The Activity Function completes in a way that identifies it as waiting to be completed by an external system.
- The Temporal Client is used to Heartbeat and complete the Activity.
To mark an Activity as completing asynchoronus, do the following inside the Activity.
# Capture token for later completion
captured_token = activity.info().task_token
activity.raise_complete_async()
To update an Activity outside the Activity, use the get_async_activity_handle() method to get the handle of the Activity.
handle = my_client.get_async_activity_handle(task_token=captured_token)
Then, on that handle, you can call the results of the Activity, heartbeat, complete, fail, or report_cancellation method to update the Activity.
await handle.complete("Completion value.")
Cancel an Activity
Canceling an Activity from within a Workflow requires that the Activity Execution sends Heartbeats and sets a Heartbeat Timeout.
If the Heartbeat is not invoked, the Activity cannot receive a cancellation request.
When any non-immediate Activity is executed, the Activity Execution should send Heartbeats and set a Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more to ensure that the server knows it is still working.
When an Activity is canceled, an error is raised in the Activity at the next available opportunity.
If cleanup logic needs to be performed, it can be done in a finally clause or inside a caught cancel error.
However, for the Activity to appear canceled the exception needs to be re-raised.
Unlike regular Activities, Local ActivitiesWhat is a Local Activity?
A Local Activity is an Activity Execution that executes in the same process as the Workflow Execution that spawns it.
Learn more can be canceled if they don't send Heartbeats.
Local Activities are handled locally, and all the information needed to handle the cancellation logic is available in the same Worker process.
To cancel an Activity from a Workflow Execution, call the cancel() method on the Activity handle that is returned from start_activity().
@activity.defn
async def cancellable_activity(input: ComposeArgsInput) -> NoReturn:
    try:
        while True:
            print("Heartbeating cancel activity")
            await asyncio.sleep(0.5)
            activity.heartbeat("some details")
    except asyncio.CancelledError:
        print("Activity cancelled")
        raise
@activity.defn
async def run_activity(input: ComposeArgsInput):
    print("Executing activity")
    return input.arg1 + input.arg2
@workflow.defn
 class GreetingWorkflow:
     @workflow.run
     async def run(self, input: ComposeArgsInput) -> None:
        activity_handle = workflow.start_activity(
            cancel_activity,
            ComposeArgsInput(input.arg1, input.arg2),
            start_to_close_timeout=timedelta(minutes=5),
            heartbeat_timeout=timedelta(seconds=30),
        )
    
        await asyncio.sleep(3)
        activity_handle.cancel()
The Activity handle is a Python task.
By calling cancel(), you're essentially requesting the task to be canceled.
Child Workflows
A Child Workflow ExecutionWhat is a Child Workflow Execution?
A Child Workflow Execution is a Workflow Execution that is spawned from within another Workflow.
Learn more is a Workflow Execution that is scheduled from within another Workflow using a Child Workflow API.
When using a Child Workflow API, Child Workflow related Events (StartChildWorkflowExecutionInitiatedEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more, ChildWorkflowExecutionStartedEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more, ChildWorkflowExecutionCompletedEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more, etc...) are logged in the Workflow Execution Event History.
Always block progress until the ChildWorkflowExecutionStartedEvents reference
Events are created by the Temporal Cluster in response to external occurrences and Commands generated by a Workflow Execution.
Learn more Event is logged to the Event History to ensure the Child Workflow Execution has started.
After that, Child Workflow Executions may be abandoned using the default Abandon Parent Close PolicyWhat is a Parent Close Policy?
If a Workflow Execution is a Child Workflow Execution, a Parent Close Policy determines what happens to the Workflow Execution if its Parent Workflow Execution changes to a Closed status (Completed, Failed, Timed out).
Learn more set in the Child Workflow Options.
To be sure that the Child Workflow Execution has started, first call the Child Workflow Execution method on the instance of Child Workflow future, which returns a different future.
Then get the value of an object that acts as a proxy for a result that is initially unknown, which is what waits until the Child Workflow Execution has spawned.
To spawn a Child Workflow Execution in Python, use the execute_child_workflow() function. execute_child_workflow() starts the Child Workflow and waits for completion.
await workflow.execute_child_workflow(MyWorkflow.run, "my child arg", id="my-child-id")
Alternatively, use the start_child_workflow() function to start a Child Workflow and return its handle. This is useful if you want to do something after it has only started, or to get the workflow/run ID, or to be able to signal it while running. To wait for completion, simply await the handle. execute_child_workflow() is a helper function for start_child_workflow() + await handle.
await workflow.start_child_workflow(MyWorkflow.run, "my child arg", id="my-child-id")
Parent Close Policy
A Parent Close PolicyWhat is a Parent Close Policy?
If a Workflow Execution is a Child Workflow Execution, a Parent Close Policy determines what happens to the Workflow Execution if its Parent Workflow Execution changes to a Closed status (Completed, Failed, Timed out).
Learn more determines what happens to a Child Workflow Execution if its Parent changes to a Closed status (Completed, Failed, or Timed Out).
The default Parent Close Policy option is set to terminate the Child Workflow Execution.
Set the parent_close_policy parameter inside the start_child_workflow function or the execute_child_workflow() function to specify the behavior of the Child Workflow when the Parent Workflow closes.
async def run(self, name: str) -> str:
    return await workflow.execute_child_workflow(
        ComposeGreeting.run,
        ComposeGreetingInput("Hello", name),
        id="hello-child-workflow-workflow-child-id",
        parent_close_policy=TERMINATE,
    )
execute_child_workflow() is a shortcut function for temporalio.workflow.start_child_workflow() plus handle.result().
Continue-As-New
Continue-As-NewContinue-As-New is the mechanism by which all relevant state is passed to a new Workflow Execution with a fresh Event History.
Learn more enables a Workflow Execution to close successfully and create a new Workflow Execution in a single atomic operation if the number of Events in the Event History is becoming too large. The Workflow Execution spawned from the use of Continue-As-New has the same Workflow Id, a new Run Id, and a fresh Event History and is passed all the appropriate parameters.
To Continue-As-New in Python, call the continue_as_new() function from inside your Workflow, which will stop the Workflow immediately and Continue-As-New.
workflow.continue_as_new("your-workflow-name")
Timers
A Workflow can set a durable timer for a fixed time period.
In some SDKs, the function is called sleep(), and in others, it's called timer().
A Workflow can sleep for months.
Timers are persisted, so even if your Worker or Temporal Cluster is down when the time period completes, as soon as your Worker and Cluster are back up, the sleep() call will resolve and your code will continue executing.
Sleeping is a resource-light operation: it does not tie up the process, and you can run millions of Timers off a single Worker.
To set a Timer in Python, call the asyncio.sleep() function and pass the duration in seconds you want to wait before continuing.
await asyncio.sleep(5)
Schedule a Workflow
Scheduling Workflows is a crucial aspect of any automation process, especially when dealing with time-sensitive tasks. By scheduling a Workflow, you can automate repetitive tasks, reduce the need for manual intervention, and ensure timely execution of your business processes
Use any of the following action to help Schedule a Workflow Execution and take control over your automation process.
Create
The create action enables you to create a new Schedule. When you create a new Schedule, a unique Schedule ID is generated, which you can use to reference the Schedule in other Schedule commands.
To create a Scheduled Workflow Execution in Python, use the create_schedule()
asynchronous method on the Client.
Then pass the Schedule ID and the Schedule object to the method to create a Scheduled Workflow Execution.
Set the action parameter to ScheduleActionStartWorkflow to start a Workflow Execution.
Optionally, you can set the spec parameter to ScheduleSpec to specify the schedule or set the intervals parameter to ScheduleIntervalSpec to specify the interval.
Other options include: cron_expressions, skip, start_at, and jitter.
# ...
async def main():
    client = await Client.connect("localhost:7233")
    await client.create_schedule(
        "workflow-schedule-id",
        Schedule(
            action=ScheduleActionStartWorkflow(
                YourSchedulesWorkflow.run,
                "my schedule arg",
                id="schedules-workflow-id",
                task_queue="schedules-task-queue",
            ),
            spec=ScheduleSpec(
                intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
            ),
            state=ScheduleState(note="Here's a note on my Schedule."),
        ),
    )
Backfill
The backfill action executes Actions ahead of their specified time range. This command is useful when you need to execute a missed or delayed Action, or when you want to test the Workflow before its scheduled time.
To Backfill a Scheduled Workflow Execution in Python, use the backfill() asynchronous method on the Schedule Handle.
View source code# ...
async def main():
    client = await Client.connect("localhost:7233")
    handle = client.get_schedule_handle(
        "workflow-schedule-id",
    )
    now = datetime.utcnow()
    await handle.backfill(
        ScheduleBackfill(
            start_at=now - timedelta(minutes=10),
            end_at=now - timedelta(minutes=9),
            overlap=ScheduleOverlapPolicy.ALLOW_ALL,
        ),
    ),
Delete
The delete action enables you to delete a Schedule. When you delete a Schedule, it does not affect any Workflows that were started by the Schedule.
To delete a Scheduled Workflow Execution in Python, use the delete() asynchronous method on the Schedule Handle.
View source codeasync def main():
    client = await Client.connect("localhost:7233")
    handle = client.get_schedule_handle(
        "workflow-schedule-id",
    )
    await handle.delete()
Describe
The describe action shows the current Schedule configuration, including information about past, current, and future Workflow Runs. This command is helpful when you want to get a detailed view of the Schedule and its associated Workflow Runs.
To describe a Scheduled Workflow Execution in Python, use the describe() asynchronous method on the Schedule Handle. You can get a complete list of the attributes of the Scheduled Workflow Execution from the ScheduleDescription class.
View source code# ...
async def main():
    client = await Client.connect("localhost:7233")
    handle = client.get_schedule_handle(
        "workflow-schedule-id",
    )
    desc = await handle.describe()
    print(f"Returns the note: {desc.schedule.state.note}")
List
The list action lists all the available Schedules. This command is useful when you want to view a list of all the Schedules and their respective Schedule IDs.
To list all schedules, use the list_schedules() asynchronous method on the Client. If a schedule is added or deleted, it may not be available in the list immediately.
View source code# ...
async def main() -> None:
    client = await Client.connect("localhost:7233")
    async for schedule in await client.list_schedules():
        print(f"List Schedule Info: {schedule.info}.")
Pause
The pause action enables you to pause and unpause a Schedule. When you pause a Schedule, all the future Workflow Runs associated with the Schedule are temporarily stopped. This command is useful when you want to temporarily halt a Workflow due to maintenance or any other reason.
To pause a Scheduled Workflow Execution in Python, use the pause() asynchronous method on the Schedule Handle.
You can pass a note to the pause() method to provide a reason for pausing the schedule.
# ...
async def main():
    client = await Client.connect("localhost:7233")
    handle = client.get_schedule_handle(
        "workflow-schedule-id",
    )
    await handle.pause(note="Pausing the schedule for now")
Trigger
The trigger action triggers an immediate action with a given Schedule. By default, this action is subject to the Overlap Policy of the Schedule. This command is helpful when you want to execute a Workflow outside of its scheduled time.
To trigger a Scheduled Workflow Execution in Python, use the trigger() asynchronous method on the Schedule Handle.
View source code# ...
async def main():
    client = await Client.connect("localhost:7233")
    handle = client.get_schedule_handle(
        "workflow-schedule-id",
    )
    await handle.trigger()
Update
The update action enables you to update an existing Schedule. This command is useful when you need to modify the Schedule's configuration, such as changing the start time, end time, or interval.
Create a function that takes ScheduleUpdateInput and returns ScheduleUpdate.
To update a Schedule, use a callback to build the update from the description.
The following example updates the Schedule to use a new argument.
# ...
    async def update_schedule_simple(input: ScheduleUpdateInput) -> ScheduleUpdate:
        schedule_action = input.description.schedule.action
        if isinstance(schedule_action, ScheduleActionStartWorkflow):
            schedule_action.args = ["my new schedule arg"]
        return ScheduleUpdate(schedule=input.description.schedule)
Temporal Cron Jobs
A Temporal Cron JobWhat is a Temporal Cron Job?
A Temporal Cron Job is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.
Learn more is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.
A Cron Schedule is provided as an option when the call to spawn a Workflow Execution is made.
You can set each Workflow to repeat on a schedule with the cron_schedule option from either the start_workflow() or execute_workflow() asynchronous methods:
await client.start_workflow(
    "your_workflow_name",
    id="your-workflow-id",
    task_queue="your-task-queue",
    cron_schedule="* * * * *",
)