Skip to content

Latest commit

 

History

History
322 lines (278 loc) · 13.2 KB

README.md

File metadata and controls

322 lines (278 loc) · 13.2 KB

Build status Quality Gate Status Coverage

Cortside.DomainEvent

Classes for sending and listening to a message bus. Uses AMQPNETLITE (AMQP 1. 0 protocol).

Azure ServiceBus

General

  • Authorization keys cannot contain '/'. They must be regenerated if they do. AMQPNETLITE does not like that value.
  • I found inconsistent behavior if the topic and queue were created using the AzureSB UI. I had success creating the topics, subscriptions, queues using ServiceBusExplorer (https://github.com/paolosalvatori/ServiceBusExplorer/releases)

Queues

  • Names of queues cannot be single worded. Should be multipart (eg. auth.queue).

Topic

  • The forward to setting for the topic subscription is not visible in the azure UI. You can use ServiceBusExplorer to set that field.

Example

  • for the following configuration settings for the test project with a TestEvent object
    "Publisher.Settings": {
        "Protocol": "amqps",
        "Namespace": "namespace.servicebus.windows.net",
        "Policy": "Send",
        "Key": "44CharBASE64EncodedNoSlashes",
        "AppName": "test.publisher",
        "Topic": "topic.",
        "Durable": "0"
    },
    "Receiver.Settings": {
        "Protocol": "amqps",
        "Namespace": "namespace.servicebus.windows.net",
        "Policy": "Listen",
        "Key": "44CharBASE64EncodedNoSlashes",
        "AppName": "test.receiver",
        "Queue": "queue.testReceive",
        "Durable": "0"
    }

(for test default settings from Service Bus Explorer are fine unless specified below)

  • Azure Service Bus Components:
    • a queue named queue.TestReceive
      • new authorization rule for queue
        • claimType = SharedAccessKey
        • claimValue = none
        • KeyName = "Listen"
        • Primary/Secondary Key = 44 Char BASE64 encoded string (33 char unencoded and remember no '/')
        • Manage - off
        • Send - off
        • Listen - on
    • a topic named topic.TestEvent
      • new authorization rule for topic
        • claimType = SharedAccessKey
        • claimValue = none
        • KeyName = "Send"
        • Primary/Secondary Key = 44 Char BASE64 encoded string (33 char unencoded and remember no '/')
        • Manage - off
        • Send - on
        • Listen - off
    • a subscription to topic.TestEvent named subscription.TestEvent
      • The "Forward To" setting for this subscription needs to be set to queue.TestReceive

Outbox Pattern using Cortside.DomainEvent.EntityFramework

What To Do:

  • will probably want to set deduplication at the message broker since same messageId will be used
  • will need to generate ef migration after adding following to OnModelCreating method in dbcontext class:
  • register IDomainEventOutboxPublisher AND IDomainEventPublisher
    • use IDomainEventOutboxPublisher in classes that will publish to the outbox
    • SaveChanges after calling PublishAsync or ScheduleAsync -- and make part of transaction or workset in db so that publish becomes atomic with db changes
    • OutboxHostedService needs IDomainEventPublisher to actually publish to message broker
  • register OutboxHostedService to publish messages from db to broker
  • if publishing an entity id in message, might need to add a using around the work with a transaction and call savechanges twice if the entity id is assigned by the db
  • add section for configuration:
OutboxHostedService": {
        "BatchSize":  10,
        "Enabled": true,
        "Interval": 5,
        "PurgePublished": true
      }

sql to create table if not using migrations

    CREATE TABLE [dbo].[Outbox] (
        [MessageId] nvarchar(36) NOT NULL,
        [CorrelationId] nvarchar(36) NULL,
        [EventType] nvarchar(250) NOT NULL,
        [Topic] nvarchar(100) NOT NULL,
        [RoutingKey] nvarchar(100) NOT NULL,
        [Body] nvarchar(max) NOT NULL,
        [Status] nvarchar(10) NOT NULL,
        [CreatedDate] datetime2 NOT NULL,
        [ScheduledDate] datetime2 NOT NULL,
        [PublishedDate] datetime2 NULL,
        [LockId] nvarchar(36) NULL,
        CONSTRAINT [PK_Outbox] PRIMARY KEY ([MessageId])
    );

   CREATE INDEX [IX_ScheduleDate_Status] ON [dbo].[Outbox] ([ScheduledDate], [Status]) INCLUDE ([EventType]);

migration from cortside.common.domainevent to cortside.domainevent

  • DomainEventPublisher change in publish method
    • SendAsync to PublishAsync
      • overloads that took messageId should now use the overload with EventProperties
    • ScheduleMessageAsync to ScheduleAsync
    • overrides for both PublishAsync and ScheduleAsync use EventProperties for overrides that allowed for EventType or Topic
  • namespaces all dropped common
    • make sure to check logging overrides for namespaces that might have changed
  • namespace for handler interface changed to be in Handlers
  • namespace for ReceiverHostedService changed to be in Hosting
  • ReceiverHostedServiceSettings.Disabled changed to Enabled
  • ReceiverHostedServiceSettings.TimedInterval should be specified in seconds, not milliseconds
  • IDomainEventHandler HandleAsync now has return value of HandlerResultEnum
    • To keep current functionality, return HandlerResultEnum.Success and let uncaught exceptions trigger HandlerResultEnum.Failure result
  • Publisher uses Logger<DomainEventPublisher> instead of Logger<DomainEventComms>
  • Reciever uses Logger<DomainEventReceiver> instead of Logger<DomainEventComms>
  • ServiceBusPublisherSettings renamed to DomainEventPublisherSettings
    • changed Address to Topic
  • ServiceBusReceiverSettings renamed to DomainEventReceiverSettings
    • changed Address to Queue
  • receiverHostedServiceSettings now has property for message type lookup dictionary named MessageTypes

Transactions

  • See E2ETransactionTest for use of transactions for accept/reject/release and publish operations

Cortside.DomainEvent.Stubs

  • Stubs allow integration tests code to publish/subscribe to events without using a real message broker
  • To setup stubs in the integration tests setup do the following:
        private void RegisterDomainEventPublisher(IServiceCollection services)
        {
            // Remove the real IDomainEventPublisher
            var descriptor = services.SingleOrDefault(d => d.ServiceType == typeof(IDomainEventPublisher));
            if (descriptor != null)
            {
                services.Remove(descriptor);
            }

            // Remove the real IDomainEventReceiver
            descriptor = services.SingleOrDefault(d => d.ServiceType == typeof(IDomainEventReceiver));
            if (descriptor != null)
            {
                services.Remove(descriptor);
            }

            services.AddSingleton<IStubBroker, ConcurrentQueueBroker>();
            services.AddTransient<IDomainEventPublisher, DomainEventPublisherStub>();
            services.AddTransient<IDomainEventReceiver, DomainEventReceiverStub>();
        }

And then register it during the WebHostBuilder step:

        protected override IHostBuilder CreateHostBuilder()
        {
            // other setup here
            // then create the host
            return Host.CreateDefaultBuilder()
                .ConfigureAppConfiguration(builder =>
                {
                    builder.AddConfiguration(Configuration);
                })
                .ConfigureWebHostDefaults(webbuilder =>
                {
                    webbuilder
                    .UseConfiguration(Configuration)
                    .UseStartup<Startup>()
                    .UseSerilog(Log.Logger)
                    .ConfigureTestServices(sc =>
                    {
                        RegisterDomainEventPublisher(sc);
                        // other custom stuff goes here
                    });
                });
        }

Once it's been registered, you can inject the IStubBroker into any test where you want to verify messages have been published or consumed:

    public class TestClass : IClassFixture<IntegrationTestFactory<Startup>>
    {
        private readonly IntegrationTestFactory<Startup> fixture;
        private readonly HttpClient testServerClient;
        private readonly IStubBroker broker;

        public TestClass(IntegrationTestFactory<Startup> fixture)
        {
            this.fixture = fixture;
            testServerClient = fixture.CreateClient();
            broker = fixture.Services.GetService<IStubBroker>();
        }
        
        [Fact]
        public void SomeTest()
        {
            // do something to generate messages
            // check that the message was consumed
            // it's a good practice to wait for a bit for the async publishing/receiving to happen
            await AsyncUtil.WaitUntilAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token, () => !broker.HasItems);
            // or
            await Task.Delay(10000);
            Assert.False(broker.HasItems);
            
            // grab the messages by type
            var myMessages = broker.GetAcceptedMessagesByType<MyDomainEventType>(x => x.Id == 1); // grab our specific message from the completed list
            Assert.NotNull(myMessages);
        }
    }

Cortside.DomainEvent.Stubs

  • Stubs allow integration tests code to publish/subscribe to events without using a real message broker
  • To setup stubs in the integration tests setup do the following:
        private void RegisterDomainEventPublisher(IServiceCollection services)
        {
            // Remove the real IDomainEventPublisher
            var descriptor = services.SingleOrDefault(d => d.ServiceType == typeof(IDomainEventPublisher));
            if (descriptor != null)
            {
                services.Remove(descriptor);
            }

            // Remove the real IDomainEventReceiver
            descriptor = services.SingleOrDefault(d => d.ServiceType == typeof(IDomainEventReceiver));
            if (descriptor != null)
            {
                services.Remove(descriptor);
            }

            services.AddSingleton<IStubBroker, ConcurrentQueueBroker>();
            services.AddTransient<IDomainEventPublisher, DomainEventPublisherStub>();
            services.AddTransient<IDomainEventReceiver, DomainEventReceiverStub>();
        }

And then register it during the WebHostBuilder step:

        protected override IHostBuilder CreateHostBuilder()
        {
            // other setup here
            // then create the host
            return Host.CreateDefaultBuilder()
                .ConfigureAppConfiguration(builder =>
                {
                    builder.AddConfiguration(Configuration);
                })
                .ConfigureWebHostDefaults(webbuilder =>
                {
                    webbuilder
                    .UseConfiguration(Configuration)
                    .UseStartup<Startup>()
                    .UseSerilog(Log.Logger)
                    .ConfigureTestServices(sc =>
                    {
                        RegisterDomainEventPublisher(sc);
                        // other custom stuff goes here
                    });
                });
        }

Once it's been registered, you can inject the IStubBroker into any test where you want to verify messages have been published or consumed:

    public class TestClass : IClassFixture<IntegrationTestFactory<Startup>>
    {
        private readonly IntegrationTestFactory<Startup> fixture;
        private readonly HttpClient testServerClient;
        private readonly IStubBroker broker;

        public TestClass(IntegrationTestFactory<Startup> fixture)
        {
            this.fixture = fixture;
            testServerClient = fixture.CreateClient();
            broker = fixture.Services.GetService<IStubBroker>();
        }
        
        [Fact]
        public void SomeTest()
        {
            // do something to generate messages
            // check that the message was consumed
            // it's a good practice to wait for a bit for the async publishing/receiving to happen
            await AsyncUtil.WaitUntilAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token, () => !broker.HasItems);
            // or
            await Task.Delay(10000);
            Assert.False(broker.HasItems);
            
            // grab the messages by type
            var myMessages = broker.GetAcceptedMessagesByType<MyDomainEventType>(x => x.Id == 1); // grab our specific message from the completed list
            Assert.NotNull(myMessages);
        }
    }

examples

todo:

  • publisher should return published message information -- at least messageId -- would make debugging easier
  • allow publisher to be used to publish multiple events withing a using statement without having to create new connection for each publish