# Batching
In some scenarios, high message volume can lead to consumer resource bottlenecks. If a system is publishing thousands of messages per second, and has a consumer that is writing the content of those messages to some type of storage, the storage system might not be optimized for thousands of individual writes per second. It may, however, perform better if writes are performed in batches. For example, receiving one hundred messages and then writing the content of those messages using a single storage operation may be significantly more efficient (and faster).
MassTransit supports receiving messages and delivering those messages to a consumer as a Batch
.
To create a batch consumer, consume the Batch<T>
interface, where T
is the message type. That consumer can then be configured using the container integration, with the batch options specified in a consumer definition. The example below consumes a batch of OrderAudit events, up to 100 at a time, and up to 10 concurrent batches.
namespace BatchingConsumer
{
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.ConsumeConfigurators;
using MassTransit.Definition;
public interface OrderAudit
{
}
class OrderAuditConsumer :
IConsumer<Batch<OrderAudit>>
{
public async Task Consume(ConsumeContext<Batch<OrderAudit>> context)
{
for(int i = 0; i < context.Message.Length; i++)
{
ConsumeContext<OrderAudit> audit = context.Message[i];
}
}
}
class OrderAuditConsumerDefinition :
ConsumerDefinition<OrderAuditConsumer>
{
public OrderAuditConsumerDefinition()
{
Endpoint(x => x.PrefetchCount = 1000);
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<OrderAuditConsumer> consumerConfigurator)
{
consumerConfigurator.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(1000)
.SetConcurrencyLimit(10));
}
}
}
Once the consumer has been created, configure the consumer on a receive endpoint (in this case, using the default convention).
namespace BatchingConsumerBus
{
using System;
using System.Threading.Tasks;
using BatchingConsumer;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<OrderAuditConsumer>(typeof(OrderAuditConsumerDefinition));
x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
});
}
}
}
If automatic receive endpoint configuration is not used, the receive endpoint can be configured explicitly.
namespace BatchingConsumerExplicit
{
using System;
using System.Threading.Tasks;
using BatchingConsumer;
using MassTransit;
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("audit-service", e =>
{
e.PrefetchCount = 1000;
e.Batch<OrderAudit>(b =>
{
b.MessageLimit = 100;
b.ConcurrencyLimit = 10;
b.TimeLimit = TimeSpan.FromSeconds(1);
b.Consumer(() => new OrderAuditConsumer());
});
});
});
}
}
}
PrefetchCount
Every transport has its own limitations that may constrain the batch size. For instance, Amazon SQS fetches ten messages at a time, making it an optimal batch size. It is best to experiment and see what works best in your environment.
If the PrefetchCount is lower than the batch limit, performance will be limited by the time limit as the batch size will never be reached.
For instance, when using Azure Service Bus, there are two settings which must be configured as shown below.
namespace BatchingConsumerAzure
{
using System;
using System.Threading.Tasks;
using BatchingConsumer;
using MassTransit;
using MassTransit.Azure.ServiceBus.Core;
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
cfg.ReceiveEndpoint("audit-service", e =>
{
e.PrefetchCount = 100;
e.MaxConcurrentCalls = 100;
e.Batch<OrderAudit>(b =>
{
b.MessageLimit = 100;
b.TimeLimit = TimeSpan.FromSeconds(3);
b.Consumer(() => new OrderAuditConsumer());
});
});
});
}
}
}
# Batch Interface
The Batch
interface, shown below, also includes the first message receipt time, the last message receipt time, and the completion mode of the batch (message limit or time limit was reached).
namespace MassTransit
{
using System;
using System.Collections.Generic;
/// <summary>
/// A batch of messages which are delivered to a consumer all at once
/// </summary>
/// <typeparam name="T"></typeparam>
public interface Batch<out T> :
IEnumerable<ConsumeContext<T>>
where T : class
{
BatchCompletionMode Mode { get; }
/// <summary>
/// When the first message in this batch was received
/// </summary>
DateTime FirstMessageReceived { get; }
/// <summary>
/// When the last message in this batch was received
/// </summary>
DateTime LastMessageReceived { get; }
/// <summary>
/// Returns the message at the specified index
/// </summary>
/// <param name="index"></param>
ConsumeContext<T> this[int index] { get; }
/// <summary>
/// The number of messages in this batch
/// </summary>
int Length { get; }
}
}