Giter VIP home page Giter VIP logo

queuebatch's Introduction

Build Status

Icon

QueueBatch

QueueBatch is an Azure Functions trigger providing ability to process Azure Storage Queue messages in batches.

Why?

Azure Functions trigger for Azure Storage Queue obtains messages in batches but dispatches them in separate tasks. It's ok, if a function touches different resources. If a function does some processing and then appends to a single append blob it won't scale up nicely. The problems that might occur are:

  • issuing many IO transactions,
  • failing due to concurrency checks (hot resources)
  • breaching throughput of a partition, etc.

Accessing the same resource with high frequency might simply not work. With QueueBatch, you can address it, by processing all the messages from the batch in the same function, amortizing the cost of accessing other resources.

Usage

Basic

To use QueueBatch in your Function application, you need to use a custom IMessageBatch parameter type to accept a batch and mark it with an appropriate attribute. To resolve queue name from configuration file wrap it with % sign, i.e. %queue-app-setting-key%

public static void MyFunc([QueueBatchTrigger("myqueue")] IMessageBatch batch)
{
  foreach (var msg in batch.Messages)
  {
    // do something with payload
    DoSomething(msg.Payload);
  }

  // acknowledge processing
  batch.MarkAllAsProcessed ();
}

With SuccessOrFailAsBatch set to false, you can also acknowledge only some of the messages. The rest, will be retried in a similar manner to the regural [QueueTrigger]

public static void MyFunc([QueueBatchTrigger("myqueue", SuccessOrFailAsBatch = false)] IMessageBatch batch)
{
  foreach (var msg in batch.Messages)
  {
    // do something with payload
    if (DoSomething(msg.Payload))
    {
       // mark as processed only if successful
       batch.MarkAsProcessed (msg);
    }
  }
}

Faster queues

QueueBatch provides an alternative client for accessing Azure Storage Queues that is much faster then the one provided by SDK (up to 20x). To enable it (it's opt-in), you need to set UseFasterQueues to true.

public static void MyFunc([QueueBatchTrigger("myqueue", UseFasterQueues = true)] IMessageBatch batch)
{
  // ...
}

Parallel gets

As a single operation of getting messages can obtain no more than 32, you can request issuing multiple parallel gets. The maximum number of messages in a batch will be equal to 32 * ParallelGets.

public static void MyFunc([QueueBatchTrigger("myqueue", ParallelGets = 2)] IMessageBatch batch)
{
  // ...
}

Empty batches

Sometimes it might be useful to get notifications about empty batches as well. They can be used to do some other work, like compacting your data or sending a notification that there was a run with nothing to process. After each empty batch, the back-off strategy will delay the next query for messages even more. To enable calls with empty batches, specify the following property

public static void MyFunc([QueueBatchTrigger("myqueue", RunWithEmptyBatch = true)] IMessageBatch batch)
{
  // now, if no message is retrieved from the queue, MyFunc will still be called
}

Connection

Connection is an optional property to specify queue's storage account. It's a name of an app setting that contains the storage connection string to use for this binding. When connection property is empty then default AzureWebJobsStorage connection string is used.

public static void MyFunc([QueueBatchTrigger("myqueue", Connection = "StorageConnectionAppSetting")] IMessageBatch batch)
{
  // ...
}

Licensing

QueueBatch

QueueBatch is licensed under the Apache License 2.0 license.

Azure Webjobs SDK is licensed under the MIT license as described here. Azure Webjobs SDK sources are used and partially compiled into the QueueBatch distribution as allowed under the license terms found here.

Icon

Batch Download designed by Fatahillah from The Noun Project

queuebatch's People

Contributors

azure-pipelines[bot] avatar jeremy001181 avatar krlm avatar scooletz avatar simoncropp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

queuebatch's Issues

QueueBatchTrigger won't trigger

Thx for adding batch support for message queues!

I'm trying to get QueueBatchTrigger to fire using WebJobs 3.0.4 but it won't.
If I change it to the default QueueTrigger it triggers message by message and yes I do have messages in my queu.

What am I doing wrong?

Program.cs

static async Task Main(string[] args)
        {
            var builder = new HostBuilder()
                .UseEnvironment("Development")
                .ConfigureWebJobs(b =>
                {
                    b.AddAzureStorageCoreServices()
                    .AddAzureStorage()
                    .AddServiceBus()
                    .AddEventHubs()
                    .AddTimers();
                })
                .ConfigureAppConfiguration(b =>
                {
                    // Adding command line as a configuration source
                    b.AddCommandLine(args);
                })
                .ConfigureLogging((context, b) =>
                {
                    b.SetMinimumLevel(LogLevel.Debug);
                    b.AddConsole();
                })
                .ConfigureServices((context,services) =>
                {
                    // add some sample services to demonstrate job class DI
                    services.Configure<QueuesOptions>(options =>
                    {
                        //maximum number of queue messages that are picked up simultaneously to be executed in parallel (default is 16)
                        options.BatchSize = 15;
                        //Maximum number of retries before a queue message is sent to a poison queue (default is 5)
                        options.MaxDequeueCount = 5;
                        //maximum wait time before polling again when a queue is empty (default is 1 minute).
                        options.MaxPollingInterval = System.TimeSpan.FromSeconds(60);
                    });

                })
                .UseConsoleLifetime();

            var host = builder.Build();
            using (host)
            {
                await host.RunAsync();
            }
        }

ProductQueu.cs

public class ProductQueu
    {
        public static void QueuListener([QueueBatchTrigger("product")]IMessageBatch batch)
        {
            Console.WriteLine(batch);
        }
    }

Idea: generalize the queue lib to allow for plugging in other queue technologies

First, a piece of solid work! Two things:

  1. Looking at src/QueueBatch abstractions, they don't know anything about Azure Cloud queues. Neither does src/QueueBatch/Impl/Queues/FastCloudQueue.cs (just a quick glance, I suppose there are more). Do you think it's reasonable to make the lib more pluggable, etc. QueueBatch.Abstractions along with QueueBatch.Azure, QueueBatch.Aws, QueueBatch.RabbitMq etc.? This could make the whole a highly performant solution of any queue provider out there.

  2. I'm also wondering whether it's worthwhile as it could also be that the performance improvement trick is strictly related with Azure queue specific stuff. Is it? Well, from what you said in https://github.com/Scooletz/QueueBatch#why it shouldn't be the case.

Just an idea, what do you think?

Number of function executions

Hey @Scooletz

Could be a silly question, lets say my function receives a batch of 32 messages as an example, when it comes to calculate Azure Function pricing, will it be counted as executed 32 times or only once? My guess is latter, just want to be sure.
Thanks

Add an option to specify the number of messages to retrieve

I see that the number of messages that are retrieved in one call is set to 32 (or CloudQueueMessage.MaxNumberOfMessagesToPeek, which I assume is 32 also).

Do you see any issue in extending the functionality to make this number configurable?

Clarify usage

I struggled a bit to figure out how to actually get a hold of my queued message. Maybe clarify in docs (not sure if what I got is the best way...) what to do with a Memory<byte> object? Maybe my lack of c# knowledge but I think a clarification might be useful?!

so I 'd add a message as usual:

 await batchQueue.AddMessageAsync(new CloudQueueMessage(JsonConvert.SerializeObject(myDTO)));

and then consume it in my trigger

public static async Task RunAsync([QueueTrigger("batch-queue")]MyDTO myDTO, ILogger log)
{
        // use myDTO here ...
}

using BatchQueueTrigger

public static async System.Task RunAsync([QueueBatchTrigger("batch-queue")]IMessageBatch batch,  ILogger log)
{
        foreach (var msg in batch.Messages)
        {
                MyDTO dto = JsonConvert.DeserializeObject<MyDTO>(System.Text.Encoding.Default.GetString(msg.Payload.ToArray()))
        }
        batch.MarkAllAsProcessed();
}

is there a better way?

Retrieving and processing multiple batches in parallel

I see in issue #13 that QueueBatch doesn't respect host.json for the maxPollingInterval setting. Does it also ignore the rest of the host.json settings for queues as well?

"visibilityTimeout": "00:00:05",
"batchSize": 16,
"maxDequeueCount": 5,
"newBatchThreshold": 8,
"messageEncoding": "base64"

In particular I'm interested in setting the # of batches that can be retrieved and processed in parallel. The equivalent setting for single messages is the batchSize parameter. Also the newBatchThreshold for getting a new batch when the # processed gets down to a certain threshold. Is there an equivalent for these settings using the attribute or does QueueBatch already respect these settings from host.json?

Messages in a batch marked as processed get picked up again if others cause exception

Hey

I have scenario where handles messages in parallel, so the way I have this done is to wrap them in their own Task which calls MarkAsProcessed at the end, then await Task.WhenAll(allTasks).

It works fine with happy path, however if one of task causes exception, the function will again pick up all processed messages which seems wrong to me. so I had a look at the source code, and realize it doesn't dobatch.Complete in case where exception occurred

catch (Exception ex) when (ex.InnerException != null &&

Do you think it is a valid feature/bug to add/fix? if so, I don't mind create a PR for it.

Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.