Classes for sending and listening to a message bus. Uses AMQPNETLITE (AMQP 1. 0 protocol).
- Authorization keys cannot contain '/'. They must be regenerated if they do. AMQPNETLITE does not like that value.
- I found inconsistent behavior if the topic and queue were created using the AzureSB UI. I had success creating the topics, subscriptions, queues using ServiceBusExplorer (https://github.com/paolosalvatori/ServiceBusExplorer/releases)
- Names of queues cannot be single worded. Should be multipart (eg. auth.queue).
- The forward to setting for the topic subscription is not visible in the azure UI. You can use ServiceBusExplorer to set that field.
- for the following configuration settings for the test project with a TestEvent object
"Publisher.Settings": {
"Protocol": "amqps",
"Namespace": "namespace.servicebus.windows.net",
"Policy": "Send",
"Key": "44CharBASE64EncodedNoSlashes",
"AppName": "test.publisher",
"Topic": "topic.",
"Durable": "0"
},
"Receiver.Settings": {
"Protocol": "amqps",
"Namespace": "namespace.servicebus.windows.net",
"Policy": "Listen",
"Key": "44CharBASE64EncodedNoSlashes",
"AppName": "test.receiver",
"Queue": "queue.testReceive",
"Durable": "0"
}
(for test default settings from Service Bus Explorer are fine unless specified below)
- Azure Service Bus Components:
- a queue named queue.TestReceive
- new authorization rule for queue
- claimType = SharedAccessKey
- claimValue = none
- KeyName = "Listen"
- Primary/Secondary Key = 44 Char BASE64 encoded string (33 char unencoded and remember no '/')
- Manage - off
- Send - off
- Listen - on
- new authorization rule for queue
- a topic named topic.TestEvent
- new authorization rule for topic
- claimType = SharedAccessKey
- claimValue = none
- KeyName = "Send"
- Primary/Secondary Key = 44 Char BASE64 encoded string (33 char unencoded and remember no '/')
- Manage - off
- Send - on
- Listen - off
- new authorization rule for topic
- a subscription to topic.TestEvent named subscription.TestEvent
- The "Forward To" setting for this subscription needs to be set to queue.TestReceive
- a queue named queue.TestReceive
What To Do:
- will probably want to set deduplication at the message broker since same messageId will be used
- will need to generate ef migration after adding following to OnModelCreating method in dbcontext class:
- modelBuilder.AddDomainEventOutbox();
- https://github.com/cortside/cortside.webapistarter/blob/outbox/src/Cortside.WebApiStarter.Data/Migrations/20210228035338_DomainEventOutbox.cs
- register IDomainEventOutboxPublisher AND IDomainEventPublisher
- use IDomainEventOutboxPublisher in classes that will publish to the outbox
- SaveChanges after calling PublishAsync or ScheduleAsync -- and make part of transaction or workset in db so that publish becomes atomic with db changes
- OutboxHostedService needs IDomainEventPublisher to actually publish to message broker
- register OutboxHostedService to publish messages from db to broker
- if publishing an entity id in message, might need to add a using around the work with a transaction and call savechanges twice if the entity id is assigned by the db
- add section for configuration:
OutboxHostedService": {
"BatchSize": 10,
"Enabled": true,
"Interval": 5,
"PurgePublished": true
}
CREATE TABLE [dbo].[Outbox] (
[MessageId] nvarchar(36) NOT NULL,
[CorrelationId] nvarchar(36) NULL,
[EventType] nvarchar(250) NOT NULL,
[Topic] nvarchar(100) NOT NULL,
[RoutingKey] nvarchar(100) NOT NULL,
[Body] nvarchar(max) NOT NULL,
[Status] nvarchar(10) NOT NULL,
[CreatedDate] datetime2 NOT NULL,
[ScheduledDate] datetime2 NOT NULL,
[PublishedDate] datetime2 NULL,
[LockId] nvarchar(36) NULL,
CONSTRAINT [PK_Outbox] PRIMARY KEY ([MessageId])
);
CREATE INDEX [IX_ScheduleDate_Status] ON [dbo].[Outbox] ([ScheduledDate], [Status]) INCLUDE ([EventType]);
- DomainEventPublisher change in publish method
- SendAsync to PublishAsync
- overloads that took messageId should now use the overload with EventProperties
- ScheduleMessageAsync to ScheduleAsync
- overrides for both PublishAsync and ScheduleAsync use EventProperties for overrides that allowed for EventType or Topic
- SendAsync to PublishAsync
- namespaces all dropped common
- make sure to check logging overrides for namespaces that might have changed
- namespace for handler interface changed to be in Handlers
- namespace for ReceiverHostedService changed to be in Hosting
- ReceiverHostedServiceSettings.Disabled changed to Enabled
- ReceiverHostedServiceSettings.TimedInterval should be specified in seconds, not milliseconds
- IDomainEventHandler HandleAsync now has return value of HandlerResultEnum
- To keep current functionality, return HandlerResultEnum.Success and let uncaught exceptions trigger HandlerResultEnum.Failure result
- Publisher uses
Logger<DomainEventPublisher>
instead ofLogger<DomainEventComms>
- Reciever uses
Logger<DomainEventReceiver>
instead ofLogger<DomainEventComms>
- ServiceBusPublisherSettings renamed to DomainEventPublisherSettings
- changed Address to Topic
- ServiceBusReceiverSettings renamed to DomainEventReceiverSettings
- changed Address to Queue
- receiverHostedServiceSettings now has property for message type lookup dictionary named MessageTypes
- See E2ETransactionTest for use of transactions for accept/reject/release and publish operations
- Stubs allow integration tests code to publish/subscribe to events without using a real message broker
- To setup stubs in the integration tests setup do the following:
private void RegisterDomainEventPublisher(IServiceCollection services)
{
// Remove the real IDomainEventPublisher
var descriptor = services.SingleOrDefault(d => d.ServiceType == typeof(IDomainEventPublisher));
if (descriptor != null)
{
services.Remove(descriptor);
}
// Remove the real IDomainEventReceiver
descriptor = services.SingleOrDefault(d => d.ServiceType == typeof(IDomainEventReceiver));
if (descriptor != null)
{
services.Remove(descriptor);
}
services.AddSingleton<IStubBroker, ConcurrentQueueBroker>();
services.AddTransient<IDomainEventPublisher, DomainEventPublisherStub>();
services.AddTransient<IDomainEventReceiver, DomainEventReceiverStub>();
}
And then register it during the WebHostBuilder step:
protected override IHostBuilder CreateHostBuilder()
{
// other setup here
// then create the host
return Host.CreateDefaultBuilder()
.ConfigureAppConfiguration(builder =>
{
builder.AddConfiguration(Configuration);
})
.ConfigureWebHostDefaults(webbuilder =>
{
webbuilder
.UseConfiguration(Configuration)
.UseStartup<Startup>()
.UseSerilog(Log.Logger)
.ConfigureTestServices(sc =>
{
RegisterDomainEventPublisher(sc);
// other custom stuff goes here
});
});
}
Once it's been registered, you can inject the IStubBroker into any test where you want to verify messages have been published or consumed:
public class TestClass : IClassFixture<IntegrationTestFactory<Startup>>
{
private readonly IntegrationTestFactory<Startup> fixture;
private readonly HttpClient testServerClient;
private readonly IStubBroker broker;
public TestClass(IntegrationTestFactory<Startup> fixture)
{
this.fixture = fixture;
testServerClient = fixture.CreateClient();
broker = fixture.Services.GetService<IStubBroker>();
}
[Fact]
public void SomeTest()
{
// do something to generate messages
// check that the message was consumed
// it's a good practice to wait for a bit for the async publishing/receiving to happen
await AsyncUtil.WaitUntilAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token, () => !broker.HasItems);
// or
await Task.Delay(10000);
Assert.False(broker.HasItems);
// grab the messages by type
var myMessages = broker.GetAcceptedMessagesByType<MyDomainEventType>(x => x.Id == 1); // grab our specific message from the completed list
Assert.NotNull(myMessages);
}
}
- Stubs allow integration tests code to publish/subscribe to events without using a real message broker
- To setup stubs in the integration tests setup do the following:
private void RegisterDomainEventPublisher(IServiceCollection services)
{
// Remove the real IDomainEventPublisher
var descriptor = services.SingleOrDefault(d => d.ServiceType == typeof(IDomainEventPublisher));
if (descriptor != null)
{
services.Remove(descriptor);
}
// Remove the real IDomainEventReceiver
descriptor = services.SingleOrDefault(d => d.ServiceType == typeof(IDomainEventReceiver));
if (descriptor != null)
{
services.Remove(descriptor);
}
services.AddSingleton<IStubBroker, ConcurrentQueueBroker>();
services.AddTransient<IDomainEventPublisher, DomainEventPublisherStub>();
services.AddTransient<IDomainEventReceiver, DomainEventReceiverStub>();
}
And then register it during the WebHostBuilder step:
protected override IHostBuilder CreateHostBuilder()
{
// other setup here
// then create the host
return Host.CreateDefaultBuilder()
.ConfigureAppConfiguration(builder =>
{
builder.AddConfiguration(Configuration);
})
.ConfigureWebHostDefaults(webbuilder =>
{
webbuilder
.UseConfiguration(Configuration)
.UseStartup<Startup>()
.UseSerilog(Log.Logger)
.ConfigureTestServices(sc =>
{
RegisterDomainEventPublisher(sc);
// other custom stuff goes here
});
});
}
Once it's been registered, you can inject the IStubBroker into any test where you want to verify messages have been published or consumed:
public class TestClass : IClassFixture<IntegrationTestFactory<Startup>>
{
private readonly IntegrationTestFactory<Startup> fixture;
private readonly HttpClient testServerClient;
private readonly IStubBroker broker;
public TestClass(IntegrationTestFactory<Startup> fixture)
{
this.fixture = fixture;
testServerClient = fixture.CreateClient();
broker = fixture.Services.GetService<IStubBroker>();
}
[Fact]
public void SomeTest()
{
// do something to generate messages
// check that the message was consumed
// it's a good practice to wait for a bit for the async publishing/receiving to happen
await AsyncUtil.WaitUntilAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token, () => !broker.HasItems);
// or
await Task.Delay(10000);
Assert.False(broker.HasItems);
// grab the messages by type
var myMessages = broker.GetAcceptedMessagesByType<MyDomainEventType>(x => x.Id == 1); // grab our specific message from the completed list
Assert.NotNull(myMessages);
}
}
- publisher should return published message information -- at least messageId -- would make debugging easier
- allow publisher to be used to publish multiple events withing a using statement without having to create new connection for each publish