在 Azure 服务总线中发布到一个主题并订阅另一个主题的集成测试不可靠是否存在竞争条件?

Integration Test to publish to a topic and subscribe to another in Azure Service Bus is unreliable is there a race condition?

我正在尝试编写一个集成/验收测试来测试 azure 中的一些代码,问题 ATM 中的代码只是订阅一个主题并发布到另一个主题。

我已经编写了测试,但它并不总是通过,似乎可能存在竞争条件。我已经尝试用几种方式编写它,包括使用 OnMessage 和使用 Receive(我在这里展示的示例)。

当使用 OnMessage 时,测试似乎总是提前退出(大约 30 秒),我想这可能意味着它无论如何都不适合这个测试。

我的查询特别针对我的示例,我假设一旦我创建了对目标主题的订阅,发送给它的任何消息我都可以使用 Receive() 接收,无论该消息到达的时间点意味着什么,如果消息在我调用 Receive() 之前到达目标主题,我仍然可以通过调用 Receive() 读取消息。任何人都可以阐明这一点吗?

    namespace somenamespace {
    [TestClass]
    public class SampleTopicTest
    {
        private static TopicClient topicClient;
        private static SubscriptionClient subClientKoEligible;
        private static SubscriptionClient subClientKoIneligible;

        private static OnMessageOptions options;
        public const string TEST_MESSAGE_SUB = "TestMessageSub";
        private static NamespaceManager namespaceManager;

        private static string topicFleKoEligible;
        private static string topicFleKoIneligible;

        private BrokeredMessage message;

        [ClassInitialize]
        public static void BeforeClass(TestContext testContext)
        {
            //client for publishing messages
            string connectionString = ConfigurationManager.AppSettings["ServiceBusConnectionString"];
            string topicDataReady = ConfigurationManager.AppSettings["DataReadyTopicName"];
            topicClient = TopicClient.CreateFromConnectionString(connectionString, topicDataReady);

            topicFleKoEligible = ConfigurationManager.AppSettings["KnockOutEligibleTopicName"];
            topicFleKoIneligible = ConfigurationManager.AppSettings["KnockOutIneligibleTopicName"];

            //create test subscription to receive messages
            namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);


            if (!namespaceManager.SubscriptionExists(topicFleKoEligible, TEST_MESSAGE_SUB))
            {
                namespaceManager.CreateSubscription(topicFleKoEligible, TEST_MESSAGE_SUB);
            }

            if (!namespaceManager.SubscriptionExists(topicFleKoIneligible, TEST_MESSAGE_SUB))
            {
                namespaceManager.CreateSubscription(topicFleKoIneligible, TEST_MESSAGE_SUB);
            }

            //subscriber client koeligible
            subClientKoEligible = SubscriptionClient.CreateFromConnectionString(connectionString, topicFleKoEligible, TEST_MESSAGE_SUB);

            subClientKoIneligible = SubscriptionClient.CreateFromConnectionString(connectionString, topicFleKoIneligible, TEST_MESSAGE_SUB);

            options = new OnMessageOptions()
            {
                AutoComplete = false,
                AutoRenewTimeout = TimeSpan.FromMinutes(1),

            };
        }

          [TestMethod]
        public void E2EPOCTopicTestLT50()
        {
            Random rnd = new Random();
            string customerId = rnd.Next(1, 49).ToString();

            FurtherLendingCustomer sentCustomer = new FurtherLendingCustomer { CustomerId = customerId };
            BrokeredMessage sentMessage = new BrokeredMessage(sentCustomer.ToJson());           
            sentMessage.CorrelationId = Guid.NewGuid().ToString();
            string messageId = sentMessage.MessageId;
            topicClient.Send(sentMessage);

            Boolean messageRead = false;

            //wait for message to arrive on the ko eligible queue
            while((message = subClientKoEligible.Receive(TimeSpan.FromMinutes(2))) != null){

                //read message
                string messageString = message.GetBody<String>();

                //Serialize
                FurtherLendingCustomer receivedCustomer =  JsonConvert.DeserializeObject<FurtherLendingCustomer>(messageString.Substring(messageString.IndexOf("{")));

                //assertion
                Assert.AreEqual(sentCustomer.CustomerId, receivedCustomer.CustomerId,"verify customer id");

                //pop message
                message.Complete();
                messageRead = true;

                //leave loop after processing one message
                break;
            }
            if (!messageRead)
                Assert.Fail("Didn't receive any message after 2 mins");

        }
    }
}

正如官方文档所述SubscriptionClient.Receive(TimeSpan)

Parameters serverWaitTime TimeSpan

The time span the server waits for receiving a message before it times out.

A Null can be return by this API if operation exceeded the timeout specified, or the operations succeeded but there are no more messages to be received.

根据我的测试,如果一条消息发送到主题,然后在您的特定 serverWaitTime 内传送到您的订阅,那么无论消息是在您调用之前还是之后到达目标主题,您都可以收到消息 Receive.

When using OnMessage the test seemed to always exit prematurely (around 30 seconds), which I guess perhaps means its inappropriate for this test anyway.

[TestMethod]
public void ReceiveMessages()
{
    subClient.OnMessage(msg => {
        System.Diagnostics.Trace.TraceInformation($"{DateTime.Now}:{msg.GetBody<string>()}");
        msg.Complete();
    });
    Task.Delay(TimeSpan.FromMinutes(5)).Wait();
}

对于Subscription​Client.​On​Message, I assumed that it basically a loop invoking Receive. After calling OnMessage, you need to wait for a while and stop this method to exit. Here is a blog about the Event-Driven message programming for windows Azure Service Bus, you could refer to here

此外,我发现您发送消息的 topicClient 和接收消息的 subClientKoEligible 不是针对同一主题路径。