使用 datastax cpp 驱动程序改进 cassandra 数据库中的插入时间

improve insertion time in cassandra database with datastax cpp driver

我有一个简单的 table,我正在使用 datastax cpp 驱动程序填充它。 table 存储在 cassandra 数据库中。 table 是这样创建的:

create table data (
    dt_id int PRIMARY KEY,
    dt_numbers list<int>,
    insertion_time timestamp,
)

因此,对于我要插入的每一行

VALUES (1, [1,2,3,4], now())

问题是我尝试插入100万行σ,时间是35分钟。那个时间在生产环境中被认为是高的。

一些伪代码:

std::map<int, vector<int>> myData;
for( all entries in myData) {
  const char* query = "INSERT INTO...";
  future = cass_session_execute(session, statement);
  cass_future_wait(future);
} 

数据存储在一个std::map中,我不关心先写入哪个地图索引,但每个地图条目都应该恰好插入一个。有什么方法可以提高这个程序的性能吗?有例子吗?

我目前使用的代码是一行接一行地写,与其中一个 datastax 示例中使用的代码类似

    CassError insert_into_basic(CassSession* session, const char* key, const Basic* basic) {
      CassError rc = CASS_OK;
      CassStatement* statement = NULL;
      CassFuture* future = NULL;
      const char* query = "INSERT INTO examples.basic (key, bln, flt, dbl, i32, i64) VALUES (?, ?, ?, ?, ?, ?);";

      statement = cass_statement_new(query, 6);

      cass_statement_bind_string(statement, 0, key);
      cass_statement_bind_bool(statement, 1, basic->bln);
      cass_statement_bind_float(statement, 2, basic->flt);
      cass_statement_bind_double(statement, 3, basic->dbl);
      cass_statement_bind_int32(statement, 4, basic->i32);
      cass_statement_bind_int64(statement, 5, basic->i64);

      future = cass_session_execute(session, statement);
      cass_future_wait(future);

      rc = cass_future_error_code(future);
      if (rc != CASS_OK) {
        print_error(future);
      }

      cass_future_free(future);
      cass_statement_free(statement);

      return rc;
    }

您的性能越来越慢,因为您一次只写一行并等待真正写入该行。如果仔细查看文档,您会发现要从 C* 获得最佳性能,您确实需要执行异步写入。在伪代码中它会是这样的:

std::map<int, vector<int>> myData;
std::list<CassFuture> futures;
for( all entries in myData) {
  const char* query = "INSERT INTO...";
  futures.push_back(cass_session_execute(session, statement));
  if (futures.size() > 5000) {
    for (all entries in futures) {
       cass_future_wait(future);
       cass_future_free(future);
    }
    futures.clear();
  }
}
// Wait for the "trailing" futures... 
for (all entries in futures) {
   cass_future_wait(future);
   cass_future_free(future);
}

这样你的性能应该会得到提升。您应该需要调整 5000 以匹配您的 requirements/hardware 规格。当然,还要实施一些策略来处理写入失败。