Improving Azure Functions Blob Trigger Performance and Reliability - Part 4: Periodically Checking for Unprocessed Blobs

In the this final part of this series we wrap up by briefly discussing some ways to check for blobs that have not been processed correctly.

When using Azure Functions, a timer trigger can be used to automatically periodically execute a function based on a CRON expression. The following code is and example of a timer-triggered function:

public static class CheckBlobs
{
    [FunctionName("CheckBlobs")]
    public static void Run(
        [TimerTrigger("0 */5 * * * *")]TimerInfo myTimer, 
        ILogger log)
    {
        log.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}");

        // Blob checking logic here
    }
}

There are a number of ways to check for unprocessed blobs depending on the solution you are building, some examples:

  • Check that an output blob exists for every input blob
  • Use a database to keep track of blobs that were uploaded and compare this to actual output blobs
  • If blobs are deleted after they have been processed, check there are no blobs in the container
  • Etc.

Some things to bear in mind if implementing this kind of checking include:

  • How often/when to run the function?
  • How long after a blob is written should you give it to be processed normally?
  • Will running this function interfere with any other processing in the system?
  • What if a blob is due to be processed (e.g. message sitting in a queue but not yet processed)? Could this create false positives or cause duplication of processing?
  • How long does the checking function take to execute? Will it take too long as the number of blobs increases and will the function be terminated by the runtime?
  • How/who do you notify of missed blobs (email, SMS, create ticket in CRM/bug system, etc.)?
  • Do you try to perform auto-retry of processing? Again, could this cause duplication, errors, etc.?

You could also use logging/Application Insights to provide you with information, or write every incoming blob name to a database and update that record when a blob has been processed, this way unprocessed blobs can be found with a simple “IF NOT PROCESSED” query.

SHARE:

Improving Azure Functions Blob Trigger Performance and Reliability - Part 3: Using Event Grid to Respond to New Blobs

In the previous part of the series we saw how to improve the reliability of responding to new blobs by introducing a queue.This required the introduction of a Storage Queue to the solution and also that the writer of new blobs also write a queue message.

In this article, instead of manually writing messages to a queue on blob creation, we use Event Grid events.

Azure Event Grid has support for Blob Storage, meaning that when a new blob is written, Event Grid will notice this. We can then trigger an Azure Function from this Event Grid event.

This approach can improve the reliability and responsiveness compared to using a simple blob trigger: “Blob storage events are reliably sent to the Event grid service which provides reliable delivery services to your applications through rich retry policies and dead-letter delivery.” [Microsoft]

Creating an Event Grid Triggered Function

The following Azure Function code is a modified version of the code used in the previous article:

public static class ProcessFoodBlobsEventGrid
{
    private static readonly string[] _meats = { "steak", "chicken", "venison" };

    [FunctionName("ProcessFoodBlobsEventGrid")]
    public static void Run(
     [EventGridTrigger]EventGridEvent blobCreatedEvent,
     [Blob("{data.url}")] string foods, // assumes small blob size so using string not stream
     [Blob("{data.url}.vegetarian")] out string vegetarian,
     [Blob("{data.url}.nonvegetarian")] out string nonVegetarian,
     ILogger log)
    {
        log.LogInformation("Processing a blob created event");

        StorageBlobCreatedEventData createdEvent = ((JObject)blobCreatedEvent.Data).ToObject<StorageBlobCreatedEventData>();

        log.LogInformation($"Blob: {createdEvent.Url}");
        log.LogInformation($"Api operation: {createdEvent.Api}");

        vegetarian = null;
        nonVegetarian = null;

        string[] foodLines = foods.Split(new[] { "\r\n", "\n" }, StringSplitOptions.RemoveEmptyEntries);


        foreach (var food in foodLines)
        {
            var isMeat = _meats.Contains(food);

            if (isMeat)
            {
                nonVegetarian += food + Environment.NewLine;
            }
            else
            {
                vegetarian += food + Environment.NewLine;
            }
        }
    }
}

In the preceding code, the [EventGridTrigger]EventGridEvent blobCreatedEvent will cause the function to be trigged based on an Event Grid event being directed to the function.

The input blob binding [Blob("{data.url}")] string foods uses a binding expression and accesses the data.url property from the JSON data that’s contained in the event (this comes from the event schema for Blob Storage). The 2 output bindings also use the original blob path/name and append .vegetarian or .nonvegetarian. This implementation writes output blobs to the same container as the input blob. You could also use dynamic binding in Azure Functions with imperative runtime bindings to just extract the filename from the blob and write the output blobs to a different container.

Creating an Event Subscription for New Blobs

The function needs an event subscription to be created in Azure to recognize when new blobs are written and invoke the function. This can be done by navigating to the storage account (v2) in the Azure Portal and clicking the Events link. You can then add a new event subscription as the following screenshot shows (note the Defined Event Types is set to Blob Created):

Creating a new Azure Event Grid Subscription to trigger an Azure Function

You can also specify subject filters to limit the event to a specific container and/or file type as the following screenshot shows:

Configuring Azure Event Grid subscription to filter on blob storage containers

