# Containers
MassTransit supports several dependency injection containers. And since Microsoft introduced its own container, it has become the most commonly used container.
Optional
MassTransit does not require a container, as demonstrated in the configuration example. So if you aren't already using a container, you can get started without having adopt one. However, when you're ready to use a container, perhaps to deploy your service using the .NET Generic Host, you will likely want to use Microsoft's built-in solution.
Regardless of which container is used, supported containers have a consistent registration syntax used to add consumers, sagas, and activities, as well as configure the bus. Behind the scenes, MassTransit is configuring the container, including container-specific features such as scoped lifecycles, consistently and correctly. Use of the registration syntax has drastically reduced container configuration support questions.
# Consumer Registration
Uses MassTransit.Extensions.DependencyInjection (opens new window)
To configure a bus using RabbitMQ and register the consumers, sagas, and activities to be used by the bus, call the AddMassTransit
extension method. The UsingRabbitMq method can be changed to the appropriate method for the proper transport if RabbitMQ is not being used.
namespace MicrosoftContainer
{
using System;
using System.Threading;
using System.Threading.Tasks;
using ContainerConsumers;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
});
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
try
{
Console.WriteLine("Press enter to exit");
await Task.Run(() => Console.ReadLine());
}
finally
{
await busControl.StopAsync();
}
}
}
}
The AddConsumer
method is one of several methods used to register consumers, some of which are shown below.
namespace MicrosoftContainerAddConsumer
{
using System;
using System.Threading.Tasks;
using ContainerConsumers;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
// Add a single consumer
x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
// Add a single consumer by type
x.AddConsumer(typeof(SubmitOrderConsumer), typeof(SubmitOrderConsumerDefinition));
// Add all consumers in the specified assembly
x.AddConsumers(typeof(SubmitOrderConsumer).Assembly);
// Add all consumers in the namespace containing the specified type
x.AddConsumersFromNamespaceContaining<SubmitOrderConsumer>();
});
}
}
}
# Consumer Definition
A consumer definition is used to configure the receive endpoint and pipeline behavior for the consumer. When scanning assemblies or namespaces for consumers, consumer definitions are also found and added to the container. The SubmitOrderConsumer and matching definition are shown below.
namespace ContainerConsumers
{
using System;
using System.Threading.Tasks;
using ContainerContracts;
using GreenPipes;
using MassTransit;
using MassTransit.ConsumeConfigurators;
using MassTransit.Definition;
using Microsoft.Extensions.Logging;
class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
ILogger<SubmitOrderConsumer> _logger;
public SubmitOrderConsumer(ILogger<SubmitOrderConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
_logger.LogInformation("Order Submitted: {OrderId}", context.Message.OrderId);
await context.Publish<OrderSubmitted>(new
{
context.Message.OrderId
});
}
}
class SubmitOrderConsumerDefinition :
ConsumerDefinition<SubmitOrderConsumer>
{
public SubmitOrderConsumerDefinition()
{
// override the default endpoint name
EndpointName = "order-service";
// limit the number of messages consumed concurrently
// this applies to the consumer only, not the endpoint
ConcurrentMessageLimit = 8;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator)
{
// configure message retry with millisecond intervals
endpointConfigurator.UseMessageRetry(r => r.Intervals(100,200,500,800,1000));
// use the outbox to prevent duplicate events from being published
endpointConfigurator.UseInMemoryOutbox();
}
}
}
# Endpoint Definition
To configure the endpoint for a consumer registration, or override the endpoint configuration in the definition, the Endpoint
method can be added to the consumer registration. This will create an endpoint definition for the consumer, and register it in the container. This method is available on consumer and saga registrations, with separate execute and compensate endpoint methods for activities.
namespace MicrosoftContainerAddConsumerEndpoint
{
using System;
using System.Threading.Tasks;
using ContainerConsumers;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition))
.Endpoint(e =>
{
// override the default endpoint name
e.Name = "order-service-extreme";
// specify the endpoint as temporary (may be non-durable, auto-delete, etc.)
e.Temporary = false;
// specify an optional concurrent message limit for the consumer
e.ConcurrentMessageLimit = 8;
// only use if needed, a sensible default is provided, and a reasonable
// value is automatically calculated based upon ConcurrentMessageLimit if
// the transport supports it.
e.PrefetchCount = 16;
// set if each service instance should have its own endpoint for the consumer
// so that messages fan out to each instance.
e.InstanceId = "something-unique";
});
x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
});
}
}
}
When the endpoint is configured after the AddConsumer method, the configuration then overrides the endpoint configuration in the consumer definition. However, it cannot override the EndpointName
if it is specified in the constructor. The order of precedence for endpoint naming is explained below.
Specifying
EndpointName = "submit-order-extreme"
in the constructor which cannot be overriddenx.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>() public SubmitOrderConsumerDefinition() { EndpointName = "submit-order-extreme"; }
Specifying
.Endpoint(x => x.Name = "submit-order-extreme")
in the consumer registration, chained toAddConsumer
x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>() .Endpoint(x => x.Name = "submit-order-extreme"); public SubmitOrderConsumerDefinition() { Endpoint(x => x.Name = "not used"); }
Specifying
Endpoint(x => x.Name = "submit-order-extreme")
in the constructor, which creates an endpoint definitionx.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>() public SubmitOrderConsumerDefinition() { Endpoint(x => x.Name = "submit-order-extreme"); }
Unspecified, the endpoint name formatter is used (in this case, the endpoint name is
SubmitOrder
using the default formatter)x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>() public SubmitOrderConsumerDefinition() { }
# Bus Configuration
In the above examples, the bus is configured by the UsingRabbitMq method, which is passed two arguments. context
is the registration context, used to configure endpoints. cfg
is the bus factory configurator, used to configure the bus. The above examples use the default conventions to configure the endpoints. Alternatively, endpoints can be explicitly configured. However, when configuring endpoints manually, the ConfigureEndpoints methods should not be used (duplicate endpoints may result).
ConfigureEndpoints uses an IEndpointNameFormatter
to generate endpoint names, which by default uses a PascalCase formatter. There are two additional endpoint name formatters included, snake and kebab case.
For the SubmitOrderConsumer, the endpoint names would be:
Formatter | Name |
---|---|
Default | SubmitOrder |
Snake Case | submit_order |
Kebab Case | submit-order |
All of the included formatters trim the Consumer, Saga, or Activity suffix from the end of the class name. If the consumer name is generic, the generic parameter type is used instead of the generic type.
Video
Learn about the default conventions as well as how to tailor the naming style to meet your requirements in this short video (opens new window).
The endpoint name formatter can be set as shown below.
namespace MicrosoftContainerFormatter
{
using System;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.SetSnakeCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
});
}
}
}
The endpoint formatter can also be passed to the ConfigureEndpoints method as shown.
namespace MicrosoftContainerFormatterInline
{
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.Definition;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context, KebabCaseEndpointNameFormatter.Instance);
});
});
}
}
}
To explicitly configure endpoints, use the ConfigureConsumer and/or ConfigureConsumers methods.
namespace MicrosoftContainerConfigureConsumer
{
using System;
using System.Threading.Tasks;
using ContainerConsumers;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
x.UsingRabbitMq((context, cfg) =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.ConfigureConsumer<SubmitOrderConsumer>(context);
});
});
});
}
}
}
When using ConfigureConsumer, the EndpointName, PrefetchCount, and Temporary properties of the consumer definition are not used.
# Saga Registration
To add a state machine saga, use the AddSagaStateMachine methods. For a consumer saga, use the AddSaga methods.
Important
State machine sagas should be added before class-based sagas, and the class-based saga methods should not be used to add state machine sagas. This may be simplified in the future, but for now, be aware of this registration requirement.
containerBuilder.AddMassTransit(r =>
{
// add a state machine saga, with the in-memory repository
r.AddSagaStateMachine<OrderStateMachine, OrderState>()
.InMemoryRepository();
// add a consumer saga with the in-memory repository
r.AddSaga<OrderSaga>()
.InMemoryRepository();
// add a saga by type, without a repository. The repository should be registered
// in the container elsewhere
r.AddSaga(typeof(OrderSaga));
// add a state machine saga by type, including a saga definition for that saga
r.AddSagaStateMachine(typeof(OrderState), typeof(OrderStateDefinition))
// add all saga state machines by type
r.AddSagaStateMachines(Assembly.GetExecutingAssembly());
// add all sagas in the specified assembly
r.AddSagas(Assembly.GetExecutingAssembly());
// add sagas from the namespace containing the type
r.AddSagasFromNamespaceContaining<OrderSaga>();
r.AddSagasFromNamespaceContaining(typeof(OrderSaga));
});
To add a saga registration and configure the consumer endpoint in the same expression, a definition can automatically be created.
containerBuilder.AddMassTransit(r =>
{
r.AddSagaStateMachine<OrderStateMachine, OrderState>()
.NHibernateRepository()
.Endpoint(e =>
{
e.Name = "order-state";
e.ConcurrentMessageLimit = 8;
});
});
Supported saga persistence storage engines are documented in the saga documentation section.