下载文件 in-memory 并将其上传到 Google 驱动器

Download and upload file in-memory to Google Drive

目标

使用 Google 驱动 API 可恢复 URL.

将文件下载并上传到 Google 纯驱动 in-memory

挑战/问题

I want to buffer the file as its being downloaded to memory (not filesystem) and subsequently upload to Google Drive. Google Drive API requires chunks to be a minimum length of 256 * 1024, (262144 bytes).

The process should pass a chunk from the buffer to be uploaded. If the chunk errors, that buffer chunk is retried up to 3 times. If the chunk succeeds, that chunk from the buffer should be cleared, and the process should continue until complete.

背景工作/研究 (以下参考文献

我研究和测试过的大多数文章、示例和包都提供了对流、管道和分块的一些见解,但使用 filesystem 作为可读流的起点。

我尝试了不同的流方法,例如 passthroughhighWaterMark 和 third-party 库,例如 requestgaxiosgot 已内置 stream/piping 支持,但在进程的上传端无用。

意思是,我不确定如何构建 pipingchunking 机制,是否使用 bufferpipeline 以正确地流向上传过程,直到完成,并以高效的方式处理进度和完成事件。

问题

  1. 使用下面的代码,我如何适当地将文件和 PUT 缓冲到 google 提供的 URL 以及正确的 Content-LengthContent-Range headers,同时有足够的缓冲区 space 来处理 3 次重试?

  2. 在处理 back-pressure 或缓冲方面,利用 .cork().uncork() 是管理缓冲流的有效方法吗?

  3. 有没有办法通过 highWaterMarkpipeline 使用 Transform 流来有效地管理缓冲区? 例如...

pipeline(
  downloadStream,
  transformStream,
  uploadStream,
  (err) => {
    if (err) {
      reject(err)
    } else {
        resolve(true)
      }
    }
  )

下面是一个可视化模型和我要完成的代码:

视觉示例

[====================]
File Length (20 MB)

[==========          ]
Download (10 MB)
       
      [======      ]
      Buffer (e.g. 6 MB, size 12 MB)

      [===]
      Upload Chunk (3 MB) => Error? Retry from Buffer (max 3 times)
                          => Success? Empty Buffer => Continue =>
      [===]
      Upload next Chunk (3 MB)

代码

/* 
   Assume resumable_drive_url was already obtained from Google API
   with the proper access token, which already contains the 
   Content-Type and Content-Length in the session. 
*/

transfer(download_url, resumable_drive_url, file_type, file_length) {

    return new Promise((resolve, reject) => {

        let timeout = setTimeout(() => {
            reject(new Error("Transfer timed out."))
        }, 80000)


       // Question #1: Should the passthrough stream 
       // and .on events be declared here?

       const passthrough = new stream.PassThrough({
            highWaterMark: 256 * 1024
       })

       passthrough.on("error", (error) => {
            console.error(`Upload failed: ${error.message}`)
            reject(error.message)
       })

       passthrough.on("end", () => {
            clearTimeout(timeout)
            resolve(true)
       })

        
        // Download file
        axios({
            method: 'get',
            url: download_url,
            responseType: 'stream',
            maxRedirects: 1
        }).then(result => {
            
            // QUESTION #2: How do we buffer the file from here 
            // via axios.put to the resumable_url with the correct 
            // header information Content-Range and Content-Length?

            // CURIOSITY #1: Do we pipe from here 
            // to a passthrough stream that maintains a minimum buffer size?

            result.data.pipe(passthrough)
        }
        ).catch(error => {
            reject(error)
        })


    })
}

参考资料

  1. Chunked Upload Class - (体面的分块机制但臃肿;似乎流管道有更有效的方法)
  2. Google Drive API v3 - Upload via Resumable URL with Multiple Requests
  3. resumableUpload.js - (概念正确但使用文件系统)
  4. Google-Drive-Uploader - (概念正确但使用文件系统和自定义 StreamFactory)
  5. Resumable upload in Drive Rest API V3 - (体面但显得臃肿和过时)

我认为您的目标和现状如下。

  • 您想下载一个数据并将下载的数据上传到 Google Drive using Axios with Node.js。
  • 对于上传数据,您希望通过从流中检索数据使用具有多个块的可恢复上传来上传。
  • 您的访问令牌可用于将数据上传到 Google 驱动器。
  • 您已经知道要上传的数据的数据大小和mimeType。

修改点:

  • 在这种情况下,为了实现多块断点续传,我提出如下流程。

    1. 从 URL 下载数据。
    2. 为可续传创建会话。
    3. 从流中检索下载的数据并将其转换为缓冲区。
      • 为此,我使用了 stream.Transform
      • 在这种情况下,我停止流并将数据上传到 Google 驱动器。我想不出有什么方法可以在不停止流的情况下实现这一点。
      • 我认为这部分可能是您问题 2 和 3 的答案。
    4. 当缓冲区大小与声明的块大小相同时,将缓冲区上传到Google驱动器。
      • 我认为这部分可能是您问题 3 的答案。
    5. 上传出错时,再次上传同一个buffer。在此示例脚本中,3 次重试为 运行。重试 3 次后,将发生错误。
      • 我认为这部分可能是您问题 1 的答案。

当上面的流程反映到你的脚本中时,它变成如下。

修改后的脚本:

请在函数中设置变量main()

const axios = require("axios");
const stream = require("stream");

function transfer(
  download_url,
  resumable_drive_url,
  file_type,
  file_length,
  accessToken,
  filename,
  chunkSize
) {
  return new Promise((resolve, reject) => {
    axios({
      method: "get",
      url: download_url,
      responseType: "stream",
      maxRedirects: 1,
    })
      .then((result) => {
        const streamTrans = new stream.Transform({
          transform: function (chunk, _, callback) {
            callback(null, chunk);
          },
        });

        // 1. Retrieve session for resumable upload.
        axios({
          method: "POST",
          url: resumable_drive_url,
          headers: {
            Authorization: `Bearer ${accessToken}`,
            "Content-Type": "application/json",
          },
          data: JSON.stringify({
            name: filename,
            mimeType: file_type,
          }),
        })
          .then(({ headers: { location } }) => {
            // 2. Upload the file.
            let startByte = 0;
            result.data.pipe(streamTrans);
            let bufs = [];
            streamTrans.on("data", async (chunk) => {
              bufs.push(chunk);
              const temp = Buffer.concat(bufs);
              if (temp.length >= chunkSize) {
                const dataChunk = temp.slice(0, chunkSize);
                const left = temp.slice(chunkSize);
                streamTrans.pause();
                let upcount = 0;
                const upload = function () {
                  console.log(
                    `Progress: from ${startByte} to ${
                      startByte + dataChunk.length - 1
                    } for ${file_length}`
                  );
                  axios({
                    method: "PUT",
                    url: location,
                    headers: {
                      "Content-Range": `bytes ${startByte}-${
                        startByte + dataChunk.length - 1
                      }/${file_length}`,
                    },
                    data: dataChunk,
                  })
                    .then(({ data }) => resolve(data))
                    .catch((err) => {
                      if (err.response.status == 308) {
                        startByte += dataChunk.length;
                        streamTrans.resume();
                        return;
                      }
                      if (upcount == 3) {
                        reject(err);
                      }
                      upcount++;
                      console.log("Retry");
                      upload();
                      return;
                    });
                };
                upload();
                bufs = [left];
              }
            });
            streamTrans.on("end", () => {
              const dataChunk = Buffer.concat(bufs);
              if (dataChunk.length > 0) {
                // 3. Upload last chunk.
                let upcount = 0;
                const upload = function () {
                  console.log(
                    `Progress(last): from ${startByte} to ${
                      startByte + dataChunk.length - 1
                    } for ${file_length}`
                  );
                  axios({
                    method: "PUT",
                    url: location,
                    headers: {
                      "Content-Range": `bytes ${startByte}-${
                        startByte + dataChunk.length - 1
                      }/${file_length}`,
                    },
                    data: dataChunk,
                  })
                    .then(({ data }) => resolve(data))
                    .catch((err) => {
                      if (upcount == 3) {
                        reject(err);
                      }
                      upcount++;
                      upload();
                      return;
                    });
                };
                upload();
              }
            });
            streamTrans.on("error", (err) => reject(err));
          })
          .catch((err) => reject(err));
      })
      .catch((error) => {
        reject(error);
      });
  });
}

function main() {
  const download_url = "###";
  const resumable_drive_url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=resumable";
  const file_type = "###"; // Please set the mimeType of the downloaded data.
  const file_length = 12345; // Please set the data size of the downloaded data.
  const accessToken = "###"; // Please set the access token.
  const filename = "sample filename"; // Please set the filename on Google Drive.
  const chunkSize = 10485760; // This is used as the chunk size for the resumable upload. This is 10 MB as a sample. In this case, please set the multiples of 256 KB (256 x 1024 bytes).

  transfer(
    download_url,
    resumable_drive_url,
    file_type,
    file_length,
    accessToken,
    filename,
    chunkSize
  )
    .then((res) => console.log(res))
    .catch((err) => console.log(err));
}

main();

结果:

当上面的脚本是运行,文件大小23558108(这是一个示例数据),在控制台中得到如下结果..

Progress: from 0 to 10485759 for 23558108
Progress: from 10485760 to 20971519 for 23558108
Progress(last): from 20971520 to 23558107 for 23558108
{
  kind: 'drive#file',
  id: '###',
  name: 'sample filename',
  mimeType: '###'
}

注:

  • 当你想使用单块实现断点续传时,你可以在here查看示例脚本。

参考文献: