Understanding Azure Durable Functions - Part 7: The Function Chaining Pattern

This is the seventh part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

There are a number of patterns that Durable Functions make easier to implement, we’ll look at some more later in this series of articles.

One common scenario is the requirement to create a “pipeline” of processing where the output from one Azure Function feeds into the next function in the chain/pipeline. This pattern can be implemented without Durable Functions, for example by manually setting up different queues to pass work down the chain. One downside to this manual approach is that it’s not sometimes immediately obvious what functions are involved in the the pipeline. Function chaining with Durable Functions allows the chain/pipeline to be easy to understand because the entire pipeline is represented in code.

To implement the function chaining pattern, you simply call one activity function and pass in  the input from a previous activity function.

As an example, the following orchestrator function chains 3 activity functions together:

[FunctionName("ChainPatternExample")]
public static async Task<string> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");            

    string greeting = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction", "London");
    string toUpper = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction_ToUpper", greeting);
    string withTimestamp = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction_AddTimestamp", toUpper);

    log.LogInformation(withTimestamp);
    return withTimestamp;
}

In the preceding code, the result of the function (e.g. in the greeting variable) is passed in as data to the next activity function: await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction_ToUpper", greeting);

This example is fairly simple, but you could add condition logic to only call an activity based on the result of the previous function. In this way you can build up more complex pipelines which using the manual (non Durable Functions) approach would be even harder to reason about. At least with Durable Functions, the entire flow (even if it has conditional logic) can be easily understood.

The complete listing is as follows (for simplicity we’re not taking in any input in the client function.):

using System;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class ChainPatternExample
    {
        [FunctionName("ChainPatternExample")]
        public static async Task<string> RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
        {
            log.LogInformation($"************** RunOrchestrator method executing ********************");            

            string greeting = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction", "London");
            string toUpper = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction_ToUpper", greeting);
            string withTimestamp = await context.CallActivityAsync<string>("ChainPatternExample_ActivityFunction_AddTimestamp", toUpper);

            log.LogInformation(withTimestamp);
            return withTimestamp;
        }

        [FunctionName("ChainPatternExample_ActivityFunction")]
        public static string SayHello([ActivityTrigger] string name, ILogger log)
        {            
            return $"Hello {name}!";
        }

        [FunctionName("ChainPatternExample_ActivityFunction_ToUpper")]
        public static string ToUpper([ActivityTrigger] string s, ILogger log)
        {
            return s.ToUpperInvariant();
        }

        [FunctionName("ChainPatternExample_ActivityFunction_AddTimestamp")]
        public static string AddTimeStamp([ActivityTrigger] string s, ILogger log)
        {            
            return $"{s} [{DateTimeOffset.Now}]";
        }

        [FunctionName("ChainPatternExample_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {
            string instanceId = await starter.StartNewAsync("ChainPatternExample", null);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
    }
}

If you want to fill in the gaps in your C# knowledge be sure to check out my C# Tips and Traps training course from Pluralsight – get started with a free trial.

SHARE:

Understanding Azure Durable Functions - Part 6: Activity Functions with Additional Input Bindings

This is the sixth part in a series of articles.

Up until this point in this series, the activity function has received it’s data from the calling orchestration function, for example passing in a Greeting instance from the orchestrator:

public class Greeting
{
    public string CityName { get; set; }
    public string Message { get; set; }
}

[FunctionName("DataExample2_ActivityFunction")]
public static string SayHello([ActivityTrigger] Greeting greeting, ILogger log)
{            
    log.LogInformation($"Saying '{greeting.Message}' to {greeting.CityName}.");
    return $"{greeting.Message} {greeting.CityName}";
}

Because activity functions are just like regular Azure functions, we can also add input bindings:

[FunctionName("DataExample3_ActivityFunction")]
public static string SayHello(
    [ActivityTrigger] Greeting greeting, 
    [Blob("cities/{data.CityId}.txt")] string city,
    ILogger log)
{            
    log.LogInformation($"Saying '{greeting.Message}' to {city}.");
    return $"{greeting.Message} {city}";
}

In the preceding code, in addition to the activity trigger, the activity function also makes use of a blob storage input binding. When this activity function executes, the contents of a blob will be read in and the text contained therein used for the city name. Notice the binding syntax  [Blob("cities/{data.CityId}.txt")] string city, this will look for a property on the incoming data called CityId. The blob container that will be read from is fixed and is called cities. This works because the Greeting class has been modified as follows:

public class Greeting
{
    public string CityId { get; set; }
    public string Message { get; set; }
}

I just wanted to interrupt  this blog post with a quick reminder that you can start watching my Pluralsight courses with a free trial.

The following is a complete listing of these functions:

using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class DataExample3
    {
        public class Greeting
        {
            public string CityId { get; set; }
            public string Message { get; set; }
        }

        public class GreetingsRequest
        {
            public List<Greeting> Greetings { get; set; }
        }

        [FunctionName("DataExample3_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {
            var data = await req.Content.ReadAsAsync<GreetingsRequest>();

            string instanceId = await starter.StartNewAsync("DataExample3", data);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }

        [FunctionName("DataExample3")]
        public static async Task RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
        {
            log.LogInformation($"************** RunOrchestrator method executing ********************");

            GreetingsRequest data = context.GetInput<GreetingsRequest>();

            foreach (var greeting in data.Greetings)
            {
                await context.CallActivityAsync<string>("DataExample3_ActivityFunction", greeting);
            }            
        }

        [FunctionName("DataExample3_ActivityFunction")]
        public static string SayHello(
            [ActivityTrigger] Greeting greeting, 
            [Blob("cities/{data.CityId}.txt")] string city,
            ILogger log)
        {            
            log.LogInformation($"Saying '{greeting.Message}' to {city}.");
            return $"{greeting.Message} {city}";
        }
    }
}

Now the following JSON can be posted to the client function (http://localhost:7071/api/DataExample3_HttpStart on my local dev environment):

{
    "Greetings": [{
            "CityId": "42",
            "Message": "Yo!"
        },
        {
            "CityId": "100",
            "Message": "Good day"
        }
    ]
}

In blob storage there are a couple of blobs with names 42, and 100, these match the IDs being passed in the preceding JSON.

Blobs in blob storage

Now when the activity function executes it will read in the corresponding blob which contains the city name as the following (simplified) output shows:

Executing 'DataExample3_HttpStart' (Reason='This function was programmatically called via the host APIs.', Id=b76cb2a3-ef54-48d4-bcd2-5854aedebe62)
Started orchestration with ID = '08e79880b6e844f5a3b2acbe5ba46244'.
Executed 'DataExample3_HttpStart' (Succeeded, Id=b76cb2a3-ef54-48d4-bcd2-5854aedebe62)
Executing 'DataExample3_ActivityFunction' (Reason='', Id=8f965987-d893-4755-b342-30c84c682e70)
Saying 'Yo!' to New York.
Executed 'DataExample3_ActivityFunction' (Succeeded, Id=8f965987-d893-4755-b342-30c84c682e70)
Executing 'DataExample3_ActivityFunction' (Reason='', Id=7b0aaee2-0132-40bd-90df-a8b741db6491)
Saying 'Good day' to London.
Executed 'DataExample3_ActivityFunction' (Succeeded, Id=7b0aaee2-0132-40bd-90df-a8b741db6491)

Notice the messages “Saying 'Yo!' to New York.” and “Saying 'Good day' to London.” – New York and London have been read from blob storage.

You could also use output bindings in the activity function, such as writing to queue storage:

[FunctionName("DataExample3_ActivityFunction")]
public static string SayHello(
    [ActivityTrigger] Greeting greeting, 
    [Blob("cities/{data.CityId}.txt")] string city,
    [Queue("greetings")] out string queueMessage,
    ILogger log)
{            
    log.LogInformation($"Saying '{greeting.Message}' to {city}.");

    var message = $"{greeting.Message} {city}";

    queueMessage = message;

    return message;
}

Now in addition to returning the message to the orchestration, the activity also writes a message to the queue.

If you want to fill in the gaps in your C# knowledge be sure to check out my C# Tips and Traps training course from Pluralsight – get started with a free trial.

SHARE:

Understanding Azure Durable Functions - Part 5: Getting Results from Orchestrations

This is the fifth part in a series of articles.

As we learned earlier in this series, a client function is called that initiates an orchestrator function which in turn calls one or more activity functions.

This process is asynchronous in nature. If the client function is an HTTP function then the HTTP request will complete and return an HTTP 202 accepted response to the caller. As this response code suggests, the request has been successfully accepted for processing but the processing is not yet complete.

Take the following client function that triggers an orchestration:

[FunctionName("ClientFunction")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    // Function input comes from the request content.
    string instanceId = await starter.StartNewAsync("OrchestratorFunction", null);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

Notice the line return starter.CreateCheckStatusResponse(req, instanceId); This method creates an HttpResponseMessage which is returned to the caller. This message contains information on how to check the status of the orchestration.

As an example, if the client function is called over HTTP (e.g. in local development: http://localhost:7071/api/ClientFunction) the response will look similar to the following:

{
    "id": "85ee280f20a249089ec30882bd2ea4e2",
    "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA=="
}

This JSON returns the id of the orchestration instance along with a number of URLs that can be used to interact with the orchestration instance.

Checking the Status of a Durable Functions Orchestration

To check the status of an orchestration instance, an HTTP GET can be sent to the following URL:  http://localhost:7071/runtime/webhooks/durabletask/instances/85ee280f20a249089ec30882bd2ea4e2?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==

Assuming the following orchestrator and activity functions:

[FunctionName("OrchestratorFunction")]
public static async Task<List<string>> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context)
{
    var outputs = new List<string>();

    // Replace "hello" with the name of your Durable Activity Function.
    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Tokyo"));
    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Seattle"));
    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "London"));

    // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
    return outputs;
}

[FunctionName("ActivityFunction")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
    Thread.Sleep(5000); // simulate longer processing delay

    log.LogInformation($"Saying hello to {name}.");
    return $"Hello {name}!";
}

If the orchestration is still running, then querying the status URL will return the following:

{
    "name": "OrchestratorFunction",
    "instanceId": "85ee280f20a249089ec30882bd2ea4e2",
    "runtimeStatus": "Running",
    "input": null,
    "customStatus": null,
    "output": null,
    "createdTime": "2019-08-07T03:50:39Z",
    "lastUpdatedTime": "2019-08-07T03:50:39Z"
}

Notice the runtimeStatus of “Running”, meaning that the orchestration is not yet complete, also notice output is null.

If we wait for the orchestration to complete and call the URL again we get the following:

{
    "name": "OrchestratorFunction",
    "instanceId": "35d752392e934df994d01951102e50e8",
    "runtimeStatus": "Completed",
    "input": null,
    "customStatus": null,
    "output": [
        "Hello Tokyo!",
        "Hello Seattle!",
        "Hello London!"
    ],
    "createdTime": "2019-08-07T03:50:39Z",
    "lastUpdatedTime": "2019-08-07T03:50:55Z"
}

Notice this time that the runtimeStatus is now “Completed” and the output gives us the results returned from the orchestrator function in the line: return outputs;

In addition to getting the status you can use the other URLs to terminate a running orchestration,  purge history for instances, send event notifications to orchestrations, and replay (rewind) a failed orchestration into a running state (currently in preview). You can check out the complete API reference in the documentation.

Running Durable Functions Synchronously

Currently the client function is starting a new orchestration instance with the code: string instanceId = await starter.StartNewAsync("OrchestratorFunction", null);

In this code, the method StartNewAsync starts the orchestration asynchronously and returns the instance id.

If you want the orchestration to run synchronously, and for the client function to wait around until a result is available, the (rather verbose) WaitForCompletionOrCreateCheckStatusResponseAsync method can be used as the following code demonstrates:

[FunctionName("ClientFunctionSync")]
public static async Task<HttpResponseMessage> HttpStartSync(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    string instanceId = await starter.StartNewAsync("OrchestratorFunction", null);
    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    var timeout = TimeSpan.FromSeconds(20);
    var retryInterval = TimeSpan.FromSeconds(1); // How often to check the orchestration instance for completion

    return await starter.WaitForCompletionOrCreateCheckStatusResponseAsync(req,instanceId,timeout,retryInterval);
}

If we call this modified HTTP client function, the HTTP request will complete after approximately 15 seconds with a 200 OK status code and the following response:

[
    "Hello Tokyo!",
    "Hello Seattle!",
    "Hello London!"
]

The reason the response takes 15 seconds is that the activity function has a 5 second delay in it (Thread.Sleep(5000); // simulate longer processing delay) and the orchestrator is calling this function 3 times.

If we reduce the timeout to 5 seconds (var timeout = TimeSpan.FromSeconds(5);) and call the client HTTP function again, once again we get a 202 Accepted and we get the following returned:

{
    "id": "8f88192ecfb3440199e572e93c478906",
    "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8f88192ecfb3440199e572e93c478906?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8f88192ecfb3440199e572e93c478906/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8f88192ecfb3440199e572e93c478906/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8f88192ecfb3440199e572e93c478906/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/8f88192ecfb3440199e572e93c478906?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA=="
}

Now the client can use the status URL to poll for completion as before.

You will want to make sure that the end client that makes the initial HTTP call to start the orchestration doesn’t have an HTTP timeout implemented that is shorter than the timeout specified in the call to WaitForCompletionOrCreateCheckStatusResponseAsync (plus some extra time for the overhead of starting the orchestration etc) otherwise this initial call will always timeout on the client side.

Adding Custom Status Information to a Durable Functions Orchestration

In addition to the built-in status information, you can also set custom status information in the orchestrator function.

To do this the SetCustomStatus method of the DurableOrchestrationContext can be used, this method takes an object. The following is a modified version of the orchestrator function that gives us a rough idea of what % of processing has been completed:

[FunctionName("OrchestratorFunction")]
public static async Task<List<string>> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context)
{
    var outputs = new List<string>();

    context.SetCustomStatus("0% complete");            
    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Tokyo"));
    context.SetCustomStatus("33% complete");

    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Seattle"));
    context.SetCustomStatus("66% complete");

    outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "London"));
    context.SetCustomStatus("100% complete");

    // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
    return outputs;
}

If we call the client HTTP function and then periodically call the status URL we get the following:

{
    "name": "OrchestratorFunction",
    "instanceId": "84d370b3833d4563b3cc1b1bab285787",
    "runtimeStatus": "Running",
    "input": null,
    "customStatus": "33% complete",
    "output": null,
    "createdTime": "2019-08-07T05:07:41Z",
    "lastUpdatedTime": "2019-08-07T05:07:56Z"
}

 

{
    "name": "OrchestratorFunction",
    "instanceId": "84d370b3833d4563b3cc1b1bab285787",
    "runtimeStatus": "Running",
    "input": null,
    "customStatus": "66% complete",
    "output": null,
    "createdTime": "2019-08-07T05:07:41Z",
    "lastUpdatedTime": "2019-08-07T05:08:12Z"
}
{
    "name": "OrchestratorFunction",
    "instanceId": "84d370b3833d4563b3cc1b1bab285787",
    "runtimeStatus": "Completed",
    "input": null,
    "customStatus": "100% complete",
    "output": [
        "Hello Tokyo!",
        "Hello Seattle!",
        "Hello London!"
    ],
    "createdTime": "2019-08-07T05:07:41Z",
    "lastUpdatedTime": "2019-08-07T05:08:27Z"
}

Notice in the preceding statuses that the customStatus has been populated. You can output anything in this, for example you could output an estimated time remaining to be displayed in the end client to give the end user an idea of when the request will be complete.

If you want to fill in the gaps in your C# knowledge be sure to check out my C# Tips and Traps training course from Pluralsight – get started with a free trial.

SHARE:

Understanding Azure Durable Functions - Part 4: Passing Input To Orchestrations and Activities

This is the fourth part in a series of articles.

In the first part of the series we learned that Durable Functions consist of three types of function: client, orchestrator, and activity functions.Up until this point, the only data we’ve made use of has been hardcoded in the orchestrator function:

await context.CallActivityAsync<string>("ReplayExample_ActivityFunction", "Tokyo");

In the preceding code, the ReplayExample_ActivityFunction is being called and the hardcoded data "Tokyo” is being passed to the activity. It’s more likely in real use that data won’t be hardcoded but will instead be either passed to the client function for use in the activity, or read in using one of the standard input bindings.

How To Pass Data to a Durable Functions Orchestrator Function

The first step in this scenario is to allow the initiator of the orchestration to provide some data. One way to do this is with an HTTP-triggered Azure Function that allows the caller to provide some JSON data, to represent this data we can create a class:

class SayHelloRequest
{
    public List<string> CityNames { get; set; }
}

This class can now be deserialized into from the incoming JSON data:

var data = await req.Content.ReadAsAsync<SayHelloRequest>();

To pass input data to an orchestration, it can be supplied to the StartNewAsync method of the DurableOrchestrationClient:

string instanceId = await starter.StartNewAsync("DataExample", data);

The full listing of the client function is now:

[FunctionName("DataExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    var data = await req.Content.ReadAsAsync<SayHelloRequest>();

    string instanceId = await starter.StartNewAsync("DataExample", data);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

Now we are sending the data, we can make use of it in the orchestrator.

How To Read Data Passed Into an Orchestrator Function

To read data that was passed into the orchestrator, the GetInput method of the DurableOrchestrationContext can be used:

SayHelloRequest data = context.GetInput<SayHelloRequest>();

One thing to note here is that the input data must be JSON serializable .

Now the data is available in the orchestrator function, it can be passed down to the activity function(s) instead of the hardcoded data we used earlier. In this example the activity function does not need to change.

The full listing for the new version of the orchestrator:

[FunctionName("DataExample")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");

    SayHelloRequest data = context.GetInput<SayHelloRequest>();

    foreach (var city in data.CityNames)
    {
        await context.CallActivityAsync<string>("DataExample_ActivityFunction", city);
    }            
}

Passing Data to an Azure Functions Durable Function Activity

The second parameter in the call to the CallActivityAsync method of the DurableOrchestrationContext allows an object to be passed to an activity function when it is called.

For example if the activity function is defined as: public static string SayHello([ActivityTrigger] string name, ILogger log) then the orchestrator can pass a single string to this activity when it is called.

To pass more data you could change this from a simple string to a more complex type as the following modified versions shows:

using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class DataExample2
    {
        [FunctionName("DataExample2")]
        public static async Task RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
        {
            log.LogInformation($"************** RunOrchestrator method executing ********************");

            GreetingsRequest data = context.GetInput<GreetingsRequest>();

            foreach (var greeting in data.Greetings)
            {
                await context.CallActivityAsync<string>("DataExample2_ActivityFunction", greeting);
            }            
        }

        [FunctionName("DataExample2_ActivityFunction")]
        public static string SayHello([ActivityTrigger] Greeting greeting, ILogger log)
        {            
            log.LogInformation($"Saying '{greeting.Message}' to {greeting.CityName}.");
            return $"{greeting.Message} {greeting.CityName}";
        }

        public class Greeting
        {
            public string CityName { get; set; }
            public string Message { get; set; }
        }

        public class GreetingsRequest
        {
            public List<Greeting> Greetings { get; set; }
        }

        [FunctionName("DataExample2_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {
            var data = await req.Content.ReadAsAsync<GreetingsRequest>();

            string instanceId = await starter.StartNewAsync("DataExample2", data);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
    }
}

Notice in the preceding code that the activity now accepts a more complex type, a Greeting.

If you want to fill in the gaps in your C# knowledge be sure to check out my C# Tips and Traps training course from Pluralsight – get started with a free trial.

SHARE:

Understanding Azure Durable Functions - Part 3: What Is Durability?

This is the third part in a series of articles.

Durable Functions make it easier to organize (orchestrate) multiple individual Azure Functions working together.

In addition to simplifying this orchestration, durable functions (as its name suggests) provide a level of “durability”. So what is this durability and how does it work?

Durable Functions provides “reliable execution”. What this means is that the framework takes care of a number of things for us, one of these things is the management of orchestration history.

"Orchestrator functions and activity functions may be running on different VMs within a data center, and those VMs or the underlying networking infrastructure is not 100% reliable. In spite of this, Durable Functions ensures reliable execution of orchestrations." [Microsoft]

One important thing to note is that this “durability” is not meant to automatically retry operations or execute compensation logic, later in the series we’ll look at error handling.

Behind the Scenes

To better explain how this works, lets take the following functions:

using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class Function1
    {
        [FunctionName("OrchestratorFunction")]
        public static async Task<List<string>> RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context)
        {
            var outputs = new List<string>();

            // Replace "hello" with the name of your Durable Activity Function.
            outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Tokyo"));
            outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Seattle"));
            outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "London"));

            // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
            return outputs;
        }

        [FunctionName("ActivityFunction")]
        public static string SayHello([ActivityTrigger] string name, ILogger log)
        {
            Thread.Sleep(5000); // simulate longer processing delay

            log.LogInformation($"Saying hello to {name}.");
            return $"Hello {name}!";
        }

        [FunctionName("ClientFunction")]
        public static async Task<HttpResponseMessage> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {
            // Function input comes from the request content.
            string instanceId = await starter.StartNewAsync("OrchestratorFunction", null);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
    }
}

If we execute the orchestration (as we did in part two of this series) with some PowerShell: $R = Invoke-WebRequest 'http://localhost:7071/api/ReplayExample_HttpStart' -Method 'POST'

And take a look in the Azure storage account associated with the function app (in this example the local storage emulator) there are a number of storage queues and tables created as the following screenshot shows:

Durable Functions tables and queues

Whilst it is not necessary to understand how Durable Functions works behind the scenes, it is useful to know that these queues and tables exist and are used.

For example if we open the DurableFunctionsHubInstances table, we see a single row representing the execution of the orchestration we just initiates with PowerShell:

Azure Durable Functions behind the scenes table

Notice in the preceding screenshot, the Name field contains the name of the orchestration we just executed: [FunctionName("ReplayExample")]

If we execution the orchestration again, we get a second row, etc.

The DurableFunctionsHubHistory table is more complex and has more data per invocation:

DurableFunctionsHubHistory Table

Notice in the previous screenshot that the data in this table keeps track of where the orchestrator function is at, for example what activity functions have been executed so far.

The storage queues behind the scenes are used by the framework to drive function execution.

How Durable Functions Work – Checkpoints

The orchestrator function instance may be removed from memory while it is waiting for activity functions to execute, namely the await context.CallActivityAsync calls. Because these calls are awaited, the orchestrator function may temporarily be removed from memory/paused until the activity function completes.

Because orchestrator functions may be removed from memory, there needs to be some way of the framework to keep track of which activity functions have completed and which ones have yet to be executed. The framework does this by creating “checkpoints” and using the DurableFunctionsHubHistory table to store these checkpoints.

“Azure Storage does not provide any transactional guarantees between saving data into table storage and queues. To handle failures, the Durable Functions storage provider uses eventual consistency patterns. These patterns ensure that no data is lost if there is a crash or loss of connectivity in the middle of a checkpoint.” [Microsoft]

Essentially what this means is the the code in the orchestrator function may execute multiple times per invocation (be replayed) and the checkpoint data ensures that a given activity call is not called multiple times.

At a high level, at each checkpoint (each await) the execution history is saved into table storage and  messages are added to storage queues to trigger other functions.

Replay Considerations

Because the code in an orchestrator function can be replayed, there are a few restrictions on the code you write inside them.

Firstly, code in orchestrator function should be deterministic, that is if the code is replayed multiple times it should produce the same result – some examples of non-determinist code include random number generation, GUID generation, current date/time calls and calls to non-deterministic code/APIs/endpoints.

Code inside orchestrator functions should not block, so for example do not do IO operations or call Thread.Sleep.

Code inside orchestrator functions should not call into async code except by way of the DurableOrchestrationContext object that is passed into the orchestrator function, e.g. public static async Task RunOrchestrator( [OrchestrationTrigger] DurableOrchestrationContext context). For example initiating an activity function via: context.CallActivityAsync<string>("ReplayExample_ActivityFunction", "Tokyo"); So for example do not use Task.Run, Task.Delay, HttpClient.SendAsync, etc. inside an orchestrator function.

Inside an orchestrator function, if you want the following logic use the substitute APIs as recommended in the Microsoft docs:

Note: these restrictions apply to the orchestrator function, not to the activity function(s) or client function.

Seeing Replay in Action

As a quick example, we can modify the code in the orchestrator function to add a log message:

[FunctionName("ReplayExample")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");
    
    await context.CallActivityAsync<string>("ReplayExample_ActivityFunction", "Tokyo");            
    await context.CallActivityAsync<string>("ReplayExample_ActivityFunction", "Seattle");
    await context.CallActivityAsync<string>("ReplayExample_ActivityFunction", "London");
}

Now when the client function is called and the orchestration started, the following simplified log output can be seen:

Executing HTTP request: {
  "requestId": "c7edb6b0-e947-4347-9f0e-fc46a2bdeefe",
  "method": "POST",
  "uri": "/api/ReplayExample_HttpStart"
}
Executing 'ReplayExample_HttpStart' (Reason='This function was programmatically called via the host APIs.', Id=ca981987-5ea3-4c76-8934-09fef757cd6a)
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample (Orchestrator)' scheduled. Reason: NewInstance. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 2.
Started orchestration with ID = '75dce58e3be440e98dcd3c99112c9bbf'.
Executed 'ReplayExample_HttpStart' (Succeeded, Id=ca981987-5ea3-4c76-8934-09fef757cd6a)
Executing 'ReplayExample' (Reason='', Id=fb5e9945-3f9e-4fc6-bda3-9028fa6d8f04)
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample (Orchestrator)' started. IsReplay: False. Input: (16 bytes). State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 3.
************** RunOrchestrator method executing ********************
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample_ActivityFunction (Activity)' scheduled. Reason: ReplayExample. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 4.
Executed 'ReplayExample' (Succeeded, Id=fb5e9945-3f9e-4fc6-bda3-9028fa6d8f04)
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample (Orchestrator)' awaited. IsReplay: False. State: Awaited. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 5.
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample_ActivityFunction (Activity)' started. IsReplay: False. Input: (36 bytes). State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 6.
Executing 'ReplayExample_ActivityFunction' (Reason='', Id=e93e770c-a15b-47b5-b8e3-e0db081cf44b)
Saying hello to Tokyo.
Executed 'ReplayExample_ActivityFunction' (Succeeded, Id=e93e770c-a15b-47b5-b8e3-e0db081cf44b)
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample_ActivityFunction (Activity)' completed. ContinuedAsNew: False. IsReplay: False. Output: (56 bytes). State: Completed. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 7.
Executing 'ReplayExample' (Reason='', Id=0542a8de-d9e5-46d0-a5a5-d94ea5040202)
************** RunOrchestrator method executing ********************
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample_ActivityFunction (Activity)' scheduled. Reason: ReplayExample. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 8.
Executed 'ReplayExample' (Succeeded, Id=0542a8de-d9e5-46d0-a5a5-d94ea5040202)
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample (Orchestrator)' awaited. IsReplay: False. State: Awaited. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 9.
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample_ActivityFunction (Activity)' started. IsReplay: False. Input: (44 bytes). State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 10.
Executing 'ReplayExample_ActivityFunction' (Reason='', Id=9b408473-3224-4968-8d7b-1b1ec56d9359)
Saying hello to Seattle.
Executed 'ReplayExample_ActivityFunction' (Succeeded, Id=9b408473-3224-4968-8d7b-1b1ec56d9359)
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample_ActivityFunction (Activity)' completed. ContinuedAsNew: False. IsReplay: False. Output: (64 bytes). State: Completed. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 11.
Executing 'ReplayExample' (Reason='', Id=040e7026-724c-4b07-9e4f-46ed52278785)
************** RunOrchestrator method executing ********************
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample_ActivityFunction (Activity)' scheduled. Reason: ReplayExample. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 12.
Executed 'ReplayExample' (Succeeded, Id=040e7026-724c-4b07-9e4f-46ed52278785)
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample (Orchestrator)' awaited. IsReplay: False. State: Awaited. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 13.
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample_ActivityFunction (Activity)' started. IsReplay: False. Input: (40 bytes). State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 14.
Executing 'ReplayExample_ActivityFunction' (Reason='', Id=03a0b729-d8e5-4e42-8b7a-28ea974bb1a6)
Saying hello to London.
Executed 'ReplayExample_ActivityFunction' (Succeeded, Id=03a0b729-d8e5-4e42-8b7a-28ea974bb1a6)
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample_ActivityFunction (Activity)' completed. ContinuedAsNew: False. IsReplay: False. Output: (60 bytes). State: Completed. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 15.
Executing 'ReplayExample' (Reason='', Id=02f307bc-c63c-4803-80f8-acfae72c3577)
************** RunOrchestrator method executing ********************
75dce58e3be440e98dcd3c99112c9bbf: Function 'ReplayExample (Orchestrator)' completed. ContinuedAsNew: False. IsReplay: False. Output: (null). State: Completed. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 16.
Executed 'ReplayExample' (Succeeded, Id=02f307bc-c63c-4803-80f8-acfae72c3577)

Notice in the preceding log output, there are 4 instances of the log messages ************** RunOrchestrator method executing ********************

The final instance of the message occurs before the Function 'ReplayExample (Orchestrator)' completed message so even though we only have 3 activity calls, the orchestrator function method itself was executed 4 times.

If you want to fill in the gaps in your C# knowledge be sure to check out my C# Tips and Traps training course from Pluralsight – get started with a free trial.

SHARE:

Understanding Azure Durable Functions - Part 2: Creating Your First Durable Function

This is the second part in a series of articles.

Before creating durable functions it’s important to understand the logical types of functions involved. There are essentially 3 logical types of functions:

  • Client function: the entry point function, called by the end client to start the workflow, e.g. an HTTP triggered function
  • Orchestrator function: defines the workflow and what activity functions to call
  • Activity function: the function(s) that do the actual work/processing

When you create a durable function in Visual Studio, the template creates each of these 3 functions for you as a starting point.

Setup Development Environment

The first thing to do is set up your development environment:

  • Install Visual Studio 2019 (e.g. the free community version) – when installing, remember to install the Azure development workload as this enables functions development
  • Install and check that the Azure storage emulator is running – this allows you to run/test functions locally without deploying to Azure in the cloud

Create Azure Functions Project

Next, open Visual Studio 2019 and create a new Azure Functions project as the following screenshot shows:

Creating a new Azure Functions project in Visual Studio 2019

Once the project is created, you add individual functions to it.

At this point you should also manage NuGet packages for the project and update any packages to the latest versions.

Add a Function

Right click the new project and choose Add –> New Azure Function.

Adding a new Azure Function in Visual Studio 2019

Give the function a name (or leave it as the default “Function1.cs”) and click ok - this will open the function template chooser:

Azure Functions template chooser

Select Durable Functions Orchestration, and click OK.

This will create the following starter code:

using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;

namespace DontCodeTiredDemosV2.Durables
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<List<string>> RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context)
        {
            var outputs = new List<string>();

            // Replace "hello" with the name of your Durable Activity Function.
            outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Tokyo"));
            outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "Seattle"));
            outputs.Add(await context.CallActivityAsync<string>("Function1_Hello", "London"));

            // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
            return outputs;
        }

        [FunctionName("Function1_Hello")]
        public static string SayHello([ActivityTrigger] string name, ILogger log)
        {
            log.LogInformation($"Saying hello to {name}.");
            return $"Hello {name}!";
        }

        [FunctionName("Function1_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {
            // Function input comes from the request content.
            string instanceId = await starter.StartNewAsync("Function1", null);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
    }
}

Notice in the preceding code the 3 types of function: client, orchestrator, and activity.

We can make this a bit clearer by renaming a few things:

using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class Function1
    {
        [FunctionName("OrchestratorFunction")]
        public static async Task<List<string>> RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context)
        {
            var outputs = new List<string>();

            // Replace "hello" with the name of your Durable Activity Function.
            outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Tokyo"));
            outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "Seattle"));
            outputs.Add(await context.CallActivityAsync<string>("ActivityFunction", "London"));

            // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
            return outputs;
        }

        [FunctionName("ActivityFunction")]
        public static string SayHello([ActivityTrigger] string name, ILogger log)
        {
            log.LogInformation($"Saying hello to {name}.");
            return $"Hello {name}!";
        }

        [FunctionName("ClientFunction")]
        public static async Task<HttpResponseMessage> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {
            // Function input comes from the request content.
            string instanceId = await starter.StartNewAsync("OrchestratorFunction", null);

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return starter.CreateCheckStatusResponse(req, instanceId);
        }
    }
}

There are 3 Azure Functions in this single Function1 class.

First the “ClientFunction” is what starts the workflow, in this example it’s triggered by a HTTP call from the client, but you could use any trigger here – for example from a message on a queue or a timer. When this function is called, it doesn’t do any processing itself but rather creates an instance of the workflow that is defined in the "OrchestratorFunction". The line string instanceId = await starter.StartNewAsync("OrchestratorFunction", null); is what kicks off the workflow: the first argument is a string naming the orchestration to start, the second parameter (in this example null) is any input that needs to be passed to the orchestrator. The final line return starter.CreateCheckStatusResponse(req, instanceId); returns an HttpResponseMessage to the HTTP caller.

The second function "OrchestratorFunction" is what defines the activity functions that will comprise the workflow. In this function the CallActivityAsync method defines what activities get executed as part of the orchestration, in this example the same activity "ActivityFunction" is called 3 times. The CallActivityAsync method takes 2 parameters: the first is a string naming the activity function to execute, and the second is any data to be passed to the activity function; in this case hardcoded strings "Tokyo", "Seattle", and "London". Once these activities have completed execution, the result will be returned – a list: ["Hello Tokyo!", "Hello Seattle!", "Hello London!"].

The third function "ActivityFunction" is where the actual work/processing takes place.

Testing Durable Functions Locally

The project can be be run locally by hitting F5 in Visual Studio, this will start the local functions runtime:

                  %%%%%%
                 %%%%%%
            @   %%%%%%    @
          @@   %%%%%%      @@
       @@@    %%%%%%%%%%%    @@@
     @@      %%%%%%%%%%        @@
       @@         %%%%       @@
         @@      %%%       @@
           @@    %%      @@
                %%
                %

Azure Functions Core Tools (2.7.1373 Commit hash: cd9bfca26f9c7fe06ce245f5bf69bc6486a685dd)
Function Runtime Version: 2.0.12507.0
[9/07/2019 3:29:16 AM] Starting Rpc Initialization Service.
[9/07/2019 3:29:16 AM] Initializing RpcServer
[9/07/2019 3:29:16 AM] Building host: startup suppressed:False, configuration suppressed: False
[9/07/2019 3:29:17 AM] Initializing extension with the following settings: Initializing extension with the following settings:
[9/07/2019 3:29:17 AM] AzureStorageConnectionStringName: , MaxConcurrentActivityFunctions: 80, MaxConcurrentOrchestratorFunctions: 80, PartitionCount: 4, ControlQueueBatchSize: 32, ControlQueueVisibilityTimeout: 00:05:00, WorkItemQueueVisibilityTimeout: 00:05:00, ExtendedSessionsEnabled: False, EventGridTopicEndpoint: , NotificationUrl: http://localhost:7071/runtime/webhooks/durabletask, TrackingStoreConnectionStringName: , MaxQueuePollingInterval: 00:00:30, LogReplayEvents: False. InstanceId: . Function: . HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 0.
[9/07/2019 3:29:17 AM] Initializing Host.
[9/07/2019 3:29:17 AM] Host initialization: ConsecutiveErrors=0, StartupCount=1
[9/07/2019 3:29:17 AM] LoggerFilterOptions
[9/07/2019 3:29:17 AM] {
[9/07/2019 3:29:17 AM]   "MinLevel": "None",
[9/07/2019 3:29:17 AM]   "Rules": [
[9/07/2019 3:29:17 AM]     {
[9/07/2019 3:29:17 AM]       "ProviderName": null,
[9/07/2019 3:29:17 AM]       "CategoryName": null,
[9/07/2019 3:29:17 AM]       "LogLevel": null,
[9/07/2019 3:29:17 AM]       "Filter": "<AddFilter>b__0"
[9/07/2019 3:29:17 AM]     },
[9/07/2019 3:29:17 AM]     {
[9/07/2019 3:29:17 AM]       "ProviderName": "Microsoft.Azure.WebJobs.Script.WebHost.Diagnostics.SystemLoggerProvider",
[9/07/2019 3:29:17 AM]       "CategoryName": null,
[9/07/2019 3:29:17 AM]       "LogLevel": "None",
[9/07/2019 3:29:17 AM]       "Filter": null
[9/07/2019 3:29:17 AM]     },
[9/07/2019 3:29:17 AM]     {
[9/07/2019 3:29:17 AM]       "ProviderName": "Microsoft.Azure.WebJobs.Script.WebHost.Diagnostics.SystemLoggerProvider",
[9/07/2019 3:29:17 AM]       "CategoryName": null,
[9/07/2019 3:29:17 AM]       "LogLevel": null,
[9/07/2019 3:29:17 AM]       "Filter": "<AddFilter>b__0"
[9/07/2019 3:29:17 AM]     }
[9/07/2019 3:29:17 AM]   ]
[9/07/2019 3:29:17 AM] }
[9/07/2019 3:29:17 AM] FunctionResultAggregatorOptions
[9/07/2019 3:29:17 AM] {
[9/07/2019 3:29:17 AM]   "BatchSize": 1000,
[9/07/2019 3:29:17 AM]   "FlushTimeout": "00:00:30",
[9/07/2019 3:29:17 AM]   "IsEnabled": true
[9/07/2019 3:29:17 AM] }
[9/07/2019 3:29:17 AM] SingletonOptions
[9/07/2019 3:29:17 AM] {
[9/07/2019 3:29:17 AM]   "LockPeriod": "00:00:15",
[9/07/2019 3:29:17 AM]   "ListenerLockPeriod": "00:00:15",
[9/07/2019 3:29:17 AM]   "LockAcquisitionTimeout": "10675199.02:48:05.4775807",
[9/07/2019 3:29:17 AM]   "LockAcquisitionPollingInterval": "00:00:05",
[9/07/2019 3:29:17 AM]   "ListenerLockRecoveryPollingInterval": "00:01:00"
[9/07/2019 3:29:17 AM] }
[9/07/2019 3:29:17 AM] Starting JobHost
[9/07/2019 3:29:17 AM] Starting Host (HostId=desktopkghqug8-1671102379, InstanceId=015cba37-1f46-41a1-b3c1-19f341c4d3d9, Version=2.0.12507.0, ProcessId=18728, AppDomainId=1, InDebugMode=False, InDiagnosticMode=False, FunctionsExtensionVersion=)
[9/07/2019 3:29:17 AM] Loading functions metadata
[9/07/2019 3:29:17 AM] 3 functions loaded
[9/07/2019 3:29:17 AM] Generating 3 job function(s)
[9/07/2019 3:29:17 AM] Found the following functions:
[9/07/2019 3:29:17 AM] DurableDemos.Function1.SayHello
[9/07/2019 3:29:17 AM] DurableDemos.Function1.HttpStart
[9/07/2019 3:29:17 AM] DurableDemos.Function1.RunOrchestrator
[9/07/2019 3:29:17 AM]
[9/07/2019 3:29:17 AM] Host initialized (221ms)
[9/07/2019 3:29:18 AM] Starting task hub worker. InstanceId: . Function: . HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 1.
[9/07/2019 3:29:18 AM] Host started (655ms)
[9/07/2019 3:29:18 AM] Job host started
Hosting environment: Production
Content root path: C:\Users\Admin\OneDrive\Documents\dct\19\DontCodeTiredDemosV2\DontCodeTiredDemosV2\DurableDemos\bin\Debug\netcoreapp2.1
Now listening on: http://0.0.0.0:7071
Application started. Press Ctrl+C to shut down.

Http Functions:

        ClientFunction: [GET,POST] http://localhost:7071/api/ClientFunction

[9/07/2019 3:29:23 AM] Host lock lease acquired by instance ID '000000000000000000000000E72C9561'.

Now to start an instance of the workflow, the following PowerShell can be used:

$R = Invoke-WebRequest 'http://localhost:7071/api/ClientFunction' -Method 'POST'

This will result in the following rather verbose output:

[9/07/2019 3:30:55 AM] Executing HTTP request: {
[9/07/2019 3:30:55 AM]   "requestId": "36d9f77f-1ceb-43ec-aa1d-5702b42a8e15",
[9/07/2019 3:30:55 AM]   "method": "POST",
[9/07/2019 3:30:55 AM]   "uri": "/api/ClientFunction"
[9/07/2019 3:30:55 AM] }
[9/07/2019 3:30:55 AM] Executing 'ClientFunction' (Reason='This function was programmatically called via the host APIs.', Id=a014a4ae-77ff-46c7-b812-344bd442da38)
[9/07/2019 3:30:55 AM] f5a38610c07a4c90815f2936451628b8: Function 'OrchestratorFunction (Orchestrator)' scheduled. Reason: NewInstance. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 2.
[9/07/2019 3:30:56 AM] Started orchestration with ID = 'f5a38610c07a4c90815f2936451628b8'.
[9/07/2019 3:30:56 AM] Executed 'ClientFunction' (Succeeded, Id=a014a4ae-77ff-46c7-b812-344bd442da38)
[9/07/2019 3:30:56 AM] Executing 'OrchestratorFunction' (Reason='', Id=d6263f09-2372-4bfb-9473-70f03874cfee)
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'OrchestratorFunction (Orchestrator)' started. IsReplay: False. Input: (16 bytes). State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 3.
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'ActivityFunction (Activity)' scheduled. Reason: OrchestratorFunction. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 4.
[9/07/2019 3:30:56 AM] Executed 'OrchestratorFunction' (Succeeded, Id=d6263f09-2372-4bfb-9473-70f03874cfee)
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'OrchestratorFunction (Orchestrator)' awaited. IsReplay: False. State: Awaited. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 5.
[9/07/2019 3:30:56 AM] Executed HTTP request: {
[9/07/2019 3:30:56 AM]   "requestId": "36d9f77f-1ceb-43ec-aa1d-5702b42a8e15",
[9/07/2019 3:30:56 AM]   "method": "POST",
[9/07/2019 3:30:56 AM]   "uri": "/api/ClientFunction",
[9/07/2019 3:30:56 AM]   "identities": [
[9/07/2019 3:30:56 AM]     {
[9/07/2019 3:30:56 AM]       "type": "WebJobsAuthLevel",
[9/07/2019 3:30:56 AM]       "level": "Admin"
[9/07/2019 3:30:56 AM]     }
[9/07/2019 3:30:56 AM]   ],
[9/07/2019 3:30:56 AM]   "status": 202,
[9/07/2019 3:30:56 AM]   "duration": 699
[9/07/2019 3:30:56 AM] }
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'ActivityFunction (Activity)' started. IsReplay: False. Input: (36 bytes). State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 6.
[9/07/2019 3:30:56 AM] Executing 'ActivityFunction' (Reason='', Id=d35e9667-f77b-4328-aff9-4ecbc3b66e89)
[9/07/2019 3:30:56 AM] Saying hello to Tokyo.
[9/07/2019 3:30:56 AM] Executed 'ActivityFunction' (Succeeded, Id=d35e9667-f77b-4328-aff9-4ecbc3b66e89)
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'ActivityFunction (Activity)' completed. ContinuedAsNew: False. IsReplay: False. Output: (56 bytes). State: Completed. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 7.
[9/07/2019 3:30:56 AM] Executing 'OrchestratorFunction' (Reason='', Id=5cc451d2-dd5b-4cb5-b10a-02e7bca71a08)
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'ActivityFunction (Activity)' scheduled. Reason: OrchestratorFunction. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 8.
[9/07/2019 3:30:56 AM] Executed 'OrchestratorFunction' (Succeeded, Id=5cc451d2-dd5b-4cb5-b10a-02e7bca71a08)
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'OrchestratorFunction (Orchestrator)' awaited. IsReplay: False. State: Awaited. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 9.
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'ActivityFunction (Activity)' started. IsReplay: False. Input: (44 bytes). State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 10.
[9/07/2019 3:30:56 AM] Executing 'ActivityFunction' (Reason='', Id=719c797b-9ee1-4167-972c-c0b0c4dd886c)
[9/07/2019 3:30:56 AM] Saying hello to Seattle.
[9/07/2019 3:30:56 AM] Executed 'ActivityFunction' (Succeeded, Id=719c797b-9ee1-4167-972c-c0b0c4dd886c)
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'ActivityFunction (Activity)' completed. ContinuedAsNew: False. IsReplay: False. Output: (64 bytes). State: Completed. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 11.
[9/07/2019 3:30:56 AM] Executing 'OrchestratorFunction' (Reason='', Id=0b115432-1d9d-43af-b5da-3e3607b808ac)
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'ActivityFunction (Activity)' scheduled. Reason: OrchestratorFunction. IsReplay: False. State: Scheduled. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 12.
[9/07/2019 3:30:56 AM] Executed 'OrchestratorFunction' (Succeeded, Id=0b115432-1d9d-43af-b5da-3e3607b808ac)
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'OrchestratorFunction (Orchestrator)' awaited. IsReplay: False. State: Awaited. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 13.
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'ActivityFunction (Activity)' started. IsReplay: False. Input: (40 bytes). State: Started. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 14.
[9/07/2019 3:30:56 AM] Executing 'ActivityFunction' (Reason='', Id=2cbd8e65-3e1d-4fbb-8d92-afe4b7e6a012)
[9/07/2019 3:30:56 AM] Saying hello to London.
[9/07/2019 3:30:56 AM] Executed 'ActivityFunction' (Succeeded, Id=2cbd8e65-3e1d-4fbb-8d92-afe4b7e6a012)
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'ActivityFunction (Activity)' completed. ContinuedAsNew: False. IsReplay: False. Output: (60 bytes). State: Completed. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 15.
[9/07/2019 3:30:56 AM] Executing 'OrchestratorFunction' (Reason='', Id=10215167-9de6-4197-bc65-1d819b8471cb)
[9/07/2019 3:30:56 AM] f5a38610c07a4c90815f2936451628b8: Function 'OrchestratorFunction (Orchestrator)' completed. ContinuedAsNew: False. IsReplay: False. Output: (196 bytes). State: Completed. HubName: DurableFunctionsHub. AppName: . SlotName: . ExtensionVersion: 1.8.2. SequenceNumber: 16.
[9/07/2019 3:30:56 AM] Executed 'OrchestratorFunction' (Succeeded, Id=10215167-9de6-4197-bc65-1d819b8471cb)

A few key things to notice:

Executing 'ClientFunction' – this is PowerShell calling the HTTP trigger function.

Started orchestration with ID = 'f5a38610c07a4c90815f2936451628b8' – the HTTP client function has started an instance of the orchestration.

And 3:Executing 'ActivityFunction'… - the 3 activity calls defined in the orchestrator function.

If we modify the "ActivityFunction" to introduce a simulated processing time:

[FunctionName("ActivityFunction")]
public static string SayHello([ActivityTrigger] string name, ILogger log)
{
    Thread.Sleep(5000); // simulate longer processing delay

    log.LogInformation($"Saying hello to {name}.");
    return $"Hello {name}!";
}

And now run the project again, and once again make the request from PowerShell, the client function returns a result to PowerShell “immediately”:

StatusCode        : 202
StatusDescription : Accepted
Content           : {"id":"5ed815a8fe3d497993266d49213a7c09","statusQueryGetUri":"http://localhost:7071/runtime/webhook
                    s/durabletask/instances/5ed815a8fe3d497993266d49213a7c09?taskHub=DurableFunctionsHub&connection=Sto
                    ra...
RawContent        : HTTP/1.1 202 Accepted
                    Retry-After: 10
                    Content-Length: 1232
                    Content-Type: application/json; charset=utf-8
                    Date: Tue, 09 Jul 2019 03:38:46 GMT
                    Location: http://localhost:7071/runtime/webhooks/durab...
Forms             : {}
Headers           : {[Retry-After, 10], [Content-Length, 1232], [Content-Type, application/json; charset=utf-8],
                    [Date, Tue, 09 Jul 2019 03:38:46 GMT]...}
Images            : {}
InputFields       : {}
Links             : {}
ParsedHtml        : mshtml.HTMLDocumentClass
RawContentLength  : 1232

So even though the HTTP request is completed (with a 202 Accepted HTTP code), the orchestration is still running.

Later in this series of articles we’ll learn more and dig into more detail about what is going on behind the scenes.

If you want to fill in the gaps in your C# knowledge be sure to check out my C# Tips and Traps training course from Pluralsight – get started with a free trial.

SHARE:

Understanding Azure Durable Functions - Part 1: Overview

This is the first part in a series of articles.

Durable Functions are built on top of top of the basic Azure Functions platform and provide the ability to define how multiple individual functions can be set up to work together.

You can accomplish a lot with just the basic Azure Functions so the durable extensions are not necessarily required to implement your solutions, however if you find yourself needing to make multiple functions work together in some kind of workflow then durable functions may be able to simply things and as a bonus allow you to document in code how the functions interact.

Using Multiple Functions

Just as you would keep classes and methods functionally cohesive (i.e. do one/small number related things) so should your Azure Functions be “specialized”. There are a number of good reasons for breaking down a problem into multiple individual functions such as:

  • Auto-scaling of functions rather than auto-scaling one massive “god” function
  • Maintainability: individual functions doing one thing are easier to understand/bug fix/test
  • Reusability/composeability: smaller units (functions) of logic could be reused in multiple workflows
  • Time-outs: if an individual function execution exceeds a timeout it will be stopped before finishing

Even if you are not going to use durable functions the above points still make sense.

Common Patterns That Durable Functions Can Help With

There are a number of common logical/architectural/workflow patterns that durable functions can help to orchestrate such as:

  • Chained Functions: the output of one function triggers the next function in the chain (aka functions pipeline)
  • Fan out, fan in: Split data into “chunks” (fanning out), process chunks in parallel,  aggregate results (fan-in)
  • Asynchronous HTTP APIs: Co-ordinate HTTP request with other services, client polls endpoint to check if work complete
  • Monitors: recurring process to perform clean-up, monitor (call) APIs to wait for completion, etc.
  • Human interaction: wait for human to make a decision at some part during the workflow

Implementing the preceding patterns without the use of durable functions may prove difficult, complex, or error prone due to the need to manually manage checkpoints (where are we in the process?) in addition to  other implementation details.

What Do Durable Functions Provide?

When using durable functions, there are a number of implementation details that are taken care of for you such as:

  • Maintains execution position of the workflow
  • When to execute next function
  • Replaying actions
  • Workflow monitoring/diagnostics
  • Workflow state storage
  • Scalability

Durable functions  also provide the ability to specify the workflow (“orchestration”) in code rather than using a visual flowchart style tool for example.

In the next part of this series we’ll see how to create a basic durable orchestration in C#.

If you want to fill in the gaps in your C# knowledge be sure to check out my C# Tips and Traps training course from Pluralsight – get started with a free trial.

SHARE:

Accessing Cosmos DB JSON Properties in Azure Functions with Dynamic C#

This is the eighth part in a series of articles.

When working with the Cosmos DB Microsoft.Azure.Documents.Document class, if you need to get custom properties from the document you can use the GetPropertyValue method as we saw used in part six of this series and replicated as follows:

[FunctionName("PizzaDriverLocationUpdated1")]
public static void RunOperation1([CosmosDBTrigger(
    databaseName: "pizza",
    collectionName: "driver",
    LeaseCollectionName = "PizzaDriverLocationUpdated1",
    CreateLeaseCollectionIfNotExists = true,
    ConnectionStringSetting = "pizzaConnection")] IReadOnlyList<Document> modifiedDrivers,
    ILogger log)
{
    if (modifiedDrivers != null)
    {
        foreach (var modifiedDriver in modifiedDrivers)
        {
            var driverName = modifiedDriver.GetPropertyValue<string>("Name");

            log.LogInformation($"Running operation 1 for driver {modifiedDriver.Id} {driverName}");
        }
    }
}

In the preceding code, the Azure Function is triggered from new/updated documents and the data that needs processing is passed to the function by way of the IReadOnlyList<Document> modifiedDrivers parameter. If you have multiple functions that work with a document type you may end up with many duplicated GetPropertyValue calls such as repeatedly getting the driver’s name: var driverName = modifiedDriver.GetPropertyValue<string>("Name"); Also notice the use of the magic string “Name” to refer to the document property to retrieve, this is not type safe and will return null at runtime if there is no property with that name.

Another option to simplify the code and remove the magic string is to use dynamic as the following code demonstrates:

[FunctionName("PizzaDriverLocationUpdated2")]
public static void RunOperation2([CosmosDBTrigger(
    databaseName: "pizza",
    collectionName: "driver",
    LeaseCollectionName = "PizzaDriverLocationUpdated2",
    CreateLeaseCollectionIfNotExists = true,
    ConnectionStringSetting = "pizzaConnection")] IReadOnlyList<dynamic> modifiedDrivers,
    ILogger log)
{
    if (modifiedDrivers != null)
    {
        foreach (var modifiedDriver in modifiedDrivers)
        {
            var driverName = modifiedDriver.Name;

            log.LogInformation($"Running operation 2 for driver {modifiedDriver.Id} {driverName}");
        }
    }
}

Notice in the preceding code that the binding has been changed from IReadOnlyList<Document> modifiedDrivers to IReadOnlyList<dynamic> modifiedDrivers and the code has changed from var driverName = modifiedDriver.GetPropertyValue<string>("Name"); to var driverName = modifiedDriver.Name;

While this removes the magic string, it is still not type safe and an incorrectly spelled property name will not error at compile time. Furthermore if the property does not exist, rather than return null, an exception will be thrown in your function at runtime.

If you’re not familiar with dynamic C# be sure to check out my Dynamic C# Fundamentals Pluralsight course.

You can start watching with a Pluralsight free trial.

SHARE:

How to Schedule Cosmos DB Data Processing With Azure Functions

This is the seventh part in a series of articles.

You can perform scheduled/batch processing of Azure Cosmos DB data by making use of timer triggers in Azure Functions.

Timer triggers allow you to set up a function to execute periodically based on set schedule.

For example the following attribute will cause the function to be executed once per day at  22:00 hours:

[TimerTrigger("0 0 22 * * *")]

As an example, in this series we’ve been using the domain of pizza delivery. Suppose that once per day the manager wanted an SMS with the total sales for the day.

To accomplish this we can combine a timer trigger, cosmos DB input binding, and a Twilio SMS output binding. In this example, the CosmosDB input binding is bound to an instance of DocumentClient. This allows us to perform more complex queries against Cosmos DB as we saw in part 2 of this series.

Once we have a DocumentClient instance, we can use LINQ to query Cosmos DB:

DateTime startOfToday = DateTime.Today;
DateTime endOfToday = startOfToday.AddDays(1).AddTicks(-1); 

decimal totalValueOfTodaysOrders = 
    client.CreateDocumentQuery<Order>(ordersCollectionUri, options)
         .Where(order => order.OrderDate >= startOfToday && order.OrderDate <= endOfToday)
         .Sum(order => order.OrderTotal);

The preceding query gets the total value or orders that have todays date, where order documents look like the following:

[
    {
        "id": "3",
        "StoreId": 2,
        "OrderTotal": 10.25,
        "OrderDate": "2019-06-06T17:17:17.7251173Z",
        "_rid": "Vg08AKOQeVQBAAAAAAAAAA==",
        "_self": "dbs/Vg08AA==/colls/Vg08AKOQeVQ=/docs/Vg08AKOQeVQBAAAAAAAAAA==/",
        "_etag": "\"00000000-0000-0000-1c2d-b2a092fd01d5\"",
        "_attachments": "attachments/",
        "_ts": 1559801067
    },
    {
        "id": "4",
        "StoreId": 2,
        "OrderTotal": 10.25,
        "OrderDate": "2019-06-06T18:18:18.7251173Z",
        "_rid": "Vg08AKOQeVQCAAAAAAAAAA==",
        "_self": "dbs/Vg08AA==/colls/Vg08AKOQeVQ=/docs/Vg08AKOQeVQCAAAAAAAAAA==/",
        "_etag": "\"00000000-0000-0000-1c37-77f607ea01d5\"",
        "_attachments": "attachments/",
        "_ts": 1559805263
    },
    {
        "id": "1",
        "StoreId": 1,
        "OrderTotal": 100,
        "OrderDate": "2019-06-06T14:14:14.7251173Z",
        "_rid": "Vg08AKOQeVQBAAAAAAAACA==",
        "_self": "dbs/Vg08AA==/colls/Vg08AKOQeVQ=/docs/Vg08AKOQeVQBAAAAAAAACA==/",
        "_etag": "\"00000000-0000-0000-1c2d-a87985f501d5\"",
        "_attachments": "attachments/",
        "_ts": 1559801050
    },
    {
        "id": "2",
        "StoreId": 1,
        "OrderTotal": 25.87,
        "OrderDate": "2019-06-06T16:16:16.7251173Z",
        "_rid": "Vg08AKOQeVQCAAAAAAAACA==",
        "_self": "dbs/Vg08AA==/colls/Vg08AKOQeVQ=/docs/Vg08AKOQeVQCAAAAAAAACA==/",
        "_etag": "\"00000000-0000-0000-1c2d-aef57e8c01d5\"",
        "_attachments": "attachments/",
        "_ts": 1559801061
    }
]

Once the total order value has been calculated, a Twilio SMS can be created and written to the Twilio output binding.

The complete listing is as follows:

using System;
using System.Linq;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Twilio.Rest.Api.V2010.Account;
using Twilio.Types;

namespace DontCodeTiredDemosV2.CosmosDemos
{
    public static class DailySales
    {
        [FunctionName("DailySales")]
        public static void Run(
            [TimerTrigger("0 0 22 * * *")]TimerInfo myTimer,            
            [CosmosDB(ConnectionStringSetting = "pizzaConnection")] DocumentClient client,
            [TwilioSms(AccountSidSetting = "TwilioAccountSid", AuthTokenSetting = "TwilioAuthToken", From = "%TwilioFromNumber%")
                ] out CreateMessageOptions messageToSend,
            ILogger log)
        {
            log.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}");

            Uri ordersCollectionUri = UriFactory.CreateDocumentCollectionUri(databaseId: "pizza", collectionId: "orders");

            var options = new FeedOptions { EnableCrossPartitionQuery = true }; // Enable cross partition query

            DateTime startOfToday = DateTime.Today;
            DateTime endOfToday = startOfToday.AddDays(1).AddTicks(-1); 

            decimal totalValueOfTodaysOrders = 
                client.CreateDocumentQuery<Order>(ordersCollectionUri, options)
                      .Where(order => order.OrderDate >= startOfToday && order.OrderDate <= endOfToday)
                      .Sum(order => order.OrderTotal);


            var messageText = $"Total sales for today: {totalValueOfTodaysOrders}";

            log.LogInformation(messageText);

            var managersMobileNumber = new PhoneNumber(Environment.GetEnvironmentVariable("ManagersMobileNumber"));

            var mobileMessage = new CreateMessageOptions(managersMobileNumber)
            {
                Body = messageText
            };

            messageToSend = mobileMessage;
        }
    }
}

In the preceding code, notice the From = "%TwilioFromNumber%" element of the [TwilioSms] binding, the %% means that the from number will be read from configuration, e.g. local.settings.json in the development environment. Similarly, notice the phone number that the SMS is sent to is read from configuration: var managersMobileNumber = new PhoneNumber(Environment.GetEnvironmentVariable("ManagersMobileNumber"));

Now once per day at 10pm the function will run and send an SMS to the manager giving him the days sales figures.

If you want to fill in the gaps in your C# knowledge be sure to check out my C# Tips and Traps training course from Pluralsight – get started with a free trial.

SHARE:

Executing Multiple Azure Functions When Azure Cosmos DB Documents Are Created or Modified

This is the sixth part in a series of articles.

Sometimes you may want more than one Azure Function to execute when a document  is changed or inserted in Cosmos DB.

You could just use one function that performs multiple logical operations on the changed document but there are some things to consider when doing this:

  • What if the function throws an exception during the first logical operation? (operation 2 may not be executed).
  • What scaling do you want, you/Azure won’t be able to scale the 2 logical operations independently.
  • How long will the function execute for if performing multiple operations in a single function, will you risk function timeouts?
  • How will you monitor operations when they are all contained in a single function.
  • How will you update the code/fix bugs: you will have to update the entire function even if the bug is only related to one operation.
  • How will you write automated tests? They will be more complex if there are multiple operations in a single function.

In some cases you may decide the preceding points don’t matter, but if they do you will need to split the operations into multiple separate Azure Functions.

As an example, the following function contains two logical operations in a single function:

[FunctionName("PizzaDriverLocationUpdated")]
public static void RunMultipleOperations([CosmosDBTrigger(
    databaseName: "pizza",
    collectionName: "driver",
    ConnectionStringSetting = "pizzaConnection")] IReadOnlyList<Document> modifiedDrivers,
    ILogger log)
{
    if (modifiedDrivers != null)
    {
        foreach (var modifiedDriver in modifiedDrivers)
        {
            var driverName = modifiedDriver.GetPropertyValue<string>("Name");

            // Simulate running logical operation 1
            log.LogInformation($"Running operation 1 for driver {modifiedDriver.Id} {driverName}");

            // Simulate running logical operation 2
            log.LogInformation($"Running operation 2 for driver {modifiedDriver.Id} {driverName}");
        }
    }
}

The preceding function could be separated into two separate functions, each one containing only a single logical operation:

[FunctionName("PizzaDriverLocationUpdated1")]
public static void RunOperation1([CosmosDBTrigger(
    databaseName: "pizza",
    collectionName: "driver",
    ConnectionStringSetting = "pizzaConnection")] IReadOnlyList<Document> modifiedDrivers,
    ILogger log)
{
    if (modifiedDrivers != null)
    {
        foreach (var modifiedDriver in modifiedDrivers)
        {
            var driverName = modifiedDriver.GetPropertyValue<string>("Name");

            log.LogInformation($"Running operation 1 for driver {modifiedDriver.Id} {driverName}");
        }
    }
}

[FunctionName("PizzaDriverLocationUpdated2")]
public static void RunOperation2([CosmosDBTrigger(
    databaseName: "pizza",
    collectionName: "driver",
    ConnectionStringSetting = "pizzaConnection")] IReadOnlyList<Document> modifiedDrivers,
    ILogger log)
{
    if (modifiedDrivers != null)
    {
        foreach (var modifiedDriver in modifiedDrivers)
        {
            var driverName = modifiedDriver.GetPropertyValue<string>("Name");

            log.LogInformation($"Running operation 2 for driver {modifiedDriver.Id} {driverName}");
        }
    }
}

If you try and run the function app (for example in the local development functions runtime) you will see some error such as the following:

Unhealthiness detected in the operation AcquireLease for localhost_...==_...=..1 Owner='626d5aec...' Continuation="49" Timestamp(local)=...
Unhealthiness detected in the operation AcquireLease for localhost_...==_...=..0 Owner='626d5aec... Continuation="586" Timestamp(local)=...

If you then make updates/inserts you may see that only one of the two functions is executed, rather than both of them. This is due to change feed leases.

Understanding Azure Cosmos DB Change Feed Leases

The Azure Functions Cosmos DB trigger knows when documents are changed/insert ed by way of the Cosmos DB change feed.

The change feed at a simple level listens for changes made in a collection and allows these changes to be passed to other processes (such as Azure Functions) to do work on.

Without a way to keep track of what changes in the underlying collection have been “fed” out to other process(es) there would be no way to know what changed documents have been passed to external process(es). This is where the lease collection comes in.

The lease collection stores a “checkpoint” for an Azure Function that is using the Cosmos DB trigger. Without this checkpoint, the function would not know if it has processed changed documents or not.

When only one function exists for Cosmos DB collection there is not a problem as only one checkpoint needs to be stored, because there is only one function.

When more that one function exists, there needs to be a way to store different checkpoints for different functions.

One way to do this is to use lease prefixes.

Sharing a Single Lease Collection Across Multiple Azure Functions

To use a single lease collection when you have multiple Azure Functions, you can use the LeaseCollectionPrefix property of the [CosmosDBTrigger] attribute. The value for this property needs to be unique for every function, as the following code demonstrates:

[FunctionName("PizzaDriverLocationUpdated1")]
public static void RunOperation1([CosmosDBTrigger(
    databaseName: "pizza",
    collectionName: "driver",
    LeaseCollectionPrefix = "PizzaDriverLocationUpdated1",
    ConnectionStringSetting = "pizzaConnection")] IReadOnlyList<Document> modifiedDrivers,
    ILogger log)
{
    if (modifiedDrivers != null)
    {
        foreach (var modifiedDriver in modifiedDrivers)
        {
            var driverName = modifiedDriver.GetPropertyValue<string>("Name");

            log.LogInformation($"Running operation 1 for driver {modifiedDriver.Id} {driverName}");
        }
    }
}

[FunctionName("PizzaDriverLocationUpdated2")]
public static void RunOperation2([CosmosDBTrigger(
    databaseName: "pizza",
    collectionName: "driver",
    LeaseCollectionPrefix = "PizzaDriverLocationUpdated2",
    ConnectionStringSetting = "pizzaConnection")] IReadOnlyList<Document> modifiedDrivers,
    ILogger log)
{
    if (modifiedDrivers != null)
    {
        foreach (var modifiedDriver in modifiedDrivers)
        {
            var driverName = modifiedDriver.GetPropertyValue<string>("Name");

            log.LogInformation($"Running operation 2 for driver {modifiedDriver.Id} {driverName}");
        }
    }
}

In the preceding code, notice LeaseCollectionPrefix = "PizzaDriverLocationUpdated1", and LeaseCollectionPrefix = "PizzaDriverLocationUpdated2".

If the function app is run now there is no startup error and changes made to a document trigger both functions:

Executing 'PizzaDriverLocationUpdated2' (Reason='New changes on collection driver at 2019-05-31T02:49:55.6946671Z', Id=b1476848-7f98-4362-a25f-69beb714c379)
Executing 'PizzaDriverLocationUpdated1' (Reason='New changes on collection driver at 2019-05-31T02:49:55.6946679Z', Id=366fa257-3b94-4d41-94f2-2777e0b8249a)
Running operation 2 for driver 1 Amrit
Running operation 1 for driver 1 Amrit
Executed 'PizzaDriverLocationUpdated2' (Succeeded, Id=b1476848-7f98-4362-a25f-69beb714c379)
Executed 'PizzaDriverLocationUpdated1' (Succeeded, Id=366fa257-3b94-4d41-94f2-2777e0b8249a)

If you check the lease collection behind the scenes notice the lease prefixes in use as the following screenshot shows:

Azure Functions Lease Prefixes

 

Using Multiple Azure Cosmos DB Lease Collections with Azure Functions

Rather than sharing a single lease collection, you can instead specify completely separate collections with the LeaseCollectionName property:

[FunctionName("PizzaDriverLocationUpdated1")]
public static void RunOperation1([CosmosDBTrigger(
    databaseName: "pizza",
    collectionName: "driver",
    LeaseCollectionName = "PizzaDriverLocationUpdated1",
    CreateLeaseCollectionIfNotExists = true,
    ConnectionStringSetting = "pizzaConnection")] IReadOnlyList<Document> modifiedDrivers,
    ILogger log)
{
    if (modifiedDrivers != null)
    {
        foreach (var modifiedDriver in modifiedDrivers)
        {
            var driverName = modifiedDriver.GetPropertyValue<string>("Name");

            log.LogInformation($"Running operation 1 for driver {modifiedDriver.Id} {driverName}");
        }
    }
}

[FunctionName("PizzaDriverLocationUpdated2")]
public static void RunOperation2([CosmosDBTrigger(
    databaseName: "pizza",
    collectionName: "driver",
    LeaseCollectionName = "PizzaDriverLocationUpdated2",
    CreateLeaseCollectionIfNotExists = true,
    ConnectionStringSetting = "pizzaConnection")] IReadOnlyList<Document> modifiedDrivers,
    ILogger log)
{
    if (modifiedDrivers != null)
    {
        foreach (var modifiedDriver in modifiedDrivers)
        {
            var driverName = modifiedDriver.GetPropertyValue<string>("Name");

            log.LogInformation($"Running operation 2 for driver {modifiedDriver.Id} {driverName}");
        }
    }
}

Notice in the preceding code LeaseCollectionName = "PizzaDriverLocationUpdated1", and LeaseCollectionName = "PizzaDriverLocationUpdated2". Also notice the CreateLeaseCollectionIfNotExists = true, as its name suggests this will create the lease collections if they don’t already exist.

Running the function app and once again and changing a document will result in both functions executing.

Because there are now two separate collections being used for leases there will be a cost associated with having these two collections. Also be sure to read up on RUs for your lease collections, especially if sharing a lease collection and using lease prefixes you should keep an eye on the metrics and make sure you are not getting throttled requests on your lease collection(s).

If you want to fill in the gaps in your C# knowledge be sure to check out my C# Tips and Traps training course from Pluralsight – get started with a free trial.

SHARE: