# Mediator
MassTransit includes a mediator implementation, with full support for consumers, handlers, and sagas (including saga state machines). MassTransit Mediator runs in-process and in-memory, no transport is required. For maximum performance, messages are passed by reference, instead than being serialized, and control flows directly from the Publish/Send caller to the consumer. If a consumer throws an exception, the Publish/Send method throws and the exception should be handled by the caller.
Mediator
Mediator is a behavioral design pattern (opens new window) in which a mediator encapsulates communication between objects to reduce coupling.
# Configuration
Creating and configuring a mediator is similar to a bus, but uses the CreateMediator factory method. Consumers and sagas are configured the same way they would on a receive endpoint. The example below configures the mediator with a single consumer.
namespace UsageMediator
{
using System;
using System.Threading.Tasks;
using UsageConsumer;
using MassTransit;
using MassTransit.Mediator;
public class Program
{
public static async Task Main()
{
IMediator mediator = Bus.Factory.CreateMediator(cfg =>
{
cfg.Consumer<SubmitOrderConsumer>();
});
}
}
}
Once created, the mediator doesn't need to be started or stopped and can be used immediately. IMediator combines several other interfaces into a single interface, including IPublishEndpoint, ISendEndpoint, and IClientFactory.
namespace MassTransit.Mediator
{
public interface IMediator :
ISendEndpoint,
IPublishEndpoint,
IClientFactory,
IConsumePipeConnector,
IRequestPipeConnector,
IConsumeObserverConnector,
IConsumeMessageObserverConnector
{
}
}
MassTransit dispatches the command to the consumer asynchronously. Once the Consume method completes, the Send method will complete. If the consumer throws an exception, it will be propagated back to the caller.
Send vs Publish
Send expects the message to be consumed. If there is no consumer configured for the message type, an exception will be thrown.
Publish, on the other hand, does not require the message to be consumed and does not throw an exception if the message isn't consumed. To throw an exception when a published message is not consumed, set the Mandatory property to true on PublishContext.
# Connect
Consumers can be connected and disconnected from mediator at run-time, allowing components and services to temporarily consume messages. Use the ConnectConsumer method to connect a consumer. The handle can be used to disconnect the consumer.
namespace UsageMediatorConnect
{
using System;
using System.Threading.Tasks;
using UsageConsumer;
using MassTransit;
using MassTransit.Mediator;
public class Program
{
public static async Task Main()
{
IMediator mediator = Bus.Factory.CreateMediator(cfg =>
{
});
var handle = mediator.ConnectConsumer<SubmitOrderConsumer>();
}
}
}
# Requests
To send a request using the mediator, a request client can be created from IMediator. The example below configures two consumers and then sends the SubmitOrder command, followed by the GetOrderStatus request.
namespace UsageMediatorRequest
{
using System;
using System.Threading.Tasks;
using UsageContracts;
using UsageConsumer;
using UsageMediatorConsumer;
using MassTransit;
using MassTransit.Mediator;
public class Program
{
public static async Task Main()
{
IMediator mediator = Bus.Factory.CreateMediator(cfg =>
{
cfg.Consumer<SubmitOrderConsumer>();
cfg.Consumer<OrderStatusConsumer>();
});
Guid orderId = NewId.NextGuid();
await mediator.Send<SubmitOrder>(new { OrderId = orderId });
var client = mediator.CreateRequestClient<GetOrderStatus>();
var response = await client.GetResponse<OrderStatus>(new { OrderId = orderId });
Console.WriteLine("Order Status: {0}", response.Message.Status);
}
}
}
The OrderStatusConsumer, along with the message contracts, is shown below.
namespace UsageMediatorConsumer
{
using System;
using System.Threading.Tasks;
using MassTransit;
public interface GetOrderStatus
{
Guid OrderId { get; }
}
public interface OrderStatus
{
Guid OrderId { get; }
string Status { get; }
}
class OrderStatusConsumer :
IConsumer<GetOrderStatus>
{
public async Task Consume(ConsumeContext<GetOrderStatus> context)
{
await context.RespondAsync<OrderStatus>(new
{
context.Message.OrderId,
Status = "Pending"
});
}
}
}
Just like Send, the request is executed asynchronously. If an exception occurs, the exception will be propagated back to the caller. If the request times out, or if the request is canceled, the GetResponse method will throw an exception (either a RequestTimeoutException or an OperationCanceledException).
# Containers
To configure mediator using a container, use the AddMediator method.
namespace UsageMediatorContainer
{
using System;
using System.Threading.Tasks;
using UsageContracts;
using UsageConsumer;
using UsageMediatorConsumer;
using MassTransit;
using MassTransit.Mediator;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMediator(cfg =>
{
cfg.AddConsumer<SubmitOrderConsumer>();
cfg.AddConsumer<OrderStatusConsumer>();
});
var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();
Guid orderId = NewId.NextGuid();
await mediator.Send<SubmitOrder>(new { OrderId = orderId });
var client = mediator.CreateRequestClient<GetOrderStatus>();
var response = await client.GetResponse<OrderStatus>(new { OrderId = orderId });
Console.WriteLine("Order Status: {0}", response.Message.Status);
}
}
}
Consumers and sagas (including saga repositories) can be added, routing slip activities are not supported using mediator. Consumer and saga definitions are supported as well, but certain properties like EndpointName are ignored. Middleware components, including UseMessageRetry and UseInMemoryOutbox, are fully supported.
# Middleware
MassTransit Mediator is built using the same components used to create a bus, which means all the same middleware components can be configured. For instance, to configure the Mediator pipeline, such as adding a scoped filter, see the example below.
namespace UsageMediatorConfigure
{
using System;
using System.Threading.Tasks;
using GreenPipes;
using GreenPipes.Internals.Extensions;
using UsageContracts;
using UsageConsumer;
using UsageMediatorConsumer;
using MassTransit;
using MassTransit.Mediator;
using Microsoft.Extensions.DependencyInjection;
public class ValidateOrderStatusFilter<T> :
IFilter<SendContext<T>>
where T : class
{
public void Probe(ProbeContext context)
{
}
public Task Send(SendContext<T> context, IPipe<SendContext<T>> next)
{
if (context.Message is GetOrderStatus getOrderStatus && getOrderStatus.OrderId == Guid.Empty)
throw new ArgumentException("The OrderId must not be empty");
return next.Send(context);
}
}
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMediator(cfg =>
{
cfg.AddConsumer<SubmitOrderConsumer>();
cfg.AddConsumer<OrderStatusConsumer>();
cfg.ConfigureMediator((context, mcfg) =>
{
mcfg.UseSendFilter(typeof(ValidateOrderStatusFilter<>), context);
});
});
var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();
Guid orderId = NewId.NextGuid();
await mediator.Send<SubmitOrder>(new { OrderId = orderId });
var client = mediator.CreateRequestClient<GetOrderStatus>();
var response = await client.GetResponse<OrderStatus>(new { OrderId = orderId });
Console.WriteLine("Order Status: {0}", response.Message.Status);
}
}
}
# HTTP Context Scope
A common question lately has been around the use of MassTransit's Mediator with ASP.NET Core, specifically the scope created for controllers. In cases where it is desirable to use the same scope for Mediator consumers that was created by the controller, the HttpContextScopeAccessor
can be used as shown below.
First, to configure the scope accessor, add the following to the services configuration:
services.AddHttpContextAccessor();
services.AddMediator(configurator =>
{
configurator.AddConsumer<SampleMessageConsumer>();
configurator.ConfigureMediator((context, cfg) => cfg.UseHttpContextScopeFilter(context));
});
The UseHttpContextScopeFilter
is an extension method that needs to be added to the project:
public static class MediatorHttpContextScopeFilterExtensions
{
public static void UseHttpContextScopeFilter(this IMediatorConfigurator configurator, IServiceProvider serviceProvider)
{
var filter = new HttpContextScopeFilter(serviceProvider.GetRequiredService<IHttpContextAccessor>());
configurator.ConfigurePublish(x => x.UseFilter(filter));
configurator.ConfigureSend(x => x.UseFilter(filter));
configurator.UseFilter(filter);
}
}
The extension method uses the HttpContextScopeFilter
, shown below, which also needs to be added to the project:
public class HttpContextScopeFilter :
IFilter<PublishContext>,
IFilter<SendContext>,
IFilter<ConsumeContext>
{
private readonly IHttpContextAccessor _httpContextAccessor;
public HttpContextScopeFilter(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
}
private void AddPayload(PipeContext context)
{
if (_httpContextAccessor.HttpContext == null)
return;
var serviceProvider = _httpContextAccessor.HttpContext.RequestServices;
context.GetOrAddPayload(() => serviceProvider);
context.GetOrAddPayload<IServiceScope>(() => new NoopScope(serviceProvider));
}
public Task Send(PublishContext context, IPipe<PublishContext> next)
{
AddPayload(context);
return next.Send(context);
}
public Task Send(SendContext context, IPipe<SendContext> next)
{
AddPayload(context);
return next.Send(context);
}
public Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
{
AddPayload(context);
return next.Send(context);
}
public void Probe(ProbeContext context)
{
}
private class NoopScope :
IServiceScope
{
public NoopScope(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
}
public void Dispose()
{
}
public IServiceProvider ServiceProvider { get; }
}
}
Once the above have been added, the controller scope will be passed through the mediator send and consume filters so that the controller scope is used for the consumers.