如何将数据从 SQL 服务器迁移到 Cassandra

How to migrate data from SQL Server to Cassandra

我有大量数据要从 SQL 服务器传输到 Cassandra。我已经开发了一些代码来使从 SQL 服务器接收的数据与 cassandra 兼容,但我无法插入它。由于它有很多数据,一段时间后我收到此错误:

我正在使用 nodejs 执行此操作。这是我的代码:

request.on('row', async function (columns) {
    columns.forEach(async function (column) {
        if (column.metadata.colName.toLowerCase() == sqlTimestampCol.toLowerCase()) {
            data.timestamp = column.value;
            data.partitionName = partition_utils.getPartitionNameByDate(column.value);
        } else {
            dataMap[column.metadata.colName] = column.value;
        }
        if (column.value === null) {
            console.log('NULL');
        } else {
            result += column.value + " ";
        }
    });
    data.dataMap = dataMap;

    // INSERT CASSANDRA
    var insertString =
        `INSERT INTO ${cassandraKeyspace}.${cassandraTableName}(${cassandraDateColumn}, month_year_index, dataMap) VALUES (?,?,?)`;

    await cassandraConnection.client.execute(insertString, insertParams, { prepare: true })
        .then(async function () {
            console.log(`Inserted data for ${cassandraTableName} successfully`);
        })
        .catch(async function (error) {
            cassandraConnection.client.shutdown();
            logger.error(error);
        });

    result = "";
    data = {}
});

request.on('doneInProc', function (rowCount, more) {
    console.log(rowCount + ' rows returned');
    cassandraConnection.client.shutdown();
});
connection.execSql(request);

我正在使用 npm tedious 和 datastax cassandra 驱动程序。我为 table 的每一行做了一个简单的 select * from table 和 运行 上面的代码(在 request.on('row') 中),但它对我不起作用。有没有正确的方法来做到这一点?

据我了解,我收到的错误是因为我插入数据的速度比 cassandra 处理速度快。另外,只有当 SQL 查询结束时,新数据才真正插入到 cassandra 中。

您的代码存在问题,您有一个客户端在同一个连接上发送数千个查询。与其说 Cassandra 无法处理请求,还不如说你的客户端和集群之间的单一连接是一个瓶颈。

在您的设置中,每个连接只能处理 2048 个并发请求。达到此限制后,您的应用将无法再发送任何请求。

为了获得最大吞吐量,您需要水平扩展,以便您有多个应用程序实例都在发送查询。例如,如果您有 6 个应用程序实例,那么您可以向您的 Cassandra 集群发送 6 x 2048 个并发请求。

附带说明一下,如果这是将您的数据一次性迁移到 Cassandra,那么我建议将您的数据导出为 CSV 格式并使用 DataStax Bulk Loader (DSBulk). It's a free utility that fully open-source that allows you load/unload data to/from an Apache Cassandra cluster, DSE cluster or Astra 数据库。

有关 DSBulk 的更多信息,请参阅以下内容:

限制运行中请求的数量需要手动流背压处理。

Node.js 驱动程序为此提供实用方法:

https://docs.datastax.com/en/developer/nodejs-driver/4.6/features/concurrent-api/#using-a-fixed-query-and-a-readable-stream.

例如:

const stream = myRowStream.pipe(transformRowToArrayOfParameters);
const result = await executeConcurrent(client, insertString, stream);