Overview
Savvyio.Extensions.RabbitMQ adds concrete RabbitMQ transports to the Savvyio messaging model. It supplies RabbitMqCommandQueue for ICommand work queues and RabbitMqEventBus for IIntegrationEvent publish-subscribe flows, extending the message envelope model from Savvyio.Messaging for the command and event abstractions in Savvyio.Commands and Savvyio.EventDriven.
The package leaves serialization to an injected IMarshaller and transport settings to option types. Construction stays lazy until a send, receive, publish, or subscribe call initializes connectivity, and the functional tests show the transports round-trip plain messages, signed messages, and CloudEvent-based integration event envelopes with both text.json and Newtonsoft-based marshallers.
Key APIs
RabbitMqCommandQueue implements IPointToPointChannel<ICommand> as a named RabbitMQ work queue. SendAsync declares the queue and publishes each envelope with a type header that captures the concrete message type, while ReceiveAsync yields an IAsyncEnumerable<IMessage<ICommand>>, wires AcknowledgeAsync() back to BasicAckAsync, and can acknowledge automatically when RabbitMqCommandQueueOptions.AutoAcknowledge is true; the tests also show two receivers on the same queue splitting a 100-message batch across consumers instead of duplicating it.
RabbitMqCommandQueueOptions configures the queue-specific parts of the transport. It requires QueueName, defaults Durable to true, leaves AutoAcknowledge, Exclusive, and AutoDelete at false, and inherits the shared AmqpUrl and Persistent settings from RabbitMqMessageOptions.
RabbitMqEventBus implements IPublishSubscribeChannel<IIntegrationEvent> on top of a RabbitMQ fanout exchange. PublishAsync declares the configured exchange and publishes a single integration event envelope, while SubscribeAsync declares a server-named queue, binds it to that exchange, resolves the runtime envelope type from the type header, and invokes the supplied handler for each delivered message; the tests show separate subscribers each receive the full published set, including signed and CloudEvent-based envelopes.
RabbitMqEventBusOptions adds the required ExchangeName setting for the publish-subscribe side of the transport. The option surface is otherwise the shared RabbitMQ configuration from RabbitMqMessageOptions, so the exchange name is the stable identifier that subscribers and publishers coordinate around.
RabbitMqMessage is the common base class for both public transports. It owns the injected IMarshaller, performs one-time lazy initialization of the RabbitMQ connection and channel, disposes those resources asynchronously, and exposes GetHealthCheckTargetAsync for health probes that need an IConnection.
RabbitMqMessageOptions holds the shared transport settings. It defaults AmqpUrl to amqp://localhost:5672, exposes Persistent for broker-level message durability, and validates that a non-null AMQP endpoint has been supplied before a transport instance is constructed.
Basic usage
using Codebelt.Extensions.Xunit;
using Savvyio;
using Savvyio.Extensions.RabbitMQ;
using Savvyio.Extensions.RabbitMQ.Commands;
using Savvyio.Extensions.RabbitMQ.EventDriven;
using System;
using System.IO;
using System.Threading.Tasks;
using Xunit;
namespace MyProject.Tests;
public class RabbitMqTransportSetupTests : Test
{
public RabbitMqTransportSetupTests(ITestOutputHelper output) : base(output) { }
[Fact]
public async Task CreateQueueAndBus_WithValidatedOptions_ConstructsLazyRabbitMqTransports()
{
var marshaller = new NoOpMarshaller();
var queueOptions = new RabbitMqCommandQueueOptions { QueueName = "members.commands", Durable = true, Persistent = true };
var busOptions = new RabbitMqEventBusOptions { AmqpUrl = queueOptions.AmqpUrl, ExchangeName = "members.events", Persistent = true };
queueOptions.ValidateOptions();
busOptions.ValidateOptions();
await using var queue = new RabbitMqCommandQueue(marshaller, queueOptions);
await using var bus = new RabbitMqEventBus(marshaller, busOptions);
TestOutput.WriteLine($"Queue '{queueOptions.QueueName}' and exchange '{busOptions.ExchangeName}' are ready for RabbitMQ operations.");
Assert.IsAssignableFrom<RabbitMqMessage>(queue);
Assert.IsAssignableFrom<RabbitMqMessage>(bus);
}
private sealed class NoOpMarshaller : IMarshaller
{
public Stream Serialize<TValue>(TValue value) => Stream.Null;
public Stream Serialize(object value, Type inputType) => Stream.Null;
public TValue Deserialize<TValue>(Stream data) => throw new NotSupportedException();
public object Deserialize(Stream data, Type returnType) => throw new NotSupportedException();
}
}
Use this pattern when you want to assemble RabbitMQ transport instances from validated queue and exchange settings before handing them to a host or composition root. It matters because the package keeps broker configuration explicit in option types while deferring connectivity until an actual send, receive, publish, or subscribe operation begins.
Installation
dotnet add package Savvyio.Extensions.RabbitMQ
Usage guidance
Use this package when your application already models commands or integration events as Savvyio message envelopes and you want RabbitMQ-backed work queues or fanout event delivery without rewriting those contracts. If you need Microsoft DI registration helpers, use Savvyio.Extensions.DependencyInjection.RabbitMQ, and if you do not need Savvyio envelopes or marshaller-based serialization, the lower-level RabbitMQ.Client API is the better fit.
Family packages
- 🏭Savvyio.App
- 📦Savvyio.Commands
- 📦Savvyio.Commands.Messaging
- 📦Savvyio.Core
- 📦Savvyio.Domain
- 📦Savvyio.Domain.EventSourcing
- 📦Savvyio.EventDriven
- 📦Savvyio.EventDriven.Messaging
- 📦Savvyio.Extensions.Dapper
- 📦Savvyio.Extensions.DapperExtensions
- 📦Savvyio.Extensions.DependencyInjection
- 📦Savvyio.Extensions.DependencyInjection.Dapper
- 📦Savvyio.Extensions.DependencyInjection.DapperExtensions
- 📦Savvyio.Extensions.DependencyInjection.Domain
- 📦Savvyio.Extensions.DependencyInjection.EFCore
- 📦Savvyio.Extensions.DependencyInjection.EFCore.Domain
- 📦Savvyio.Extensions.DependencyInjection.EFCore.Domain.EventSourcing
- 📦Savvyio.Extensions.DependencyInjection.NATS
- 📝Savvyio.Extensions.DependencyInjection.Newtonsoft.Json
- 📦Savvyio.Extensions.DependencyInjection.QueueStorage
- 📦Savvyio.Extensions.DependencyInjection.RabbitMQ
- 📦Savvyio.Extensions.DependencyInjection.SimpleQueueService
- 📝Savvyio.Extensions.DependencyInjection.Text.Json
- 📦Savvyio.Extensions.Dispatchers
- 📦Savvyio.Extensions.EFCore
- 📦Savvyio.Extensions.EFCore.Domain
- 📦Savvyio.Extensions.EFCore.Domain.EventSourcing
- 📦Savvyio.Extensions.NATS
- 📝Savvyio.Extensions.Newtonsoft.Json
- 📦Savvyio.Extensions.QueueStorage
- 📦Savvyio.Extensions.SimpleQueueService
- 📝Savvyio.Extensions.Text.Json
- 📦Savvyio.Messaging
- 📦Savvyio.Queries