Writing Azure Functions with Function Monkey: Using Commands Without Handlers

If you’ve read the previous articles on Function Monkey you may be wondering if you always need a command handler. Sometimes you may want to accept a request into the system (for example via HTTP) and that pass that request off for further processing. For example the HTTP data can be accepted and then the data (“command”) can be put on a queue for processing. This allows the potential scale-out of the function that processes queue messages to improve the overall throughput of the system.

Take the following example that allows an invoice to be submitted via HTTP. The submitted invoice is validated before simply being returned from the SubmitInvoiceCommandHandler. The output of the handler gets sent to a storage queue called “invoices”. Then we have a queue storage trigger creating and handling the ProcessInvoiceCommand.

using System.Net.Http;
using System.Threading.Tasks;
using AzureFromTheTrenches.Commanding.Abstractions;
using FluentValidation;
using FunctionMonkey.Abstractions;
using FunctionMonkey.Abstractions.Builders;
using FunctionMonkey.FluentValidation;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace FunctionApp2
{
    public class SubmitInvoiceCommand : ICommand<SubmitInvoiceCommand>
    {
        public string Description { get; set; }
        public decimal Amount { get; set; }
    }

    public class SubmitInvoiceCommandValidator : AbstractValidator<SubmitInvoiceCommand>
    {
        public SubmitInvoiceCommandValidator()
        {
            RuleFor(x => x.Description).NotEmpty();
            RuleFor(x => x.Amount).GreaterThan(0);
        }
    }

    public class SubmitInvoiceCommandHandler : ICommandHandler<SubmitInvoiceCommand, SubmitInvoiceCommand>
    {
        public Task<SubmitInvoiceCommand> ExecuteAsync(SubmitInvoiceCommand command, SubmitInvoiceCommand previousResult)
        {
            // We are not actually "handling" anything here, the handler is just returning the sam command
            return Task.FromResult(command);
        }
    }

    public class ProcessInvoiceCommand : ICommand
    {
        public string Description { get; set; }
        public decimal Amount { get; set; }
    }

    public class ProcessInvoiceCommandHandler : ICommandHandler<ProcessInvoiceCommand>
    {
        private readonly ILogger Log;

        public ProcessInvoiceCommandHandler(ILogger log)
        {
            Log = log;
        }

        public Task ExecuteAsync(ProcessInvoiceCommand command)
        {
            Log.LogInformation($"Processing invoice {command.Description} {command.Amount}");
            return Task.CompletedTask;
        }
    }

    public class FunctionAppConfiguration : IFunctionAppConfiguration
    {
        public void Build(IFunctionHostBuilder builder)
        {
            builder
                .Setup((serviceCollection, commandRegistry) =>
                {
                    serviceCollection.AddTransient<IValidator<SubmitInvoiceCommand>, SubmitInvoiceCommandValidator>();
                    commandRegistry.Register<SubmitInvoiceCommandHandler>();
                    commandRegistry.Register<ProcessInvoiceCommandHandler>();
                })
                .AddFluentValidation()
                .Functions(functions => functions

                    .HttpRoute("v1/SubmitInvoice", route => route
                        .HttpFunction<SubmitInvoiceCommand>(HttpMethod.Post)
                        .OutputTo.StorageQueue("invoices"))

                    .Storage(storage => storage
                        .QueueFunction<ProcessInvoiceCommand>("invoices"))                    
                );
        }
    }
}

If we POST the JSON { "Description" : "NAS",    "Amount" : 1000 } we get the following (abridged) output:

Executing HTTP request: {"method": "POST",  "uri": "/api/v1/SubmitInvoice"}
Executing 'SubmitInvoice' 
Executed 'SubmitInvoice'
Executing 'StqFnProcessInvoice' (Reason='New queue message detected on 'invoices')
Storage queue trigger function StqFnProcessInvoice processed a request.
Processing invoice NAS 1000.0
Executed 'StqFnProcessInvoice' 

At the moment the SubmitInvoiceCommandHandler is not doing anything useful, it’s just passing the command back out so it can be output to queue storage.

With Function Monkey you can do away with the command handler in these cases.

One way to do this is when configuring the function app by adding the NoCommandHandler() option in the build method. This means that the SubmitInvoiceCommandHandler class can be deleted:

using System.Net.Http;
using System.Threading.Tasks;
using AzureFromTheTrenches.Commanding.Abstractions;
using FluentValidation;
using FunctionMonkey.Abstractions;
using FunctionMonkey.Abstractions.Builders;
using FunctionMonkey.FluentValidation;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace FunctionApp2
{
    public class SubmitInvoiceCommand : ICommand<SubmitInvoiceCommand>
    {
        public string Description { get; set; }
        public decimal Amount { get; set; }
    }

    public class SubmitInvoiceCommandValidator : AbstractValidator<SubmitInvoiceCommand>
    {
        public SubmitInvoiceCommandValidator()
        {
            RuleFor(x => x.Description).NotEmpty();
            RuleFor(x => x.Amount).GreaterThan(0);
        }
    }

    public class ProcessInvoiceCommand : ICommand
    {
        public string Description { get; set; }
        public decimal Amount { get; set; }
    }

    public class ProcessInvoiceCommandHandler : ICommandHandler<ProcessInvoiceCommand>
    {
        private readonly ILogger Log;

        public ProcessInvoiceCommandHandler(ILogger log)
        {
            Log = log;
        }

        public Task ExecuteAsync(ProcessInvoiceCommand command)
        {
            Log.LogInformation($"Processing invoice {command.Description} {command.Amount}");
            return Task.CompletedTask;
        }
    }

    public class FunctionAppConfiguration : IFunctionAppConfiguration
    {
        public void Build(IFunctionHostBuilder builder)
        {
            builder
                .Setup((serviceCollection, commandRegistry) =>
                {
                    serviceCollection.AddTransient<IValidator<SubmitInvoiceCommand>, SubmitInvoiceCommandValidator>();
                    commandRegistry.Register<ProcessInvoiceCommandHandler>();
                })
                .AddFluentValidation()
                .Functions(functions => functions

                    .HttpRoute("v1/SubmitInvoice", route => route
                        .HttpFunction<SubmitInvoiceCommand>(HttpMethod.Post)
                        .Options(options => options.NoCommandHandler())
                        .OutputTo.StorageQueue("invoices"))

                    .Storage(storage => storage
                        .QueueFunction<ProcessInvoiceCommand>("invoices"))                    
                );
        }
    }
}

If we submit the same JSON request we get:

Executing HTTP request: {  "method": "POST",  "uri": "/api/v1/SubmitInvoice"}
Executing 'SubmitInvoice' 
Executed 'SubmitInvoice' 
Executing 'StqFnProcessInvoice' 
Storage queue trigger function StqFnProcessInvoice processed a request.
Processing invoice NAS 1000.0
Executed 'StqFnProcessInvoice'

Even though we don’t have an explicit handler now for the SubmitInvoiceCommand, the validation still takes place.

Another option is to implement the marker interface ICommandWithNoHandler in the command and then you don’t need the .NoCommandHandler() option.

Other Function Monkey articles:

SHARE:

Writing Azure Functions with Function Monkey: Validation

Function Monkey is a framework to define Azure Functions in a fluent way as opposed to using binding attributes on function methods.

Other Function Monkey articles:

In addition to offering a different way to define functions, Function Monkey offers features such as validation.

Consider the following setup that generates a greeting:

using System.Net.Http;
using System.Threading.Tasks;
using AzureFromTheTrenches.Commanding.Abstractions;
using FunctionMonkey.Abstractions;
using FunctionMonkey.Abstractions.Builders;

namespace FunctionApp2
{
    public class GenerateGreetingCommand : ICommand<string>
    {
        public string Name { get; set; }
    }

    public class GenerateGreetingHandler : ICommandHandler<GenerateGreetingCommand, string>
    {
        public Task<string> ExecuteAsync(GenerateGreetingCommand command, string previousResult) => Task.FromResult($"Hello {command.Name}");
    }

    public class FunctionAppConfiguration : IFunctionAppConfiguration
    {
        public void Build(IFunctionHostBuilder builder)
        {
            builder
                .Setup((serviceCollection, commandRegistry) =>
                {
                    commandRegistry.Register<GenerateGreetingHandler>();                    
                })
                .Functions(functions => functions
                    .HttpRoute("v1/GenerateGreeting", route => route
                        .HttpFunction<GenerateGreetingCommand>(HttpMethod.Get))
                );
        }
    }  
}

If we run this and send a JSON payload of {"Name": ""} we’ll get back a response of "Hello ".

There is currently no validation on the name in the GenerateGreetingCommand.

To add validation with Function Monkey install the additional package: FunctionMonkey.FluentValidation. This will also install the dependent package FluentValidation.

To add validation to the Name property, we create a new class that inherits from AbstractValidator<T> where T is the command we want to validate, in this case the GenerateGreetingCommand:

public class GenerateGreetingCommandValidator : AbstractValidator<GenerateGreetingCommand>
{
    public GenerateGreetingCommandValidator()
    {
        RuleFor(x => x.Name).NotEmpty();
    }
}

In the constructor we use the FluentValidation syntax to define what validation to perform on the Name property of the command. In the preceding code we are saying name cannot be empty.

Next we need to wire up this new validator by adding the call to AddFluentValidation() and also registering the validator with serviceCollection.AddTransient<IValidator<GenerateGreetingCommand>, GenerateGreetingCommandValidator>();

So the setup now looks like:

public class FunctionAppConfiguration : IFunctionAppConfiguration
{
    public void Build(IFunctionHostBuilder builder)
    {
        builder
            .Setup((serviceCollection, commandRegistry) =>
            {
                serviceCollection.AddTransient<IValidator<GenerateGreetingCommand>, GenerateGreetingCommandValidator>();
                commandRegistry.Register<GenerateGreetingHandler>();                    
            })
            .AddFluentValidation()
            .Functions(functions => functions
                .HttpRoute("v1/GenerateGreeting", route => route
                    .HttpFunction<GenerateGreetingCommand>(HttpMethod.Get))
            );
    }
}

If we run the app again and try and submit an empty name, this time we get the following response:

{
  "errors": [
    {
      "severity": 0,
      "errorCode": "NotEmptyValidator",
      "property": "Name",
      "message": "'Name' must not be empty."
    }
  ],
  "isValid": false
}

If we wanted to enforce minimum and maximum Name length:

public class GenerateGreetingCommandValidator : AbstractValidator<GenerateGreetingCommand>
{
    public GenerateGreetingCommandValidator()
    {
        RuleFor(x => x.Name).NotEmpty()
                            .MinimumLength(5)
                            .MaximumLength(10);
    }
}

Now if we try and submit a name of “Joe”:

{
  "errors": [
    {
      "severity": 0,
      "errorCode": "MinimumLengthValidator",
      "property": "Name",
      "message": "The length of 'Name' must be at least 5 characters. You entered 3 characters."
    }
  ],
  "isValid": false
}

To add some custom validation in the form of an Action:

public class GenerateGreetingCommandValidator : AbstractValidator<GenerateGreetingCommand>
{
    public GenerateGreetingCommandValidator()
    {
        RuleFor(x => x.Name).NotEmpty()
                            .MinimumLength(5)
                            .MaximumLength(10)
                            .Custom((name, context) =>
                                {
                                    if (name == "Jason")
                                    {
                                        context.AddFailure("Jason is not a valid name");
                                    }
                                });

    }
}

Submitting a name of “Jason” now results in:

{
  "errors": [
    {
      "severity": 0,
      "errorCode": null,
      "property": "Name",
      "message": "Jason is not a valid name"
    }
  ],
  "isValid": false
}

We could also go and write units tests for the validator.

The ability to define command validation could also be useful if you had multiple ways for a client to submit requests, for example the same command (and validation) could be triggered from HTTP and a queue for example. In this case you could ensure the same validation is executed regardless of the input “channel”.

SHARE:

Writing Azure Functions with Function Monkey: Dependency Injection

In my continued exploration/experimentation with Function Monkey I thought I’d look at how easy/hard it is to inject dependencies into handlers.

Previous articles: Creating Azure Functions with Function Monkey–First Look and Refactoring an Azure Functions App to use Function Monkey.

If you’ve read the previous articles you’ll know the Function Monkey uses the concept of a command to represent “something that needs doing” and a command handler to “do the thing that needs doing”.

An Azure Function trigger results in the creation of a command, that command is passed to a handler, and the handler can return a result to the caller or an output binding.

Good practice dictates good separation of concerns, etc. so you may want to inject dependencies into your handlers to also make them easier to test.

Let’s start off my defining a dependency to represent the generation of a greeting:

public interface IGreetingGenerator
{
    string GenerateGreeting();
}

And we’ll create a basic implementation:

public class TimeOfDayGreetingGenerator : IGreetingGenerator
{
    public string GenerateGreeting()
    {
        var isAfternoon = DateTime.Now.Hour >= 12;

        if (isAfternoon)
        {
            return "Good afternoon";
        }

        return "Good morning";
    }
}

We could now go and write unit tests for this TimeOfDayGreetingGenerator – however we first have to go and provide a way to deterministically provide a specific date and time.

We’ll create another abstraction represent time so the code becomes:

public interface IGreetingGenerator
{
    string GenerateGreeting();
}

public interface ITime
{
    DateTime Now { get; }
}

public class Time : ITime
{
    public DateTime Now => DateTime.Now;
}

public class TimeOfDayGreetingGenerator : IGreetingGenerator
{
    private readonly ITime Time;

    public TimeOfDayGreetingGenerator(ITime time)
    {
        Time = time;
    }

    public string GenerateGreeting()
    {
        var isAfternoon = Time.Now.Hour >= 12;

        if (isAfternoon)
        {
            return "Good afternoon";
        }

        return "Good morning";
    }
}

And some example tests we could write:

public class TimeOfDayGreetingGeneratorShould
{
    [Fact]        
    public void GenerateMorningGreeting()
    {
        var mockTime = new Mock<ITime>();
        mockTime.Setup(x => x.Now).Returns(new DateTime(2020, 1, 1, 11, 59, 59));
        var sut = new TimeOfDayGreetingGenerator(mockTime.Object);

        var greeting = sut.GenerateGreeting();

        Assert.Equal("Good morning", greeting);
    }

    [Fact]
    public void GenerateAfternoonGreeting()
    {
        var mockTime = new Mock<ITime>();
        mockTime.Setup(x => x.Now).Returns(new DateTime(2020, 1, 1, 13, 0, 0));
        var sut = new TimeOfDayGreetingGenerator(mockTime.Object);

        var greeting = sut.GenerateGreeting();

        Assert.Equal("Good afternoon", greeting);
    }
}

The above tests are using the xUnit.net testing framework and Moq: you can learn how to use both of these by following this Pluralsight skills path that features some of my courses. You can start watching with a free trial.

Next we’ll create a command to represent the requirement to create a greeting for a person:

public class GenerateGreetingCommand : ICommand<string>
{
    public string Name { get; set; }
}

We can now create a handler for this command that also takes an IGreetingGenerator as a constructor dependency:

public class GenerateGreetingHandler : ICommandHandler<GenerateGreetingCommand, string>
{
    private readonly IGreetingGenerator GreetingGenerator;

    public GenerateGreetingHandler(IGreetingGenerator greetingGenerator)
    {
        GreetingGenerator = greetingGenerator;
    }
    public Task<string> ExecuteAsync(GenerateGreetingCommand command, string previousResult)
    {
        return Task.FromResult($"{GreetingGenerator.GenerateGreeting()} {command.Name}");
    }
}

And we can add a test:

public class GenerateGreetingHandlerShould
{
    [Fact]
    public async Task GenerateGreetingWithName()
    {
        var mockGenerator = new Mock<IGreetingGenerator>();
        mockGenerator.Setup(x => x.GenerateGreeting()).Returns("mock greeting");
        var sut = new GenerateGreetingHandler(mockGenerator.Object);
        var command = new GenerateGreetingCommand { Name = "Amrit" };

        var greeting = await sut.ExecuteAsync(command, null);

        Assert.Equal("mock greeting Amrit", greeting);
    }
}

Now we have tested some of the moving parts we can put them all together with Function Monkey (note there are more tests cases we should write but we’ll keep this example short):

public class FunctionAppConfiguration : IFunctionAppConfiguration
{
    public void Build(IFunctionHostBuilder builder)
    {
        builder
            .Setup((serviceCollection, commandRegistry) =>
            {
                commandRegistry.Register<GenerateGreetingHandler>();
            })
            .Functions(functions => functions
                .HttpRoute("v1/GenerateGreeting", route => route
                    .HttpFunction<GenerateGreetingCommand>(HttpMethod.Get))
            );
    }
}

If we try and run this and submit an HTTP request to the function we’ll get the following error:

Error occurred executing command GenerateGreetingCommand
AzureFromTheTrenches.Commanding: Error occurred during command execution. Microsoft.Extensions.DependencyInjection: Unable to resolve service for type 'FunctionApp2.IGreetingGenerator' while attempting to activate 'FunctionApp2.GenerateGreetingHandler'.

This is because we haven’t wired up the dependencies which we can do by adding:

serviceCollection.AddTransient<ITime, Time>();
serviceCollection.AddTransient<IGreetingGenerator, TimeOfDayGreetingGenerator>();

This makes the entire setup look like the following:

public class FunctionAppConfiguration : IFunctionAppConfiguration
{
    public void Build(IFunctionHostBuilder builder)
    {
        builder
            .Setup((serviceCollection, commandRegistry) =>
            {
                serviceCollection.AddTransient<ITime, Time>();
                serviceCollection.AddTransient<IGreetingGenerator, TimeOfDayGreetingGenerator>();
                commandRegistry.Register<GenerateGreetingHandler>();                    
            })
            .Functions(functions => functions
                .HttpRoute("v1/GenerateGreeting", route => route
                    .HttpFunction<GenerateGreetingCommand>(HttpMethod.Get))
            );
    }
}

Running the app now and executing the function with the JSON “{"Name": "Sarah"}” returns "Good afternoon Sarah".

It’s nice that DI is built into Function Monkey and that the registration of dependencies is pretty simple.

SHARE:

Refactoring an Azure Functions App to use Function Monkey

In a previous post I took a first look at the Function Monkey library to define Azure Functions using commands and handlers.

In this post I’m going to try and take an existing functions app and convert it to the Function Monkey approach. I should note up-front that this post is not a criticism of the library itself, like everything it’s a work in progress, I may be misunderstanding some of the features :)

The Starting App

The non-function-monkey app creates the workflow: client—>HTTP function –> queue function –> blob function and looks like the following:

using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace FunctionApp2
{
    public class WordsAdditionRequest
    {
        public IEnumerable<string> Words { get; set; }
    }

    public static class Function1
    {
        [FunctionName("AddWords")]
        public static async Task AddWords(
            [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest request,
            [Queue("WordsToProcess")] IAsyncCollector<string> wordQueue,
            ILogger log)
        {
            // validation/error checking logic omitted for brevity

            log.LogInformation("C# HTTP trigger function processed a request.");

            string requestBody = await new StreamReader(request.Body).ReadToEndAsync();
            WordsAdditionRequest wordsAdditionRequest= JsonConvert.DeserializeObject<WordsAdditionRequest>(requestBody);

            foreach (var word in wordsAdditionRequest.Words)
            {
                log.LogInformation($"Adding word '{word}'");
                await wordQueue.AddAsync(word);
            }
        }

        [FunctionName("ProcessWord")]
        public static void ProcessWord(
            [QueueTrigger("WordsToProcess")] string wordToProcess,
            [Blob("uppercase-words/{rand-guid}")] out string uppercaseWord,
            ILogger log)
        {
            log.LogInformation($"C# Queue trigger for word '{wordToProcess}'");

            uppercaseWord = wordToProcess.ToUpperInvariant();
        }

        [FunctionName("AuditWordWritten")]
        public static void AuditWordWritten(
            [BlobTrigger("uppercase-words/{name}")] string uppercaseWord,
            string name,
            ILogger log)
        {
            log.LogInformation($"C# blob trigger - audit for word '{uppercaseWord}'");
        }
    }
}

Refactoring to Function Monkey

First install the NuGets: FunctionMonkey and FunctionMonkey.Compiler.

The next first step is to create a command, so we’ll change WordsAdditionRequest to:

public class AddWordsCommand: ICommand<int>
{
    public IEnumerable<string> Words { get; set; }
}

So far so good, next we’ll need to create a handler for this command. I need the handler to process the command and return each of the strings so they can be added as separate messages to the queue, so we can fan-out the work:

internal class AddWordsHandler : ICommandHandler<AddWordsCommand, string[]>
{
    public Task<string[]> ExecuteAsync(AddWordsCommand command, string[] previousResult)
    {
        return Task.FromResult(command.Words.ToArray());
    }
}

The next thing is to create the configuration class and wire up the hander as a HTTP-triggered function. The initial attempt looks like this:

public class FunctionAppConfiguration : IFunctionAppConfiguration
{
    public void Build(IFunctionHostBuilder builder)
    {
        builder
            .Setup((serviceCollection, commandRegistry) =>
            {
                commandRegistry.Register<AddWordsHandler>();
            })
            .Functions(functions => functions
                .HttpRoute("v1/AddWords", route => route
                    .HttpFunction<AddWordsCommand>(HttpMethod.Post)
                    .OutputTo.StorageQueue("WordsToProcess")
                )
            );
    }
}

 

Now we have an HTTP function outputting to a storage queue.

If we run this and submit a HTTP request containing the JSON { "words": ["apple", "pear"]} we get 2 messages added to the WordsToProcess queue.

Messages in the queue

What’s nice here is that because the command and handler return string[] Function Monkey has automatically “fanned-out” the data into multiple messages.When I was writing the code I was looking for specific ways to implement this which wasted some time. Really this is a nice intuitive way of handling IEnumerable return values.

So far so good. The next step is to read the messages from the queue, convert them to upper case, and then write out the blobs. This is unfortunately where I ran into some roadblocks.

First I defined command/hander:

public class ConvertToUpperCaseCommand : ICommand<string>
{
    public string Word { get; set; }
}

internal class ConvertToUpperCaseHandler : ICommandHandler<ConvertToUpperCaseCommand, string>
{        
    public Task<string> ExecuteAsync(ConvertToUpperCaseCommand command, string previousResult)
    {
        return Task.FromResult(command.Word.ToUpperInvariant());
    }
}

Then I tried to modify the config:

public class FunctionAppConfiguration : IFunctionAppConfiguration
{
    public void Build(IFunctionHostBuilder builder)
    {
        builder
            .Setup((serviceCollection, commandRegistry) =>
            {
                commandRegistry.Register<AddWordsHandler>();
                commandRegistry.Register<ConvertToUpperCaseHandler>();
            })
            .Functions(functions => functions
                .HttpRoute("v1/AddWords", route => route
                    .HttpFunction<AddWordsCommand>(HttpMethod.Post)
                    .OutputTo.StorageQueue("WordsToProcess")                    
                )
            .Storage(storageFunctions => storageFunctions
                .QueueFunction<ConvertToUpperCaseCommand>("WordsToProcess")
                .OutputTo.StorageBlob // StorageBlob does not exist
            );
    }
}

This is where I ran into a problem, I couldn't find an option to output to blob storage:

Visual Studio screenshot showing no method to output to blob storage

There are output bindings for storage queues and tables but not for blobs. I could of course be misunderstanding how to configure this.

To get around this I’m going to try and mix Function Monkey with traditional attributes by adding the following:

[FunctionName("ProcessWord")]
public static void ProcessWord(
    [QueueTrigger("WordsToProcess")] string wordToProcess,
    [Blob("uppercase-words/{rand-guid}")] out string uppercaseWord,
    ILogger log)
{
    log.LogInformation($"C# Queue trigger for word '{wordToProcess}'");

    uppercaseWord = wordToProcess.ToUpperInvariant();
}

Now running the app and making an HTTP request results in the messages being added to the queue via Function Monkey and then the traditionally-specified ProcessWord function executes and writes to blob storage.

It’s nice that you can mix Function Monkey with the attribute-based function definition, though I don’t know if this is recommended or not and whether not there could by any unintentional side-effects.

The final part is the blob-triggered audit function.

Once again I’ll define a command and handler:

public class AuditCommand : ICommand, IStreamCommand
{
    public Stream Stream { get; set; }

    public string Name { get; set; }
}
internal class AuditHandler : ICommandHandler<AuditCommand>
{
    public Task ExecuteAsync(AuditCommand command)
    {           
        using (StreamReader reader = new StreamReader(command.Stream))
        {
            string name = reader.ReadToEnd();

            // How to log name?
        }

        return Task.CompletedTask;
    }
}

The version of a blob-triggered command allows us to get a Stream representing the blob data (simpler blob commands without streams are also available).

We can now wire this up:

public class FunctionAppConfiguration : IFunctionAppConfiguration
{
    public void Build(IFunctionHostBuilder builder)
    {
        builder
            .Setup((serviceCollection, commandRegistry) =>
            {
                commandRegistry.Register<AddWordsHandler>();
                commandRegistry.Register<AuditHandler>();
            })
            .Functions(functions => functions
                .HttpRoute("v1/AddWords", route => route
                    .HttpFunction<AddWordsCommand>(HttpMethod.Post)
                    .OutputTo.StorageQueue("WordsToProcess"))
                .Storage(storageFunctions => storageFunctions
                .BlobFunction<AuditCommand>("uppercase-words/{name}"))
            );
    }
}

Running this now results in the blob being written and the audit command handler executing. The original hander just output the blob to the log.

I’m not sure how to get access to the log in a handler, so I’m going to just add a constructor that takes an ILogger and see what happens:

internal class AuditHandler : ICommandHandler<AuditCommand>
{
    private readonly ILogger log;

    public AuditHandler(ILogger log)
    {
        this.log = log;
    }
    public Task ExecuteAsync(AuditCommand command)
    {           
        using (StreamReader reader = new StreamReader(command.Stream))
        {
            string name = reader.ReadToEnd();

            log.LogInformation($"C# blob trigger - audit for word '{name}'");
        }

        return Task.CompletedTask;
    }
}

Running this and it just works, the ILogger is injected into the handler and the log message is output: C# blob trigger - audit for word 'APPLE'

There’s a lot more to Function Monkey by the looks of it such as DI, validation, etc.

The final code:

using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using AzureFromTheTrenches.Commanding.Abstractions;
using FunctionMonkey.Abstractions;
using FunctionMonkey.Abstractions.Builders;
using FunctionMonkey.Commanding.Abstractions;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace FunctionApp2
{
    public class AddWordsCommand : ICommand<string[]>
    {
        public IEnumerable<string> Words { get; set; }
    }

    internal class AddWordsHandler : ICommandHandler<AddWordsCommand, string[]>
    {
        public Task<string[]> ExecuteAsync(AddWordsCommand command, string[] previousResult)
        {
            return Task.FromResult(command.Words.ToArray());
        }
    }

    public class AuditCommand : ICommand, IStreamCommand
    {
        public Stream Stream { get; set; }

        public string Name { get; set; }
    }

    internal class AuditHandler : ICommandHandler<AuditCommand>
    {
        private readonly ILogger log;

        public AuditHandler(ILogger log)
        {
            this.log = log;
        }
        public Task ExecuteAsync(AuditCommand command)
        {           
            using (StreamReader reader = new StreamReader(command.Stream))
            {
                string name = reader.ReadToEnd();

                log.LogInformation($"C# blob trigger - audit for word '{name}'");
            }

            return Task.CompletedTask;
        }
    }

    public class FunctionAppConfiguration : IFunctionAppConfiguration
    {
        public void Build(IFunctionHostBuilder builder)
        {
            builder
                .Setup((serviceCollection, commandRegistry) =>
                {
                    commandRegistry.Register<AddWordsHandler>();
                    commandRegistry.Register<AuditHandler>();
                })
                .Functions(functions => functions
                    .HttpRoute("v1/AddWords", route => route
                        .HttpFunction<AddWordsCommand>(HttpMethod.Post)
                        .OutputTo.StorageQueue("WordsToProcess"))
                    .Storage(storageFunctions => storageFunctions
                    .BlobFunction<AuditCommand>("uppercase-words/{name}"))
                );
        }
    }

    public static class Function1
    {
        [FunctionName("ProcessWord")]
        public static void ProcessWord(
            [QueueTrigger("WordsToProcess")] string wordToProcess,
            [Blob("uppercase-words/{rand-guid}")] out string uppercaseWord,
            ILogger log)
        {
            log.LogInformation($"C# Queue trigger for word '{wordToProcess}'");

            uppercaseWord = wordToProcess.ToUpperInvariant();
        }
    }   
}

SHARE:

Creating Azure Functions with Function Monkey–First Look

I’ve had Function Monkey on my to-look-at radar for a little while now so I thought I’d finally get round to looking at it in this post.

As a writing experiment I’m going to “live write” my experience of using it for the first time.

Function Monkey is essentially a tool/library for building an Azure Functions app in a different way from the usual function-methods-with-attributes.

Let’s go!

Setting Up A Project

So the first thing I need to do is create a new Azure Functions project in  Visual Studio – I’m choosing to create an empty functions app.

Next, I’m installing the NuGet packages: FunctionMonkey and FunctionMonkey.Compiler

Defining a Command

Function Monkey uses commands to represent “things the functions can do”.

For example to create a command that represent the addition of 2 numbers I’m adding the following:

using AzureFromTheTrenches.Commanding.Abstractions;

namespace FunctionApp1
{
    public class AddNumbers : ICommand<int>
    {
        public int FirstNumber { get; set; }
        public int SecondNumber { get; set; }
    }
}

The ICommand<int> says that when this command executes it will return an int.

Handling a Command

Now I’ve defined a command, I need to be able to execute it so I need to create a “command handler”, so I’m adding a  new class:

internal class AddNumbersHandler : ICommandHandler<AddNumbers, int>
{
    public Task<int> ExecuteAsync(AddNumbers command, int previousResult)
    {
        return Task.FromResult(command.FirstNumber + command.SecondNumber);
    }
}

I now need a wire to make the handler execute.

Function App Configuration

In function apps you normally create a class and decorate methods with attributes to create functions. With Function Monkey it looks like there’s a fluent API instead, so I’m adding a new class to wire everything up:

public class FunctionAppConfiguration : IFunctionAppConfiguration
{
    public void Build(IFunctionHostBuilder builder)
    {
        builder
            .Setup((serviceCollection, commandRegistry) =>
            {
                commandRegistry.Register<AddNumbersHandler>();
            })
            .Functions(functions => functions
                .HttpRoute("v1/AddNumbers", route => route
                    .HttpFunction<AddNumbers>()
                )
            );
    }
}

This code is wiring up the command and handler that we just created and setting up an HTTP-triggered function.

I’m not sure at first glance how I feel about the readability of this as I’m looking at it, though it is an interesting way to wire things up, I believe you can also wire up dependency injection and other things here which would be nice.

Testing It Out

Ok, let’s run the app and see what happens:

Application started. Press Ctrl+C to shut down.

Http Functions:

        AddNumbers: [GET] http://localhost:7071/api/v1/AddNumbers

[31/01/2020 4:49:07 AM] Host lock lease acquired by instance ID '0000000000000000000000006B155899'.

So it looks like the HTTP-triggered function has been created in the functions runtime with the route “v1/AddNumbers”.

It’s not immediately obvious what HTTP request to make, it’s exposing a GET so let’s try and make a GET request with some JSON to represent the command properties FirstNumber and SecondNumber.

This is the JSON I’m going to try (I’m going to use the Postman app to send the request):

{
    "FirstNumber" : 1,
    "SecondNumber" : 1
}

Sending this JSON results in the value 2 being returned so this part where JSON maps to the command properties is nice and intuitive.

There’s also probably an easy way to restrict this API to POSTs. Without looking at the docs I’m going to see if it’s easy to find/intuitive to modify the code where the function is defined…

So the HTTP method has an overload that takes an params array of HttpMethod, so I just changed the config to:

public class FunctionAppConfiguration : IFunctionAppConfiguration
{
    public void Build(IFunctionHostBuilder builder)
    {
        builder
            .Setup((serviceCollection, commandRegistry) =>
            {
                commandRegistry.Register<AddNumbersHandler>();
            })
            .Functions(functions => functions
                .HttpRoute("v1/AddNumbers", route => route
                    .HttpFunction<AddNumbers>(HttpMethod.Post)
                )
            );
    }
}

Let’s run the app again:

Application started. Press Ctrl+C to shut down.

Http Functions:

        AddNumbers: [POST] http://localhost:7071/api/v1/AddNumbers

[31/01/2020 7:00:38 AM] Host lock lease acquired by instance ID '0000000000000000000000006B155899'.

Now as we can see the URL accepts POSTS.

This is a library that I think warrants further inspection. Let me know in the comments if you’d like to see more posts on this.

One thing to bear in mind when using these kinds of tools is that they do introduce another dependency to the application.

SHARE:

Understanding Azure Durable Functions - Part 12: Sub Orchestrations

This is part twelve in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

Sub-orchestrations are a feature of Durable Functions that allow you to further compose and reuse functions.

Essentially sub-orchestrations allow you to call an orchestration from within another orchestration. In this way they are similar to calling activity functions from within an orchestration and just like activity functions can return a value to the calling (parent) orchestration.

As an example, the following client function starts the orchestration called ProcessMultipleCitiesOrhestrator:

[FunctionName("SubOrchExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{

    var data = await req.Content.ReadAsAsync<GreetingsRequest>();

    string instanceId = await starter.StartNewAsync("ProcessMultipleCitiesOrhestrator", data);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

The preceding code is no different from what we’ve seen already in this series, the change comes in the ProcessMultipleCitiesOrhestrator:

[FunctionName("ProcessMultipleCitiesOrhestrator")]
public static async Task<string> ParentOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, 
    ILogger log)
{
    log.LogInformation($"************** ProcessMultipleCitiesOrhestrator ********************");

    GreetingsRequest data = context.GetInput<GreetingsRequest>();


    // Perform all greetings in parallel executing sub-orchestrations
    var greetingsSubOrchestrations = new List<Task>();            
    
    foreach (string city in data.Cities)
    {
        Task greetingSubOrchestration = context.CallSubOrchestratorAsync<string>("ProcessSingleCityOrhestrator", city);
        greetingsSubOrchestrations.Add(greetingSubOrchestration);
    }

    await Task.WhenAll(greetingsSubOrchestrations);

    // When all of the sub orchestrations have competed, get the results and append into a single string
    var allGreetings = new StringBuilder();
    foreach (Task<string> greetingSubOrchestration in greetingsSubOrchestrations)
    {
        allGreetings.AppendLine(await greetingSubOrchestration);
    }

    log.LogInformation(allGreetings.ToString());

    return allGreetings.ToString();
}

The main thing to note in the preceding code is the line: Task greetingSubOrchestration = context.CallSubOrchestratorAsync<string>("ProcessSingleCityOrhestrator", city); Here we are calling into another orchestrator function (the sub-orchestration). We are doing this by creating a list of tasks, each representing an instance of the sub-orchestration, and then executing those tasks, and finally getting the return values from each sub-orchestration task and creating a single string result.

The ProcessSingleCityOrhestrator function is as follows:

[FunctionName("ProcessSingleCityOrhestrator")]
public static async Task<string> SubOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** ProcessSingleCityOrhestrator method executing ********************");

    var city = context.GetInput<string>();

    string greeting = await context.CallActivityAsync<string>("SubOrchExample_ActivityFunction", city);
    string toUpper = await context.CallActivityAsync<string>("SubOrchExample_ActivityFunction_ToUpper", greeting);
    string withTimestamp = await context.CallActivityAsync<string>("SubOrchExample_ActivityFunction_AddTimestamp", toUpper);

    log.LogInformation(withTimestamp);

    return withTimestamp;
}

Now we have the flexibility to compose/reuse, for example the ProcessSingleCityOrhestrator could be called from a different client function if only a single city was being supplied.

If we call the client function SubOrchExample_HttpStart  with the JSON:

{
    "Cities": [
            "London",
            "Tokyo",
            "Perth",
            "Nadi"
    ]
}

We get the following return value (from the ProcessMultipleCitiesOrhestrator):

{
    "name": "ProcessMultipleCitiesOrhestrator",
    "instanceId": "c7a0eb03c56a44ab8a767cd8a487c834",
    "runtimeStatus": "Completed",
    "input": {
        "$type": "DurableDemos.SubOrchExample+GreetingsRequest, DurableDemos",
        "Cities": [
            "London",
            "Tokyo",
            "Perth",
            "Nadi"
        ]
    },
    "customStatus": null,
    "output": "HELLO LONDON! [30/10/2019 11:51:53 AM +08:00]\r\nHELLO TOKYO! [30/10/2019 11:51:58 AM +08:00]\r\nHELLO PERTH! [30/10/2019 11:52:01 AM +08:00]\r\nHELLO NADI! [30/10/2019 11:52:00 AM +08:00]\r\n",
    "createdTime": "2019-10-30T03:51:52Z",
    "lastUpdatedTime": "2019-10-30T03:52:01Z"
}

To learn more about sub-orchestrations, check out the docs.

If you like this article or series feel free to share it :)

SHARE:

Understanding Azure Durable Functions - Part 11: The Asynchronous Human Interaction Pattern

This is the eleventh part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

The Asynchronous Human Interaction Pattern allows a Durable Functions orchestration to pause at some point during its lifecycle and wait for an external event such as a human to perform some action or make some decision.

Azure Functions Durable Functions and Twilio working together

As an example, suppose that a comment can be submitted on a website but before actually appearing it must be moderated by a human. The human moderator can see the comment and then decide whether to approve the comment so it appears on the website or decline the comment in which case it is deleted.

In this scenario, if the human moderator does not approve or decline the comment within a set amount of time then the comment will be escalated to a manager to review.

Azure Durable Functions makes this possible because the orchestration can wait for an external event during its execution.

The code in this article follows the following workflow:

  1. New comment submitted via HTTP
  2. Review/moderation orchestration started
  3. Orchestration sends SMS notification to moderator
  4. Moderator receives SMS that contains 2 links: one to approve and one to decline the comment
  5. Orchestration waits for human moderator to click on one of the 2 links
  6. When human clicks a link, the orchestration resumes and comment is approved or declined
  7. If human does not click link within a set deadline, the comment is escalated to a human manager

Let’s start by defining a class to be HTTP POSTed to the client function:

public class AddCommentRequest
{
    public string UserName { get; set; }
    public string Comment { get; set; }
}

And the client function:

[FunctionName("HumanPatternExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    var commentRequest = await req.Content.ReadAsAsync<AddCommentRequest>();

    string instanceId = await starter.StartNewAsync("HumanPatternExample_Orchestrator", (commentRequest: commentRequest, requestUri: req.RequestUri));

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return new HttpResponseMessage(System.Net.HttpStatusCode.Accepted)
    {
        Content = new StringContent("Your comment has been submitted and is awaiting moderator approval.")
    };
}

The preceding client function initiates the HumanPatternExample_Orchestrator function and passes in a tuple containing the comment request and the request URI which will be used later to construct the approve/decline URL links.

We’ll have a look at this orchestrator function in a moment, but first let’s take a look at the activity function that sends the SMS to the moderator:

[FunctionName("HumanPatternExample_RequestApproval")]
[return: TwilioSms(AccountSidSetting = "TwilioAccountSid", AuthTokenSetting = "TwilioAuthToken", From = "%FromPhoneNumber%")]
public static CreateMessageOptions RequestApproval([ActivityTrigger] ModerationRequest moderationRequest, ILogger log)
{            
    log.LogInformation($"Requesting approval for comment: {moderationRequest.CommentRequest.Comment}.");

    // Here we provide some way for a human to know that there is a new comment pending approval.
    // This could be writing to a database representing requests not yet approved for a human
    // to work through, or an SMS message for them to reply to with either APPROVED or NOTAPPROVED
    // or an email for them to reply to etc etc.

    var approversPhoneNumber = new PhoneNumber(Environment.GetEnvironmentVariable("ApproverPhoneNumber", EnvironmentVariableTarget.Process));                        

    var message = new CreateMessageOptions(approversPhoneNumber)
    {
        Body = $"'{moderationRequest.CommentRequest.Comment}' \r\nApprove: {moderationRequest.ApproveRequestUrl} \r\nDecline: {moderationRequest.DeclineRequestUrl}"
    };

    log.LogInformation($"Sending SMS: {message.Body}");

    return message;
}

In the preceding code, the TwilioSms output binding is being used to send an SMS – the SMS will contain links to either approve or decline the comment as the following screenshot shows:

Azure Functions and Twilio integration

Also notice that the ActivityTrigger is bound to a ModerationRequest: public static CreateMessageOptions RequestApproval([ActivityTrigger] ModerationRequest moderationRequest, ILogger log) – this class is defined as follows:

public class ModerationRequest
{
    public AddCommentRequest CommentRequest { get; set; }
    public string ApproveRequestUrl { get; set; }
    public string DeclineRequestUrl { get; set; }
}

The orchestrator function is where the main workflow is defined:

[FunctionName("HumanPatternExample_Orchestrator")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");

    // Using tuples but could also define a class for this data
    var (commentRequest, requestUri) = context.GetInput<Tuple<AddCommentRequest,Uri>>();
    var moderationRequest = new ModerationRequest
    {
        CommentRequest = commentRequest,
        ApproveRequestUrl = $"{requestUri.Scheme}://{requestUri.Host}:{requestUri.Port}/api/HumanPatternExample_Approve?id={context.InstanceId}",
        DeclineRequestUrl = $"{requestUri.Scheme}://{requestUri.Host}:{requestUri.Port}/api/HumanPatternExample_Decline?id={context.InstanceId}",
    };
    await context.CallActivityAsync("HumanPatternExample_RequestApproval", moderationRequest);

    // Define a time out - if the moderator hasn't approved/decline then escalate to someone else, e.g. a manager
    using (var timeout = new CancellationTokenSource())
    {
        DateTime moderationDeadline = context.CurrentUtcDateTime.AddMinutes(5); // probably would be longer in real life

        Task durableTimeout = context.CreateTimer(moderationDeadline, timeout.Token);

        Task<bool> moderatedEvent = context.WaitForExternalEvent<bool>("Moderation");

        if (moderatedEvent == await Task.WhenAny(moderatedEvent, durableTimeout))
        {
            timeout.Cancel();

            bool isApproved = moderatedEvent.Result;

            if (isApproved)
            {
                log.LogInformation($"************** Comment '{commentRequest.Comment}' was approved by a moderator ********************");
                // call an activity to make the comment live on the website, etc.
            }
            else
            {
                log.LogInformation($"************** Comment '{commentRequest.Comment}' was declined by a moderator ********************");
                // call an activity to delete the comment and don't make it live on website, etc.
            }
        }
        else
        {
            log.LogInformation($"************** Comment '{commentRequest.Comment}' was not reviewed by a moderator in time, escalating...  ********************");
            // await context.CallActivityAsync("Escalate"); call an activity to escalate etc.
        }
    }

    log.LogInformation($"************** Orchestration complete ********************");
}

The code may look a little complex at first, let’s break it down into the more important parts as they relate to the Asynchronous Human Interaction Pattern:

The line await context.CallActivityAsync("HumanPatternExample_RequestApproval", moderationRequest); calls the activity that actually notifies the human in some way that the orchestration is waiting for them.

Two tasks are created: Task durableTimeout = context.CreateTimer(moderationDeadline, timeout.Token); and Task<bool> moderatedEvent = context.WaitForExternalEvent<bool>("Moderation"); The first task represents the deadline/timeout that the moderator has. The second uses the DurableOrchestrationContext.WaitForExternalEvent method to pause the orchestration until an event occurs outside of the orchestration (i.e. the human interaction). Once these 2 tasks are defined, the line if (moderatedEvent == await Task.WhenAny(moderatedEvent, durableTimeout)) checks to see if the orchestration is continuing because of an external event or because of the timeout .

So if the orchestration is waiting for an external event, how is that event sent to the orchestration? This is done via the DurableOrchestrationClient.RaiseEventAsync method as the following code shows:

[FunctionName("HumanPatternExample_Approve")]
public static async Task<IActionResult> HumanPatternExample_Approve(
    [HttpTrigger(AuthorizationLevel.Function, "get")]HttpRequest req,
    [OrchestrationClient]DurableOrchestrationClient client,
    ILogger log)
{
    // additional validation/null check code omitted for brevity

    var id = req.Query["id"];

    var status = await client.GetStatusAsync(id);

    if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running)
    {
        await client.RaiseEventAsync(id, "Moderation", true);
        return new OkObjectResult("Comment was approved.");
    }

    return new NotFoundResult();
}

[FunctionName("HumanPatternExample_Decline")]
public static async Task<IActionResult> HumanPatternExample_Decline(
    [HttpTrigger(AuthorizationLevel.Function, "get")]HttpRequest req,
    [OrchestrationClient]DurableOrchestrationClient client,
    ILogger log)
{
    // additional validation/null check code omitted for brevity

    var id = req.Query["id"];

    var status = await client.GetStatusAsync(id);
    if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running)
    {
        await client.RaiseEventAsync(id, "Moderation", false);
        return new OkObjectResult("Comment was declined.");
    }

    return new NotFoundResult();
}

The preceding 2 functions are triggered via HTTP (the links that are sent in the SMS) and raise the “Moderation” event to the orchestration id with either true or false.

The orchestrator is waiting for this event: Task<bool> moderatedEvent = context.WaitForExternalEvent<bool>("Moderation"); If the event is received in the orchestrator,  the approval decision is determined: bool isApproved = moderatedEvent.Result; The sample code then uses an if statement to either publish the comment to the website or delete it (omitted for brevity).

Let’s take a look at some (simplified) output – first if the human moderator approves the comment:

Executing HTTP request: {
  "requestId": "c300dfdb-553a-4d1a-8685-7eec6e9fc375",
  "method": "POST",
  "uri": "/api/HumanPatternExample_HttpStart"
}
Executing 'HumanPatternExample_HttpStart' (Reason='This function was programmatically called via the host APIs.', Id=2ff016f6-d54e-4489-9497-393322165d24)
Started orchestration with ID = 'c5ea86d46e524641a49ca10d1c04efc5'.
Executing 'HumanPatternExample_Orchestrator' (Reason='', Id=43196ec8-83cd-4a82-9c63-fcc9f13bd114)
************** RunOrchestrator method executing ********************
Executing 'HumanPatternExample_RequestApproval' (Reason='', Id=c98b259c-100a-4730-a1a9-116f5ea11aa1)
Requesting approval for comment: I hate cheese.
Sending SMS: 'I hate cheese'
Approve: http://localhost:7071/api/HumanPatternExample_Approve?id=c5ea86d46e524641a49ca10d1c04efc5
Decline: http://localhost:7071/api/HumanPatternExample_Decline?id=c5ea86d46e524641a49ca10d1c04efc5
Executed 'HumanPatternExample_RequestApproval' (Succeeded, Id=c98b259c-100a-4730-a1a9-116f5ea11aa1)
'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: CreateTimer
'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: WaitForExternalEvent:Moderation

<I 'clicked' the approve link here>

Executing 'HumanPatternExample_Approve' 
Function 'HumanPatternExample_Orchestrator (Orchestrator)' scheduled. Reason: RaiseEvent:Moderation
Function 'HumanPatternExample_Orchestrator (Orchestrator)' received a 'Moderation' event
************** Comment 'I hate cheese' was approved by a moderator ********************
************** Orchestration complete ********************

If we submit another request and wait or 5 minutes (DateTime moderationDeadline = context.CurrentUtcDateTime.AddMinutes(5);) we get the following:

Executing HTTP request: {
  "requestId": "08354276-34b2-4183-8884-9fc92fbac13d",
  "method": "POST",
  "uri": "/api/HumanPatternExample_HttpStart"
}
Executing 'HumanPatternExample_HttpStart' 
Started orchestration with ID = 'e660e4ee29044d8ea9bbbcff0e7e001f'.
Executed 'HumanPatternExample_HttpStart' (Succeeded, Id=6b33326d-b0e5-4fd1-9671-43f176b12928)
Executing 'HumanPatternExample_Orchestrator' (Reason='', Id=e2845e74-9818-4a92-ab29-e85233c208f3)
************** RunOrchestrator method executing ********************
Function 'HumanPatternExample_RequestApproval (Activity)' started.
Executing 'HumanPatternExample_RequestApproval' (Reason='', Id=b25f71e4-810f-4fc4-bb71-0c8f819eccfc)
Requesting approval for comment: I LOOOOVEEEE cheese.
Sending SMS: 'I LOOOOVEEEE cheese'
Approve: http://localhost:7071/api/HumanPatternExample_Approve?id=e660e4ee29044d8ea9bbbcff0e7e001f
Decline: http://localhost:7071/api/HumanPatternExample_Decline?id=e660e4ee29044d8ea9bbbcff0e7e001f
Executed 'HumanPatternExample_RequestApproval' (Succeeded, Id=b25f71e4-810f-4fc4-bb71-0c8f819eccfc)
Function 'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: CreateTimer
Function 'HumanPatternExample_Orchestrator (Orchestrator)' is waiting for input. Reason: WaitForExternalEvent:Moderation
Function 'HumanPatternExample_Orchestrator (Orchestrator)' was resumed by a timer scheduled for '2019-09-05T05:42:05.9852935Z'. State: TimerExpired
************** Comment 'I LOOOOVEEEE cheese' was not reviewed by a moderator in time, escalating...  ********************
************** Orchestration complete ********************
Executed 'HumanPatternExample_Orchestrator' (Succeeded, Id=eff52ac6-a7e8-4d81-afd5-9125bd5a5aaa)

Notice this time the orchestration was “un-paused” because the timer expired and the moderator didn’t respond: Function 'HumanPatternExample_Orchestrator (Orchestrator)' was resumed by a timer scheduled for '2019-09-05T05:42:05.9852935Z'. State: TimerExpired

The sample code in this article does not contain comprehensive error handling or security so you’d want to make sure you have both these in place if you were to implement this kind of workflow.

You can also use the Durable Functions API to send events though you’ll need to expose the system key which you probably won’t want to do – read Part 9: The Asynchronous HTTP API Pattern to learn more.

If you like this article or series be sure to share it :)

SHARE:

Understanding Azure Durable Functions - Part 10 The Monitor Pattern

This is the tenth part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

In the previous part in this series, we looked at the  Asynchronous HTTP API Pattern where a client can poll to see if an orchestration has completed or not.

The monitor pattern is like a mirror image of this, whereby the orchestration polls some service at regular intervals. For example an orchestration could  initiate a long running asynchronous process and then periodically poll to see if the operation is complete. Once the operation is complete the orchestration can complete (or continue with more operations.)

We can create a pause in the execution of an orchestration by calling the CreateTimer method of the DurableOrchestrationContext. This method takes a DateTime specifying how long to “sleep” for, and a CancellationToken.

As an example, suppose we want to allow a client to post a video to be encoding to a different format.

The fist client function could look something like:

[FunctionName("MonitorPatternExample_HttpStart")]
public static async Task<HttpResponseMessage> HttpStartV1(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{            
    dynamic data = await req.Content.ReadAsAsync<dynamic>();
    var fileName = data.FileName;

    string instanceId = await starter.StartNewAsync("MonitorPatternExample", fileName);

    return starter.CreateCheckStatusResponse(req, instanceId);
}

For simplicity, we just taking a filename to be used as a job identifier. This function calls the orchestrator function MonitorPatternExample.

The orchestrator function is where the Monitor Pattern is implemented:

[FunctionName("MonitorPatternExample")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    string fileName = context.GetInput<string>();

    // start encoding
    await context.CallActivityAsync<string>("MonitorPatternExample_BeginEncode", fileName);


    // We don't want the orchestration to run infinitely
    // If the operation has not completed within 30 mins, end the orchestration
    var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);
  
    while (true)
    {
        var operationHasTimedOut = context.CurrentUtcDateTime > operationTimeoutTime;

        if (operationHasTimedOut)
        {
            context.SetCustomStatus("Encoding has timed out, please submit the job again.");
            break;
        }

        var isEncodingComplete = await context.CallActivityAsync<bool>("MonitorPatternExample_IsEncodingComplete", fileName);

        if (isEncodingComplete)
        {
            context.SetCustomStatus("Encoding has completed successfully.");
            break;
        }

        // If no timeout and encoding still being processed we want to put the orchestration to sleep,
        // and awaking it again after a specified interval
        var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
        log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
        await context.CreateTimer(nextCheckTime, CancellationToken.None);
    }
}

This code starts the actual asynchronous/long-running work by calling the MonitorPatternExample_BeginEncode activity.

Then we loop around a while loop until either the long running operation is completes or a timeout occurs.

To query whether or not the encoding is complete, the MonitorPatternExample_IsEncodingComplete activity is called.

The timeout in this example is fixed at 30 mins: var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);

The orchestration is put to sleep for 15 seconds before the while loop starts again with the code:

var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
await context.CreateTimer(nextCheckTime, CancellationToken.None);

Now the orchestration will either complete by timing out with a custom status context.SetCustomStatus("Encoding has timed out, please submit the job again."); or when the encoding has completed context.SetCustomStatus("Encoding has completed successfully."); 

Just a quick reminder that you can start watching my Pluralsight courses with a free trial.

 

The full listing is as follows:

using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;

namespace DurableDemos
{
    public static class MonitorPatternExample
    {
        [FunctionName("MonitorPatternExample_HttpStart")]
        public static async Task<HttpResponseMessage> HttpStartV1(
            [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
            [OrchestrationClient]DurableOrchestrationClient starter,
            ILogger log)
        {            
            dynamic data = await req.Content.ReadAsAsync<dynamic>();
            var fileName = data.FileName;

            string instanceId = await starter.StartNewAsync("MonitorPatternExample", fileName);

            return starter.CreateCheckStatusResponse(req, instanceId);
        }

            [FunctionName("MonitorPatternExample")]
            public static async Task RunOrchestrator(
                [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
            {
                string fileName = context.GetInput<string>();

                // start encoding
                await context.CallActivityAsync<string>("MonitorPatternExample_BeginEncode", fileName);


                // We don't want the orchestration to run infinitely
                // If the operation has not completed within 30 mins, end the orchestration
                var operationTimeoutTime = context.CurrentUtcDateTime.AddMinutes(30);
          
                while (true)
                {
                    var operationHasTimedOut = context.CurrentUtcDateTime > operationTimeoutTime;

                    if (operationHasTimedOut)
                    {
                        context.SetCustomStatus("Encoding has timed out, please submit the job again.");
                        break;
                    }

                    var isEncodingComplete = await context.CallActivityAsync<bool>("MonitorPatternExample_IsEncodingComplete", fileName);

                    if (isEncodingComplete)
                    {
                        context.SetCustomStatus("Encoding has completed successfully.");
                        break;
                    }

                    // If no timeout and encoding still being processed we want to put the orchestration to sleep,
                    // and awaking it again after a specified interval
                    var nextCheckTime = context.CurrentUtcDateTime.AddSeconds(15);
                    log.LogInformation($"************** Sleeping orchestration until {nextCheckTime.ToLongTimeString()}");
                    await context.CreateTimer(nextCheckTime, CancellationToken.None);
                }
            }

        [FunctionName("MonitorPatternExample_BeginEncode")]
        public static void BeginEncodeVideo([ActivityTrigger] string fileName, ILogger log)
        {
            // Call API, start an async process, queue a message, etc.
            log.LogInformation($"************** Starting encoding of {fileName}");

            // This activity returns before the job is complete, its job is to just start the async/long running operation
        }


        [FunctionName("MonitorPatternExample_IsEncodingComplete")]
        public static bool IsEncodingComplete([ActivityTrigger] string fileName, ILogger log)
        {
            log.LogInformation($"************** Checking if {fileName} encoding is complete...");
            // Here you would make a call to an API, query a database, check blob storage etc 
            // to check whether the long running asyn process is complete

            // For demo purposes, we'll just signal completion every so often
            bool isComplete = new Random().Next() % 2 == 0;

            log.LogInformation($"************** {fileName} encoding complete: {isComplete}");

            return isComplete;
        }


    }
}

SHARE:

Understanding Azure Durable Functions - Part 9: The Asynchronous HTTP API Pattern

This is the ninth part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

If your orchestration takes a while to execute, you may not want the end client (for example a web app that triggers the orchestration via an HTTP call) to wait around for a response. Instead you may want to provide the client with a way of querying (polling) if the long-running process is complete. In a previous article in this series we looked at how to get results from orchestrations. In this article we’ll dig into this in a bit more detail.

The Asynchronous HTTP API Pattern means the client calls an HTTP API which does not return the end result, but rather returns a way of checking for the completion of the task, for example by being providing with a status URL. This pattern may also be referred to as the Polling Consumer Pattern.

Recall from this previous article that when the client HTTP function is called, it returns some body content with management URLS for the orchestration instance that was started, for example:

{
    "id": "1bf95a9fa5084745bce24363e9ee781b",
    "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "rewindPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==",
    "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA=="
}

Notice that this information provides the client with a lot of information, including the URLs to terminate the orchestration, purge the history etc. Notice the response also include the key:  code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==

In the Azure Portal, this is the durabletask_extension key for the function app. With this key the client can perform admin/management operations on orchestration instances using the API including getting results from arbitrary orchestrations, terminating running orchestrations, etc..

The response also contains headers, including one called Location that also points to the check status URL, e.g. http://localhost:7071/runtime/webhooks/durabletask/instances/1bf95a9fa5084745bce24363e9ee781b?taskHub=DurableFunctionsHub&connection=Storage&code=ZOBhsHdAnHuXA6s2FMCcmcgW2XLFOVpQ5Hfob5CWYcyi2c5Al0DyjA==

If we look at the client function, this information is generated with the line: return starter.CreateCheckStatusResponse(req, instanceId);

[FunctionName("AsyncApiPatternExample_HttpStartV1")]
public static async Task<HttpResponseMessage> HttpStartV1(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    string instanceId = await starter.StartNewAsync("AsyncApiPatternExample", null);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

You probably do not want to expose the durabletask_extension key to clients as this will allow them to perform operations they should not have access to. Instead we can modify the client function as follows:

[FunctionName("AsyncApiPatternExample_HttpStartV2")]
public static async Task<HttpResponseMessage> HttpStartV2(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    string instanceId = await starter.StartNewAsync("AsyncApiPatternExample", null);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    // Create the URL to allow the client to check status of a request (excluding the function key in the code querystring)
    string checkStatusUrl = string.Format("{0}://{1}/api/AsyncApiPatternExample_Status?id={2}", req.RequestUri.Scheme, req.RequestUri.Host, instanceId);

    // Create the response and add headers
    var response = new HttpResponseMessage()
    {
        StatusCode = System.Net.HttpStatusCode.Accepted,                
        Content = new StringContent(checkStatusUrl),                
    };
    response.Headers.Add("Location", checkStatusUrl);
    response.Headers.Add("Retry-After", "10");

    return response;
}

In this new version of the client function, we control what is passed back to the client, we don’t include any sensitive management URLs/keys, a response from calling this function would look like:

Response body (text): https://localhost:7071/api/AsyncApiPatternExample_Status?id=d69a847230e5411ca57659723cb14c55
Response status: 202Accepted
Response Headers:
Location = https://localhost:7071/api/AsyncApiPatternExample_Status?id=d69a847230e5411ca57659723cb14c55
Retry-After = 10
etc.

The client can then GET the status URL (+ the function key): https://localhost:7071/api/AsyncApiPatternExample_Status?id=d69a847230e5411ca57659723cb14c55&code=XXXXXXXXXX

This will return:

{
    "currentStatus": "Running",
    "result": null
}

And once the orchestration has complete this will return:

{
    "currentStatus": "Completed",
    "result": "Hello London!"
}

The actual status function look like the following:

 

[FunctionName("AsyncApiPatternExample_Status")]
public static async Task<IActionResult> Status(
  [HttpTrigger(AuthorizationLevel.Function, "get")]HttpRequest req,
  [OrchestrationClient]DurableOrchestrationClient orchestrationClient,
  ILogger log)
{
    var orchestrationInstanceId = req.Query["id"];

    if (string.IsNullOrWhiteSpace(orchestrationInstanceId))
    {
        return new NotFoundResult();
    }

    // Get the status for the passed in instanceId
    DurableOrchestrationStatus status = await orchestrationClient.GetStatusAsync(orchestrationInstanceId);

    if (status is null)
    {
        return new NotFoundResult();
    }

    
    var shortStatus = new
    {
        currentStatus = status.RuntimeStatus.ToString(),
        result = status.Output
    };

    return new OkObjectResult(shortStatus);
    //  We could also expand this and check status.RuntimeStatus and for example return a 202 if processing is still underway
}

The key thing in the preceding code is the call: DurableOrchestrationStatus status = await orchestrationClient.GetStatusAsync(orchestrationInstanceId); This allows the status to be obtained for the orchestration id that was passed in as a querystring parameter.

The output to the client is chosen in the anonymous object shortStatus. Now the client does not get sensitive information returned such as the management URLs and keys. A client could however still retrieve the status/result from orchestrations started by other clients.If you want more fine grained control/authentication you should check out the other options in the documentation.

SHARE:

Understanding Azure Durable Functions - Part 8: The Fan Out/Fan In Pattern

This is the eighth part in a series of articles. If you’re not familiar with Durable Functions you should check out the previous articles before reading this.

In the previous article we saw the function chaining pattern where the output from one activity function is passed as the input to the next activity function to form a processing pipeline.

If you have a workload that you can split up into discrete chunks of data, you can parallelize the processing of those chunks to reduce the time it takes to complete the total workload. The fan out/fan in pattern can be used to do this.

This pattern essentially means running multiple instances of the activity function at the same time. The “fan out” part is the splitting up of the data into multiple chunks and then calling the activity function multiple times, passing in these chunks. The fanning out process invokes multiple instances of the activity function.

When each chunk has been processed, the “fan in” takes places and takes the results from each activity function instance and aggregates them into a single final result.

This pattern is only really useful if you can “chunk” the workload in a meaningful way for splitting up to be processed in parallel.

As an example, suppose we allow the client to specify a number of greetings to generate:

public class Greeting
{
    public string CityName { get; set; }
    public string Message { get; set; }
}

public class GreetingsRequest
{
    public List<Greeting> Greetings { get; set; }
}

Now the HTTP client function can be created that allows some JSON to be sent, this then calls the orchestrator:

[FunctionName("FanOutIn_HttpStart")]
public static async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Function, "post")]HttpRequestMessage req,
    [OrchestrationClient]DurableOrchestrationClient starter,
    ILogger log)
{
    var data = await req.Content.ReadAsAsync<GreetingsRequest>();

    string instanceId = await starter.StartNewAsync("FanOutInOrchestrator", data);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

At this point nothing is really different, the fan out/in is specified in the orchestrator function:

[FunctionName("FanOutInOrchestrator")]
public static async Task<string> RunOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
    log.LogInformation($"************** RunOrchestrator method executing ********************");

    GreetingsRequest greetingsRequest = context.GetInput<GreetingsRequest>();

    // Fanning out
    log.LogInformation($"************** Fanning out ********************");
    var parallelActivities = new List<Task<string>>();
    foreach (var greeting in greetingsRequest.Greetings)
    {
        // Start a new activity function and capture the task reference
        Task<string> task = context.CallActivityAsync<string>("FanOutIn_ActivityFunction", greeting);

        // Store the task reference for later
        parallelActivities.Add(task);
    }

    // Wait until all the activity functions have done their work
    log.LogInformation($"************** 'Waiting' for parallel results ********************");
    await Task.WhenAll(parallelActivities);
    log.LogInformation($"************** All activity functions complete ********************");

    // Now that all parallel activity functions have completed,
    // fan in AKA aggregate the results, in this case into a single
    // string using a StringBuilder
    log.LogInformation($"************** fanning in ********************");
    var sb = new StringBuilder();
    foreach (var completedParallelActivity in parallelActivities)
    {
        sb.AppendLine(completedParallelActivity.Result);
    }

    return sb.ToString();
}

The preceding code is the orchestrator function that handles the fan out/in, I’ve added comments to illustrate what’s going on. Essentially each Greeting is being treated as a “chunk” to be processed in parallel. Each chunk is passed to an instance of the FanOutIn_ActivityFunction. However rather than just awaiting the CallActivityAsync call, the task is stored in the parallelActivities list. Now when all activities have completed, the fan in can happen which just aggregates all the results into a single string containing all the greetings.

The activity function is defined as:

[FunctionName("FanOutIn_ActivityFunction")]
public static string SayHello([ActivityTrigger] Greeting greeting, ILogger log)
{            
    // simulate longer processing delay to demonstrate parallelism
    Thread.Sleep(15000); 

    return $"{greeting.Message} {greeting.CityName}";
}

If we run this, we get the following (simplified) output:

Executing 'FanOutIn_HttpStart' 
Executing 'FanOutInOrchestrator'
************** RunOrchestrator method executing ********************
************** Fanning out ********************
Function 'FanOutIn_ActivityFunction (Activity)' scheduled. 
Function 'FanOutIn_ActivityFunction (Activity)' scheduled. 
Function 'FanOutIn_ActivityFunction (Activity)' started. 
Function 'FanOutIn_ActivityFunction (Activity)' started. 
Executing 'FanOutIn_ActivityFunction' (Reason='', Id=9a33abd6-4594-4285-bbcd-0e428cf15d76)
Executing 'FanOutIn_ActivityFunction' (Reason='', Id=e3afbcb2-1f90-4f3f-a638-3983ea8db1a7)
Executed 'FanOutIn_ActivityFunction' (Succeeded, Id=9a33abd6-4594-4285-bbcd-0e428cf15d76)
Executed 'FanOutIn_ActivityFunction' (Succeeded, Id=e3afbcb2-1f90-4f3f-a638-3983ea8db1a7)
Function 'FanOutIn_ActivityFunction (Activity)' completed. 
Function 'FanOutIn_ActivityFunction (Activity)' completed. 
************** 'Waiting' for parallel results ********************
************** All activity functions complete ********************
************** fanning in ********************
Executed 'FanOutInOrchestrator' (Succeeded, Id=fba76372-758f-433c-af22-299a3b38dc5a)

Recall in the activity function there is a 15 second delay:

[FunctionName("FanOutIn_ActivityFunction")]
public static string SayHello([ActivityTrigger] Greeting greeting, ILogger log)
{            
    // simulate longer processing delay to demonstrate parallelism
    Thread.Sleep(15000); 

    return $"{greeting.Message} {greeting.CityName}";
}

If we look at the timings (below) notice that the createdTime and lastUpdatedTime are not 30 seconds apart but rather about 15 seconds apart (04:06:36 to 04:06:52), this is because the 2 activities have been run in parallel at the same time:

{
    "name": "FanOutInOrchestrator",
    "instanceId": "5704559dc4d94e26998ead2f47ea9821",
    "runtimeStatus": "Completed",
    "input": {
        "$type": "DurableDemos.FanOutInPatternExample+GreetingsRequest, DurableDemos",
        "Greetings": [
            {
                "$type": "DurableDemos.FanOutInPatternExample+Greeting, DurableDemos",
                "CityName": "New York",
                "Message": "Yo"
            },
            {
                "$type": "DurableDemos.FanOutInPatternExample+Greeting, DurableDemos",
                "CityName": "London",
                "Message": "Good day"
            }
        ]
    },
    "customStatus": null,
    "output": "Yo New York\r\nGood day London\r\n",
    "createdTime": "2019-08-21T04:06:36Z",
    "lastUpdatedTime": "2019-08-21T04:06:52Z"
}

Also note in the preceding status result the output is the aggregated “fanned-in” result: “Yo New York\r\nGood day London\r\n”.

Just as with the Function Chaining pattern discussed in the previous article, you could implement the fan out/fan in pattern without Durable Functions but you would have to manage the complexity of the process manually, such as knowing when all the parallel activity functions have completed and also the fan in/aggregation could be quite complex to implement manually.

SHARE: