Understanding Azure Durable Functions - Part 12: Sub Orchestrations

This is part twelve in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

Sub-orchestrations are a feature of Durable Functions that allow you to further compose and reuse functions.

Essentially sub-orchestrations allow you to call an orchestration from within another orchestration. In this way they are similar to calling activity functions from within an orchestration and just like activity functions can return a value to the calling (parent) orchestration.

As an example, the following client function starts the orchestration called ProcessMultipleCitiesOrhestrator:

[FunctionName("SubOrchExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{

    var data = await req.Content.ReadAsAsync<GreetingsRequest>();

    string instanceId = await starter.StartNewAsync("ProcessMultipleCitiesOrhestrator", data);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

The preceding code is no different from what we’ve seen already in this series, the change comes in the ProcessMultipleCitiesOrhestrator:

[FunctionName("ProcessMultipleCitiesOrhestrator")]
public static async Task<string> ParentOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, 
    ILogger log)
{
    log.LogInformation($"************** ProcessMultipleCitiesOrhestrator ********************");

    GreetingsRequest data = context.GetInput<GreetingsRequest>();


    // Perform all greetings in parallel executing sub-orchestrations
    var greetingsSubOrchestrations = new List<Task>();            
    
    foreach (string city in data.Cities)
    {
        Task greetingSubOrchestration = context.CallSubOrchestratorAsync<string>("ProcessSingleCityOrhestrator", city);
        greetingsSubOrchestrations.Add(greetingSubOrchestration);
    }

    await Task.WhenAll(greetingsSubOrchestrations);

    // When all of the sub orchestrations have competed, get the results and append into a single string
    var allGreetings = new StringBuilder();
    foreach (Task<string> greetingSubOrchestration in greetingsSubOrchestrations)
    {
        allGreetings.AppendLine(await greetingSubOrchestration);
    }

    log.LogInformation(allGreetings.ToString());

    return allGreetings.ToString();
}

The main thing to note in the preceding code is the line: Task greetingSubOrchestration = context.CallSubOrchestratorAsync<string>("ProcessSingleCityOrhestrator", city); Here we are calling into another orchestrator function (the sub-orchestration). We are doing this by creating a list of tasks, each representing an instance of the sub-orchestration, and then executing those tasks, and finally getting the return values from each sub-orchestration task and creating a single string result.

The ProcessSingleCityOrhestrator function is as follows:

[FunctionName("ProcessSingleCityOrhestrator")]
public static async Task<string> SubOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** ProcessSingleCityOrhestrator method executing ********************");

    var city = context.GetInput<string>();

    string greeting = await context.CallActivityAsync<string>("SubOrchExample_ActivityFunction", city);
    string toUpper = await context.CallActivityAsync<string>("SubOrchExample_ActivityFunction_ToUpper", greeting);
    string withTimestamp = await context.CallActivityAsync<string>("SubOrchExample_ActivityFunction_AddTimestamp", toUpper);

    log.LogInformation(withTimestamp);

    return withTimestamp;
}

Now we have the flexibility to compose/reuse, for example the ProcessSingleCityOrhestrator could be called from a different client function if only a single city was being supplied.

If we call the client function SubOrchExample_HttpStart  with the JSON:

{
    "Cities": [
            "London",
            "Tokyo",
            "Perth",
            "Nadi"
    ]
}

We get the following return value (from the ProcessMultipleCitiesOrhestrator):

{
    "name": "ProcessMultipleCitiesOrhestrator",
    "instanceId": "c7a0eb03c56a44ab8a767cd8a487c834",
    "runtimeStatus": "Completed",
    "input": {
        "$type": "DurableDemos.SubOrchExample+GreetingsRequest, DurableDemos",
        "Cities": [
            "London",
            "Tokyo",
            "Perth",
            "Nadi"
        ]
    },
    "customStatus": null,
    "output": "HELLO LONDON! [30/10/2019 11:51:53 AM +08:00]\r\nHELLO TOKYO! [30/10/2019 11:51:58 AM +08:00]\r\nHELLO PERTH! [30/10/2019 11:52:01 AM +08:00]\r\nHELLO NADI! [30/10/2019 11:52:00 AM +08:00]\r\n",
    "createdTime": "2019-10-30T03:51:52Z",
    "lastUpdatedTime": "2019-10-30T03:52:01Z"
}

To learn more about sub-orchestrations, check out the docs.

If you like this article or series feel free to share it :)

SHARE:

Learning xUnit .NET Unit Testing the Easy Way

If you’re getting started with .NET or you’ve done some testing but want to know how to put it all together and also learn some additional tools then the new xUnit.net testing path from Pluralsight may be of interest (you can also get started viewing for free with a trial).

The path currently has the following individual courses (including some of my courses) taking you right from the basics of xUnit.net to more advanced techniques:

  • Testing .NET Code with xUnit.net: Getting Started
  • Mocking in .NET Core Unit Tests with Moq: Getting Started
  • Creating Maintainable Contexts for Automated Testing
  • Writing Testable Code
  • Building a Pragmatic Unit Test Suite
  • Improving Unit Tests with Fluent Assertions
  • Better .NET Unit Tests with AutoFixture: Get Started
  • Approval Tests for .NET

If you’re already skilled with xUnit.net you may find some of the other courses in the path useful.

SHARE:

Understanding Azure Durable Functions - Part 11: The Asynchronous Human Interaction Pattern

This is the eleventh part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

The Asynchronous Human Interaction Pattern allows a Durable Functions orchestration to pause at some point during its lifecycle and wait for an external event such as a human to perform some action or make some decision.

Azure Functions Durable Functions and Twilio working together

As an example, suppose that a comment can be submitted on a website but before actually appearing it must be moderated by a human. The human moderator can see the comment and then decide whether to approve the comment so it appears on the website or decline the comment in which case it is deleted.

In this scenario, if the human moderator does not approve or decline the comment within a set amount of time then the comment will be escalated to a manager to review.

Azure Durable Functions makes this possible because the orchestration can wait for an external event during its execution.

The code in this article follows the following workflow:

  1. New comment submitted via HTTP
  2. Review/moderation orchestration started
  3. Orchestration sends SMS notification to moderator
  4. Moderator receives SMS that contains 2 links: one to approve and one to decline the comment
  5. Orchestration waits for human moderator to click on one of the 2 links
  6. When human clicks a link, the orchestration resumes and comment is approved or declined
  7. If human does not click link within a set deadline, the comment is escalated to a human manager

Let’s start by defining a class to be HTTP POSTed to the client function:

public class AddCommentRequest
{
    public string UserName { get; set; }
    public string Comment { get; set; }
}

And the client function:

[FunctionName("HumanPatternExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    var commentRequest = await req.Content.ReadAsAsync<AddCommentRequest>();

    string instanceId = await starter.StartNewAsync("HumanPatternExample_Orchestrator", (commentRequest: commentRequest, requestUri: req.RequestUri));

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return new HttpResponseMessage(System.Net.HttpStatusCode.Accepted)
    {
        Content = new StringContent("Your comment has been submitted and is awaiting moderator approval.")
    };
}

The preceding client function initiates the HumanPatternExample_Orchestrator function and passes in a tuple containing the comment request and the request URI which will be used later to construct the approve/decline URL links.

We’ll have a look at this orchestrator function in a moment, but first let’s take a look at the activity function that sends the SMS to the moderator:

[FunctionName("HumanPatternExample_RequestApproval")]
[return: TwilioSms(AccountSidSetting = "TwilioAccountSid", AuthTokenSetting = "TwilioAuthToken", From = "%FromPhoneNumber%")]
public static CreateMessageOptions RequestApproval([ActivityTrigger] ModerationRequest moderationRequest, ILogger log)
{            
    log.LogInformation($"Requesting approval for comment: {moderationRequest.CommentRequest.Comment}.");

    // Here we provide some way for a human to know that there is a new comment pending approval.
    // This could be writing to a database representing requests not yet approved for a human
    // to work through, or an SMS message for them to reply to with either APPROVED or NOTAPPROVED
    // or an email for them to reply to etc etc.

    var approversPhoneNumber = new PhoneNumber(Environment.GetEnvironmentVariable("ApproverPhoneNumber", EnvironmentVariableTarget.Process));                        

    var message = new CreateMessageOptions(approversPhoneNumber)
    {
        Body = $"'{moderationRequest.CommentRequest.Comment}' \r\nApprove: {moderationRequest.ApproveRequestUrl} \r\nDecline: {moderationRequest.DeclineRequestUrl}"
    };

    log.LogInformation($"Sending SMS: {message.Body}");

    return message;
}

In the preceding code, the TwilioSms output binding is being used to send an SMS – the SMS will contain links to either approve or decline the comment as the following screenshot shows:

Azure Functions and Twilio integration

Also notice that the ActivityTrigger is bound to a ModerationRequest: public static CreateMessageOptions RequestApproval([ActivityTrigger] ModerationRequest moderationRequest, ILogger log) – this class is defined as follows:

public class ModerationRequest
{
    public AddCommentRequest CommentRequest { get; set; }
    public string ApproveRequestUrl { get; set; }
    public string DeclineRequestUrl { get; set; }
}

The orchestrator function is where the main workflow is defined:

[FunctionName("HumanPatternExample_Orchestrator")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");

    // Using tuples but could also define a class for this data
    var (commentRequest, requestUri) = context.GetInput<Tuple<AddCommentRequest,Uri>>();
    var moderationRequest = new ModerationRequest
    {
        CommentRequest = commentRequest,
        ApproveRequestUrl = $"{requestUri.Scheme}://{requestUri.Host}:{requestUri.Port}/api/HumanPatternExample_Approve?id={context.InstanceId}",
        DeclineRequestUrl = $"{requestUri.Scheme}://{requestUri.Host}:{requestUri.Port}/api/HumanPatternExample_Decline?id={context.InstanceId}",
    };
    await context.CallActivityAsync("HumanPatternExample_RequestApproval", moderationRequest);

    // Define a time out - if the moderator hasn't approved/decline then escalate to someone else, e.g. a manager
    using (var timeout = new CancellationTokenSource())
    {
        DateTime moderationDeadline = context.CurrentUtcDateTime.AddMinutes(5); // probably would be longer in real life

        Task durableTimeout = context.CreateTimer(moderationDeadline, timeout.Token);

        Task<bool> moderatedEvent = context.WaitForExternalEvent<bool>("Moderation");

        if (moderatedEvent == await Task.WhenAny(moderatedEvent, durableTimeout))
        {
            timeout.Cancel();

            bool isApproved = moderatedEvent.Result;

            if (isApproved)
            {
                log.LogInformation($"************** Comment '{commentRequest.Comment}' was approved by a moderator ********************");
                // call an activity to make the comment live on the website, etc.
            }
            else
            {
                log.LogInformation($"************** Comment '{commentRequest.Comment}' was declined by a moderator ********************");
                // call an activity to delete the comment and don't make it live on website, etc.
            }
        }
        else
        {
            log.LogInformation($"************** Comment '{commentRequest.Comment}' was not reviewed by a moderator in time, escalating...  ********************");
            // await context.CallActivityAsync("Escalate"); call an activity to escalate etc.
        }
    }

    log.LogInformation($"************** Orchestration complete ********************");
}

The code may look a little complex at first, let’s break it down into the more important parts as they relate to the Asynchronous Human Interaction Pattern:

The line await context.CallActivityAsync("HumanPatternExample_RequestApproval", moderationRequest); calls the activity that actually notifies the human in some way that the orchestration is waiting for them.

Two tasks are created: Task durableTimeout = context.CreateTimer(moderationDeadline, timeout.Token); and Task<bool> moderatedEvent = context.WaitForExternalEvent<bool>("Moderation"); The first task represents the deadline/timeout that the moderator has. The second uses the DurableOrchestrationContext.WaitForExternalEvent method to pause the orchestration until an event occurs outside of the orchestration (i.e. the human interaction). Once these 2 tasks are defined, the line if (moderatedEvent == await Task.WhenAny(moderatedEvent, durableTimeout)) checks to see if the orchestration is continuing because of an external event or because of the timeout .

So if the orchestration is waiting for an external event, how is that event sent to the orchestration? This is done via the DurableOrchestrationClient.RaiseEventAsync method as the following code shows:

[FunctionName("HumanPatternExample_Approve")]
public static async Task<IActionResult> HumanPatternExample_Approve(
    [HttpTrigger(AuthorizationLevel.Function, "get")]HttpRequest req,
    [OrchestrationClient]DurableOrchestrationClient client,
    ILogger log)
{
    // additional validation/null check code omitted for brevity

    var id = req.Query["id"];

    var status = await client.GetStatusAsync(id);

    if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running)
    {
        await client.RaiseEventAsync(id, "Moderation", true);
        return new OkObjectResult("Comment was approved.");
    }

    return new NotFoundResult();
}

[FunctionName("HumanPatternExample_Decline")]
public static async Task<IActionResult> HumanPatternExample_Decline(
    [HttpTrigger(AuthorizationLevel.Function, "get")]HttpRequest req,
    [OrchestrationClient]DurableOrchestrationClient client,
    ILogger log)
{
    // additional validation/null check code omitted for brevity

    var id = req.Query["id"];

    var status = await client.GetStatusAsync(id);
    if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running)
    {
        await client.RaiseEventAsync(id, "Moderation", false);
        return new OkObjectResult("Comment was declined.");
    }

    return new NotFoundResult();
}

The preceding 2 functions are triggered via HTTP (the links that are sent in the SMS) and raise the “Moderation” event to the orchestration id with either true or false.

