在服务器端为 gRPC 流调用 .end() 之前,如何在客户端接收数据
How can I receive data on client side before calling .end() on the server side for a gRPC stream
我目前正在尝试使用 gRPC Node.js API 设置服务器流。为此,当我在服务器端写入客户端立即接收数据事件的流时,我想实现这一点。
目前,如果我只在服务器端调用 write,我不会在客户端收到任何东西。但是,只要我在服务器上调用结束函数,客户端就会收到所有数据事件。
为了测试这一点,我使用了一个无限的 while 循环来在服务器端写入消息。然后客户端收不到消息(数据事件)。相反,如果我使用 for 循环并在调用 end 之后调用 end,则在调用 end 时客户端会收到所有消息(数据事件)。
我的 .proto 文件:
syntax = "proto3";
message ControlMessage {
enum Control {
Undefined = 0;
Start = 1;
Stop = 2;
}
Control control = 1;
}
message ImageMessage {
enum ImageType {
Raw = 0;
Mono8 = 1;
RGB8 = 2;
}
ImageType type = 1;
int32 width = 2;
int32 height = 3;
bytes image = 4;
}
service StartImageTransmission {
rpc Start(ControlMessage) returns (stream ImageMessage);
}
在服务器端,我实现了启动功能并尝试不断地向调用写入消息:
function doStart(call) {
var imgMsg = {type: "Mono8", width: 600, height: 600, image: new ArrayBuffer(600*600)};
//for(var i = 0; i < 10; i++) {
while(true) {
call.write(imgMsg);
console.log("Message sent");
}
call.end();
}
我在服务器中将函数注册为服务:
var server = new grpc.Server();
server.addService(protoDescriptor.StartImageTransmission.service, {Start: doStart});
在客户端,我生成适当的调用并注册数据和结束事件:
var call = client.Start({control: 0});
call.on('data', (imgMessage) => {
console.log('received image message');
});
call.read();
call.on('end', () => {console.log('end');});
我也试过在python写服务器端。在这种情况下,节点客户端会立即接收消息,而不仅仅是在服务器端结束流之后。所以我想这对于用节点 API.
编写的服务器也应该是可能的
看来问题是无休止的 while 循环阻塞了节点中的所有后台任务。一种可能的解决方案是使用 setTimeout
来创建循环。以下代码对我有用:
首先在 gRPC 调用中将 call
对象存储在数组中:
function doStart(call) {
calls.push(call);
}
我使用 setTimeout 发送给所有客户端:
function sendToAllClients() {
calls.forEach((call) => {
call.write(imgMsg);
});
setTimeout(sendToAllClients, 10);
}
setTimeout(sendToAllClients, 10);
有用的 Whosebug 文章:
我能够使用来自 Node.js 的 Writable
.
的 uncork
这是一个例子。伪代码,但来自一个有效的实现:
import * as grpc from '@grpc/grpc-js';
import * as proto from './src/proto/generated/organizations'; // via protoc w/ ts-proto
const OrganizationsGrpcServer: proto.OrganizationsServer = {
async getMany(call: ServerWritableStream<proto.Empty, proto.OrganizationCollection>) {
call.write(proto.OrganizationCollection.fromJSON({ value: [{}] }));
call.uncork();
// do some blocking stuff
call.write(proto.OrganizationCollection.fromJSON({ value: [{}] }));
call.uncork();
// call.end(), or client.close() below, at some point?
},
ping(call, callback) {
callback(null);
}
};
const client = new proto.OrganizationsClient('127.0.0.1:5000', grpc.credentials.createInsecure());
const stream = client.getMany(null);
stream.on('data', data => {
// this cb should run twice
});
export default OrganizationsGrpcServer;
//.proto
service Organizations {
rpc GetMany (google.protobuf.Empty) returns (stream OrganizationCollection) {}
}
message OrganizationCollection {
repeated Organization value = 1;
}
版本:
@grpc/grpc-js
1.4.4
@grpc/proto-loader
0.6.7
ts-proto
1.92.1
npm
8.1.4
node
17
我目前正在尝试使用 gRPC Node.js API 设置服务器流。为此,当我在服务器端写入客户端立即接收数据事件的流时,我想实现这一点。
目前,如果我只在服务器端调用 write,我不会在客户端收到任何东西。但是,只要我在服务器上调用结束函数,客户端就会收到所有数据事件。
为了测试这一点,我使用了一个无限的 while 循环来在服务器端写入消息。然后客户端收不到消息(数据事件)。相反,如果我使用 for 循环并在调用 end 之后调用 end,则在调用 end 时客户端会收到所有消息(数据事件)。
我的 .proto 文件:
syntax = "proto3";
message ControlMessage {
enum Control {
Undefined = 0;
Start = 1;
Stop = 2;
}
Control control = 1;
}
message ImageMessage {
enum ImageType {
Raw = 0;
Mono8 = 1;
RGB8 = 2;
}
ImageType type = 1;
int32 width = 2;
int32 height = 3;
bytes image = 4;
}
service StartImageTransmission {
rpc Start(ControlMessage) returns (stream ImageMessage);
}
在服务器端,我实现了启动功能并尝试不断地向调用写入消息:
function doStart(call) {
var imgMsg = {type: "Mono8", width: 600, height: 600, image: new ArrayBuffer(600*600)};
//for(var i = 0; i < 10; i++) {
while(true) {
call.write(imgMsg);
console.log("Message sent");
}
call.end();
}
我在服务器中将函数注册为服务:
var server = new grpc.Server();
server.addService(protoDescriptor.StartImageTransmission.service, {Start: doStart});
在客户端,我生成适当的调用并注册数据和结束事件:
var call = client.Start({control: 0});
call.on('data', (imgMessage) => {
console.log('received image message');
});
call.read();
call.on('end', () => {console.log('end');});
我也试过在python写服务器端。在这种情况下,节点客户端会立即接收消息,而不仅仅是在服务器端结束流之后。所以我想这对于用节点 API.
编写的服务器也应该是可能的看来问题是无休止的 while 循环阻塞了节点中的所有后台任务。一种可能的解决方案是使用 setTimeout
来创建循环。以下代码对我有用:
首先在 gRPC 调用中将 call
对象存储在数组中:
function doStart(call) {
calls.push(call);
}
我使用 setTimeout 发送给所有客户端:
function sendToAllClients() {
calls.forEach((call) => {
call.write(imgMsg);
});
setTimeout(sendToAllClients, 10);
}
setTimeout(sendToAllClients, 10);
有用的 Whosebug 文章:
我能够使用来自 Node.js 的 Writable
.
uncork
这是一个例子。伪代码,但来自一个有效的实现:
import * as grpc from '@grpc/grpc-js';
import * as proto from './src/proto/generated/organizations'; // via protoc w/ ts-proto
const OrganizationsGrpcServer: proto.OrganizationsServer = {
async getMany(call: ServerWritableStream<proto.Empty, proto.OrganizationCollection>) {
call.write(proto.OrganizationCollection.fromJSON({ value: [{}] }));
call.uncork();
// do some blocking stuff
call.write(proto.OrganizationCollection.fromJSON({ value: [{}] }));
call.uncork();
// call.end(), or client.close() below, at some point?
},
ping(call, callback) {
callback(null);
}
};
const client = new proto.OrganizationsClient('127.0.0.1:5000', grpc.credentials.createInsecure());
const stream = client.getMany(null);
stream.on('data', data => {
// this cb should run twice
});
export default OrganizationsGrpcServer;
//.proto
service Organizations {
rpc GetMany (google.protobuf.Empty) returns (stream OrganizationCollection) {}
}
message OrganizationCollection {
repeated Organization value = 1;
}
版本:
@grpc/grpc-js
1.4.4
@grpc/proto-loader
0.6.7
ts-proto
1.92.1
npm
8.1.4
node
17