Understanding Azure Durable Functions - Part 10 The Monitor Pattern

This is the tenth part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

In the previous part in this series, we looked at the  Asynchronous HTTP API Pattern where a client can poll to see if an orchestration has completed or not.

The monitor pattern is like a mirror image of this, whereby the orchestration polls some service at regular intervals. For example an orchestration could  initiate a long running asynchronous process and then periodically poll to see if the operation is complete. Once the operation is complete the orchestration can complete (or continue with more operations.)

We can create a pause in the execution of an orchestration by calling the CreateTimer method of the DurableOrchestrationContext. This method takes a DateTime specifying how long to “sleep” for, and a CancellationToken.

As an example, suppose we want to allow a client to post a video to be encoding to a different format.

The fist client function could look something like:

[FunctionName("MonitorPatternExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStartV1(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{            
    dynamic data = await req.Content.ReadAsAsync<dynamic>();
    var fileName = data.FileName;

    string instanceId = await starter.StartNewAsync("MonitorPatternExample", fileName);

    return starter.CreateCheckStatusResponse(req, instanceId);
}

For simplicity, we just taking a filename to be used as a job identifier. This function calls the orchestrator function MonitorPatternExample.

The orchestrator function is where the Monitor Pattern is implemented:

[FunctionName("MonitorPatternExample")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    string fileName = context.GetInput<string>();

    // start encoding
    await context.CallActivityAsync<string>("MonitorPatternExample_BeginEncode", fileName);


    // We don't want the orchestration to run infinitely
    // If the operation has not completed within 30 mins, end the orchestration
    var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);
  
    while (true)
    {
        var operationHasTimedOut = context.CurrentUtcDateTime > operationTimeoutTime;

        if (operationHasTimedOut)
        {
            context.SetCustomStatus("Encoding has timed out, please submit the job again.");
            break;
        }

        var isEncodingComplete = await context.CallActivityAsync<bool>("MonitorPatternExample_IsEncodingComplete", fileName);

        if (isEncodingComplete)
        {
            context.SetCustomStatus("Encoding has completed successfully.");
            break;
        }

        // If no timeout and encoding still being processed we want to put the orchestration to sleep,
        // and awaking it again after a specified interval
        var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
        log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
        await context.CreateTimer(nextCheckTime, CancellationToken.None);
    }
}

This code starts the actual asynchronous/long-running work by calling the MonitorPatternExample_BeginEncode activity.

Then we loop around a while loop until either the long running operation is completes or a timeout occurs.

To query whether or not the encoding is complete, the MonitorPatternExample_IsEncodingComplete activity is called.

The timeout in this example is fixed at 30 mins: var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);

The orchestration is put to sleep for 15 seconds before the while loop starts again with the code:

var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
await context.CreateTimer(nextCheckTime, CancellationToken.None);

Now the orchestration will either complete by timing out with a custom status context.SetCustomStatus("Encoding has timed out, please submit the job again."); or when the encoding has completed context.SetCustomStatus("Encoding has completed successfully."); 

Just a quick reminder that you can start watching my Pluralsight courses with a free trial.

 

The full listing is as follows:

using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class MonitorPatternExample
    {
        [FunctionName("MonitorPatternExample_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStartV1(
            [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {            
            dynamic data = await req.Content.ReadAsAsync<dynamic>();
            var fileName = data.FileName;

            string instanceId = await starter.StartNewAsync("MonitorPatternExample", fileName);

            return starter.CreateCheckStatusResponse(req, instanceId);
        }

            [FunctionName("MonitorPatternExample")]
            public static async Task RunOrchestrator(
                [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
            {
                string fileName = context.GetInput<string>();

                // start encoding
                await context.CallActivityAsync<string>("MonitorPatternExample_BeginEncode", fileName);


                // We don't want the orchestration to run infinitely
                // If the operation has not completed within 30 mins, end the orchestration
                var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);
          
                while (true)
                {
                    var operationHasTimedOut = context.CurrentUtcDateTime > operationTimeoutTime;

                    if (operationHasTimedOut)
                    {
                        context.SetCustomStatus("Encoding has timed out, please submit the job again.");
                        break;
                    }

                    var isEncodingComplete = await context.CallActivityAsync<bool>("MonitorPatternExample_IsEncodingComplete", fileName);

                    if (isEncodingComplete)
                    {
                        context.SetCustomStatus("Encoding has completed successfully.");
                        break;
                    }

                    // If no timeout and encoding still being processed we want to put the orchestration to sleep,
                    // and awaking it again after a specified interval
                    var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
                    log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
                    await context.CreateTimer(nextCheckTime, CancellationToken.None);
                }
            }

        [FunctionName("MonitorPatternExample_BeginEncode")]
        public static void BeginEncodeVideo([ActivityTrigger] string fileName, ILogger log)
        {
            // Call API, start an async process, queue a message, etc.
            log.LogInformation($"************** Starting encoding of {fileName}");

            // This activity returns before the job is complete, its job is to just start the async/long running operation
        }


        [FunctionName("MonitorPatternExample_IsEncodingComplete")]
        public static bool IsEncodingComplete([ActivityTrigger] string fileName, ILogger log)
        {
            log.LogInformation($"************** Checking if {fileName} encoding is complete...");
            // Here you would make a call to an API, query a database, check blob storage etc 
            // to check whether the long running asyn process is complete

            // For demo purposes, we'll just signal completion every so often
            bool isComplete = new Random().Next() % 2 == 0;

            log.LogInformation($"************** {fileName} encoding complete: {isComplete}");

            return isComplete;
        }


    }
}

SHARE:

Add comment

Loading