使用 ServiceStack 和 RabbitMQ 发送流
Using ServiceStack and RabbitMQ to send a stream
我正在尝试使用 RabbitMQ 和 Servicestack(使用 .NET Core 的 v1.0.41)发送流。
我的Request实现了ServiceStack.Web.IRequiresRequestStream
,在client设置了stream属性,但是到了server端,stream是NULL
.
完成回购
服务器代码:
using System;
using System.IO;
using System.Threading.Tasks;
using Funq;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.DependencyInjection;
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;
namespace Server
{
class Program
{
public static void Main(string[] args)
{
IWebHost host = new WebHostBuilder()
.UseServer(new RabbitServer())
.UseStartup<Startup>()
.Build();
host.Run();
}
}
public class RabbitServer : IServer
{
public void Dispose(){}
public void Start<TContext>(IHttpApplication<TContext> application){}
public IFeatureCollection Features { get; } = new FeatureCollection();
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddLogging();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.UseServiceStack((AppHostBase)Activator.CreateInstance<AppHost>());
app.Run((RequestDelegate)(context => (Task)Task.FromResult<int>(0)));
}
}
public class AppHost : AppHostBase
{
public AppHost()
: base("My Test Service", typeof(MyService).GetAssembly())
{
}
public override void Configure(Container container)
{
var mqServer = new RabbitMqServer("127.0.0.1");
container.Register<IMessageService>(mqServer);
mqServer.RegisterHandler<HelloRequest>(ExecuteMessage);
mqServer.Start();
}
}
public class MyService : Service
{
public HelloResponse Any(HelloRequest request)
{
Console.WriteLine($"Stream is null: {request.RequestStream == null}");
return new HelloResponse { Counter = request.Counter };
}
}
public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
{
public int Counter { get; set; }
public Stream RequestStream { get; set; }
}
public class HelloResponse
{
public int Counter { get; set; }
}
}
客户代码:
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;
using System;
using System.IO;
using System.Text;
namespace Client
{
class Program
{
static void Main(string[] args)
{
RabbitMqServer messageService = new RabbitMqServer("127.0.0.1");
RabbitMqQueueClient mqClient = messageService.MessageFactory.CreateMessageQueueClient() as RabbitMqQueueClient;
var responseQueueName = mqClient.GetTempQueueName();
MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes("Hello World!")) { Position = 0 };
HelloRequest request = new HelloRequest { Counter = 100, RequestStream = ms }; //Counter is just some arbitary extra data
Guid messageId = Guid.NewGuid();
mqClient.Publish(QueueNames<HelloRequest>.In, new Message<HelloRequest>(request) { ReplyTo = responseQueueName, Id = messageId });
}
}
public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
{
public int Counter { get; set; }
public Stream RequestStream { get; set; }
}
public class HelloResponse
{
public int Counter { get; set; }
}
}
注意:我知道我可以在我的请求对象中使用一个 byte[],但我非常想利用提供的 IRequiresRequestStream
接口,这样我就可以切换回使用 HTTP 而不是 AMQP在将来。
我还应该说,我可能不会使用 servicestack 提供的 RabbitMQ 客户端,因为我正在编写自定义逻辑以从 HTTP 转换为 AMQP,所以我将手动构建 rabbitMQ 请求 - 代码以上仅以最简单的方式演示了我遇到的问题。
我假设这不会直接与 AMQP 一起工作(就像它与 HTTP 一样)——所以我想我需要做一些事情,比如将流序列化为 byte[]
并将其包含在 RabbitMQ 消息中,然后填充 ServiceStack 在服务器上神奇地重新水合的 dto。
所以两个问题真的...
1. 我在正确的轨道上吗?
2. 如果是这样,我如何连接到服务器上的反序列化代码,以便我可以访问原始 RabbitMQ 消息,以便将我的 byte[] 转换回流并在我的 dto 上设置流?
您不能将 IRequiresRequestStream
请求 DTO 发送到 MQ,因为它不是普通的序列化请求 DTO,而是指示 ServiceStack 跳过反序列化请求 DTO,而是注入 HTTP 请求流,以便服务可以改为执行自己的反序列化,这不同于普通的请求 DTO,它是序列化的,可以作为 MQ 消息中的正文发送。
如果您想在 IRequiresRequestStream
服务和可由 MQ 调用的服务之间共享实现,一个选项是仅委托给接受字节的公共服务,例如:
//Called from HTTP
public object Any(HelloStream request) =>
Any(new HelloBytes { Bytes = request.RequestStream.ReadFully() });
//Called from HTTP or MQ
public object Any(HelloBytes request)
{
//= request.Bytes
}
我正在尝试使用 RabbitMQ 和 Servicestack(使用 .NET Core 的 v1.0.41)发送流。
我的Request实现了ServiceStack.Web.IRequiresRequestStream
,在client设置了stream属性,但是到了server端,stream是NULL
.
完成回购
服务器代码:
using System;
using System.IO;
using System.Threading.Tasks;
using Funq;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.DependencyInjection;
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;
namespace Server
{
class Program
{
public static void Main(string[] args)
{
IWebHost host = new WebHostBuilder()
.UseServer(new RabbitServer())
.UseStartup<Startup>()
.Build();
host.Run();
}
}
public class RabbitServer : IServer
{
public void Dispose(){}
public void Start<TContext>(IHttpApplication<TContext> application){}
public IFeatureCollection Features { get; } = new FeatureCollection();
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddLogging();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.UseServiceStack((AppHostBase)Activator.CreateInstance<AppHost>());
app.Run((RequestDelegate)(context => (Task)Task.FromResult<int>(0)));
}
}
public class AppHost : AppHostBase
{
public AppHost()
: base("My Test Service", typeof(MyService).GetAssembly())
{
}
public override void Configure(Container container)
{
var mqServer = new RabbitMqServer("127.0.0.1");
container.Register<IMessageService>(mqServer);
mqServer.RegisterHandler<HelloRequest>(ExecuteMessage);
mqServer.Start();
}
}
public class MyService : Service
{
public HelloResponse Any(HelloRequest request)
{
Console.WriteLine($"Stream is null: {request.RequestStream == null}");
return new HelloResponse { Counter = request.Counter };
}
}
public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
{
public int Counter { get; set; }
public Stream RequestStream { get; set; }
}
public class HelloResponse
{
public int Counter { get; set; }
}
}
客户代码:
using ServiceStack;
using ServiceStack.Messaging;
using ServiceStack.RabbitMq;
using ServiceStack.Web;
using System;
using System.IO;
using System.Text;
namespace Client
{
class Program
{
static void Main(string[] args)
{
RabbitMqServer messageService = new RabbitMqServer("127.0.0.1");
RabbitMqQueueClient mqClient = messageService.MessageFactory.CreateMessageQueueClient() as RabbitMqQueueClient;
var responseQueueName = mqClient.GetTempQueueName();
MemoryStream ms = new MemoryStream(Encoding.UTF8.GetBytes("Hello World!")) { Position = 0 };
HelloRequest request = new HelloRequest { Counter = 100, RequestStream = ms }; //Counter is just some arbitary extra data
Guid messageId = Guid.NewGuid();
mqClient.Publish(QueueNames<HelloRequest>.In, new Message<HelloRequest>(request) { ReplyTo = responseQueueName, Id = messageId });
}
}
public class HelloRequest : IReturn<HelloResponse>, IRequiresRequestStream
{
public int Counter { get; set; }
public Stream RequestStream { get; set; }
}
public class HelloResponse
{
public int Counter { get; set; }
}
}
注意:我知道我可以在我的请求对象中使用一个 byte[],但我非常想利用提供的 IRequiresRequestStream
接口,这样我就可以切换回使用 HTTP 而不是 AMQP在将来。
我还应该说,我可能不会使用 servicestack 提供的 RabbitMQ 客户端,因为我正在编写自定义逻辑以从 HTTP 转换为 AMQP,所以我将手动构建 rabbitMQ 请求 - 代码以上仅以最简单的方式演示了我遇到的问题。
我假设这不会直接与 AMQP 一起工作(就像它与 HTTP 一样)——所以我想我需要做一些事情,比如将流序列化为 byte[]
并将其包含在 RabbitMQ 消息中,然后填充 ServiceStack 在服务器上神奇地重新水合的 dto。
所以两个问题真的...
1. 我在正确的轨道上吗?
2. 如果是这样,我如何连接到服务器上的反序列化代码,以便我可以访问原始 RabbitMQ 消息,以便将我的 byte[] 转换回流并在我的 dto 上设置流?
您不能将 IRequiresRequestStream
请求 DTO 发送到 MQ,因为它不是普通的序列化请求 DTO,而是指示 ServiceStack 跳过反序列化请求 DTO,而是注入 HTTP 请求流,以便服务可以改为执行自己的反序列化,这不同于普通的请求 DTO,它是序列化的,可以作为 MQ 消息中的正文发送。
如果您想在 IRequiresRequestStream
服务和可由 MQ 调用的服务之间共享实现,一个选项是仅委托给接受字节的公共服务,例如:
//Called from HTTP
public object Any(HelloStream request) =>
Any(new HelloBytes { Bytes = request.RequestStream.ReadFully() });
//Called from HTTP or MQ
public object Any(HelloBytes request)
{
//= request.Bytes
}