You could also specify dead-lettering and retry policies in case the Function App is unable to respond.

Now when a blob is added, the event subscription will notice it and invoke the function.

Ultimately “Use the Event Grid trigger instead of the Blob storage trigger for blob-only storage accounts, for high scale, or to reduce latency.” [Microsoft]

SHARE:

Improving Azure Functions Blob Trigger Performance and Reliability - Part 2: Processing Delays and Missed Blobs

This is the second part of a series or articles.

When you add a new blob, your blob-triggered function may not be triggered immediately: “If the blob container being monitored contains more than 10,000 blobs, the Functions runtime scans log files to watch for new or changed blobs. This process can result in delays. A function might not get triggered until several minutes or longer after the blob is created.” [Microsoft]

Also when scanning log files to find new blobs that need processing, there’s “no guarantee that all events are captured. Under some conditions, logs may be missed.” [Microsoft]

This means that it is possible for some new blobs to be missed and not processed.

Using a Storage Queue to Trigger Processing of New Blobs

One alternative to reduce the likelihood of missed blobs and also improve the responsiveness of blob processing is to use a slightly more  complex (but still relatively straight forward) approach.

Essentially this alternative approach has the following workflow:

  1. New blob written to blob storage
  2. Write message to storage queue containing new blob path
  3. Queue-triggered function gets message from step 2
  4. Blob processing occurs

(Note that this alternative approach may not suit all situations depending on how new blobs are making their way into blob storage – who or whatever is writing the blob in step 1 also needs to be able to write a queue message.)

Blob Writing

This approach requires that when a blob is written, a queue message is also written.

As a simple example, this could be from client code as follows:

using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System.IO;
using System.Threading.Tasks;

namespace AddNewBlob
{
    class Program
    {
        static async Task Main(string[] args)
        {
            CloudStorageAccount storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
            CloudBlobClient cloudBlobClient = storageAccount.CreateCloudBlobClient();
            CloudBlobContainer cloudBlobContainer = cloudBlobClient.GetContainerReference("food-in");
            CloudBlockBlob cloudBlockBlob = cloudBlobContainer.GetBlockBlobReference("recipe1.txt");

            await WriteBlob();
            await WriteMessage();

            async Task WriteBlob()
            {
                using (var stream = await cloudBlockBlob.OpenWriteAsync())
                using (var sw = new StreamWriter(stream))
                {
                    await sw.WriteLineAsync("carrot");
                    await sw.WriteLineAsync("steak");
                    await sw.WriteLineAsync("apple");
                }
            }

            async Task WriteMessage()
            {
                var queueClient = storageAccount.CreateCloudQueueClient();
                var queue = queueClient.GetQueueReference("food-in");
                await queue.AddMessageAsync(new Microsoft.WindowsAzure.Storage.Queue.CloudQueueMessage("recipe1.txt"));
            }
        }

        
    }
}

Or perhaps the blob data comes in via an HTTP-triggered function as follows:

public static class AddRecipe
{
    [FunctionName("AddRecipe")]
    [return: Queue("food-in")]
    public static async Task<string> Run(
        [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,            
        ILogger log)
    {
        log.LogInformation("C# HTTP trigger function processed a request.");
        
        string ingredients = await new StreamReader(req.Body).ReadToEndAsync();

        // validation/error code omitted for demo purposes

        var blobName = Guid.NewGuid().ToString();

        await WriteBlob(); // ensure blob is written *before* function returns and add message to the queue

        return blobName; // write to queue


        async Task WriteBlob()
        {
            var account = CloudStorageAccount.DevelopmentStorageAccount; // In real app load this from secure config location
            var blobClient = account.CreateCloudBlobClient();
            var blobContainer = blobClient.GetContainerReference("food-in");
            var cloudBlockBlob = blobContainer.GetBlockBlobReference(blobName);
            await cloudBlockBlob.UploadTextAsync(ingredients);
        }
    }
}

Notice in the preceding code , the writing of the blob is being done explicitly in code to ensure that the queue message isn’t added until the blob is definitely available to be processed by the next function in the chain. (See this related GitHub issue).

More Reliable Blob Processing

The next function is where the actual processing of the new blob is carried out, it is however triggered from a queue rather than relying on a blob trigger:

public static class ProcessFoodBlobs
{
    private static readonly string[] _meats = { "steak", "chicken", "venison" };      

    [FunctionName("ProcessFoodBlobs")]
    public static void Run(
        [QueueTrigger("food-in")]string newBlobPath, 
        [Blob("food-in/{queueTrigger}")] string foods,
        [Blob("food-out/{queueTrigger}.vegetarian")] out string vegetarian,
        [Blob("food-out/{queueTrigger}.nonvegetarian")] out string nonVegetarian,
        ILogger log)
    {
        vegetarian = null;
        nonVegetarian = null;

        string[] foodLines = foods.Split(new[] {"\r\n", "\n"  }, StringSplitOptions.RemoveEmptyEntries);


        foreach (var food in foodLines)
        {
            var isMeat = _meats.Contains(food);

            if (isMeat)
            {
                nonVegetarian += food + Environment.NewLine;
            }
            else
            {
                vegetarian += food + Environment.NewLine;
            }
        }    
    }
}

In the preceding code we’re making use of automatic input blob binding.

Summary

This approach may offer some benefits at the cost of some additional complexity if you have a lot of blobs being written/stored/processed. It also has some other considerations to bear in mind such as what happens if the blob is deleted or changed before the message is picked up off the queue? As with all things you should consider your own requirements and ensure you do thorough testing which includes performance/load/stress testing.

SHARE:

Improving Azure Functions Blob Trigger Performance and Reliability - Part 1: Memory Usage

This is the first part of a series or articles.

When creating blob-triggered Azure Functions there are some memory usage considerations to bear in mind.

“The consumption plan limits a function app on one virtual machine (VM) to 1.5 GB of memory. Memory is used by each concurrently executing function instance and by the Functions runtime itself.” [Microsoft]

A blob-triggered function can execute concurrently and internally uses a queue: “the maximum number of concurrent function invocations is controlled by the queues configuration in host.json. The default settings limit concurrency to 24 invocations. This limit applies separately to each function that uses a blob trigger.” [Microsoft]

So, if you have 1 blob-triggered function in a Function App, with the default concurrency setting of 24, you could have a maximum of 24 (1 * 24) concurrently executing function invocations. (The documentation describes this as per-VM concurrency, with 2 VMs you could have 48 (2vm * 1 * 24 concurrently executing function invocations.)

If you had 3 blob-triggered functions in a Function App (assuming 1 VM) then you could have 72 (3 * 24) concurrently executing function invocations.

Because the consumption plan “limits a function app on one virtual machine (VM) to 1.5 GB of memory”, if you are processing blobs that are non-trivial in size then you may need to consider overall memory usage.

OutOfMemoryException When Using Azure Functions Blob Trigger

As an example, suppose the following function exists:

public static class BlobPerformanceAndReliability
{
    [FunctionName("BlobPerformanceAndReliability")]
    public static void Run(
        [BlobTrigger("big-blobs/{name}")]string blob, 
        string name, 
        [Blob("big-blobs-out")] out string foundData,
        ILogger log)
    {
        log.LogInformation($"C# Blob trigger function Processed blob\n Name:{name} \n Size: {blob.Length} Bytes");

        // Code to find and output a specific line
        foundData = "This line will never be reached if out of memory";
    }
}

The preceding function code is triggered by blobs in the big-blobs container, the omitted code towards the end of the function would find a specific line of text in the blob and output it to big-blobs-out.

We can create a large file (appx. 1.8 GB) with the following code in a console app:

using System.IO;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var sw = new StreamWriter(@"c:\temp\bigblob.txt"))
            {
                for (int i = 0; i < 40_000_000; i++)
                {
                    sw.WriteLine("Some line we are not interested in processing");
                }
                sw.WriteLine("Data: 42");
            }
        }
    }
}

The contents of the last line in the file will be set to “Data: 42”.

If we run the function app locally and upload this big file to the Azure Storage Emulator, the function will trigger and will error with: “System.Private.CoreLib: Exception while executing function: BlobPerformanceAndReliability. Microsoft.Azure.WebJobs.Host: One or more errors occurred. (Exception binding parameter 'blob') (Exception binding parameter 'name'). Exception binding parameter 'blob'. System.Private.CoreLib: Exception of type 'System.OutOfMemoryException' was thrown.”.

The reason for this is that when you bind a blob trigger/input and bind to string or byte[] the entire blob will be read into memory, if the blob is too big (and/or there are other function invocations executing concurrently also processing big files) it will exceed the memory restrictions of the Functions Runtime.

Processing Large Blobs with Azure Functions

Instead of binding to string or byte[], you can bind to a Stream. This will not load the entire blob into memory and will allow you to instead process it incrementally.

The function can be re-written as follows:

public static class BlobPerformanceAndReliability
{
    [FunctionName("BlobPerformanceAndReliability")]
    public static void Run(
        [BlobTrigger("big-blobs/{name}")]Stream blob,
        string name,
        [Blob("big-blobs-out/{name}")] out string foundData,
        ILogger log)
    {
        log.LogInformation($"C# Blob trigger function Processed blob\n Name:{name} \n Size: {blob.Length} Bytes");

        // Code to find and output a specific line            

        foundData = null; // Don't write an output blob by default

        string line;

        using (var sr = new StreamReader(blob))
        {                
            while (!sr.EndOfStream)
            {
                line = sr.ReadLine();

                if (line.StartsWith("Data"))
                {
                    foundData = line;
                    break;
                }                    
            }
        }            
    }
}

If you’re not familiar with using streams in .NET, check out my Working with Files and Streams in C# Pluralsight course.

If we force the same blob to be reprocessed with this new function code, there will be no error and the output blob containing “Data: 42” will be seen in the big-blobs-out container.

Another thing to bear in mind when processing large files is that there is a timeout on function execution.

In the next part of this series we’ll look at how to improve the responsiveness of function execution when new blobs are written and also improve the reliability and reduce the chances of blobs being missed.

SHARE: