文件的切片上传、断点续传和并发控制

整体思路

客户端:

  1. 使用 Blob.prototype.slice 方法对文件切片
  2. 使用 spark-md5 库计算文件 hash,并创建 web worker 开启新的线程执行
  3. 将各个分片文件上传到服务器,并携带 hash
  4. 上传进度计算
  5. 当所有文件上传完成后,发起合并请求,携带文件 hash、后缀名、分片大小

服务端:

  1. 根据接收到的 hash 创建文件夹,将分片文件存储到文件夹中
  2. 收到合并请求后,读取各个分片文件。根据 hash 和后缀名,合并生成完整文件
  3. 删除存储分片文件的文件夹及其内容

服务端部分

技术选型:Koa2 + @koa/multer 文件上传中间键

uploadController 公共部分:

// uploadController.js
const path = require('path');
const fse = require('fs-extra');
const { domain, port } = require('../../config');

// 大文件存储目录路径
const UPLOAD_DIR = path.resolve(__dirname, '../../', 'upload');

// 获取切片目录路径函数
const getChunkDir = (fileHash) => path.resolve(UPLOAD_DIR, `${fileHash}-chunks`);

// 提取文件后缀名函数
const extractExt = (filename) => filename.slice(filename.lastIndexOf('.'), filename.length);

切片上传接口

// uploadController.js
exports.uploadChunk = async (ctx) => {
  // 文件哈希,切片哈希=${文件哈希}-${切片索引}
  const { fileHash, hash } = ctx.request.body;

  // 切片文件
  const chunk = ctx.request.file;

  // 切片目录路径
  const chunkDir = getChunkDir(fileHash);

  // 切片目录不存在,创建切片目录
  await fse.ensureDir(chunkDir);

  // 将切片文件移动到切片文件的目录中并重命名为切片哈希(即fileHash-index)
  await fse.move(chunk.path, `${chunkDir}/${hash}`);

  ctx.success("上传成功")
}

切片上传前的验证接口

在上传切片前,我们需要验证这个文件是否已经被上传过,上传过直接返回为文件引用地址,即文件秒传。若没有被上传过,则还需要进一步验证是否存在相应的切片文件(即之前可能没有全部上传完切片),如果存在的话,则后续上传需要跳过这些已经存在的切片,尽可能的减少不必要的请求,即断点续传。

思路:由于切片文件存储的目录,和合并后的文件都是根据整个文件的哈希进行命名的。所以后端可以通过前端传递的 fileHash 来判断这个文件是否已经被上传过。

  • 存在上传文件,则通知前端不需要再上传并且返回文件的引用地址。
  • 不存在上传的文件,但存在该文件的切片目录。则通知前端需要继续上传,并且返回已上传的切片文件列表,让前端可以跳过这些切片文件的上传
  • 不存在该文件的切片目录,通知前端需要继续上传,已上传的文件列表返回一个空数组就行。
// uploadController.js
exports.verifyUpload = async (ctx) => {
  // 请求体参数
  const { fileHash, fileName } = ctx.request.body;

  // 读取文件的后缀名 eg: .mp4
  const ext = extractExt(fileName);

  // 文件路径
  const targetPath = path.resolve(UPLOAD_DIR, fileHash + ext);

  // 判断文件是否存在
  if (fse.existsSync(targetPath)) {
    // 合成后的文件存在,不需要上传
    ctx.success({ shouldUpload: false, url: `${domain}:${port}/upload/${fileHash}${ext}` })
  } else {
    // 切片目录路径
    const chunkDir = getChunkDir(fileHash);
    // 合成后的文件不存在
    const uploadedList = 
      // 切片目录存在
      fse.existsSync(chunkDir)
        // 返回切片目录下已上传的切片文件列表
        ? await fse.readdir(chunkDir)
        // 切片目录不存在,返回空数组
        : [];
    
    // 返回已经上传的切片列表
    ctx.success({
      shouldUpload: true,
      uploadedList: uploadedList
    })
  }
}

切片合并接口

在切片上传完成后,前端需要调用合并接口,来合并上传的所有切片。合并成功后,返回相应的文件引用地址给前端。

// uploadController.js
exports.mergeChunk = async (ctx) => {
  // 请求体参数
  const { fileHash, fileName, splitSize, total } = ctx.request.body;

  // 获取该文件的切片的文件夹路径
  const chunkDir = getChunkDir(fileHash);

  // 获取切片的文件列表
  const chunkPaths = await fse.readdir(chunkDir);

  // 已存在的切片数量与切片的总数不符,不能直接合并
  if (chunkPaths.length !== total) {
    ctx.fail("切片文件数量不符合");
  }

  // 根据切片下标进行排序,直接读取目录获得的顺序可能会错乱
  chunkPaths.sort((a, b) => a.split("-")[1] - b.split("-")[1]);

  // 获取该文件的后缀名 eg: .mp4
  const ext = extractExt(fileName);

  // 计算需要最终的存储的文件路径
  const targetPath = path.resolve(UPLOAD_DIR, fileHash + ext);

  // 开始合并切片
  await Promise.all(
    chunkPaths.map((chunkPath, index) => {
      return pipeStream(
        path.resolve(chunkDir, chunkPath),
        // 指定位置创建可写流
        fse.createWriteStream(targetPath, {
          start: index * splitSize
        })
      );
    })
  );

  // 合并后删除保存切片的目录
  fse.rmdirSync(chunkDir); 

  // 返回文件地址给前端
  ctx.success({ url: `${domain}:${port}/upload/${fileHash}${ext}` })
}

// 读流 -> 写流
const pipeStream = (path, writeStream) =>
  new Promise((resolve) => {
    // 创建可读流
    const readStream = fse.createReadStream(path);

    // 监听读取完成
    readStream.on('end', () => {
      // 删除 chunk 文件
      fse.unlinkSync(path);
      resolve();
    });

    readStream.pipe(writeStream);
  });

客户端部分

技术选型:Vue3 + Axios + Vite

配置请求Api

// 导入配置的 axios 实例
import request from "/@/service/request";

// 上传切片接口
export const uploadChunk = (data, { onUploadProgress, cancelToken }) => request({
  url: "/upload/chunk",
  // 文件上传的请求头
  header: { "Content-Type": "multipart/form-data" },
  // 监控上传进度的回调
  onUploadProgress,
  // 配置取消请求令牌
  cancelToken,
  timeout: 1000 * 60 * 5,
  method: "post",
  data
});

// 合并切片接口
export const mergeChunk = (data) => request({
  url: "/upload/merge",
  method: "post",
  data
});

// 上传切片前的验证接口
export const uploadVerify = (data) => request({
  url: "/upload/verify",
  method: "post",
  data
});

前端状态

// 文件切片大小 2M
const defaultChunkSize = 1024 * 1024 * 2;
// 上传的文件
const uploadFile = ref(null);
// 切片数组
const chunks = ref([]);
// 文件哈希
const hash = ref("");
// 取消请求源 数组
const sources = ref([]);

// 暂停
const pasue = ref(false);
// 预览地址
const url = ref("");

文件切片

点击上传按钮时,先调用 handleFileSlice 方法将文件切片。具体实现过程是:设置默认单个切片文件的大小,通过while循环和slice方法将文件进行切割,并将生成的一个个 chunk 切片数据放到数组中:

// 开始切片
async function startSlice() {
  // 切片文件是否存在
  if (!isEmpty(chunks.value)) {
    return ElMessage.warning("切片已存在");
  }

  // 进行切片
  const _chunks = handleFileSlice(uploadFile.value, defaultChunkSize);

  // 计算哈希
  hash.value = await calculateHash(_chunks);

  // 切片属性增强
  chunks.value = enhanceChunks(_chunks, {
    file: uploadFile.value,
    hash: hash.value,
    chunkSize: defaultChunkSize
  });
}
// utils/index.js
export function handleFileSlice(file, chunkSize) {
  // 切片数组
  const chunks = [];
  // 切片开始位置
  let start = 0;
  // 循环切片
  while (start < file.size) {
    // 切片结束位置
    const end = Math.min(start + chunkSize, file.size);
    // 切片
    const chunk = file.slice(start, end);
    // 添加切片
    chunks.push({ chunk });
    // 前进
    start += chunkSize;
  }
  // 返回切片数组
  return chunks;
}

生成hash(spark-md5)

无论是客户端还是服务端,都要用到文件和切片的 hash,生成 hash 最简单的方法是 文件名 + 切片下标,但是如果文件名一旦修改,生成的 hash 就会失效。事实上只要文件内容不变, hash 就不应该变化,所以我们根据文件内容生成 hash。

这里我们选用 spark-md5 库,它可以根据文件内容计算出文件的hash值。

如果上传的文件过大时,读取文件内容计算hash非常耗时,并且会引起 UI 阻塞,导致页面假死,所以我们使用 web-worker 在worker 线程计算 hash,这样仍可以在主界面正常做交互。具体做法如下:

// utils/index.js
const worker = new Worker(new URL("./worker.js", import.meta.url), { type: "module" });

// 计算文件哈希
export function calculateHash(chunks) {
  return new Promise((resolve, reject) => {
    // 发送消息
    worker.postMessage({ chunks });
    // 接收消息
    worker.onmessage = (e) => {
      // 哈希
      const { hash } = e.data;
      if (hash) {
        // 哈希计算完成
        resolve(hash);
      }
    };
    // 错误处理
    worker.onerror = reject;
  });
}
// utils/worker.js
import SparkMD5 from "spark-md5";

// 接受主进程发送过来的数据
self.onmessage = function (e) {
  const { chunks } = e.data;
  // 计算整个文件哈希
  getFileHash(chunks)
    .then((hash) => {
      // 发送哈希值给主进程
      self.postMessage({ hash });
    })
    .catch((error) => {
      // 发送错误信息给主进程
      self.postMessage({ error });
    });
};

// 计算整个文件哈希
async function getFileHash(chunks) {
  // 创建 SparkMD5 对象
  const spark = new SparkMD5.ArrayBuffer();
  // 读取每个 chunk 内容
  const promises = chunks.map(({ chunk }) => {
    return getChunkContent(chunk);
  });

  try {
    // 等待所有 chunk 内容读取完成
    const contents = await Promise.all(promises);
    // 将所有 chunk 内容添加到 spark 对象中
    contents.forEach((content) => {
      spark.append(content);
    });
    // 计算哈希值
    const hash = spark.end();
    // 返回哈希值
    return hash;
  } catch (error) {
    console.log(error);
  }
}

// 读取 chunk 内容
function getChunkContent(chunk) {
  return new Promise((resolve, reject) => {
    // 创建 FileReader 对象
    const reader = new FileReader();

    // 读取文件内容
    reader.readAsArrayBuffer(chunk);

    // 读取成功后的回调
    reader.onload = function (e) {
      resolve(e.target.result);
    };

    // 读取失败后的回调
    reader.onerror = function () {
      reject(reader.error);
      reader.abort();
    };
  });
}

收集切片信息

生成文件切片后,将计算出来的hash、切片索引、文件个数、当前切片的hash、切片内容等信息都收集起来,发送给服务端。上传索引的目的是为了让后端可以知道当前切片是第几个切片,方便服务端合并切片。

// 增强 chunks 的信息
function enhanceChunks(chunks, meta) {
  // 元数据
  const { hash, file, chunkSize } = meta;

  // 给切片对象添加属性
  return chunks.map(({ chunk }, index) => {
    return {
      // 1. 文件哈希
      fileHash: hash,
      // 2. 切片索引
      index,
      // 3. 切片总数
      total: chunks.length,
      // 4. 文件总大小
      totalSize: file.size,
      // 5. 当前切片的哈希
      hash: `${hash}-${index}`,
      // 6. 切片对象
      chunk,
      // 7. 切片大小
      size: chunk.size,
      // 8. 分割大小
      splitSize: chunkSize,
      // 9. 文件名
      fileName: file.name,
      // 10. 上传进度
      percentage: 0
    };
  });
}

创建切片请求

// 切片请求数组
function getChunkReqs(chunks) {
  return chunks.map((enhancedChunk) => {
    // chunk 属性
    const { fileHash, hash, chunk } = enhancedChunk;
    // 创建 FormData 对象
    const formData = new FormData();
    // 文件哈希
    formData.append("fileHash", fileHash);
    // 切片哈希
    formData.append("hash", hash);
    // 切片
    formData.append("chunk", chunk);

    // 取消令牌源
    const source = axios.CancelToken.source();
    // 收集取消令牌源
    sources.value.push(source);

    // 上传请求
    return () => service.comm.uploadChunk(formData, {
      // 取消令牌
      cancelToken: source.token,
      // 监听上传进度
      onUploadProgress: (progressEvent) => {
        if (progressEvent.lengthComputable) {
          // 上传进度
          const progress = floor((progressEvent.loaded / progressEvent.total) * 100 | 0);
          // 更新进度
          enhancedChunk.percentage = progress;
        }
      }
    }).then(() => {
      // 上传成功,查找请求成功的<取消令牌源>的索引
      const sourceIndex = sources.value.findIndex((item) => item === source);
      // 移除请求成功的<取消令牌源>
      sources.value.splice(sourceIndex, 1);
    });
  });
}

上传切片

使用上面创建切片请求的方法,创建好一个执行切片请求的函数数组,调用 Promise.all 并发上传所有的切片。

// 开始上传
async function startUpload() {
  const promises = getChunkReqs(chunks.value)

  // 全部上传
  await Promise.all(promises);

  // 合并切片
  const { url: fileURL } = await service.comm.mergeChunk({
    fileHash: hash.value,
    fileName: uploadFile.value.name,
    splitSize: defaultChunkSize,
    total: chunks.value.length
  });

  url.value = fileURL;

  ElMessage.success("上传成功");
}

文件秒传

需要在开始上传前,调文件验证接口,验证文件是否已经上传过。

// 验证文件是否已经上传过
function verify() {
  return service.comm
    .uploadVerify({
      fileHash: hash.value,
      fileName: uploadFile.value.name
    });
}
// 开始上传
async function startUpload() {
  // 验证文件是否已经上传过
  const { shouldUpload, url: fileURL } = await verify();

  // 文件秒传
  if (!shouldUpload) {
    // 直接修改每个 chunk 的上传进度为100
    changePercentage(chunks.value, get100());
    url.value = fileURL;
    ElMessage.success("上传成功");
    return;
  } 
  // ...
}

断点续传

文件验证接口会返回一个 uploadedList,根据这个数据判断需要过滤哪些切片,即续传剩下的切片。

// 开始上传
async function startUpload() {
  // 验证文件是否已经上传过
  const { shouldUpload, url: fileURL, uploadedList } = await verify();

  let promises = [];

  // 文件秒传
  if (!shouldUpload) {
    // ...
  } else { // 断点续传
    // 存在上传的切片
    if (!isEmpty(uploadedList)) {
      const unUploaded = [];
      // 过滤出未上传的切片
      chunks.value.forEach((chunk) => {
        if (uploadedList.includes(chunk.hash)) {
          // 这里需要将已上传的切片进度重置为100%,保证上传进度的准确性(有些切片上传上去了,但是请求取消了,导致上传进度监听没有被及时触发,从而导致切片上传进度没有更新,例如最终进度可能停留在73%,但实际切片已上传完成了)
          chunk.percentage = get100();
        } else {
          // 未上传的切片
          unUploaded.push(chunk);
        }
      });

      // 未上传切片请求
      promises = getChunkReqs(unUploaded);
    } else { // 从头开始上传
      // ...
    }

    // 全部上传
    await Promise.all(promises);
    // ...
  }
}

并发控制

我们知道 http1.1 一个域名最多可以建立 6 个 TCP 连接,如果同时上传这么多切片的话,会导致网络阻塞,以至于其他的请求无法被处理。我们需要保证切片上传的请求数量不能同时占满 6 个 TCP 连接。

// utils/index.js
// 并发控制器
export function taskController(list, max = 4, cb, isAbort) {
  let runIdx = 0;
  let finished = 0;

  const run = () => {
    while(max && runIdx < list.length) {
      // 中断(引用值可以保证取到变化后的布尔值)
      if (isAbort.value) return;

      const task = list[runIdx];
      runIdx++;

      if (task) {
        max--;
        task().finally(() => {
          max++;
          finished++;

          if (finished === list.length) {
            cb();
          } else {
            run();
          }
        });
      }
    }
  };
  run();
}

加入并发控制器后的上传函数。

// 开始上传
async function startUpload() {
  // ...

  // 并发控制上传
  taskController(promises, 4, async () => {
    // 合并文件
    const { url: fileURL } = await service.comm.mergeChunk({
      fileHash: hash.value,
      fileName: uploadFile.value.name,
      splitSize: defaultChunkSize,
      total: chunks.value.length
    });

    url.value = fileURL;

    ElMessage.success("上传成功");
  }, pasue);
}

暂停上传

我们在创建切片请求时,创建了取消令牌源 const source = axios.CancelToken.source(),并且收集了取消令牌源 sources.value.push(source),针对每一个切片请求都配置了 cancelToken: source.token

// 切片请求数组
function getChunkReqs(chunks) {
  return chunks.map((enhancedChunk) => {
    // ...

    // 取消令牌源
    const source = axios.CancelToken.source();
    // 收集取消令牌源
    sources.value.push(source);

    // 上传请求
    return () => service.comm.uploadChunk(formData, {
      // 取消令牌
      cancelToken: source.token,
      // ...
    }).then(() => {
      // 上传成功,查找请求成功的<取消令牌源>的索引
      const sourceIndex = sources.value.findIndex((item) => item === source);
      // 移除请求成功的<取消令牌源>
      sources.value.splice(sourceIndex, 1);
    });
  });
}

所以在修改为暂停状态时,我们直接取消正在进行的请求即可。

// 暂停上传
function handlePause() {
  // 切换暂停状态
  pasue.value = !pasue.value;

  if (pasue.value) {
    // 暂停
    sources.value.forEach((source) => source.cancel());
    sources.value = [];
  } else {
    // 继续
    startUpload();
  }
}

上传进度

前面我们在创建切片请求时,配置了上传进度监听的回调函数,我们根据监听触发更新了每个 chunk 的进度。

function getChunkReqs(chunks) {
  return chunks.map((enhancedChunk) => {
    // ...

    // 上传请求
    return () => service.comm.uploadChunk(formData, {
      // ...
      // 监听上传进度
      onUploadProgress: (progressEvent) => {
        if (progressEvent.lengthComputable) {
          // 上传进度
          const progress = floor((progressEvent.loaded / progressEvent.total) * 100 | 0);
          // 更新进度
          enhancedChunk.percentage = progress;
        }
      }
    }).then(() => {
      // ...
    });
  });
}

我们已经知道了每个切片上传的进度,所以可以得到每个切片已上传的字节数,即:切片大小 * 上传进度。将这些字节数累计加起来,我们可以得到总的已上传的字节数,所以总的上传进度为:已上传的字节数 / 文件的大小。

在这里 chunks 切片列表是一个 ref 数组,所以我们可以使用 Vue 中的计算属性来实现总进度的计算。

// 判断一下 chunks 是否被增强(即计算了 fileHash 以后),增强过后的 chunk 才会被额外添加的属性,例如下面计算需要的 percentage 进度属性
const isEnhanced = computed(() => {
  return !isEmpty(chunks.value) && !isEmpty(chunks.value[0].fileHash);
});

// 总进度
const totalProgress = computed(() => {
  if (isEnhanced.value) {
    const loaded = chunks.value
      .map(({ size, percentage }) => size * percentage)
      .reduce((prev, curr) => prev + curr, 0);

    // 总进度
    return floor(loaded / uploadFile.value.size);
  }

  return 0;
});

参考

实现一个大文件上传和断点续传
原生JS + Node 实现大文件分片上传、断点续传、秒传
如何设计一个控制并发数的任务队列

> cd ..