The orchestrator is waiting for this event: Task<bool> moderatedEvent = context.WaitForExternalEvent<bool>("Moderation"); If the event is received in the orchestrator,  the approval decision is determined: bool isApproved = moderatedEvent.Result; The sample code then uses an if statement to either publish the comment to the website or delete it (omitted for brevity).

Let’s take a look at some (simplified) output – first if the human moderator approves the comment:

Executing HTTP request: {
  "requestId": "c300dfdb-553a-4d1a-8685-7eec6e9fc375",
  "method": "POST",
  "uri": "/api/HumanPatternExample_HttpStart"
}
Executing 'HumanPatternExample_HttpStart' (Reason='This function was programmatically called via the host APIs.', Id=2ff016f6-d54e-4489-9497-393322165d24)
Started orchestration with ID = 'c5ea86d46e524641a49ca10d1c04efc5'.
Executing 'HumanPatternExample_Orchestrator' (Reason='', Id=43196ec8-83cd-4a82-9c63-fcc9f13bd114)
************** RunOrchestrator method executing ********************
Executing 'HumanPatternExample_RequestApproval' (Reason='', Id=c98b259c-100a-4730-a1a9-116f5ea11aa1)
Requesting approval for comment: I hate cheese.
Sending SMS: 'I hate cheese'
Approve: http://localhost:7071/api/HumanPatternExample_Approve?id=c5ea86d46e524641a49ca10d1c04efc5
Decline: http://localhost:7071/api/HumanPatternExample_Decline?id=c5ea86d46e524641a49ca10d1c04efc5
Executed 'HumanPatternExample_RequestApproval' (Succeeded, Id=c98b259c-100a-4730-a1a9-116f5ea11aa1)
'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: CreateTimer
'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: WaitForExternalEvent:Moderation

<I 'clicked' the approve link here>

Executing 'HumanPatternExample_Approve' 
Function 'HumanPatternExample_Orchestrator (Orchestrator)' scheduled. Reason: RaiseEvent:Moderation
Function 'HumanPatternExample_Orchestrator (Orchestrator)' received a 'Moderation' event
************** Comment 'I hate cheese' was approved by a moderator ********************
************** Orchestration complete ********************

If we submit another request and wait or 5 minutes (DateTime moderationDeadline = context.CurrentUtcDateTime.AddMinutes(5);) we get the following:

Executing HTTP request: {
  "requestId": "08354276-34b2-4183-8884-9fc92fbac13d",
  "method": "POST",
  "uri": "/api/HumanPatternExample_HttpStart"
}
Executing 'HumanPatternExample_HttpStart' 
Started orchestration with ID = 'e660e4ee29044d8ea9bbbcff0e7e001f'.
Executed 'HumanPatternExample_HttpStart' (Succeeded, Id=6b33326d-b0e5-4fd1-9671-43f176b12928)
Executing 'HumanPatternExample_Orchestrator' (Reason='', Id=e2845e74-9818-4a92-ab29-e85233c208f3)
************** RunOrchestrator method executing ********************
Function 'HumanPatternExample_RequestApproval (Activity)' started.
Executing 'HumanPatternExample_RequestApproval' (Reason='', Id=b25f71e4-810f-4fc4-bb71-0c8f819eccfc)
Requesting approval for comment: I LOOOOVEEEE cheese.
Sending SMS: 'I LOOOOVEEEE cheese'
Approve: http://localhost:7071/api/HumanPatternExample_Approve?id=e660e4ee29044d8ea9bbbcff0e7e001f
Decline: http://localhost:7071/api/HumanPatternExample_Decline?id=e660e4ee29044d8ea9bbbcff0e7e001f
Executed 'HumanPatternExample_RequestApproval' (Succeeded, Id=b25f71e4-810f-4fc4-bb71-0c8f819eccfc)
Function 'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: CreateTimer
Function 'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: WaitForExternalEvent:Moderation
Function 'HumanPatternExample_Orchestrator (Orchestrator)' was resumed by a timer scheduled for '2019-09-05T05:42:05.9852935Z'. State: TimerExpired
************** Comment 'I LOOOOVEEEE cheese' was not reviewed by a moderator in time, escalating...  ********************
************** Orchestration complete ********************
Executed 'HumanPatternExample_Orchestrator' (Succeeded, Id=eff52ac6-a7e8-4d81-afd5-9125bd5a5aaa)

Notice this time the orchestration was “un-paused” because the timer expired and the moderator didn’t respond: Function 'HumanPatternExample_Orchestrator (Orchestrator)' was resumed by a timer scheduled for '2019-09-05T05:42:05.9852935Z'. State: TimerExpired

The sample code in this article does not contain comprehensive error handling or security so you’d want to make sure you have both these in place if you were to implement this kind of workflow.

You can also use the Durable Functions API to send events though you’ll need to expose the system key which you probably won’t want to do – read Part 9: The Asynchronous HTTP API Pattern to learn more.

If you like this article or series be sure to share it :)

SHARE:

Understanding Azure Durable Functions - Part 10 The Monitor Pattern

This is the tenth part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

In the previous part in this series, we looked at the  Asynchronous HTTP API Pattern where a client can poll to see if an orchestration has completed or not.

The monitor pattern is like a mirror image of this, whereby the orchestration polls some service at regular intervals. For example an orchestration could  initiate a long running asynchronous process and then periodically poll to see if the operation is complete. Once the operation is complete the orchestration can complete (or continue with more operations.)

We can create a pause in the execution of an orchestration by calling the CreateTimer method of the DurableOrchestrationContext. This method takes a DateTime specifying how long to “sleep” for, and a CancellationToken.

As an example, suppose we want to allow a client to post a video to be encoding to a different format.

The fist client function could look something like:

[FunctionName("MonitorPatternExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStartV1(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{            
    dynamic data = await req.Content.ReadAsAsync<dynamic>();
    var fileName = data.FileName;

    string instanceId = await starter.StartNewAsync("MonitorPatternExample", fileName);

    return starter.CreateCheckStatusResponse(req, instanceId);
}

For simplicity, we just taking a filename to be used as a job identifier. This function calls the orchestrator function MonitorPatternExample.

The orchestrator function is where the Monitor Pattern is implemented:

[FunctionName("MonitorPatternExample")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    string fileName = context.GetInput<string>();

    // start encoding
    await context.CallActivityAsync<string>("MonitorPatternExample_BeginEncode", fileName);


    // We don't want the orchestration to run infinitely
    // If the operation has not completed within 30 mins, end the orchestration
    var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);
  
    while (true)
    {
        var operationHasTimedOut = context.CurrentUtcDateTime > operationTimeoutTime;

        if (operationHasTimedOut)
        {
            context.SetCustomStatus("Encoding has timed out, please submit the job again.");
            break;
        }

        var isEncodingComplete = await context.CallActivityAsync<bool>("MonitorPatternExample_IsEncodingComplete", fileName);

        if (isEncodingComplete)
        {
            context.SetCustomStatus("Encoding has completed successfully.");
            break;
        }

        // If no timeout and encoding still being processed we want to put the orchestration to sleep,
        // and awaking it again after a specified interval
        var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
        log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
        await context.CreateTimer(nextCheckTime, CancellationToken.None);
    }
}

