This is the tenth part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.
In the previous part in this series, we looked at the Asynchronous HTTP API Pattern where a client can poll to see if an orchestration has completed or not.
The monitor pattern is like a mirror image of this, whereby the orchestration polls some service at regular intervals. For example an orchestration could initiate a long running asynchronous process and then periodically poll to see if the operation is complete. Once the operation is complete the orchestration can complete (or continue with more operations.)
We can create a pause in the execution of an orchestration by calling the CreateTimer method of the DurableOrchestrationContext. This method takes a DateTime specifying how long to “sleep” for, and a CancellationToken.
As an example, suppose we want to allow a client to post a video to be encoding to a different format.
The fist client function could look something like:
[FunctionName("MonitorPatternExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStartV1(
[HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
dynamic data = await req.Content.ReadAsAsync<dynamic>();
var fileName = data.FileName;
string instanceId = await starter.StartNewAsync("MonitorPatternExample", fileName);
return starter.CreateCheckStatusResponse(req, instanceId);
}
For simplicity, we just taking a filename to be used as a job identifier. This function calls the orchestrator function MonitorPatternExample.
The orchestrator function is where the Monitor Pattern is implemented:
[FunctionName("MonitorPatternExample")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
string fileName = context.GetInput<string>();
// start encoding
await context.CallActivityAsync<string>("MonitorPatternExample_BeginEncode", fileName);
// We don't want the orchestration to run infinitely
// If the operation has not completed within 30 mins, end the orchestration
var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);
while (true)
{
var operationHasTimedOut = context.CurrentUtcDateTime > operationTimeoutTime;
if (operationHasTimedOut)
{
context.SetCustomStatus("Encoding has timed out, please submit the job again.");
break;
}
var isEncodingComplete = await context.CallActivityAsync<bool>("MonitorPatternExample_IsEncodingComplete", fileName);
if (isEncodingComplete)
{
context.SetCustomStatus("Encoding has completed successfully.");
break;
}
// If no timeout and encoding still being processed we want to put the orchestration to sleep,
// and awaking it again after a specified interval
var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
await context.CreateTimer(nextCheckTime, CancellationToken.None);
}
}
This code starts the actual asynchronous/long-running work by calling the MonitorPatternExample_BeginEncode activity.
Then we loop around a while loop until either the long running operation is completes or a timeout occurs.
To query whether or not the encoding is complete, the MonitorPatternExample_IsEncodingComplete activity is called.
The timeout in this example is fixed at 30 mins: var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);
The orchestration is put to sleep for 15 seconds before the while loop starts again with the code:
var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
await context.CreateTimer(nextCheckTime, CancellationToken.None);
Now the orchestration will either complete by timing out with a custom status context.SetCustomStatus("Encoding has timed out, please submit the job again."); or when the encoding has completed context.SetCustomStatus("Encoding has completed successfully.");
Just a quick reminder that you can start watching my Pluralsight courses with a free trial.
The full listing is as follows:
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
namespace DurableDemos
{
public static class MonitorPatternExample
{
[FunctionName("MonitorPatternExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStartV1(
[HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
dynamic data = await req.Content.ReadAsAsync<dynamic>();
var fileName = data.FileName;
string instanceId = await starter.StartNewAsync("MonitorPatternExample", fileName);
return starter.CreateCheckStatusResponse(req, instanceId);
}
[FunctionName("MonitorPatternExample")]
public static async Task RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
string fileName = context.GetInput<string>();
// start encoding
await context.CallActivityAsync<string>("MonitorPatternExample_BeginEncode", fileName);
// We don't want the orchestration to run infinitely
// If the operation has not completed within 30 mins, end the orchestration
var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);
while (true)
{
var operationHasTimedOut = context.CurrentUtcDateTime > operationTimeoutTime;
if (operationHasTimedOut)
{
context.SetCustomStatus("Encoding has timed out, please submit the job again.");
break;
}
var isEncodingComplete = await context.CallActivityAsync<bool>("MonitorPatternExample_IsEncodingComplete", fileName);
if (isEncodingComplete)
{
context.SetCustomStatus("Encoding has completed successfully.");
break;
}
// If no timeout and encoding still being processed we want to put the orchestration to sleep,
// and awaking it again after a specified interval
var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
await context.CreateTimer(nextCheckTime, CancellationToken.None);
}
}
[FunctionName("MonitorPatternExample_BeginEncode")]
public static void BeginEncodeVideo([ActivityTrigger] string fileName, ILogger log)
{
// Call API, start an async process, queue a message, etc.
log.LogInformation($"************** Starting encoding of {fileName}");
// This activity returns before the job is complete, its job is to just start the async/long running operation
}
[FunctionName("MonitorPatternExample_IsEncodingComplete")]
public static bool IsEncodingComplete([ActivityTrigger] string fileName, ILogger log)
{
log.LogInformation($"************** Checking if {fileName} encoding is complete...");
// Here you would make a call to an API, query a database, check blob storage etc
// to check whether the long running asyn process is complete
// For demo purposes, we'll just signal completion every so often
bool isComplete = new Random().Next() % 2 == 0;
log.LogInformation($"************** {fileName} encoding complete: {isComplete}");
return isComplete;
}
}
}
SHARE: