Hyperledger Fabric Nodejs SDK Eventhub 已关闭
Hyperledger Fabric Nodejs SDK Eventhub has been shutdown
我已经在 IBM Blockchain 平台上部署了示例 fabcar 网络,并且正在使用此 repository 中的代码与其进行交互。当我单独调用每个函数时,交互效果很好。
当我开发节点服务器并为调用链代码公开 API 时,问题就来了。这样做后,我开始收到错误
Failed to invoke successfully :: Error: There was a problem with the eventhub ::Error: EventHub has been shutdown
奇怪的是,当我同时向 POST 调用发送 3 个请求时,出现了这个错误。但是记录被插入到 IBM 云中。
当我同时发送 100 个请求时,只有大约 20 个被插入到云中,其余的显示此错误和类似这样的附加错误
error: [Orderer.js]: sendBroadcast - on error: "Error: 14 UNAVAILABLE: Connect Failed\n at createStatusError (/home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:64:15)\n at ClientDuplexStream._emitStatusIfDone (/home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:270:19)\n at ClientDuplexStream._readsDone (/home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:236:8)\n at readCallback (/home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:296:12)"
在某些情况下
error: [client-utils.js]: sendPeersProposal - Promise is rejected: Error: 14 UNAVAILABLE: Connect Failed
at new createStatusError (/home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:64:15)
at /home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:583:15
我不明白为什么会出现这些错误。我必须开发一个可以处理负载的高性能应用程序,非常感谢任何帮助和指导。
这是我的 app.js 代码:
'use strict';
//get libraries
const express = require('express');
const bodyParser = require('body-parser');
const request = require('request');
const path = require('path');
//create express web-app
const app = express();
const router = express.Router();
app.use(bodyParser.json());
const invoke = require('./invokeNetwork.js');
//declare port
var port = process.env.PORT || 8000;
if (process.env.VCAP_APPLICATION) {
port = process.env.PORT;
}
//run app on port
app.listen(port, function() {
console.log('app running on port: %d', port);
});
app.post('/api/post', async function(req,res){
var carID = req.body.carID;
var make = req.body.make;
var model = req.body.model;
var color = req.body.color;
var owner = req.body.owner;
await invoke.invokeCreate(carID, make, model, color, owner).then((response) => {
//return error if error in response
if (response.status == 500) {
res.status(500).send({ error: response.message });
} else {
//else return success
res.status(200).send({ message: response.message });
}
}).catch(err => {
console.log(err);
});
});
这是 invokeNetwork.js 代码
'use strict';
var Fabric_Client = require('fabric-client');
var path = require('path');
var util = require('util');
var os = require('os');
var fs = require('fs');
var isError = false;
var throwError = "";
//make sure we have the profiles we need
var networkConfig = path.join(__dirname, './config/network-profile.json')
var clientConfig = path.join(__dirname, './config/client-profile.json');
module.exports = {
invokeCreate: async function(carID, make, model, color, owner) {
isError = false;
//try {
checkProfilesExist(networkConfig, clientConfig); //terminates early if they are not found
// load the base network profile
var fabric_client = Fabric_Client.loadFromConfig(path.join(__dirname, './config/network-profile.json'));
// overlay the client profile over the network profile
fabric_client.loadFromConfig(path.join(__dirname, './config/client-profile.json'));
// setup the fabric network - get the channel that was loaded from the network profile
var channel = fabric_client.getChannel('defaultchannel');
var tx_id = null;
//load the user who is going to unteract with the network
fabric_client.initCredentialStores().then(() => {
// get the enrolled user from persistence, this user will sign all requests
return fabric_client.getUserContext('user1', true);
}).then((user_from_store) => {
if (user_from_store && user_from_store.isEnrolled()) {
console.log('Successfully loaded user1 from persistence');
} else {
throw new Error('Failed to get user1.... run registerUserNetwork.js');
}
// get a transaction id object based on the current user assigned to fabric client
tx_id = fabric_client.newTransactionID();
console.log("Assigning transaction_id: ", tx_id._transaction_id);
// createCar chaincode function - requires 5 args, ex: args: ['CAR11', 'Honda', 'Accord', 'Black', 'Dave'],
// changeCarOwner chaincode function - requires 2 args , ex: args: ['CAR11', 'MGK'],
var request = {
chaincodeId: 'fabcar',
fcn: 'createCar',
args: [carID, make, model, color, owner],
txId: tx_id
};
// send the transaction proposal to the endorsing peers
return channel.sendTransactionProposal(request);
}).then((results) => {
var proposalResponses = results[0];
var proposal = results[1];
let isProposalGood = false;
if (proposalResponses && proposalResponses[0].response &&
proposalResponses[0].response.status === 200) {
isProposalGood = true;
console.log('Transaction proposal was good');
} else {
console.error('Transaction proposal was bad');
}
if (isProposalGood) {
console.log(util.format(
'Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s"',
proposalResponses[0].response.status, proposalResponses[0].response.message));
// build up the request for the orderer to have the transaction committed
var request = {
proposalResponses: proposalResponses,
proposal: proposal
};
// set the transaction listener and set a timeout of 30 sec
// if the transaction did not get committed within the timeout period,
// report a TIMEOUT status
var transaction_id_string = tx_id.getTransactionID(); //Get the transaction ID string to be used by the event processing
var promises = [];
var sendPromise = channel.sendTransaction(request);
promises.push(sendPromise); //we want the send transaction first, so that we know where to check status
// get an eventhub once the fabric client has a user assigned. The user
// is required bacause the event registration must be signed
console.error('Getting event hub');
let event_hub = fabric_client.getEventHub('org1-peer1');
// using resolve the promise so that result status may be processed
// under the then clause rather than having the catch clause process
// the status
let txPromise = new Promise((resolve, reject) => {
let handle = setTimeout(() => {
event_hub.disconnect();
resolve({ event_status: 'TIMEOUT' }); //we could use reject(new Error('Trnasaction did not complete within 30 seconds'));
}, 3000);
event_hub.connect();
event_hub.registerTxEvent(transaction_id_string, (tx, code) => {
// this is the callback for transaction event status
// first some clean up of event listener
clearTimeout(handle);
event_hub.unregisterTxEvent(transaction_id_string);
event_hub.disconnect();
// now let the application know what happened
var return_status = { event_status: code, tx_id: transaction_id_string };
if (code !== 'VALID') {
console.error('The transaction was invalid, code = ' + code);
resolve(return_status); // we could use reject(new Error('Problem with the tranaction, event status ::'+code));
} else {
console.log('The transaction has been committed on peer ' + event_hub._ep._endpoint.addr);
resolve(return_status);
}
}, (err) => {
//this is the callback if something goes wrong with the event registration or processing
reject(new Error('There was a problem with the eventhub ::' + err));
throwError += err;
});
});
promises.push(txPromise);
return Promise.all(promises);
} else {
console.error('Failed to send Proposal or receive valid response. Response null or status is not 200. exiting...');
throw new Error('Failed to send Proposal or receive valid response. Response null or status is not 200. exiting...');
}
}).then((results) => {
console.log('Send transaction promise and event listener promise have completed');
// check the results in the order the promises were added to the promise all list
if (results && results[0] && results[0].status === 'SUCCESS') {
console.log('Successfully sent transaction to the orderer.');
} else {
console.error('Failed to order the transaction. Error code: ' + response.status);
throw new exception("Transaction");
}
if (results && results[1] && results[1].event_status === 'VALID') {
console.log('Successfully committed the change to the ledger by the peer');
return true;
} else {
console.log('Transaction failed to be committed to the ledger due to ::' + results[1].event_status);
}
}).catch((err) => {
console.error('Failed to invoke successfully :: ' + err);
isError = true;
throwError += err;
});
console.log(isError);
return {
message: (isError) ? throwError : 'Success Transaction'
}
function checkProfilesExist(networkConfig, clientConfig) {
if (!fs.existsSync(networkConfig)) {
console.log("Error: config file 'network-profile.json' not found.");
console.log("Make sure 'network-profile.json' is copied into the './config' folder.");
process.exit()
}
//make sure we have the client profile we need
if (!fs.existsSync(clientConfig)) {
console.log("Error: config file 'client-profile.json' not found.");
console.log("Make sure 'client-profile.json' is copied into the './config' folder.");
process.exit()
}
}
}
}
//throw new exception("Exceoption thrown");
//return true;
// }
// catch(err) {
// //print and return error
// console.log(err);
// var error = {};
// error.error = err.message;
// return error;
// }
// }
// }
因此,我们一直在研究织物,并尝试测试一些性能。我遇到了与您处理过的所有相同问题,因此将解释我们的问题,希望它们对您有所帮助。
Eventhub 关闭
Error: There was a problem with the eventhub ::Error: EventHub has been shutdown
问题
注意到当我们发送大量交易时会出现这种情况,当我们使用不稳定的互联网连接时也会出现这种情况。 fabric docs state:
The events are ephemeral, such that if a registered listener crashed when the event is published, the listener will miss the event
我认为问题出在 eventhub 试图连接到我们的 API,但在连接中出现了问题。然后这会导致 eventhub 超时(因为它没有收到响应),然后导致 eventhub 关闭。
解决方案
织物文档再次推荐:
- 在块级别收听,因此即使您错过了一个块,您也可以获取旧块(更多信息请参见上面的结构文档 link)
- 建立你自己的队列
查看此 pastebin 以获得基本的块侦听器 -> https://pastebin.com/wCd6Ni46
错误:14 不可用
问题
我们注意到,当我们开始发送大量交易时,我们也会以错误结束:14 UNAVAILABLE,但这是在大量交易真正淹没网络之后。
我认为我们可以批量发送 20 个,但是当我们快速连续发送 50 个或更多时,它无法处理。
记住块,至少在 IBP 启动器添加上,每个块限制为 10 个事务,因此它需要等待它被挖掘才能处理下一组事务。在这个过程中,我相信缓冲区会变满,然后你不能发送太多。
所以你的两个错误是订购者被淹没或对等点被太多交易淹没。
解决方案
将交易分批放入区块中,等到这些交易被确认后再发送更多。这可以让您的同行和订购者 space 喘口气。
希望这对您有所帮助!
正如 Priyav Shah 所解释的那样,一个完整的修复需要您修改 API 来解决这个问题。
如果您只是在寻找一种短期解决方案来重新启动对等节点上的 eventhub 侦听器,一个简单的解决方法是重新启动对等节点并使其重新加入通道并在其上重新安装链代码。
我已经在 IBM Blockchain 平台上部署了示例 fabcar 网络,并且正在使用此 repository 中的代码与其进行交互。当我单独调用每个函数时,交互效果很好。
当我开发节点服务器并为调用链代码公开 API 时,问题就来了。这样做后,我开始收到错误
Failed to invoke successfully :: Error: There was a problem with the eventhub ::Error: EventHub has been shutdown
奇怪的是,当我同时向 POST 调用发送 3 个请求时,出现了这个错误。但是记录被插入到 IBM 云中。
当我同时发送 100 个请求时,只有大约 20 个被插入到云中,其余的显示此错误和类似这样的附加错误
error: [Orderer.js]: sendBroadcast - on error: "Error: 14 UNAVAILABLE: Connect Failed\n at createStatusError (/home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:64:15)\n at ClientDuplexStream._emitStatusIfDone (/home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:270:19)\n at ClientDuplexStream._readsDone (/home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:236:8)\n at readCallback (/home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:296:12)"
在某些情况下
error: [client-utils.js]: sendPeersProposal - Promise is rejected: Error: 14 UNAVAILABLE: Connect Failed at new createStatusError (/home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:64:15) at /home/trs-laptop-20/Downloads/fabcar-network/node_modules/fabric-client/node_modules/grpc/src/client.js:583:15
我不明白为什么会出现这些错误。我必须开发一个可以处理负载的高性能应用程序,非常感谢任何帮助和指导。
这是我的 app.js 代码:
'use strict';
//get libraries
const express = require('express');
const bodyParser = require('body-parser');
const request = require('request');
const path = require('path');
//create express web-app
const app = express();
const router = express.Router();
app.use(bodyParser.json());
const invoke = require('./invokeNetwork.js');
//declare port
var port = process.env.PORT || 8000;
if (process.env.VCAP_APPLICATION) {
port = process.env.PORT;
}
//run app on port
app.listen(port, function() {
console.log('app running on port: %d', port);
});
app.post('/api/post', async function(req,res){
var carID = req.body.carID;
var make = req.body.make;
var model = req.body.model;
var color = req.body.color;
var owner = req.body.owner;
await invoke.invokeCreate(carID, make, model, color, owner).then((response) => {
//return error if error in response
if (response.status == 500) {
res.status(500).send({ error: response.message });
} else {
//else return success
res.status(200).send({ message: response.message });
}
}).catch(err => {
console.log(err);
});
});
这是 invokeNetwork.js 代码
'use strict';
var Fabric_Client = require('fabric-client');
var path = require('path');
var util = require('util');
var os = require('os');
var fs = require('fs');
var isError = false;
var throwError = "";
//make sure we have the profiles we need
var networkConfig = path.join(__dirname, './config/network-profile.json')
var clientConfig = path.join(__dirname, './config/client-profile.json');
module.exports = {
invokeCreate: async function(carID, make, model, color, owner) {
isError = false;
//try {
checkProfilesExist(networkConfig, clientConfig); //terminates early if they are not found
// load the base network profile
var fabric_client = Fabric_Client.loadFromConfig(path.join(__dirname, './config/network-profile.json'));
// overlay the client profile over the network profile
fabric_client.loadFromConfig(path.join(__dirname, './config/client-profile.json'));
// setup the fabric network - get the channel that was loaded from the network profile
var channel = fabric_client.getChannel('defaultchannel');
var tx_id = null;
//load the user who is going to unteract with the network
fabric_client.initCredentialStores().then(() => {
// get the enrolled user from persistence, this user will sign all requests
return fabric_client.getUserContext('user1', true);
}).then((user_from_store) => {
if (user_from_store && user_from_store.isEnrolled()) {
console.log('Successfully loaded user1 from persistence');
} else {
throw new Error('Failed to get user1.... run registerUserNetwork.js');
}
// get a transaction id object based on the current user assigned to fabric client
tx_id = fabric_client.newTransactionID();
console.log("Assigning transaction_id: ", tx_id._transaction_id);
// createCar chaincode function - requires 5 args, ex: args: ['CAR11', 'Honda', 'Accord', 'Black', 'Dave'],
// changeCarOwner chaincode function - requires 2 args , ex: args: ['CAR11', 'MGK'],
var request = {
chaincodeId: 'fabcar',
fcn: 'createCar',
args: [carID, make, model, color, owner],
txId: tx_id
};
// send the transaction proposal to the endorsing peers
return channel.sendTransactionProposal(request);
}).then((results) => {
var proposalResponses = results[0];
var proposal = results[1];
let isProposalGood = false;
if (proposalResponses && proposalResponses[0].response &&
proposalResponses[0].response.status === 200) {
isProposalGood = true;
console.log('Transaction proposal was good');
} else {
console.error('Transaction proposal was bad');
}
if (isProposalGood) {
console.log(util.format(
'Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s"',
proposalResponses[0].response.status, proposalResponses[0].response.message));
// build up the request for the orderer to have the transaction committed
var request = {
proposalResponses: proposalResponses,
proposal: proposal
};
// set the transaction listener and set a timeout of 30 sec
// if the transaction did not get committed within the timeout period,
// report a TIMEOUT status
var transaction_id_string = tx_id.getTransactionID(); //Get the transaction ID string to be used by the event processing
var promises = [];
var sendPromise = channel.sendTransaction(request);
promises.push(sendPromise); //we want the send transaction first, so that we know where to check status
// get an eventhub once the fabric client has a user assigned. The user
// is required bacause the event registration must be signed
console.error('Getting event hub');
let event_hub = fabric_client.getEventHub('org1-peer1');
// using resolve the promise so that result status may be processed
// under the then clause rather than having the catch clause process
// the status
let txPromise = new Promise((resolve, reject) => {
let handle = setTimeout(() => {
event_hub.disconnect();
resolve({ event_status: 'TIMEOUT' }); //we could use reject(new Error('Trnasaction did not complete within 30 seconds'));
}, 3000);
event_hub.connect();
event_hub.registerTxEvent(transaction_id_string, (tx, code) => {
// this is the callback for transaction event status
// first some clean up of event listener
clearTimeout(handle);
event_hub.unregisterTxEvent(transaction_id_string);
event_hub.disconnect();
// now let the application know what happened
var return_status = { event_status: code, tx_id: transaction_id_string };
if (code !== 'VALID') {
console.error('The transaction was invalid, code = ' + code);
resolve(return_status); // we could use reject(new Error('Problem with the tranaction, event status ::'+code));
} else {
console.log('The transaction has been committed on peer ' + event_hub._ep._endpoint.addr);
resolve(return_status);
}
}, (err) => {
//this is the callback if something goes wrong with the event registration or processing
reject(new Error('There was a problem with the eventhub ::' + err));
throwError += err;
});
});
promises.push(txPromise);
return Promise.all(promises);
} else {
console.error('Failed to send Proposal or receive valid response. Response null or status is not 200. exiting...');
throw new Error('Failed to send Proposal or receive valid response. Response null or status is not 200. exiting...');
}
}).then((results) => {
console.log('Send transaction promise and event listener promise have completed');
// check the results in the order the promises were added to the promise all list
if (results && results[0] && results[0].status === 'SUCCESS') {
console.log('Successfully sent transaction to the orderer.');
} else {
console.error('Failed to order the transaction. Error code: ' + response.status);
throw new exception("Transaction");
}
if (results && results[1] && results[1].event_status === 'VALID') {
console.log('Successfully committed the change to the ledger by the peer');
return true;
} else {
console.log('Transaction failed to be committed to the ledger due to ::' + results[1].event_status);
}
}).catch((err) => {
console.error('Failed to invoke successfully :: ' + err);
isError = true;
throwError += err;
});
console.log(isError);
return {
message: (isError) ? throwError : 'Success Transaction'
}
function checkProfilesExist(networkConfig, clientConfig) {
if (!fs.existsSync(networkConfig)) {
console.log("Error: config file 'network-profile.json' not found.");
console.log("Make sure 'network-profile.json' is copied into the './config' folder.");
process.exit()
}
//make sure we have the client profile we need
if (!fs.existsSync(clientConfig)) {
console.log("Error: config file 'client-profile.json' not found.");
console.log("Make sure 'client-profile.json' is copied into the './config' folder.");
process.exit()
}
}
}
}
//throw new exception("Exceoption thrown");
//return true;
// }
// catch(err) {
// //print and return error
// console.log(err);
// var error = {};
// error.error = err.message;
// return error;
// }
// }
// }
因此,我们一直在研究织物,并尝试测试一些性能。我遇到了与您处理过的所有相同问题,因此将解释我们的问题,希望它们对您有所帮助。
Eventhub 关闭
Error: There was a problem with the eventhub ::Error: EventHub has been shutdown
问题
注意到当我们发送大量交易时会出现这种情况,当我们使用不稳定的互联网连接时也会出现这种情况。 fabric docs state:
The events are ephemeral, such that if a registered listener crashed when the event is published, the listener will miss the event
我认为问题出在 eventhub 试图连接到我们的 API,但在连接中出现了问题。然后这会导致 eventhub 超时(因为它没有收到响应),然后导致 eventhub 关闭。
解决方案
织物文档再次推荐:
- 在块级别收听,因此即使您错过了一个块,您也可以获取旧块(更多信息请参见上面的结构文档 link)
- 建立你自己的队列
查看此 pastebin 以获得基本的块侦听器 -> https://pastebin.com/wCd6Ni46
错误:14 不可用
问题
我们注意到,当我们开始发送大量交易时,我们也会以错误结束:14 UNAVAILABLE,但这是在大量交易真正淹没网络之后。 我认为我们可以批量发送 20 个,但是当我们快速连续发送 50 个或更多时,它无法处理。
记住块,至少在 IBP 启动器添加上,每个块限制为 10 个事务,因此它需要等待它被挖掘才能处理下一组事务。在这个过程中,我相信缓冲区会变满,然后你不能发送太多。
所以你的两个错误是订购者被淹没或对等点被太多交易淹没。
解决方案
将交易分批放入区块中,等到这些交易被确认后再发送更多。这可以让您的同行和订购者 space 喘口气。
希望这对您有所帮助!
正如 Priyav Shah 所解释的那样,一个完整的修复需要您修改 API 来解决这个问题。
如果您只是在寻找一种短期解决方案来重新启动对等节点上的 eventhub 侦听器,一个简单的解决方法是重新启动对等节点并使其重新加入通道并在其上重新安装链代码。