This code starts the actual asynchronous/long-running work by calling the MonitorPatternExample_BeginEncode activity.

Then we loop around a while loop until either the long running operation is completes or a timeout occurs.

To query whether or not the encoding is complete, the MonitorPatternExample_IsEncodingComplete activity is called.

The timeout in this example is fixed at 30 mins: var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);

The orchestration is put to sleep for 15 seconds before the while loop starts again with the code:

var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
await context.CreateTimer(nextCheckTime, CancellationToken.None);

Now the orchestration will either complete by timing out with a custom status context.SetCustomStatus("Encoding has timed out, please submit the job again."); or when the encoding has completed context.SetCustomStatus("Encoding has completed successfully."); 

Just a quick reminder that you can start watching my Pluralsight courses with a free trial.

 

The full listing is as follows:

using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class MonitorPatternExample
    {
        [FunctionName("MonitorPatternExample_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStartV1(
            [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {            
            dynamic data = await req.Content.ReadAsAsync<dynamic>();
            var fileName = data.FileName;

            string instanceId = await starter.StartNewAsync("MonitorPatternExample", fileName);

            return starter.CreateCheckStatusResponse(req, instanceId);
        }

            [FunctionName("MonitorPatternExample")]
            public static async Task RunOrchestrator(
                [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
            {
                string fileName = context.GetInput<string>();

                // start encoding
                await context.CallActivityAsync<string>("MonitorPatternExample_BeginEncode", fileName);


                // We don't want the orchestration to run infinitely
                // If the operation has not completed within 30 mins, end the orchestration
                var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);
          
                while (true)
                {
                    var operationHasTimedOut = context.CurrentUtcDateTime > operationTimeoutTime;

                    if (operationHasTimedOut)
                    {
                        context.SetCustomStatus("Encoding has timed out, please submit the job again.");
                        break;
                    }

                    var isEncodingComplete = await context.CallActivityAsync<bool>("MonitorPatternExample_IsEncodingComplete", fileName);

                    if (isEncodingComplete)
                    {
                        context.SetCustomStatus("Encoding has completed successfully.");
                        break;
                    }

                    // If no timeout and encoding still being processed we want to put the orchestration to sleep,
                    // and awaking it again after a specified interval
                    var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
                    log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
                    await context.CreateTimer(nextCheckTime, CancellationToken.None);
                }
            }

        [FunctionName("MonitorPatternExample_BeginEncode")]
        public static void BeginEncodeVideo([ActivityTrigger] string fileName, ILogger log)
        {
            // Call API, start an async process, queue a message, etc.
            log.LogInformation($"************** Starting encoding of {fileName}");

            // This activity returns before the job is complete, its job is to just start the async/long running operation
        }


        [FunctionName("MonitorPatternExample_IsEncodingComplete")]
        public static bool IsEncodingComplete([ActivityTrigger] string fileName, ILogger log)
        {
            log.LogInformation($"************** Checking if {fileName} encoding is complete...");
            // Here you would make a call to an API, query a database, check blob storage etc 
            // to check whether the long running asyn process is complete

            // For demo purposes, we'll just signal completion every so often
            bool isComplete = new Random().Next() % 2 == 0;

            log.LogInformation($"************** {fileName} encoding complete: {isComplete}");

            return isComplete;
        }


    }
}

SHARE:

Understanding Azure Durable Functions - Part 9: The Asynchronous HTTP API Pattern

This is the ninth part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

If your orchestration takes a while to execute, you may not want the end client (for example a web app that triggers the orchestration via an HTTP call) to wait around for a response. Instead you may want to provide the client with a way of querying (polling) if the long-running process is complete. In a previous article in this series we looked at how to get results from orchestrations. In this article we’ll dig into this in a bit more detail.

The Asynchronous HTTP API Pattern means the client calls an HTTP API which does not return the end result, but rather returns a way of checking for the completion of the task, for example by being providing with a status URL. This pattern may also be referred to as the Polling Consumer Pattern.

Recall from this previous article that when the client HTTP function is called, it returns some body content with management URLS for the orchestration instance that was started, for example:

{
    "id": "1bf95a9fa5084745bce24363e9ee781b",
    "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA=="
}

Notice that this information provides the client with a lot of information, including the URLs to terminate the orchestration, purge the history etc. Notice the response also include the key:  code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==

In the Azure Portal, this is the durabletask_extension key for the function app. With this key the client can perform admin/management operations on orchestration instances using the API including getting results from arbitrary orchestrations, terminating running orchestrations, etc..

The response also contains headers, including one called Location that also points to the check status URL, e.g. http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==

If we look at the client function, this information is generated with the line: return starter.CreateCheckStatusResponse(req, instanceId);

[FunctionName("AsyncApiPatternExample_HttpStartV1")]
public static async Task<HttpResponseMessage> HttpStartV1(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    string instanceId = await starter.StartNewAsync("AsyncApiPatternExample", null);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

You probably do not want to expose the durabletask_extension key to clients as this will allow them to perform operations they should not have access to. Instead we can modify the client function as follows:

[FunctionName("AsyncApiPatternExample_HttpStartV2")]
public static async Task<HttpResponseMessage> HttpStartV2(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    string instanceId = await starter.StartNewAsync("AsyncApiPatternExample", null);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    // Create the URL to allow the client to check status of a request (excluding the function key in the code querystring)
    string checkStatusUrl = string.Format("{0}://{1}/api/AsyncApiPatternExample_Status?id={2}", req.RequestUri.Scheme, req.RequestUri.Host, instanceId);

    // Create the response and add headers
    var response = new HttpResponseMessage()
    {
        StatusCode = System.Net.HttpStatusCode.Accepted,                
        Content = new StringContent(checkStatusUrl),                
    };
    response.Headers.Add("Location", checkStatusUrl);
    response.Headers.Add("Retry-After", "10");

    return response;
}

In this new version of the client function, we control what is passed back to the client, we don’t include any sensitive management URLs/keys, a response from calling this function would look like:

Response body (text): https://localhost:7071/api/AsyncApiPatternExample_Status?id=d69a847230e5411ca57659723cb14c55
Response status: 202Accepted
Response Headers:
Location = https://localhost:7071/api/AsyncApiPatternExample_Status?id=d69a847230e5411ca57659723cb14c55
Retry-After = 10
etc.

The client can then GET the status URL (+ the function key): https://localhost:7071/api/AsyncApiPatternExample_Status?id=d69a847230e5411ca57659723cb14c55&code=XXXXXXXXXX

This will return:

{
    "currentStatus": "Running",
    "result": null
}

And once the orchestration has complete this will return:

{
    "currentStatus": "Completed",
    "result": "Hello London!"
}

The actual status function look like the following:

 

[FunctionName("AsyncApiPatternExample_Status")]
public static async Task<IActionResult> Status(
  [HttpTrigger(AuthorizationLevel.Function, "get")]HttpRequest req,
  [OrchestrationClient]DurableOrchestrationClient orchestrationClient,
  ILogger log)
{
    var orchestrationInstanceId = req.Query["id"];

    if (string.IsNullOrWhiteSpace(orchestrationInstanceId))
    {
        return new NotFoundResult();
    }

    // Get the status for the passed in instanceId
    DurableOrchestrationStatus status = await orchestrationClient.GetStatusAsync(orchestrationInstanceId);

    if (status is null)
    {
        return new NotFoundResult();
    }

    
    var shortStatus = new
    {
        currentStatus = status.RuntimeStatus.ToString(),
        result = status.Output
    };

    return new OkObjectResult(shortStatus);
    //  We could also expand this and check status.RuntimeStatus and for example return a 202 if processing is still underway
}

The key thing in the preceding code is the call: DurableOrchestrationStatus status = await orchestrationClient.GetStatusAsync(orchestrationInstanceId); This allows the status to be obtained for the orchestration id that was passed in as a querystring parameter.

The output to the client is chosen in the anonymous object shortStatus. Now the client does not get sensitive information returned such as the management URLs and keys. A client could however still retrieve the status/result from orchestrations started by other clients.If you want more fine grained control/authentication you should check out the other options in the documentation.

SHARE:

Understanding Azure Durable Functions - Part 8: The Fan Out/Fan In Pattern

This is the eighth part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

In the previous article we saw the function chaining pattern where the output from one activity function is passed as the input to the next activity function to form a processing pipeline.

If you have a workload that you can split up into discrete chunks of data, you can parallelize the processing of those chunks to reduce the time it takes to complete the total workload. The fan out/fan in pattern can be used to do this.

This pattern essentially means running multiple instances of the activity function at the same time. The “fan out” part is the splitting up of the data into multiple chunks and then calling the activity function multiple times, passing in these chunks. The fanning out process invokes multiple instances of the activity function.

When each chunk has been processed, the “fan in” takes places and takes the results from each activity function instance and aggregates them into a single final result.

This pattern is only really useful if you can “chunk” the workload in a meaningful way for splitting up to be processed in parallel.

As an example, suppose we allow the client to specify a number of greetings to generate:

public class Greeting
{
    public string CityName { get; set; }
    public string Message { get; set; }
}

public class GreetingsRequest
{
    public List<Greeting> Greetings { get; set; }
}

Now the HTTP client function can be created that allows some JSON to be sent, this then calls the orchestrator:

[FunctionName("FanOutIn_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    var data = await req.Content.ReadAsAsync<GreetingsRequest>();

    string instanceId = await starter.StartNewAsync("FanOutInOrchestrator", data);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

At this point nothing is really different, the fan out/in is specified in the orchestrator function:

[FunctionName("FanOutInOrchestrator")]
public static async Task<string> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");

    GreetingsRequest greetingsRequest = context.GetInput<GreetingsRequest>();

    // Fanning out
    log.LogInformation($"************** Fanning out ********************");
    var parallelActivities = new List<Task<string>>();
    foreach (var greeting in greetingsRequest.Greetings)
    {
        // Start a new activity function and capture the task reference
        Task<string> task = context.CallActivityAsync<string>("FanOutIn_ActivityFunction", greeting);

        // Store the task reference for later
        parallelActivities.Add(task);
    }

    // Wait until all the activity functions have done their work
    log.LogInformation($"************** 'Waiting' for parallel results ********************");
    await Task.WhenAll(parallelActivities);
    log.LogInformation($"************** All activity functions complete ********************");

    // Now that all parallel activity functions have completed,
    // fan in AKA aggregate the results, in this case into a single
    // string using a StringBuilder
    log.LogInformation($"************** fanning in ********************");
    var sb = new StringBuilder();
    foreach (var completedParallelActivity in parallelActivities)
    {
        sb.AppendLine(completedParallelActivity.Result);
    }

    return sb.ToString();
}

The preceding code is the orchestrator function that handles the fan out/in, I’ve added comments to illustrate what’s going on. Essentially each Greeting is being treated as a “chunk” to be processed in parallel. Each chunk is passed to an instance of the FanOutIn_ActivityFunction. However rather than just awaiting the CallActivityAsync call, the task is stored in the parallelActivities list. Now when all activities have completed, the fan in can happen which just aggregates all the results into a single string containing all the greetings.

The activity function is defined as:

[FunctionName("FanOutIn_ActivityFunction")]
public static string SayHello([ActivityTrigger] Greeting greeting, ILogger log)
{            
    // simulate longer processing delay to demonstrate parallelism
    Thread.Sleep(15000); 

    return $"{greeting.Message} {greeting.CityName}";
}

If we run this, we get the following (simplified) output:

Executing 'FanOutIn_HttpStart' 
Executing 'FanOutInOrchestrator'
************** RunOrchestrator method executing ********************
************** Fanning out ********************
Function 'FanOutIn_ActivityFunction (Activity)' scheduled. 
Function 'FanOutIn_ActivityFunction (Activity)' scheduled. 
Function 'FanOutIn_ActivityFunction (Activity)' started. 
Function 'FanOutIn_ActivityFunction (Activity)' started. 
Executing 'FanOutIn_ActivityFunction' (Reason='', Id=9a33abd6-4594-4285-bbcd-0e428cf15d76)
Executing 'FanOutIn_ActivityFunction' (Reason='', Id=e3afbcb2-1f90-4f3f-a638-3983ea8db1a7)
Executed 'FanOutIn_ActivityFunction' (Succeeded, Id=9a33abd6-4594-4285-bbcd-0e428cf15d76)
Executed 'FanOutIn_ActivityFunction' (Succeeded, Id=e3afbcb2-1f90-4f3f-a638-3983ea8db1a7)
Function 'FanOutIn_ActivityFunction (Activity)' completed. 
Function 'FanOutIn_ActivityFunction (Activity)' completed. 
************** 'Waiting' for parallel results ********************
************** All activity functions complete ********************
************** fanning in ********************
Executed 'FanOutInOrchestrator' (Succeeded, Id=fba76372-758f-433c-af22-299a3b38dc5a)

Recall in the activity function there is a 15 second delay:

[FunctionName("FanOutIn_ActivityFunction")]
public static string SayHello([ActivityTrigger] Greeting greeting, ILogger log)
{            
    // simulate longer processing delay to demonstrate parallelism
    Thread.Sleep(15000); 

    return $"{greeting.Message} {greeting.CityName}";
}

If we look at the timings (below) notice that the createdTime and lastUpdatedTime are not 30 seconds apart but rather about 15 seconds apart (04:06:36 to 04:06:52), this is because the 2 activities have been run in parallel at the same time:

{
    "name": "FanOutInOrchestrator",
    "instanceId": "5704559dc4d94e26998ead2f47ea9821",
    "runtimeStatus": "Completed",
    "input": {
        "$type": "DurableDemos.FanOutInPatternExample+GreetingsRequest, DurableDemos",
        "Greetings": [
            {
                "$type": "DurableDemos.FanOutInPatternExample+Greeting, DurableDemos",
                "CityName": "New York",
                "Message": "Yo"
            },
            {
                "$type": "DurableDemos.FanOutInPatternExample+Greeting, DurableDemos",
                "CityName": "London",
                "Message": "Good day"
            }
        ]
    },
    "customStatus": null,
    "output": "Yo New York\r\nGood day London\r\n",
    "createdTime": "2019-08-21T04:06:36Z",
    "lastUpdatedTime": "2019-08-21T04:06:52Z"
}

Also note in the preceding status result the output is the aggregated “fanned-in” result: “Yo New York\r\nGood day London\r\n”.

Just as with the Function Chaining pattern discussed in the previous article, you could implement the fan out/fan in pattern without Durable Functions but you would have to manage the complexity of the process manually, such as knowing when all the parallel activity functions have completed and also the fan in/aggregation could be quite complex to implement manually.

SHARE:

Understanding Azure Durable Functions - Part 7: The Function Chaining Pattern

This is the seventh part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

There are a number of patterns that Durable Functions make easier to implement, we’ll look at some more later in this series of articles.

One common scenario is the requirement to create a “pipeline” of processing where the output from one Azure Function feeds into the next function in the chain/pipeline. This pattern can be implemented without Durable Functions, for example by manually setting up different queues to pass work down the chain. One downside to this manual approach is that it’s not sometimes immediately obvious what functions are involved in the the pipeline. Function chaining with Durable Functions allows the chain/pipeline to be easy to understand because the entire pipeline is represented in code.

To implement the function chaining pattern, you simply call one activity function and pass in  the input from a previous activity function.

As an example, the following orchestrator function chains 3 activity functions together:

[FunctionName("ChainPatternExample")]
public static async Task<string> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");            

    string greeting = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction", "London");
    string toUpper = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction_ToUpper", greeting);
    string withTimestamp = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction_AddTimestamp", toUpper);

    log.LogInformation(withTimestamp);
    return withTimestamp;
}

In the preceding code, the result of the function (e.g. in the greeting variable) is passed in as data to the next activity function: await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction_ToUpper", greeting);

This example is fairly simple, but you could add condition logic to only call an activity based on the result of the previous function. In this way you can build up more complex pipelines which using the manual (non Durable Functions) approach would be even harder to reason about. At least with Durable Functions, the entire flow (even if it has conditional logic) can be easily understood.

The complete listing is as follows (for simplicity we’re not taking in any input in the client function.):

using System;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class ChainPatternExample
    {
        [FunctionName("ChainPatternExample")]
        public static async Task<string> RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
        {
            log.LogInformation($"************** RunOrchestrator method executing ********************");            

            string greeting = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction", "London");
            string toUpper = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction_ToUpper", greeting);
            string withTimestamp = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction_AddTimestamp", toUpper);

            log.LogInformation(withTimestamp);
            return withTimestamp;
        }

        [FunctionName("ChainPatternExample_ActivityFunction")]
        public static string SayHello([ActivityTrigger] string name, ILogger log)
        {            
            return $"Hello {name}!";
        }

        [FunctionName("ChainPatternExample_ActivityFunction_ToUpper")]
        public static string ToUpper([ActivityTrigger] string s, ILogger log)
        {
            return s.ToUpperInvariant();
        }

        [FunctionName("ChainPatternExample_ActivityFunction_AddTimestamp")]
        public static string AddTimeStamp([ActivityTrigger] string s, ILogger log)
        {            
            return $"{s} [{DateTimeOffset.Now}]";
        }

        [FunctionName("ChainPatternExample_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {
            string instanceId = await starter.StartNewAsync("ChainPatternExample", null);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
    }
}

SHARE:

Understanding Azure Durable Functions - Part 6: Activity Functions with Additional Input Bindings

This is the sixth part in a series of articles.

Up until this point in this series, the activity function has received it’s data from the calling orchestration function, for example passing in a Greeting instance from the orchestrator:

public class Greeting
{
    public string CityName { get; set; }
    public string Message { get; set; }
}

[FunctionName("DataExample2_ActivityFunction")]
public static string SayHello([ActivityTrigger] Greeting greeting, ILogger log)
{            
    log.LogInformation($"Saying '{greeting.Message}' to {greeting.CityName}.");
    return $"{greeting.Message} {greeting.CityName}";
}

Because activity functions are just like regular Azure functions, we can also add input bindings:

[FunctionName("DataExample3_ActivityFunction")]
public static string SayHello(
    [ActivityTrigger] Greeting greeting, 
    [Blob("cities/{data.CityId}.txt")] string city,
    ILogger log)
{            
    log.LogInformation($"Saying '{greeting.Message}' to {city}.");
    return $"{greeting.Message} {city}";
}

In the preceding code, in addition to the activity trigger, the activity function also makes use of a blob storage input binding. When this activity function executes, the contents of a blob will be read in and the text contained therein used for the city name. Notice the binding syntax  [Blob("cities/{data.CityId}.txt")] string city, this will look for a property on the incoming data called CityId. The blob container that will be read from is fixed and is called cities. This works because the Greeting class has been modified as follows:

public class Greeting
{
    public string CityId { get; set; }
    public string Message { get; set; }
}

I just wanted to interrupt  this blog post with a quick reminder that you can start watching my Pluralsight courses with a free trial.

The following is a complete listing of these functions:

using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class DataExample3
    {
        public class Greeting
        {
            public string CityId { get; set; }
            public string Message { get; set; }
        }

        public class GreetingsRequest
        {
            public List<Greeting> Greetings { get; set; }
        }

        [FunctionName("DataExample3_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {
            var data = await req.Content.ReadAsAsync<GreetingsRequest>();

            string instanceId = await starter.StartNewAsync("DataExample3", data);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }

        [FunctionName("DataExample3")]
        public static async Task RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
        {
            log.LogInformation($"************** RunOrchestrator method executing ********************");

            GreetingsRequest data = context.GetInput<GreetingsRequest>();

            foreach (var greeting in data.Greetings)
            {
                await context.CallActivityAsync<string>("DataExample3_ActivityFunction", greeting);
            }            
        }

        [FunctionName("DataExample3_ActivityFunction")]
        public static string SayHello(
            [ActivityTrigger] Greeting greeting, 
            [Blob("cities/{data.CityId}.txt")] string city,
            ILogger log)
        {            
            log.LogInformation($"Saying '{greeting.Message}' to {city}.");
            return $"{greeting.Message} {city}";
        }
    }
}

Now the following JSON can be posted to the client function (http://localhost:7071/api/DataExample3_HttpStart on my local dev environment):

{
    "Greetings": [{
            "CityId": "42",
            "Message": "Yo!"
        },
        {
            "CityId": "100",
            "Message": "Good day"
        }
    ]
}

In blob storage there are a couple of blobs with names 42, and 100, these match the IDs being passed in the preceding JSON.

Blobs in blob storage

Now when the activity function executes it will read in the corresponding blob which contains the city name as the following (simplified) output shows:

Executing 'DataExample3_HttpStart' (Reason='This function was programmatically called via the host APIs.', Id=b76cb2a3-ef54-48d4-bcd2-5854aedebe62)
Started orchestration with ID = '08e79880b6e844f5a3b2acbe5ba46244'.
Executed 'DataExample3_HttpStart' (Succeeded, Id=b76cb2a3-ef54-48d4-bcd2-5854aedebe62)
Executing 'DataExample3_ActivityFunction' (Reason='', Id=8f965987-d893-4755-b342-30c84c682e70)
Saying 'Yo!' to New York.
Executed 'DataExample3_ActivityFunction' (Succeeded, Id=8f965987-d893-4755-b342-30c84c682e70)
Executing 'DataExample3_ActivityFunction' (Reason='', Id=7b0aaee2-0132-40bd-90df-a8b741db6491)
Saying 'Good day' to London.
Executed 'DataExample3_ActivityFunction' (Succeeded, Id=7b0aaee2-0132-40bd-90df-a8b741db6491)

Notice the messages “Saying 'Yo!' to New York.” and “Saying 'Good day' to London.” – New York and London have been read from blob storage.

You could also use output bindings in the activity function, such as writing to queue storage:

[FunctionName("DataExample3_ActivityFunction")]
public static string SayHello(
    [ActivityTrigger] Greeting greeting, 
    [Blob("cities/{data.CityId}.txt")] string city,
    [Queue("greetings")] out string queueMessage,
    ILogger log)
{            
    log.LogInformation($"Saying '{greeting.Message}' to {city}.");

    var message = $"{greeting.Message} {city}";

    queueMessage = message;

    return message;
}

Now in addition to returning the message to the orchestration, the activity also writes a message to the queue.

SHARE:

Understanding Azure Durable Functions - Part 5: Getting Results from Orchestrations

This is the fifth part in a series of articles.

As we learned earlier in this series, a client function is called that initiates an orchestrator function which in turn calls one or more activity functions.

This process is asynchronous in nature. If the client function is an HTTP function then the HTTP request will complete and return an HTTP 202 accepted response to the caller. As this response code suggests, the request has been successfully accepted for processing but the processing is not yet complete.

Take the following client function that triggers an orchestration:

[FunctionName("ClientFunction")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    // Function input comes from the request content.
    string instanceId = await starter.StartNewAsync("OrchestratorFunction", null);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

Notice the line return starter.CreateCheckStatusResponse(req, instanceId); This method creates an HttpResponseMessage which is returned to the caller. This message contains information on how to check the status of the orchestration.

As an example, if the client function is called over HTTP (e.g. in local development: http://localhost:7071/api/ClientFunction) the response will look similar to the following:

{
    "id": "85ee280f20a249089ec30882bd2ea4e2",
    "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA=="
}

This JSON returns the id of the orchestration instance along with a number of URLs that can be used to interact with the orchestration instance.

Checking the Status of a Durable Functions Orchestration

To check the status of an orchestration instance, an HTTP GET can be sent to the following URL:  http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==

Assuming the following orchestrator and activity functions:

[FunctionName("OrchestratorFunction")]
public static async Task<List<string>> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context)
{
    var outputs = new List<string>();

    // Replace "hello" with the name of your Durable Activity Function.
    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Tokyo"));
    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Seattle"));
    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "London"));

    // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
    return outputs;
}

[FunctionName("ActivityFunction")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
    Thread.Sleep(5000); // simulate longer processing delay

    log.LogInformation($"Saying hello to {name}.");
    return $"Hello {name}!";
}

If the orchestration is still running, then querying the status URL will return the following:

{
    "name": "OrchestratorFunction",
    "instanceId": "85ee280f20a249089ec30882bd2ea4e2",
    "runtimeStatus": "Running",
    "input": null,
    "customStatus": null,
    "output": null,
    "createdTime": "2019-08-07T03:50:39Z",
    "lastUpdatedTime": "2019-08-07T03:50:39Z"
}

Notice the runtimeStatus of “Running”, meaning that the orchestration is not yet complete, also notice output is null.

If we wait for the orchestration to complete and call the URL again we get the following:

{
    "name": "OrchestratorFunction",
    "instanceId": "35d752392e934df994d01951102e50e8",
    "runtimeStatus": "Completed",
    "input": null,
    "customStatus": null,
    "output": [
        "Hello Tokyo!",
        "Hello Seattle!",
        "Hello London!"
    ],
    "createdTime": "2019-08-07T03:50:39Z",
    "lastUpdatedTime": "2019-08-07T03:50:55Z"
}

Notice this time that the runtimeStatus is now “Completed” and the output gives us the results returned from the orchestrator function in the line: return outputs;

In addition to getting the status you can use the other URLs to terminate a running orchestration,  purge history for instances, send event notifications to orchestrations, and replay (rewind) a failed orchestration into a running state (currently in preview). You can check out the complete API reference in the documentation.

Running Durable Functions Synchronously

Currently the client function is starting a new orchestration instance with the code: string instanceId = await starter.StartNewAsync("OrchestratorFunction", null);

In this code, the method StartNewAsync starts the orchestration asynchronously and returns the instance id.

If you want the orchestration to run synchronously, and for the client function to wait around until a result is available, the (rather verbose) WaitForCompletionOrCreateCheckStatusResponseAsync method can be used as the following code demonstrates:

[FunctionName("ClientFunctionSync")]
public static async Task<HttpResponseMessage> HttpStartSync(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    string instanceId = await starter.StartNewAsync("OrchestratorFunction", null);
    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    var timeout = TimeSpan.FromSeconds(20);
    var retryInterval = TimeSpan.FromSeconds(1); // How often to check the orchestration instance for completion

    return await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(req,instanceId,timeout,retryInterval);
}

If we call this modified HTTP client function, the HTTP request will complete after approximately 15 seconds with a 200 OK status code and the following response:

[
    "Hello Tokyo!",
    "Hello Seattle!",
    "Hello London!"
]

The reason the response takes 15 seconds is that the activity function has a 5 second delay in it (Thread.Sleep(5000); // simulate longer processing delay) and the orchestrator is calling this function 3 times.

If we reduce the timeout to 5 seconds (var timeout = TimeSpan.FromSeconds(5);) and call the client HTTP function again, once again we get a 202 Accepted and we get the following returned:

{
    "id": "8f88192ecfb3440199e572e93c478906",
    "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8f88192ecfb3440199e572e93c478906?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8f88192ecfb3440199e572e93c478906/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8f88192ecfb3440199e572e93c478906/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8f88192ecfb3440199e572e93c478906/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8f88192ecfb3440199e572e93c478906?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA=="
}
Now the client can use the status URL to poll for completion as before.

You will want to make sure that the end client that makes the initial HTTP call to start the orchestration doesn’t have an HTTP timeout implemented that is shorter than the timeout specified in the call to WaitForCompletionOrCreateCheckStatusResponseAsync (plus some extra time for the overhead of starting the orchestration etc) otherwise this initial call will always timeout on the client side.

Adding Custom Status Information to a Durable Functions Orchestration

In addition to the built-in status information, you can also set custom status information in the orchestrator function.

To do this the SetCustomStatus method of the DurableOrchestrationContext can be used, this method takes an object. The following is a modified version of the orchestrator function that gives us a rough idea of what % of processing has been completed:

[FunctionName("OrchestratorFunction")]
public static async Task<List<string>> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context)
{
    var outputs = new List<string>();

    context.SetCustomStatus("0% complete");            
    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Tokyo"));
    context.SetCustomStatus("33% complete");

    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Seattle"));
    context.SetCustomStatus("66% complete");

    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "London"));
    context.SetCustomStatus("100% complete");

    // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
    return outputs;
}

If we call the client HTTP function and then periodically call the status URL we get the following:

{
    "name": "OrchestratorFunction",
    "instanceId": "84d370b3833d4563b3cc1b1bab285787",
    "runtimeStatus": "Running",
    "input": null,
    "customStatus": "33% complete",
    "output": null,
    "createdTime": "2019-08-07T05:07:41Z",
    "lastUpdatedTime": "2019-08-07T05:07:56Z"
}

 

{
    "name": "OrchestratorFunction",
    "instanceId": "84d370b3833d4563b3cc1b1bab285787",
    "runtimeStatus": "Running",
    "input": null,
    "customStatus": "66% complete",
    "output": null,
    "createdTime": "2019-08-07T05:07:41Z",
    "lastUpdatedTime": "2019-08-07T05:08:12Z"
}
{
    "name": "OrchestratorFunction",
    "instanceId": "84d370b3833d4563b3cc1b1bab285787",
    "runtimeStatus": "Completed",
    "input": null,
    "customStatus": "100% complete",
    "output": [
        "Hello Tokyo!",
        "Hello Seattle!",
        "Hello London!"
    ],
    "createdTime": "2019-08-07T05:07:41Z",
    "lastUpdatedTime": "2019-08-07T05:08:27Z"
}
Notice in the preceding statuses that the customStatus has been populated. You can output anything in this, for example you could output an estimated time remaining to be displayed in the end client to give the end user an idea of when the request will be complete.

SHARE:

Understanding Azure Durable Functions - Part 4: Passing Input To Orchestrations and Activities

This is the fourth part in a series of articles.

In the first part of the series we learned that Durable Functions consist of three types of function: client, orchestrator, and activity functions.Up until this point, the only data we’ve made use of has been hardcoded in the orchestrator function:

await context.CallActivityAsync<string>("ReplayExample_ActivityFunction", "Tokyo");

In the preceding code, the ReplayExample_ActivityFunction is being called and the hardcoded data "Tokyo” is being passed to the activity. It’s more likely in real use that data won’t be hardcoded but will instead be either passed to the client function for use in the activity, or read in using one of the standard input bindings.

How To Pass Data to a Durable Functions Orchestrator Function

The first step in this scenario is to allow the initiator of the orchestration to provide some data. One way to do this is with an HTTP-triggered Azure Function that allows the caller to provide some JSON data, to represent this data we can create a class:

class SayHelloRequest
{
    public List<string> CityNames { get; set; }
}

This class can now be deserialized into from the incoming JSON data:

var data = await req.Content.ReadAsAsync<SayHelloRequest>();

To pass input data to an orchestration, it can be supplied to the StartNewAsync method of the DurableOrchestrationClient:

string instanceId = await starter.StartNewAsync("DataExample", data);

The full listing of the client function is now:

[FunctionName("DataExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    var data = await req.Content.ReadAsAsync<SayHelloRequest>();

    string instanceId = await starter.StartNewAsync("DataExample", data);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

Now we are sending the data, we can make use of it in the orchestrator.

How To Read Data Passed Into an Orchestrator Function

To read data that was passed into the orchestrator, the GetInput method of the DurableOrchestrationContext can be used:

SayHelloRequest data = context.GetInput<SayHelloRequest>();

One thing to note here is that the input data must be JSON serializable .

Now the data is available in the orchestrator function, it can be passed down to the activity function(s) instead of the hardcoded data we used earlier. In this example the activity function does not need to change.

The full listing for the new version of the orchestrator:

[FunctionName("DataExample")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");

    SayHelloRequest data = context.GetInput<SayHelloRequest>();

    foreach (var city in data.CityNames)
    {
        await context.CallActivityAsync<string>("DataExample_ActivityFunction", city);
    }            
}

Passing Data to an Azure Functions Durable Function Activity

The second parameter in the call to the CallActivityAsync method of the DurableOrchestrationContext allows an object to be passed to an activity function when it is called.

For example if the activity function is defined as: public static string SayHello([ActivityTrigger] string name, ILogger log) then the orchestrator can pass a single string to this activity when it is called.

To pass more data you could change this from a simple string to a more complex type as the following modified versions shows:

using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class DataExample2
    {
        [FunctionName("DataExample2")]
        public static async Task RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
        {
            log.LogInformation($"************** RunOrchestrator method executing ********************");

            GreetingsRequest data = context.GetInput<GreetingsRequest>();

            foreach (var greeting in data.Greetings)
            {
                await context.CallActivityAsync<string>("DataExample2_ActivityFunction", greeting);
            }            
        }

        [FunctionName("DataExample2_ActivityFunction")]
        public static string SayHello([ActivityTrigger] Greeting greeting, ILogger log)
        {            
            log.LogInformation($"Saying '{greeting.Message}' to {greeting.CityName}.");
            return $"{greeting.Message} {greeting.CityName}";
        }

        public class Greeting
        {
            public string CityName { get; set; }
            public string Message { get; set; }
        }

        public class GreetingsRequest
        {
            public List<Greeting> Greetings { get; set; }
        }

        [FunctionName("DataExample2_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {
            var data = await req.Content.ReadAsAsync<GreetingsRequest>();

            string instanceId = await starter.StartNewAsync("DataExample2", data);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
    }
}

Notice in the preceding code that the activity now accepts a more complex type, a Greeting.

SHARE: