如何使用 Google Spanner 为数据导出做多个并行阅读器?

How to do multiple parallel readers for data export using Google Spanner?

建议使用带有时间戳限制的查询来创建用于导出的快照。在 Timestamp Bounds 文档的底部,它指出:

Cloud Spanner continuously garbage collects deleted and overwritten data in the background to reclaim storage space. This process is known as version GC. By default, version GC reclaims versions after they are one hour old. Because of this, Cloud Spanner cannot perform reads at a read timestamp more than one hour in the past.

因此任何导出都需要在一个小时内完成。单个 reader(即 select * from table; 使用时间戳 X)将无法在一小时内导出整个 table。

如何在spanner中实现多个并行的reader?


注意:其中一条评论中提到即将支持 Apache Beam,但看起来它只使用了一个 reader:

/** A simplest read function implementation. Parallelism support is coming. */

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java#L26

有没有一种方法可以使用现有的 API 来实现 Beam 现在需要的并行 reader?或者 Beam 是否需要使用尚未在 google 扳手上发布的东西?

编辑 2018-03-30 - 示例项目已更新为使用 Google Cloud Spanner

提供的 BatchClient

在为 reading/downloading 大量数据发布 BatchClient 后,the example project below 已更新为使用新的批处理客户端而不是标准数据库客户端。该项目背后的基本思想仍然相同:使用标准 jdbc 功能复制数据 to/from Cloud Spanner 和任何其他数据库。以下代码片段将 jdbc 连接设置为批量读取模式:

if (source.isWrapperFor(ICloudSpannerConnection.class))
{
    ICloudSpannerConnection con = source.unwrap(ICloudSpannerConnection.class);
    // Make sure no transaction is running
    if (!con.isBatchReadOnly())
    {
        if (con.getAutoCommit())
        {
            con.setAutoCommit(false);
        }
        else
        {
            con.commit();
        }
        con.setBatchReadOnly(true);
    }
}

当连接处于'batch read only mode'时,连接将使用Google Cloud Spanner 的BatchClient 而不是标准的数据库客户端。当调用 Statement#execute(String)PreparedStatement#execute() 方法之一时(因为这些方法允许返回多个结果集),jdbc 驱动程序将创建分区查询而不是普通查询。此分区查询的结果将是许多结果集(每个分区一个),可以通过 Statement#getResultSet() 和 Statement#getMoreResults(int) 方法获取。

Statement statement = source.createStatement();
boolean hasResults = statement.execute(select);
int workerNumber = 0;
while (hasResults)
{
    ResultSet rs = statement.getResultSet();
    PartitionWorker worker = new PartitionWorker("PartionWorker-" + workerNumber, config, rs, tableSpec, table, insertCols);
    workers.add(worker);
    hasResults = statement.getMoreResults(Statement.KEEP_CURRENT_RESULT);
    workerNumber++;
}

Statement#execute(String)返回的结果集不会直接执行,而是在第一次调用ResultSet#next()之后才执行。将这些结果集传递给单独的工作线程可确保并行下载和复制数据。


原回答:

This project 最初是为另一个方向(从本地数据库到 Cloud Spanner)的转换而创建的,但由于它对源和目标都使用 JDBC,因此它也可以用于另一个方向解决方法:将 Cloud Spanner 数据库转换为本地 PostgreSQL 数据库。大型 table 使用线程池并行转换。

该项目使用 this open source JDBC driver 而不是 Google 提供的 JDBC 驱动程序。源 Cloud Spanner JDBC 连接设置为只读模式且 autocommit=false。这确保连接在您第一次执行查询时使用当前时间作为时间戳自动创建只读事务。同一(只读)事务中的所有后续查询都将使用相同的时间戳,从而为您提供 Google Cloud Spanner 数据库的一致快照。

它的工作原理如下:

  1. 将源数据库设置为只读事务模式。
  2. convert(String catalog, String schema) 方法遍历所有 tables 在源数据库 (Cloud Spanner)
  3. 对于每个 table 记录的数量是确定的,并且根据 table 的大小,使用应用程序的主线程或通过一个工作池。
  4. class UploadWorker 负责并行复制。每个工作人员都分配了 table 范围内的记录(例如第 1 行到 2,400 行)。该范围由 select 语句以这种格式 select 编辑:'SELECT * FROM $TABLE ORDER BY $PK_COLUMNS LIMIT $BATCH_SIZE OFFSET $CURRENT_OFFSET'
  5. 在转换完所有 table 之后,在源数据库上提交只读事务。

下面是最重要部分的代码片段。

public void convert(String catalog, String schema) throws SQLException
{
    int batchSize = config.getBatchSize();
    destination.setAutoCommit(false);
    // Set the source connection to transaction mode (no autocommit) and read-only
    source.setAutoCommit(false);
    source.setReadOnly(true);
    try (ResultSet tables = destination.getMetaData().getTables(catalog, schema, null, new String[] { "TABLE" }))
    {
        while (tables.next())
        {
            String tableSchema = tables.getString("TABLE_SCHEM");
            if (!config.getDestinationDatabaseType().isSystemSchema(tableSchema))
            {
                String table = tables.getString("TABLE_NAME");
                // Check whether the destination table is empty.
                int destinationRecordCount = getDestinationRecordCount(table);
                if (destinationRecordCount == 0 || config.getDataConvertMode() == ConvertMode.DropAndRecreate)
                {
                    if (destinationRecordCount > 0)
                    {
                        deleteAll(table);
                    }
                    int sourceRecordCount = getSourceRecordCount(getTableSpec(catalog, tableSchema, table));
                    if (sourceRecordCount > batchSize)
                    {
                        convertTableWithWorkers(catalog, tableSchema, table);
                    }
                    else
                    {
                        convertTable(catalog, tableSchema, table);
                    }
                }
                else
                {
                    if (config.getDataConvertMode() == ConvertMode.ThrowExceptionIfExists)
                        throw new IllegalStateException("Table " + table + " is not empty");
                    else if (config.getDataConvertMode() == ConvertMode.SkipExisting)
                        log.info("Skipping data copy for table " + table);
                }
            }
        }
    }
    source.commit();
}

private void convertTableWithWorkers(String catalog, String schema, String table) throws SQLException
{
    String tableSpec = getTableSpec(catalog, schema, table);
    Columns insertCols = getColumns(catalog, schema, table, false);
    Columns selectCols = getColumns(catalog, schema, table, true);
    if (insertCols.primaryKeyCols.isEmpty())
    {
        log.warning("Table " + tableSpec + " does not have a primary key. No data will be copied.");
        return;
    }
    log.info("About to copy data from table " + tableSpec);

    int batchSize = config.getBatchSize();
    int totalRecordCount = getSourceRecordCount(tableSpec);
    int numberOfWorkers = calculateNumberOfWorkers(totalRecordCount);
    int numberOfRecordsPerWorker = totalRecordCount / numberOfWorkers;
    if (totalRecordCount % numberOfWorkers > 0)
        numberOfRecordsPerWorker++;
    int currentOffset = 0;
    ExecutorService service = Executors.newFixedThreadPool(numberOfWorkers);
    for (int workerNumber = 0; workerNumber < numberOfWorkers; workerNumber++)
    {
        int workerRecordCount = Math.min(numberOfRecordsPerWorker, totalRecordCount - currentOffset);
        UploadWorker worker = new UploadWorker("UploadWorker-" + workerNumber, selectFormat, tableSpec, table,
                insertCols, selectCols, currentOffset, workerRecordCount, batchSize, source,
                config.getUrlDestination(), config.isUseJdbcBatching());
        service.submit(worker);
        currentOffset = currentOffset + numberOfRecordsPerWorker;
    }
    service.shutdown();
    try
    {
        service.awaitTermination(config.getUploadWorkerMaxWaitInMinutes(), TimeUnit.MINUTES);
    }
    catch (InterruptedException e)
    {
        log.severe("Error while waiting for workers to finish: " + e.getMessage());
        throw new RuntimeException(e);
    }

}

public class UploadWorker implements Runnable
{
private static final Logger log = Logger.getLogger(UploadWorker.class.getName());

private final String name;

private String selectFormat;

private String sourceTable;

private String destinationTable;

private Columns insertCols;

private Columns selectCols;

private int beginOffset;

private int numberOfRecordsToCopy;

private int batchSize;

private Connection source;

private String urlDestination;

private boolean useJdbcBatching;

UploadWorker(String name, String selectFormat, String sourceTable, String destinationTable, Columns insertCols,
        Columns selectCols, int beginOffset, int numberOfRecordsToCopy, int batchSize, Connection source,
        String urlDestination, boolean useJdbcBatching)
{
    this.name = name;
    this.selectFormat = selectFormat;
    this.sourceTable = sourceTable;
    this.destinationTable = destinationTable;
    this.insertCols = insertCols;
    this.selectCols = selectCols;
    this.beginOffset = beginOffset;
    this.numberOfRecordsToCopy = numberOfRecordsToCopy;
    this.batchSize = batchSize;
    this.source = source;
    this.urlDestination = urlDestination;
    this.useJdbcBatching = useJdbcBatching;
}

@Override
public void run()
{
    // Connection source = DriverManager.getConnection(urlSource);
    try (Connection destination = DriverManager.getConnection(urlDestination))
    {
        log.info(name + ": " + sourceTable + ": Starting copying " + numberOfRecordsToCopy + " records");

        destination.setAutoCommit(false);
        String sql = "INSERT INTO " + destinationTable + " (" + insertCols.getColumnNames() + ") VALUES \n";
        sql = sql + "(" + insertCols.getColumnParameters() + ")";
        PreparedStatement statement = destination.prepareStatement(sql);

        int lastRecord = beginOffset + numberOfRecordsToCopy;
        int recordCount = 0;
        int currentOffset = beginOffset;
        while (true)
        {
            int limit = Math.min(batchSize, lastRecord - currentOffset);
            String select = selectFormat.replace("$COLUMNS", selectCols.getColumnNames());
            select = select.replace("$TABLE", sourceTable);
            select = select.replace("$PRIMARY_KEY", selectCols.getPrimaryKeyColumns());
            select = select.replace("$BATCH_SIZE", String.valueOf(limit));
            select = select.replace("$OFFSET", String.valueOf(currentOffset));
            try (ResultSet rs = source.createStatement().executeQuery(select))
            {
                while (rs.next())
                {
                    int index = 1;
                    for (Integer type : insertCols.columnTypes)
                    {
                        Object object = rs.getObject(index);
                        statement.setObject(index, object, type);
                        index++;
                    }
                    if (useJdbcBatching)
                        statement.addBatch();
                    else
                        statement.executeUpdate();
                    recordCount++;
                }
                if (useJdbcBatching)
                    statement.executeBatch();
            }
            destination.commit();
            log.info(name + ": " + sourceTable + ": Records copied so far: " + recordCount + " of "
                    + numberOfRecordsToCopy);
            currentOffset = currentOffset + batchSize;
            if (recordCount >= numberOfRecordsToCopy)
                break;
        }
    }
    catch (SQLException e)
    {
        log.severe("Error during data copy: " + e.getMessage());
        throw new RuntimeException(e);
    }
    log.info(name + ": Finished copying");
}

}

可以使用 BatchClient class 从 Cloud Spanner 并行读取数据。关注 read_data_in_parallel 获取更多信息。

如果您希望从 Cloud Spanner 导出数据,我建议您使用 Cloud Dataflow(请参阅集成详细信息 here),因为它提供更高级别的抽象并处理数据处理细节,例如缩放和故障处理。