Improving Azure Functions Blob Trigger Performance and Reliability - Part 3: Using Event Grid to Respond to New Blobs

In the previous part of the series we saw how to improve the reliability of responding to new blobs by introducing a queue.This required the introduction of a Storage Queue to the solution and also that the writer of new blobs also write a queue message.

In this article, instead of manually writing messages to a queue on blob creation, we use Event Grid events.

Azure Event Grid has support for Blob Storage, meaning that when a new blob is written, Event Grid will notice this. We can then trigger an Azure Function from this Event Grid event.

This approach can improve the reliability and responsiveness compared to using a simple blob trigger: “Blob storage events are reliably sent to the Event grid service which provides reliable delivery services to your applications through rich retry policies and dead-letter delivery.” [Microsoft]

Creating an Event Grid Triggered Function

The following Azure Function code is a modified version of the code used in the previous article:

public static class ProcessFoodBlobsEventGrid
{
    private static readonly string[] _meats = { "steak", "chicken", "venison" };

    [FunctionName("ProcessFoodBlobsEventGrid")]
    public static void Run(
     [EventGridTrigger]EventGridEvent blobCreatedEvent,
     [Blob("{data.url}")] string foods, // assumes small blob size so using string not stream
     [Blob("{data.url}.vegetarian")] out string vegetarian,
     [Blob("{data.url}.nonvegetarian")] out string nonVegetarian,
     ILogger log)
    {
        log.LogInformation("Processing a blob created event");

        StorageBlobCreatedEventData createdEvent = ((JObject)blobCreatedEvent.Data).ToObject<StorageBlobCreatedEventData>();

        log.LogInformation($"Blob: {createdEvent.Url}");
        log.LogInformation($"Api operation: {createdEvent.Api}");

        vegetarian = null;
        nonVegetarian = null;

        string[] foodLines = foods.Split(new[] { "\r\n", "\n" }, StringSplitOptions.RemoveEmptyEntries);


        foreach (var food in foodLines)
        {
            var isMeat = _meats.Contains(food);

            if (isMeat)
            {
                nonVegetarian += food + Environment.NewLine;
            }
            else
            {
                vegetarian += food + Environment.NewLine;
            }
        }
    }
}

In the preceding code, the [EventGridTrigger]EventGridEvent blobCreatedEvent will cause the function to be trigged based on an Event Grid event being directed to the function.

The input blob binding [Blob("{data.url}")] string foods uses a binding expression and accesses the data.url property from the JSON data that’s contained in the event (this comes from the event schema for Blob Storage). The 2 output bindings also use the original blob path/name and append .vegetarian or .nonvegetarian. This implementation writes output blobs to the same container as the input blob. You could also use dynamic binding in Azure Functions with imperative runtime bindings to just extract the filename from the blob and write the output blobs to a different container.

Creating an Event Subscription for New Blobs

The function needs an event subscription to be created in Azure to recognize when new blobs are written and invoke the function. This can be done by navigating to the storage account (requires storage account v2) in the Azure Portal and clicking the Events link. You can then add a new event subscription as the following screenshot shows (note the Defined Event Types is set to Blob Created):

Creating a new Azure Event Grid Subscription to trigger an Azure Function

You can also specify subject filters to limit the event to a specific container and/or file type as the following screenshot shows:

Configuring Azure Event Grid subscription to filter on blob storage containers

You could also specify dead-lettering and retry policies in case the Function App is unable to respond.

Now when a blob is added, the event subscription will notice it and invoke the function.

Ultimately “Use the Event Grid trigger instead of the Blob storage trigger for blob-only storage accounts, for high scale, or to reduce latency.” [Microsoft]

SHARE:

Improving Azure Functions Blob Trigger Performance and Reliability - Part 2: Processing Delays and Missed Blobs

This is the second part of a series or articles.

When you add a new blob, your blob-triggered function may not be triggered immediately: “If the blob container being monitored contains more than 10,000 blobs, the Functions runtime scans log files to watch for new or changed blobs. This process can result in delays. A function might not get triggered until several minutes or longer after the blob is created.” [Microsoft]

Also when scanning log files to find new blobs that need processing, there’s “no guarantee that all events are captured. Under some conditions, logs may be missed.” [Microsoft]

This means that it is possible for some new blobs to be missed and not processed.

Using a Storage Queue to Trigger Processing of New Blobs

One alternative to reduce the likelihood of missed blobs and also improve the responsiveness of blob processing is to use a slightly more  complex (but still relatively straight forward) approach.

Essentially this alternative approach has the following workflow:

  1. New blob written to blob storage
  2. Write message to storage queue containing new blob path
  3. Queue-triggered function gets message from step 2
  4. Blob processing occurs

(Note that this alternative approach may not suit all situations depending on how new blobs are making their way into blob storage – who or whatever is writing the blob in step 1 also needs to be able to write a queue message.)

Blob Writing

This approach requires that when a blob is written, a queue message is also written.

As a simple example, this could be from client code as follows:

using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System.IO;
using System.Threading.Tasks;

namespace AddNewBlob
{
    class Program
    {
        static async Task Main(string[] args)
        {
            CloudStorageAccount storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
            CloudBlobClient cloudBlobClient = storageAccount.CreateCloudBlobClient();
            CloudBlobContainer cloudBlobContainer = cloudBlobClient.GetContainerReference("food-in");
            CloudBlockBlob cloudBlockBlob = cloudBlobContainer.GetBlockBlobReference("recipe1.txt");

            await WriteBlob();
            await WriteMessage();

            async Task WriteBlob()
            {
                using (var stream = await cloudBlockBlob.OpenWriteAsync())
                using (var sw = new StreamWriter(stream))
                {
                    await sw.WriteLineAsync("carrot");
                    await sw.WriteLineAsync("steak");
                    await sw.WriteLineAsync("apple");
                }
            }

            async Task WriteMessage()
            {
                var queueClient = storageAccount.CreateCloudQueueClient();
                var queue = queueClient.GetQueueReference("food-in");
                await queue.AddMessageAsync(new Microsoft.WindowsAzure.Storage.Queue.CloudQueueMessage("recipe1.txt"));
            }
        }

        
    }
}

Or perhaps the blob data comes in via an HTTP-triggered function as follows:

public static class AddRecipe
{
    [FunctionName("AddRecipe")]
    [return: Queue("food-in")]
    public static async Task<string> Run(
        [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,            
        ILogger log)
    {
        log.LogInformation("C# HTTP trigger function processed a request.");
        
        string ingredients = await new StreamReader(req.Body).ReadToEndAsync();

        // validation/error code omitted for demo purposes

        var blobName = Guid.NewGuid().ToString();

        await WriteBlob(); // ensure blob is written *before* function returns and add message to the queue

        return blobName; // write to queue


        async Task WriteBlob()
        {
            var account = CloudStorageAccount.DevelopmentStorageAccount; // In real app load this from secure config location
            var blobClient = account.CreateCloudBlobClient();
            var blobContainer = blobClient.GetContainerReference("food-in");
            var cloudBlockBlob = blobContainer.GetBlockBlobReference(blobName);
            await cloudBlockBlob.UploadTextAsync(ingredients);
        }
    }
}

Notice in the preceding code , the writing of the blob is being done explicitly in code to ensure that the queue message isn’t added until the blob is definitely available to be processed by the next function in the chain. (See this related GitHub issue).

More Reliable Blob Processing

The next function is where the actual processing of the new blob is carried out, it is however triggered from a queue rather than relying on a blob trigger:

public static class ProcessFoodBlobs
{
    private static readonly string[] _meats = { "steak", "chicken", "venison" };      

    [FunctionName("ProcessFoodBlobs")]
    public static void Run(
        [QueueTrigger("food-in")]string newBlobPath, 
        [Blob("food-in/{queueTrigger}")] string foods,
        [Blob("food-out/{queueTrigger}.vegetarian")] out string vegetarian,
        [Blob("food-out/{queueTrigger}.nonvegetarian")] out string nonVegetarian,
        ILogger log)
    {
        vegetarian = null;
        nonVegetarian = null;

        string[] foodLines = foods.Split(new[] {"\r\n", "\n"  }, StringSplitOptions.RemoveEmptyEntries);


        foreach (var food in foodLines)
        {
            var isMeat = _meats.Contains(food);

            if (isMeat)
            {
                nonVegetarian += food + Environment.NewLine;
            }
            else
            {
                vegetarian += food + Environment.NewLine;
            }
        }    
    }
}

In the preceding code we’re making use of automatic input blob binding.

Summary

This approach may offer some benefits at the cost of some additional complexity if you have a lot of blobs being written/stored/processed. It also has some other considerations to bear in mind such as what happens if the blob is deleted or changed before the message is picked up off the queue? As with all things you should consider your own requirements and ensure you do thorough testing which includes performance/load/stress testing.

SHARE:

Improving Azure Functions Blob Trigger Performance and Reliability - Part 1: Memory Usage

This is the first part of a series or articles.

When creating blob-triggered Azure Functions there are some memory usage considerations to bear in mind.

“The consumption plan limits a function app on one virtual machine (VM) to 1.5 GB of memory. Memory is used by each concurrently executing function instance and by the Functions runtime itself.” [Microsoft]

A blob-triggered function can execute concurrently and internally uses a queue: “the maximum number of concurrent function invocations is controlled by the queues configuration in host.json. The default settings limit concurrency to 24 invocations. This limit applies separately to each function that uses a blob trigger.” [Microsoft]

So, if you have 1 blob-triggered function in a Function App, with the default concurrency setting of 24, you could have a maximum of 24 (1 * 24) concurrently executing function invocations. (The documentation describes this as per-VM concurrency, with 2 VMs you could have 48 (2vm * 1 * 24 concurrently executing function invocations.)

If you had 3 blob-triggered functions in a Function App (assuming 1 VM) then you could have 72 (3 * 24) concurrently executing function invocations.

Because the consumption plan “limits a function app on one virtual machine (VM) to 1.5 GB of memory”, if you are processing blobs that are non-trivial in size then you may need to consider overall memory usage.

OutOfMemoryException When Using Azure Functions Blob Trigger

As an example, suppose the following function exists:

public static class BlobPerformanceAndReliability
{
    [FunctionName("BlobPerformanceAndReliability")]
    public static void Run(
        [BlobTrigger("big-blobs/{name}")]string blob, 
        string name, 
        [Blob("big-blobs-out")] out string foundData,
        ILogger log)
    {
        log.LogInformation($"C# Blob trigger function Processed blob\n Name:{name} \n Size: {blob.Length} Bytes");

        // Code to find and output a specific line
        foundData = "This line will never be reached if out of memory";
    }
}

The preceding function code is triggered by blobs in the big-blobs container, the omitted code towards the end of the function would find a specific line of text in the blob and output it to big-blobs-out.

We can create a large file (appx. 1.8 GB) with the following code in a console app:

using System.IO;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var sw = new StreamWriter(@"c:\temp\bigblob.txt"))
            {
                for (int i = 0; i < 40_000_000; i++)
                {
                    sw.WriteLine("Some line we are not interested in processing");
                }
                sw.WriteLine("Data: 42");
            }
        }
    }
}

The contents of the last line in the file will be set to “Data: 42”.

If we run the function app locally and upload this big file to the Azure Storage Emulator, the function will trigger and will error with: “System.Private.CoreLib: Exception while executing function: BlobPerformanceAndReliability. Microsoft.Azure.WebJobs.Host: One or more errors occurred. (Exception binding parameter 'blob') (Exception binding parameter 'name'). Exception binding parameter 'blob'. System.Private.CoreLib: Exception of type 'System.OutOfMemoryException' was thrown.”.

The reason for this is that when you bind a blob trigger/input and bind to string or byte[] the entire blob will be read into memory, if the blob is too big (and/or there are other function invocations executing concurrently also processing big files) it will exceed the memory restrictions of the Functions Runtime.

Processing Large Blobs with Azure Functions

Instead of binding to string or byte[], you can bind to a Stream. This will not load the entire blob into memory and will allow you to instead process it incrementally.

The function can be re-written as follows:

public static class BlobPerformanceAndReliability
{
    [FunctionName("BlobPerformanceAndReliability")]
    public static void Run(
        [BlobTrigger("big-blobs/{name}")]Stream blob,
        string name,
        [Blob("big-blobs-out/{name}")] out string foundData,
        ILogger log)
    {
        log.LogInformation($"C# Blob trigger function Processed blob\n Name:{name} \n Size: {blob.Length} Bytes");

        // Code to find and output a specific line            

        foundData = null; // Don't write an output blob by default

        string line;

        using (var sr = new StreamReader(blob))
        {                
            while (!sr.EndOfStream)
            {
                line = sr.ReadLine();

                if (line.StartsWith("Data"))
                {
                    foundData = line;
                    break;
                }                    
            }
        }            
    }
}

If you’re not familiar with using streams in .NET, check out my Working with Files and Streams in C# Pluralsight course.

If we force the same blob to be reprocessed with this new function code, there will be no error and the output blob containing “Data: 42” will be seen in the big-blobs-out container.

Another thing to bear in mind when processing large files is that there is a timeout on function execution.

In the next part of this series we’ll look at how to improve the responsiveness of function execution when new blobs are written and also improve the reliability and reduce the chances of blobs being missed.

SHARE:

Handling Errors and Poison Blobs in Azure Functions With Azure Blob Storage Triggers

(This article applies to Azure Functions V2)

An Azure Function can be triggered by new blobs being written (or updated). If an unhandled exception occurs in the function, by default Azure Functions will retry the blob 5 times. This means the function will be triggered again for the same blob up to 5 times. If the same blob causes errors 5 times, no further attempts will be made and the processing of the blob will be “lost”.

Understanding Blob Processing Errors in Azure Functions

When a new (or updated) blob triggers a function, the Azure Functions runtime makes sure that the same blob is not processed twice (if no error occurs in the function execution). To do this the runtime makes use of “blob receipts”. These are stored in the Azure storage account associated with the function app (as defined in the AzureWebJobsStorage Function App settings).

As an example, suppose a new blob (called “followupletterrequest.data”) triggered the following function:

class FollowupLetterRequest
{
    public string FirstName { get; set; }
    public string LastName { get; set; }
}

public static class PoisonBlobExampleFunctions
{
    [FunctionName("PoisonBlobExampleFunctions")]
    public static void Run(
        [BlobTrigger("followup-letters/{blobname}.data")]string blobData, 
        string blobname,
        [Blob("followup-letters/{blobname}.txt")] out string letter,
        ILogger log)
    {
        var settings = new JsonSerializerSettings
        {
            MissingMemberHandling = MissingMemberHandling.Error
        };

        // This code assumes blob JSON is valid, if not an exception will be thrown
        var request = JsonConvert.DeserializeObject<FollowupLetterRequest>(blobData, settings);

        string firstName = request.FirstName;
        string lastName = request.LastName;

        letter = RenderFollowUpLetterText(firstName, lastName);
    }
    
    private static string RenderFollowUpLetterText(string firstName, string lastName)
    {
        string simulateLetterText = WaffleEngine.Text(paragraphs: 3, includeHeading: false);

        return $"Dear {firstName} {lastName}\r\n \r\n{simulateLetterText}";
    }
}

After the function runs, in the storage account under a path like “azure-webjobs-hosts/blobreceipts” the blob receipt can be seen. On a development machine using the local storage emulator the full path would be something like: “blobreceipts/desktop/DontCodeTiredDemosV2.PoisonBlobExampleFunctions.Run/"0x8D69224161F4590"/followup-letters/followupletterrequest.data”.

This full path to the blob receipt blob represents:

  • Function Id that the blob triggered (DontCodeTiredDemosV2.PoisonBlobExampleFunctions.Run)
  • Blob Container Name (followup-letters)
  • Name of triggering blob (followupletterrequest.data)
  • Triggering blob version ETag (“0x8D69224161F4590”)

 

