# Job Consumers
When a message is delivered from the message broker to a consumer instance, the message is locked by the broker. Once the consumer completes, MassTransit will acknowledge the message on the broker, removing it from the queue. While the message is locked, it will not be delivered to another consumer – on any bus instance reading from the same queue (competing consumer pattern). However, if the broker connection is lost the message will be unlocked and redelivered to a new consumer instance. The lock timeout is usually long enough for most message consumers, and this rarely is an issue in practice for consumers that complete quickly.
However, there are plenty of use cases where consumers may run for a longer duration, from minutes to even hours. In these situations, a job consumer may be used to decouple the consumer from the broker. A job consumer is a specialized consumer designed to execute jobs, defined by implementing the IJobConsumer<T>
interface where T
is the job message type. Job consumers may be used for long-running tasks, such as converting a video file, but can really be used for any task. Job consumers have additional requirements, such as a database to store the job messages, manage concurrency and retry, and report job completion or failure.
WARNING
MassTransit includes a job service that keeps track of each job, assigns jobs to service instances, and schedules job retries when necessary. The job service uses three saga state machines and the default configuration uses an in-memory saga repository, which is not durable. When using job consumers for production use cases, configuring durable saga repositories is highly recommended to avoid possible message loss.
Check out the sample project (opens new window) on GitHub, which includes the Entity Framework configuration for the job service state machines.
To use job consumers, a service instance must be configured (see below).
# IJobConsumer
A job consumer implements the IJobConsumer<T>
interface, shown below.
namespace MassTransit
{
using System.Threading.Tasks;
/// <summary>
/// Defines a message consumer which runs a job asynchronously, without waiting, which is monitored by Conductor
/// services, to monitor the job, limit concurrency, etc.
/// </summary>
/// <typeparam name="TJob">The job message type</typeparam>
public interface IJobConsumer<in TJob> :
IConsumer
where TJob : class
{
Task Run(JobContext<TJob> context);
}
}
# Configuration
The example below configures a job consumer on a receive endpoint named using an IEndpointNameFormatter passing the consumer type.
namespace JobSystem.Jobs
{
using System;
public interface ConvertVideo
{
Guid VideoId { get; }
string Format { get; }
}
}
namespace JobSystemConsoleService
{
using System;
using System.Threading;
using System.Threading.Tasks;
using JobSystem.Jobs;
using MassTransit;
using MassTransit.JobService;
using Microsoft.Extensions.DependencyInjection;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<ConvertVideoJobConsumer>(cfg =>
{
cfg.Options<JobOptions<ConvertVideo>>(options => options
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(10));
});
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ServiceInstance(instance =>
{
instance.ConfigureJobServiceEndpoints();
instance.ConfigureEndpoints(context);
});
});
});
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token);
try
{
Console.WriteLine("Press enter to exit");
await Task.Run(() => Console.ReadLine());
}
finally
{
await busControl.StopAsync();
}
}
public class ConvertVideoJobConsumer :
IJobConsumer<ConvertVideo>
{
public async Task Run(JobContext<ConvertVideo> context)
{
// simulate converting the video
await Task.Delay(TimeSpan.FromMinutes(3));
}
}
}
}
In this example, the job timeout as well as the number of concurrent jobs allowed is specified using JobOptions<T>
when configuring the consumer. The job options can also be specified using a consumer definition in the same way.
# Client
To submit jobs to the job consumer, use the service client to create a request client as shown, and send the request. The JobId is assigned the RequestId value.
namespace JobSystemClient
{
using System;
using System.Threading;
using System.Threading.Tasks;
using JobSystem.Jobs;
using MassTransit;
using MassTransit.Contracts.JobService;
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingRabbitMq();
var source = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await busControl.StartAsync(source.Token);
try
{
var requestClient = busControl.CreateRequestClient<ConvertVideo>();
do
{
string value = await Task.Run(() =>
{
Console.WriteLine("Enter video format (or quit to exit)");
Console.Write("> ");
return Console.ReadLine();
});
if("quit".Equals(value, StringComparison.OrdinalIgnoreCase))
break;
var response = await requestClient.GetResponse<JobSubmissionAccepted>(new
{
VideoId = NewId.NextGuid(),
Format = value
});
}
while (true);
}
finally
{
await busControl.StopAsync();
}
}
}
}
# Job Service Endpoints
The job service saga state machines are configured on their own endpoints, using the configured endpoint name formatter. These endpoints are required on at least one bus instance. Additionally, it is not necessary to configure them on every bus instance. In the example above, the job service endpoint are configured. Another method, ConfigureJobService, is used to configure the job service without configuring the saga state machine endpoints. In situations where there are many bus instances with job consumers, it is suggested that only one or two instances host the job service endpoints to avoid concurrency issues with the sagas repositories – particularly when optimistic locking is used.
To configure a service instance without the job service endpoints, replace ConfigureJobServiceEndpoints with ConfigureJobService.
x.UsingRabbitMq((context, cfg) =>
{
cfg.ServiceInstance(instance =>
{
instance.ConfigureJobService();
instance.ConfigureEndpoints(context);
});
});
For a more detailed example of configuring the job service endpoints, including persistent storage, see the sample mentioned in the warning box above.