Codebelt

Savvyio.Extensions.NATS

NATS-backed command queues and event buses for Savvyio message envelopes.

.NET 10.0 / .NET 9.0 MIT v5.0.8 2,825 downloads

Overview

Savvyio.Extensions.NATS adds two transport implementations to the Savvyio messaging model: NatsCommandQueue for ICommand work queues on NATS JetStream, and NatsEventBus for IIntegrationEvent publish-subscribe on NATS subjects. It builds on the message envelopes from Savvyio.Messaging, so callers publish and receive IMessage<T> values instead of working directly with NATS payloads.

The package leaves serialization to an injected IMarshaller and transport configuration to option types. The tests show the NATS transports round-trip plain messages, signed messages, and CloudEvent-based integration event messages when the chosen marshaller can serialize those envelopes.

Key APIs

NatsCommandQueue implements IPointToPointChannel<ICommand> on top of JetStream work-queue retention. SendAsync publishes each command envelope with a type header that records the concrete message type, while ReceiveAsync creates or updates the configured stream and consumer, deserializes incoming envelopes, and wires AcknowledgeAsync() on the returned message to a JetStream acknowledgment.

NatsCommandQueueOptions configures the queue-specific parts of the JetStream transport. In addition to the shared NATS subject and server URI, it requires StreamName and ConsumerName, exposes AutoAcknowledge, MaxMessages, Expires, and Heartbeat, clamps MaxMessages to the range 1 through short.MaxValue, and automatically sets Heartbeat to 5 seconds when Expires is 30 seconds or more.

NatsEventBus implements IPublishSubscribeChannel<IIntegrationEvent> for NATS core publish-subscribe. PublishAsync serializes one integration event envelope to the configured subject, SubscribeAsync resolves the concrete message type from the type header before invoking the supplied handler, and the protected virtual PublishMessageAsync and SubscribeMessagesAsync hooks make it possible to adapt or isolate the transport boundary.

NatsMessageOptions is the shared configuration base for both transports. It provides NatsUrl, which defaults to nats://127.0.0.1:4222, requires a non-empty Subject, and is the full option surface for NatsEventBusOptions because that derived type does not add extra members.

NatsMessage is the common base class for both public transports. It owns the NatsClient lifetime, exposes GetHealthCheckTarget() for health probing through INatsConnection, and centralizes the IMarshaller dependency used by the concrete queue and bus implementations.

Basic usage

using Codebelt.Extensions.Xunit;
using NATS.Client.Core;
using Savvyio;
using Savvyio.EventDriven;
using Savvyio.EventDriven.Messaging;
using Savvyio.Extensions.NATS.EventDriven;
using Savvyio.Messaging;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Xunit;

namespace MyProject.Tests;

public class NatsEventBusTests : Test
{
    public NatsEventBusTests(ITestOutputHelper output) : base(output) { }

    [Fact]
    public async Task PublishAsync_ConfiguredBus_DeliversMessage()
    {
        var marshaller = new TokenMarshaller();
        await using var bus = new LoopbackNatsEventBus(marshaller, new NatsEventBusOptions { Subject = "members.registered" });
        var received = new TaskCompletionSource<IMessage<IIntegrationEvent>>();

        var subscription = bus.SubscribeAsync((message, token) =>
        {
            received.TrySetResult(message);
            return Task.CompletedTask;
        });

        var published = new MemberRegistered("ada@example.com").ToMessage(new Uri("urn:members"), "member.registered");

        await bus.PublishAsync(published);

        var delivered = await received.Task;
        var payload = Assert.IsType<MemberRegistered>(delivered.Data);

        TestOutput.WriteLine($"{delivered.Type} -> {payload.EmailAddress}");

        Assert.Equal("member.registered", delivered.Type);
        Assert.Equal("ada@example.com", payload.EmailAddress);

        await subscription;
    }
}

internal sealed record MemberRegistered : IntegrationEvent
{
    public MemberRegistered(string emailAddress)
    {
        EmailAddress = emailAddress;
    }

    public string EmailAddress { get; }
}

internal sealed class LoopbackNatsEventBus : NatsEventBus
{
    private readonly Channel<(string Subject, string Payload, NatsHeaders Headers)> _messages = Channel.CreateUnbounded<(string, string, NatsHeaders)>();

    public LoopbackNatsEventBus(IMarshaller marshaller, NatsEventBusOptions options) : base(marshaller, options) { }

    protected override Task PublishMessageAsync(string subject, string message, NatsHeaders headers, CancellationToken cancellationToken)
        => _messages.Writer.WriteAsync((subject, message, headers), cancellationToken).AsTask();

    protected override async IAsyncEnumerable<ReceivedNatsMessage> SubscribeMessagesAsync(string subject, NatsSubOpts options, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var envelope = await _messages.Reader.ReadAsync(cancellationToken);
        if (envelope.Subject == subject)
        {
            yield return new ReceivedNatsMessage(envelope.Headers, envelope.Payload);
        }
    }
}

internal sealed class TokenMarshaller : IMarshaller
{
    private readonly ConcurrentDictionary<string, object> _store = new();

    public Stream Serialize<TValue>(TValue value) => Serialize(value!, typeof(TValue));

    public Stream Serialize(object value, Type inputType)
    {
        var token = Guid.NewGuid().ToString("N");
        _store[token] = value;
        return new MemoryStream(Encoding.UTF8.GetBytes(token));
    }

    public TValue Deserialize<TValue>(Stream data) => (TValue)Deserialize(data, typeof(TValue));

    public object Deserialize(Stream data, Type returnType)
    {
        using var reader = new StreamReader(data, Encoding.UTF8, leaveOpen: false);
        return _store[reader.ReadToEnd()];
    }
}

Use this pattern when you want to test or customize NatsEventBus behavior without opening a real NATS subscription. It matters because the package exposes transport-facing virtual hooks while keeping the public contract in Savvyio message envelopes.

Installation

dotnet add package Savvyio.Extensions.NATS

Usage guidance

Adopt this package when your application already models commands and integration events as Savvyio messages and you want to move those envelopes over NATS without rewriting the surrounding CQRS or event-driven code. If you only need registration helpers for dependency injection, Savvyio.Extensions.DependencyInjection.NATS is the better entry point, and if NATS is not your transport of choice, one of the sibling transport packages is a better fit.

Family packages