If we now added another new blob called “followupletterrequest_bad.data” that contains bad data (e.g. a missing JSON property), so that an exception is thrown, a second blob receipt will be generated: “blobreceipts/desktop/DontCodeTiredDemosV2.PoisonBlobExampleFunctions.Run/"0x8D692245985E910"/followup-letters/followupletterrequest_bad.data”.

Because this blob generated an error, after the default number of retries (5) there will be no more attempts to process it.

Manually Forcing a Blob to Be Reprocessed

The documentation states that if the blob receipt is manually deleted, this will force the blob to reprocessed. This may be suitable to force reprocessing of a set of blobs that failed processing due to some transient error such as a database or network being temporarily offline. You should obviously take care that reprocessing blobs wont cause problems such as duplicate orders, emails, etc. or other errors in the system. You  may also need to consider what would happen if blobs are retried in a different order and/or interleaved with new blobs being added. Also blobs may not be reprocessed immediately. Using the local function runtime development environment, once the blob receipt has been deleted, it seems that the function app needs restarting to cause the blob to be reprocessed (either that or I didn’t wait long enough…). Once deployed to Azure there can be a delay between when the blob receipt is deleted and the blob being retried, the following timeline shows the delay between the blob receipt being deleted and the retry attempt 1.

2019-02-14 03:40:24.374 <attempt 1 - failure>
2019-02-14 03:40:24.763 <attempt 2 - failure>
2019-02-14 03:40:24.891 <attempt 3 - failure>
2019-02-14 03:40:25.007 <attempt 4 - failure>
2019-02-14 03:40:25.117 <attempt 5 - failure>
<blob receipt deleted>
2019-02-14 04:24:24.327 <retry attempt 1 - failure>
2019-02-14 04:24:25.155 <retry attempt 2 - failure>
2019-02-14 04:24:25.288 <retry attempt 3 - failure>
2019-02-14 04:24:25.455 <retry attempt 4 - failure>
2019-02-14 04:24:25.592 <retry attempt 5 - failure>

Automatically Responding to Blob Failures in Azure Functions

When a blob fails for the last time, information about the failure will written as a message to a Storage queue called “webjobs-blobtrigger-poison”. The message contains a JSON payload describing the triggering blob that didn’t complete processing successfully, for example:

{
  "Type": "BlobTrigger",
  "FunctionId": "DontCodeTiredDemosV2.PoisonBlobExampleFunctions.Run",
  "BlobType": "BlockBlob",
  "ContainerName": "followup-letters",
  "BlobName": "followupletterrequest_bad.data",
  "ETag": "\"0x8D692245985E910\""
}

The information contained in the JSON can be used to alert support people about the error and take appropriate action as required such as writing to a support ticket database or sending an email. You could also implement logic to automatically delete the blob receipt to force reprocessing but there would probably want to be some retry count otherwise bad data could cause an infinite processing loop. Exactly how you handle failed blob processing will depend on the business scenario.

As an example, the following function monitors the “webjobs-blobtrigger-poison” queue and grabs the information about the failed blob:

[FunctionName("PoisonBlobQueueProcessor")]
public static void PoisonBlobQueueProcessor(
    [QueueTrigger("webjobs-blobtrigger-poison")] string message,
    ILogger log)
{
    var poisonBobDetails = JsonConvert.DeserializeObject<dynamic>(message);

    log.LogInformation($"Found an unprocessed blob {poisonBobDetails.ContainerName}/{poisonBobDetails.BlobName}\r\n");
    
    // Send an email, log a ticket in a fault system, log a CRM issue, etc.            
}

SHARE:

Getting Blob Metadata When Using Azure Functions Blob Storage Triggers

(This article refers to Azure Functions V2)

Basic Blob Metadata

There are a few basic pieces of metadata that are often useful.

The following code show a simple example of a blob-triggered Azure Function:

[FunctionName("BlobMetadataExample")]
public static void Run(
    [BlobTrigger("decline-letters/{name}")]Stream myBlob, 
    string name, 
    ILogger log)
{
    log.LogInformation($"Name: {name} Size: {myBlob.Length} Bytes");
}

With the preceding code, if we add a blob called “declineletterrequest.data” to the “decline-letters” container, the function will be triggered with the output: “Name: declineletterrequest.data Size: 50 Bytes”.

Notice that the string name parameter has been automatically populated with the full name of the blob that triggered the function execution.

If you want to get the blob name and blob extension separately you could write the following:

[FunctionName("BlobMetadataExample")]
public static void Run(
    [BlobTrigger("decline-letters/{blobname}.{blobextension}")]Stream myBlob,
    string blobName,
    string blobExtension,
    ILogger log)
{
    log.LogInformation($"Name: {blobName} Extension: {blobExtension} Size: {myBlob.Length} Bytes");
}

If the preceding function executes we get the output: “Name: declineletterrequest Extension: data Size: 50 Bytes”.

In addition to being able to use this simple blob metadata in code, you can also use the elements of the triggering blob name in other bindings:

[FunctionName("BlobMetadataExample")]
public static void Run(
        [BlobTrigger("decline-letters/{blobname}.{blobextension}")]Stream myBlob,
        string blobName,
        string blobExtension,
        [Queue("output-queue-{blobextension}")] out string message,
        ILogger log)
{
    log.LogInformation($"Name: {blobName} Extension: {blobExtension} Size: {myBlob.Length} Bytes");

    message = "Hello world";
}

In the preceding code, the output queue that is written to is dependent on the extension of the triggering blob. If the triggering blob name was “declineletterrequest.bankofmars” then a message will be written to the queue “output-queue-bankofmars” or if the input blob was called “output-queue-bankofvenus” then a message would be written to the “output-queue-bankofvenus”.

You can also do a similar thing by binding an input blob binding to the contents of a triggering queue message.

Advanced Metadata

There are a number of additional metadata items that you can get by simply adding the correct method arguments with the correct names:

[FunctionName("BlobMetadataExample")]
public static void Run(
        [BlobTrigger("decline-letters/{blobname}.{blobextension}")]Stream myBlob,
        string blobName,
        string blobExtension,
        string blobTrigger, // full path to triggering blob
        Uri uri, // blob primary location
        IDictionary<string, string> metaData, // user-defined blob metadata
        BlobProperties properties, // blob system properties, e.g. LastModified
        ILogger log)
{
    log.LogInformation($@"
blobName      {blobName}
blobExtension {blobExtension}
blobTrigger   {blobTrigger}
uri           {uri}
metaData      {metaData.Count}
properties    {properties.Created}");
}

Executing the preceding code will give the following output:

blobName      declineletterrequest
blobExtension data
blobTrigger   decline-letters/declineletterrequest.data
uri           http://127.0.0.1:10000/devstoreaccount1/decline-letters/declineletterrequest.data
metaData      0
properties    12/02/2019 2:15:53 AM +00:00

The BlobProperties give you access to a host of information such as ETag, DeletedTime, ContentEncoding, etc.

You can use this additional metadata in further binding expressions, the following example shows how to bind a blob output name to the ETag of the original triggering blob:

[FunctionName("BlobMetadataExample")]
public static void Run(
[BlobTrigger("decline-letters/{blobname}.{blobextension}")]Stream myBlob,
string blobName,
BlobProperties properties,
[Blob("decline-letters/{properties.ETag}")] out string message,
ILogger log)
{
    message = "Hello world";
}

The preceding code would create an output blob with a name such as “0x8D6909193F68C10”.

SHARE:

Dealing With Unprocessed Storage Queue Poison Messages in Azure Functions

If an Azure Function that is triggered by a message on a Storage Queue throws an exception, the message will automatically be returned to the queue and retried again in the future.

In addition to specifying how soon the message will be retried, you can also configure how many times the message will be retried by editing the host.json file. By default a message will be retried 5 times before finally failing. The following host.json specifies that a message should be retried 10 times before finally failing:

{
  "version": "2.0",
  "extensions": {
    "queues": {      
      "maxDequeueCount": 10
    } 
  }  
}

Handling Poison Messages in Azure Functions

Once a message has been retried the maximum number of times, it will be considered a poisonous message, essentially that if we keep it on the queue it will “poison” the application/function and cause harm. Poison messages will be removed from the queue and placed onto a poison queue.

For example, if the queue that triggers the function is called “input-queue”, poison messages will be moved to a queue called “input-queue-poison”.

Because we know the name of the poison queue, we can process these poison messages somehow. Exactly how you choose to process these messages will depend on the application you are building.

One thing to think about is why the message may have failed:

  • Is the message content itself corrupted?
  • Is the function code itself defective/have a bug?
  • Were the exceptions caused by a transient error in a service the function uses?
  • Etc.

You could have some automated process (function) attempt to resolve the poison messages or forward them to a human to resolve (for example writing the message to database that a human can query).

Triggering an Azure Function From a Poison Message Queue

As an example, the following function retrieves messages from the “input-queue-poison” queue and writes out to table storage for a human to manually correct somehow:

using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Queue;

namespace DontCodeTiredDemosV2
{
    public class PoisonMessageDetails
    {        
        public string RowKey { get; set; }
        public string PartitionKey { get; set; }
        public string OriginalMessageContent { get; set; }
        public string OriginalMessageId { get; set; }
    }

    public static class HandlePoisonMessages
    {
        [FunctionName("HandlePoisonMessages")]
        [return: Table("HumanInterventionRequired")]
        public static PoisonMessageDetails Run(
            [QueueTrigger("input-queue-poison")]CloudQueueMessage poisonMessage,            
            ILogger log)
        {
            log.LogInformation($"Processing poison message {poisonMessage.Id}");            

            return new PoisonMessageDetails
            {
                RowKey = Guid.NewGuid().ToString(),
                PartitionKey = "input-queue",
                OriginalMessageContent = poisonMessage.AsString,
                OriginalMessageId = poisonMessage.Id
            };                       
        }
    }
}

Once a poison message is processed (for example with the content of “Amrit”) a row will be added to the table as the following screenshot shows:

Azure Table Storage to process poison messages

SHARE:

Specifying How Soon a Storage Queue Message Will Be Retried in an Azure Function

By default, if an exception occurs in an Azure Function that uses a Storage Queue trigger, the message will be returned to the queue and automatically retried again in the future (up to a maximum number of times).

By default, there is no delay in how soon the message can be retried.

public static class MakeUppercase
{
    [FunctionName("MakeUppercase")]
    public static void Run(
        [QueueTrigger("input-queue")]CloudQueueMessage inputQueueItem,
        ILogger log)
    {
        log.LogInformation($"Message Dequeued : {inputQueueItem.DequeueCount} time(s)");
        log.LogInformation($"Message Next Visible : {inputQueueItem.NextVisibleTime}");

        throw new Exception("Forced exception for demonstration purposes.");
    }
}

With the preceding function, when a single message is added to the queue, the following (abbreviated) output will be seen:

[15/01/2019 11:55:49 PM] Executing 'MakeUppercase' (Reason='New queue message detected on 'input-queue'.', Id=44f95504-7a99-4f23-81f2-096f0bd434a2)
[15/01/2019 11:55:49 PM] Message Dequeued : 1 time(s)
[15/01/2019 11:55:49 PM] Message Next Visible : 16/01/2019 12:05:49 AM +00:00
[15/01/2019 11:55:50 PM] Executed 'MakeUppercase' (Failed, Id=44f95504-7a99-4f23-81f2-096f0bd434a2)
[15/01/2019 11:55:50 PM] Executing 'MakeUppercase' (Reason='New queue message detected on 'input-queue'.', Id=bbede56f-e22a-461f-945c-3f3b47114de3)
[15/01/2019 11:55:50 PM] Message Dequeued : 2 time(s)
[15/01/2019 11:55:50 PM] Message Next Visible : 16/01/2019 12:05:50 AM +00:00
[15/01/2019 11:55:50 PM] Executed 'MakeUppercase' (Failed, Id=bbede56f-e22a-461f-945c-3f3b47114de3)
[15/01/2019 11:55:50 PM] Executing 'MakeUppercase' (Reason='New queue message detected on 'input-queue'.', Id=10b7495f-9cfb-4d75-bc31-db68581b8055)
[15/01/2019 11:55:50 PM] Message Dequeued : 3 time(s)
[15/01/2019 11:55:50 PM] Message Next Visible : 16/01/2019 12:05:50 AM +00:00
[15/01/2019 11:55:50 PM] Executed 'MakeUppercase' (Failed, Id=10b7495f-9cfb-4d75-bc31-db68581b8055)
[15/01/2019 11:55:50 PM] Executing 'MakeUppercase' (Reason='New queue message detected on 'input-queue'.', Id=385beb36-80b7-47a5-ba65-b0d04f956cc6)
[15/01/2019 11:55:50 PM] Message Dequeued : 4 time(s)
[15/01/2019 11:55:50 PM] Message Next Visible : 16/01/2019 12:05:50 AM +00:00
[15/01/2019 11:55:51 PM] Executed 'MakeUppercase' (Failed, Id=385beb36-80b7-47a5-ba65-b0d04f956cc6)
[15/01/2019 11:55:51 PM] Executing 'MakeUppercase' (Reason='New queue message detected on 'input-queue'.', Id=f6acfb11-41fb-416e-88b1-e113aa4424f5)
[15/01/2019 11:55:51 PM] Message Dequeued : 5 time(s)
[15/01/2019 11:55:51 PM] Message Next Visible : 16/01/2019 12:05:51 AM +00:00
[15/01/2019 11:55:51 PM] Executed 'MakeUppercase' (Failed, Id=f6acfb11-41fb-416e-88b1-e113aa4424f5)
[15/01/2019 11:55:51 PM] Message has reached MaxDequeueCount of 5. Moving message to queue 'input-queue-poison'.

Notice in the preceding output, the next visible times don’t include a delay in when the message can potentially be retried.

The next visible time controls when the message will become visible to be consumed. This default value in Azure Functions is 0. You may want to change this default if you want to add some delay between message retries (for example to help prevent message loss* for transient failures).

* Eventually, failed messages will be moved to a poison message queue.

The next visible time can be configured in the host.json file (we are using Azure Functions V2 in this article):

{
  "version": "2.0",
  "extensions": {
    "queues": {
      "visibilityTimeout": "00:00:30" 
    } 
  }  
}

The visibilityTimeout value represents a timespan (HH:MM:SS) to wait before a message becomes visible next time, in the preceding configuration, 30 seconds. Running again with this new configuration, the following output can be seen:

[16/01/2019 12:13:01 AM] Executing 'MakeUppercase' (Reason='New queue message detected on 'input-queue'.', Id=1f4f7177-4de6-4f4c-98c1-48d318892112)
[16/01/2019 12:13:01 AM] Message Dequeued : 1 time(s)
[16/01/2019 12:13:01 AM] Message Next Visible : 16/01/2019 12:23:00 AM +00:00
[16/01/2019 12:13:01 AM] Executed 'MakeUppercase' (Failed, Id=1f4f7177-4de6-4f4c-98c1-48d318892112)
[16/01/2019 12:13:32 AM] Executing 'MakeUppercase' (Reason='New queue message detected on 'input-queue'.', Id=93e9a90c-dd83-410f-9435-003712f64513)
[16/01/2019 12:13:32 AM] Message Dequeued : 2 time(s)
[16/01/2019 12:13:32 AM] Message Next Visible : 16/01/2019 12:23:32 AM +00:00
[16/01/2019 12:13:33 AM] Executed 'MakeUppercase' (Failed, Id=93e9a90c-dd83-410f-9435-003712f64513)
[16/01/2019 12:14:04 AM] Executing 'MakeUppercase' (Reason='New queue message detected on 'input-queue'.', Id=7ffe157b-7186-4f84-b8eb-02c43a260352)
[16/01/2019 12:14:04 AM] Message Dequeued : 3 time(s)
[16/01/2019 12:14:04 AM] Message Next Visible : 16/01/2019 12:24:04 AM +00:00
[16/01/2019 12:14:04 AM] Executed 'MakeUppercase' (Failed, Id=7ffe157b-7186-4f84-b8eb-02c43a260352)
[16/01/2019 12:14:36 AM] Executing 'MakeUppercase' (Reason='New queue message detected on 'input-queue'.', Id=ba3c186b-8b33-4b4c-b896-724a95fa2b25)
[16/01/2019 12:14:36 AM] Message Dequeued : 4 time(s)
[16/01/2019 12:14:36 AM] Message Next Visible : 16/01/2019 12:24:36 AM +00:00
[16/01/2019 12:14:36 AM] Executed 'MakeUppercase' (Failed, Id=ba3c186b-8b33-4b4c-b896-724a95fa2b25)
[16/01/2019 12:15:07 AM] Executing 'MakeUppercase' (Reason='New queue message detected on 'input-queue'.', Id=3de85cf0-89ad-430f-8219-ebd7b1701d4d)
[16/01/2019 12:15:07 AM] Message Dequeued : 5 time(s)
[16/01/2019 12:15:07 AM] Message Next Visible : 16/01/2019 12:25:07 AM +00:00
[16/01/2019 12:15:07 AM] Executed 'MakeUppercase' (Failed, Id=3de85cf0-89ad-430f-8219-ebd7b1701d4d)
[16/01/2019 12:15:07 AM] Message has reached MaxDequeueCount of 5. Moving message to queue 'input-queue-poison'.

Notice now that the message won’t be retried for 30 seconds between each attempt (look at the “Message Next Visible” lines).

Setting a visibility other than zero will not prevent other messages that come into the queue from being processed while waiting for retried messages to become visible again.

SHARE:

Getting Message Metadata When Using Azure Functions Storage Queue Triggers

When creating an Azure Function that is triggered by incoming messages on a Storage Queue, the type specified for the message parameter can be a simple string as follows:

public static class MakeUppercase
{
    [FunctionName("MakeUppercase")]
    public static void Run(
        [QueueTrigger("%in%")]string inputQueueItem,
        [Queue("%out%")] out string outputQueueItem,
        [Blob("%blobout%")] out string outputBlobItem,
        ILogger log)
    {
        inputQueueItem = inputQueueItem.ToUpperInvariant();
        outputQueueItem = inputQueueItem;
        outputBlobItem = inputQueueItem;
    }
}

In the preceding code, the inputQueueItem represents the content of the message that triggered the function.

If you want additional information about the queue message item itself, rather than use a string you can use CloudQueueMessage. Doing this gives you access to the metadata about the queue message including the following:

  • Message ID
  • Time message was inserted into queue
  • Time the message expires
  • How many times the message has been dequeued (i.e. read off the queue )*
  • Message next visible time
  • Message pop receipt

* A message can be returned to the queue if an exception occurs during execution of the function, this will increment the dequeue count.

In addition to the message metadata, you can still get the message content either as a string or byte array using the AsString or AsBytes properties respectively:

public static class MakeUppercase
{
    [FunctionName("MakeUppercase")]
    public static void Run(
        [QueueTrigger("%in%")]CloudQueueMessage inputQueueItem,
        [Queue("%out%")] out string outputQueueItem,
        [Blob("%blobout%")] out string outputBlobItem,
        ILogger log)
    {
        log.LogInformation($"Message Id: {inputQueueItem.Id}");
        log.LogInformation($"Message Inserted : {inputQueueItem.InsertionTime}");
        log.LogInformation($"Message Expires : {inputQueueItem.ExpirationTime}");
        log.LogInformation($"Message Dequeued : {inputQueueItem.DequeueCount} time(s)");
        log.LogInformation($"Message Next Visible : {inputQueueItem.NextVisibleTime}");
        log.LogInformation($"Message Pop Receipt : {inputQueueItem.PopReceipt}");

        log.LogInformation($"Message content (bytes) : {BitConverter.ToString(inputQueueItem.AsBytes)}");
        log.LogInformation($"Message content (string) : {inputQueueItem.AsString}");

        
        var inputQueueItemContent = inputQueueItem.AsString;
        inputQueueItemContent = inputQueueItemContent.ToUpperInvariant();
        outputQueueItem = inputQueueItemContent;
        outputBlobItem = inputQueueItemContent;
    }
}

SHARE:

Configuring Queue Names and Blob Path Bindings in Azure Functions Configuration

When working with Azure Functions in C# (specifically Azure Functions V2 in this article) you can specify bindings with hard-coded literal values.

For example, the following function has a queue trigger that is reading messages from a queue called “input-queue”, an output queue binding writing messages to “output-queue”, and a blob storage binding to write blobs to “audit/{rand-guid}”:

public static class MakeUppercase
{
    [FunctionName("MakeUppercase")]
    public static void Run(
        [QueueTrigger("input-queue")]string inputQueueItem,
        [Queue("output-queue")] out string outputQueueItem,
        [Blob("audit/{rand-guid}")] out string outputBlobItem,
        ILogger log)
    {
        inputQueueItem = inputQueueItem.ToUpperInvariant();
        outputQueueItem = inputQueueItem;
        outputBlobItem = inputQueueItem;
    }
}

All these binding values in the preceding code are hard coded, if they need to be changed once the Function App is deployed, a new release will be required.

Specifying Azure Function Bindings in Configuration

As an alternative, the %% syntax can be used inside the binding string:

public static class MakeUppercase
{
    [FunctionName("MakeUppercase")]
    public static void Run(
        [QueueTrigger("%in%")]string inputQueueItem,
        [Queue("%out%")] out string outputQueueItem,
        [Blob("%blobout%")] out string outputBlobItem,
        ILogger log)
    {
        inputQueueItem = inputQueueItem.ToUpperInvariant();
        outputQueueItem = inputQueueItem;
        outputBlobItem = inputQueueItem;
    }
}

Notice in the preceding code, parts of the binding configuration strings are specified between %%: "%in%", "%out%", and "%blobout%".

At runtime, these values will be read from configuration instead of being hard coded.

Configuring Bindings at Development Time

When running locally, the configuration values will be read from the local.settings.json file, for example:

{
    "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "in": "input-queue",
    "out": "output-queue",
    "blobout" :  "audit/{rand-guid}"
  }
}

Notice the “in”, “out”, and “blobout” configuration elements that map to  "%in%", "%out%", and "%blobout%”.

Configuring Bindings in Azure

Once deployed and running in Azure, these settings will need to be present in the Function App Application Settings as the following screenshot demonstrates:

Specifying Azure Function Bindings in application settings

Now if you want to modify the queue names or blob path you can simply change the values in configuration. It should be noted that you may have to restart the Function App for the changes to take effect. You will also need to manage the switch to new queues, blobs, etc.such as what to do if after the change there are still some messages in the original input queue, etc, etc.

SHARE:

Azure Functions Continuous Deployment with Azure Pipelines: Part 8 - Using Gates to Run Smoke Tests on Deployed Function Apps

This is the eighth and final part in a series demonstrating how to setup continuous deployment of an Azure Functions App using Azure DevOps build and release pipelines.

Demo app source code is available on GitHub.

In the previous instalment we added functional end-to-end testing to the release pipeline.

As one final check for the release, we can execute a smoke test on the deployed production Function App. This smoke test won’t modify any data and will simply allow us to check that the Function App is responding to HTTP requests.The smoke test function won’t be disabled in production but it will be protected by a function key.

namespace InvestFunctionApp.TestFunctions
{
    public static class SmokeTest
    {
        [FunctionName("SmokeTest")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "post", Route = Testing.TestFunctionRoute + "/smoketest")] HttpRequest req,
            ILogger log)
        {            
            log.LogInformation("C# HTTP trigger function processed a request.");

            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            string azureDevopsReleaseUrl = data?.releaseurl ?? "[releaseurl value not supplied in json request]";

            log.LogInformation($"Smoke test for release {azureDevopsReleaseUrl}");

            // We could add extra smoke testing code here, this simple version just allows us to 
            // verify that the deployment succeeded and the Function App is responding to HTTP requests

            return new OkObjectResult("Smoke test successful");
        }
    }
}

The preceding smoke test function returns a 200 OK result and also allows the URL to the AzureDevOps release to be provided, which is written to the logs. This allows ops to see from which release the smoke test function came from.

The first step is to add a variable to the release pipeline called “prodSmokeTestKey” with the value being the function key from production for the smoketest function. Once this key is copied from the Azure Portal it can be copied into the variable value and the padlock icon clicked to mark this as sensitive data:

Adding a sensitive Azure Pipeline variable

“Gates allow automatic collection of health signals from external services, and then promote the release when all the signals are successful at the same time or stop the deployment on timeout.” [Microsoft]

Gates can be evaluated before a stage executes and/or after a stage executes.

“Approvals and gates give you additional control over the start and completion of the deployment pipeline. Each stage in a release pipeline can be configured with pre-deployment and post-deployment conditions that can include waiting for users to manually approve or reject deployments, and checking with other automated systems until specific conditions are verified. In addition, you can configure a manual intervention to pause the deployment pipeline and prompt users to carry out manual tasks, then resume or reject the deployment.” [Microsoft]

We can add a post-deployment gate that will mark the release as unsuccessful if the smoke test function does not respond successfully. To do this we can use the “Invoke Azure Function” task. Other tasks that can be executed as part of a gate include “Invoke REST API”, “Query Azure Monitor Alerts”, and “Query Work Items” tasks.

To add a post-deployment gate to the “Deploy to Production” stage, click the post-deployment conditions icon, enable the Gates switch, and click the + Add button.

Enabling post-deployment gates in Azure Pipelines

Choose “Invoke Azure Function”. Paste in the URL to the production smoke test function and for the function key specify “$(prodSmokeTestKey)” to retrieve the key from the pipeline variable that we set up earlier. Select the method as POST and in the body enter “{ "releaseurl" : "$(Release.ReleaseWebURL)" }”. This is the JSON payload that will be sent to the smoke test function.

Invoke Azure Function Deployment Gate

Hit save and queue a new release.

Once the production deployment is complete, there will be a 5 minute delay before the gate(s) are evaluated for the first time and a 15 minute wait between re-evaluation of gates which ultimately means a lengthy delay between the deployment and smoke test execution. To improve this you can set the the “The delay before evaluation” to 1 minute, and in the Evaluation options section at the bottom set “The time between re-evaluation of gates” to5 mins and “The timeout after which gates fail” to 6 mins.

As an alternative to using gates in this way (especially if you need to add multiple gates or complex smoke testing requirements) could be to have another post-production-deployment task/job/stage that calls the smoke test function(s)  in another source-controlled test project in a similar way to how the end-to-end tests were executed against the test Function App.

That bring us to the end of this 8 part series.To be notified of future posts subscribe to the blog feed or follow me on Twitter.

SHARE: