使用 datastax cpp 驱动程序改进 cassandra 数据库中的插入时间
improve insertion time in cassandra database with datastax cpp driver
我有一个简单的 table,我正在使用 datastax cpp 驱动程序填充它。 table 存储在 cassandra 数据库中。 table 是这样创建的:
create table data (
dt_id int PRIMARY KEY,
dt_numbers list<int>,
insertion_time timestamp,
)
因此,对于我要插入的每一行
VALUES (1, [1,2,3,4], now())
问题是我尝试插入100万行σ,时间是35分钟。那个时间在生产环境中被认为是高的。
一些伪代码:
std::map<int, vector<int>> myData;
for( all entries in myData) {
const char* query = "INSERT INTO...";
future = cass_session_execute(session, statement);
cass_future_wait(future);
}
数据存储在一个std::map中,我不关心先写入哪个地图索引,但每个地图条目都应该恰好插入一个。有什么方法可以提高这个程序的性能吗?有例子吗?
我目前使用的代码是一行接一行地写,与其中一个 datastax 示例中使用的代码类似
CassError insert_into_basic(CassSession* session, const char* key, const Basic* basic) {
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture* future = NULL;
const char* query = "INSERT INTO examples.basic (key, bln, flt, dbl, i32, i64) VALUES (?, ?, ?, ?, ?, ?);";
statement = cass_statement_new(query, 6);
cass_statement_bind_string(statement, 0, key);
cass_statement_bind_bool(statement, 1, basic->bln);
cass_statement_bind_float(statement, 2, basic->flt);
cass_statement_bind_double(statement, 3, basic->dbl);
cass_statement_bind_int32(statement, 4, basic->i32);
cass_statement_bind_int64(statement, 5, basic->i64);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
cass_statement_free(statement);
return rc;
}
您的性能越来越慢,因为您一次只写一行并等待真正写入该行。如果仔细查看文档,您会发现要从 C* 获得最佳性能,您确实需要执行异步写入。在伪代码中它会是这样的:
std::map<int, vector<int>> myData;
std::list<CassFuture> futures;
for( all entries in myData) {
const char* query = "INSERT INTO...";
futures.push_back(cass_session_execute(session, statement));
if (futures.size() > 5000) {
for (all entries in futures) {
cass_future_wait(future);
cass_future_free(future);
}
futures.clear();
}
}
// Wait for the "trailing" futures...
for (all entries in futures) {
cass_future_wait(future);
cass_future_free(future);
}
这样你的性能应该会得到提升。您应该需要调整 5000
以匹配您的 requirements/hardware 规格。当然,还要实施一些策略来处理写入失败。
我有一个简单的 table,我正在使用 datastax cpp 驱动程序填充它。 table 存储在 cassandra 数据库中。 table 是这样创建的:
create table data (
dt_id int PRIMARY KEY,
dt_numbers list<int>,
insertion_time timestamp,
)
因此,对于我要插入的每一行
VALUES (1, [1,2,3,4], now())
问题是我尝试插入100万行σ,时间是35分钟。那个时间在生产环境中被认为是高的。
一些伪代码:
std::map<int, vector<int>> myData;
for( all entries in myData) {
const char* query = "INSERT INTO...";
future = cass_session_execute(session, statement);
cass_future_wait(future);
}
数据存储在一个std::map中,我不关心先写入哪个地图索引,但每个地图条目都应该恰好插入一个。有什么方法可以提高这个程序的性能吗?有例子吗?
我目前使用的代码是一行接一行地写,与其中一个 datastax 示例中使用的代码类似
CassError insert_into_basic(CassSession* session, const char* key, const Basic* basic) {
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture* future = NULL;
const char* query = "INSERT INTO examples.basic (key, bln, flt, dbl, i32, i64) VALUES (?, ?, ?, ?, ?, ?);";
statement = cass_statement_new(query, 6);
cass_statement_bind_string(statement, 0, key);
cass_statement_bind_bool(statement, 1, basic->bln);
cass_statement_bind_float(statement, 2, basic->flt);
cass_statement_bind_double(statement, 3, basic->dbl);
cass_statement_bind_int32(statement, 4, basic->i32);
cass_statement_bind_int64(statement, 5, basic->i64);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
cass_statement_free(statement);
return rc;
}
您的性能越来越慢,因为您一次只写一行并等待真正写入该行。如果仔细查看文档,您会发现要从 C* 获得最佳性能,您确实需要执行异步写入。在伪代码中它会是这样的:
std::map<int, vector<int>> myData;
std::list<CassFuture> futures;
for( all entries in myData) {
const char* query = "INSERT INTO...";
futures.push_back(cass_session_execute(session, statement));
if (futures.size() > 5000) {
for (all entries in futures) {
cass_future_wait(future);
cass_future_free(future);
}
futures.clear();
}
}
// Wait for the "trailing" futures...
for (all entries in futures) {
cass_future_wait(future);
cass_future_free(future);
}
这样你的性能应该会得到提升。您应该需要调整 5000
以匹配您的 requirements/hardware 规格。当然,还要实施一些策略来处理写入失败。