使用 ServiceStack 和 RabbitMQ 发送流

Using ServiceStack and RabbitMQ to send a stream

我正在尝试使用 RabbitMQ 和 Servicestack(使用 .NET Core 的 v1.0.41)发送流。

我的Request实现了ServiceStack.Web.IRequiresRequestStream,在client设置了stream属性,但是到了server端,stream是NULL.

完成回购
服务器代码:

using System;
using System.IO;
using System.Threading.Tasks;
using Funq;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.DependencyInjection;
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;

namespace Server
{
    class Program
    {

        public static void Main(string[] args)
        {
            IWebHost host = new WebHostBuilder()
                .UseServer(new RabbitServer())
                .UseStartup<Startup>()
                .Build();

            host.Run();
        }
    }

    public class RabbitServer : IServer
    {
        public void Dispose(){}

        public void Start<TContext>(IHttpApplication<TContext> application){}

        public IFeatureCollection Features { get; } = new FeatureCollection();
    }

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddLogging();
        }

        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            app.UseServiceStack((AppHostBase)Activator.CreateInstance<AppHost>());
            app.Run((RequestDelegate)(context => (Task)Task.FromResult<int>(0)));
        }

    }

    public class AppHost : AppHostBase
    {
        public AppHost()
            : base("My Test Service", typeof(MyService).GetAssembly())
        {
        }

        public override void Configure(Container container)
        {
            var mqServer = new RabbitMqServer("127.0.0.1");
            container.Register<IMessageService>(mqServer);
            mqServer.RegisterHandler<HelloRequest>(ExecuteMessage);
            mqServer.Start();
        }
    }

    public class MyService : Service
    {
        public HelloResponse Any(HelloRequest request)
        {
            Console.WriteLine($"Stream is null: {request.RequestStream == null}");
            return new HelloResponse { Counter = request.Counter };

        }
    }

    public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
    {
        public int Counter { get; set; }

        public Stream RequestStream { get; set; }
    }

    public class HelloResponse
    {
        public int Counter { get; set; }
    }
}

客户代码:

using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;
using System;
using System.IO;
using System.Text;

namespace Client
{
    class Program
    {

        static void Main(string[] args)
        {
            RabbitMqServer messageService = new RabbitMqServer("127.0.0.1");
            RabbitMqQueueClient mqClient = messageService.MessageFactory.CreateMessageQueueClient() as RabbitMqQueueClient;
            var responseQueueName = mqClient.GetTempQueueName();
            MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes("Hello World!")) { Position = 0 };
            HelloRequest request = new HelloRequest { Counter = 100, RequestStream = ms };  //Counter is just some arbitary extra data
            Guid messageId = Guid.NewGuid();

            mqClient.Publish(QueueNames<HelloRequest>.In, new Message<HelloRequest>(request) { ReplyTo = responseQueueName, Id = messageId });
        }
    }

    public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
    {
        public int Counter { get; set; }
        public Stream RequestStream { get; set; }
    }

    public class HelloResponse
    {
        public int Counter { get; set; }
    }
}

注意:我知道我可以在我的请求对象中使用一个 byte[],但我非常想利用提供的 IRequiresRequestStream 接口,这样我就可以切换回使用 HTTP 而不是 AMQP在将来。

我还应该说,我可能不会使用 servicestack 提供的 RabbitMQ 客户端,因为我正在编写自定义逻辑以从 HTTP 转换为 AMQP,所以我将手动构建 rabbitMQ 请求 - 代码以上仅以最简单的方式演示了我遇到的问题。

我假设这不会直接与 AMQP 一起工作(就像它与 HTTP 一样)——所以我想我需要做一些事情,比如将流序列化为 byte[] 并将其包含在 RabbitMQ 消息中,然后填充 ServiceStack 在服务器上神奇地重新水合的 dto。

所以两个问题真的...
1. 我在正确的轨道上吗?
2. 如果是这样,我如何连接到服务器上的反序列化代码,以便我可以访问原始 RabbitMQ 消息,以便将我的 byte[] 转换回流并在我的 dto 上设置流?

您不能将 IRequiresRequestStream 请求 DTO 发送到 MQ,因为它不是普通的序列化请求 DTO,而是指示 ServiceStack 跳过反序列化请求 DTO,而是注入 HTTP 请求流,以便服务可以改为执行自己的反序列化,这不同于普通的请求 DTO,它是序列化的,可以作为 MQ 消息中的正文发送。

如果您想在 IRequiresRequestStream 服务和可由 MQ 调用的服务之间共享实现,一个选项是仅委托给接受字节的公共服务,例如:

//Called from HTTP
public object Any(HelloStream request) => 
    Any(new HelloBytes { Bytes = request.RequestStream.ReadFully() });

//Called from HTTP or MQ
public object Any(HelloBytes request) 
{
    //= request.Bytes
}