文件的切片上传、断点续传和并发控制

bt365开户 📅 2026-01-06 18:10:21 ✍️ admin 👁️ 8634 ❤️ 973
文件的切片上传、断点续传和并发控制

文件的切片上传、断点续传和并发控制 On This Page # 整体思路 # 服务端部分 > 切片上传接口 > 切片上传前的验证接口 > 切片合并接口 # 客户端部分 > 配置请求Api > 前端状态 > 文件切片 > 生成hash(spark-md5) > 收集切片信息 > 创建切片请求 > 上传切片 > 文件秒传 > 断点续传 > 并发控制 > 暂停上传 > 上传进度 # 参考 整体思路

客户端:

使用 Blob.prototype.slice 方法对文件切片

使用 spark-md5 库计算文件 hash,并创建 web worker 开启新的线程执行

将各个分片文件上传到服务器,并携带 hash

上传进度计算

当所有文件上传完成后,发起合并请求,携带文件 hash、后缀名、分片大小

服务端:

根据接收到的 hash 创建文件夹,将分片文件存储到文件夹中

收到合并请求后,读取各个分片文件。根据 hash 和后缀名,合并生成完整文件

删除存储分片文件的文件夹及其内容

服务端部分

技术选型:Koa2 + @koa/multer 文件上传中间键

uploadController 公共部分:

// uploadController.js

const path = require('path');

const fse = require('fs-extra');

const { domain, port } = require('../../config');

// 大文件存储目录路径

const UPLOAD_DIR = path.resolve(__dirname, '../../', 'upload');

// 获取切片目录路径函数

const getChunkDir = (fileHash) => path.resolve(UPLOAD_DIR, `${fileHash}-chunks`);

// 提取文件后缀名函数

const extractExt = (filename) => filename.slice(filename.lastIndexOf('.'), filename.length);

切片上传接口

// uploadController.js

exports.uploadChunk = async (ctx) => {

// 文件哈希,切片哈希=${文件哈希}-${切片索引}

const { fileHash, hash } = ctx.request.body;

// 切片文件

const chunk = ctx.request.file;

// 切片目录路径

const chunkDir = getChunkDir(fileHash);

// 切片目录不存在,创建切片目录

await fse.ensureDir(chunkDir);

// 将切片文件移动到切片文件的目录中并重命名为切片哈希(即fileHash-index)

await fse.move(chunk.path, `${chunkDir}/${hash}`);

ctx.success("上传成功")

}

切片上传前的验证接口

在上传切片前,我们需要验证这个文件是否已经被上传过,上传过直接返回为文件引用地址,即文件秒传。若没有被上传过,则还需要进一步验证是否存在相应的切片文件(即之前可能没有全部上传完切片),如果存在的话,则后续上传需要跳过这些已经存在的切片,尽可能的减少不必要的请求,即断点续传。

思路:由于切片文件存储的目录,和合并后的文件都是根据整个文件的哈希进行命名的。所以后端可以通过前端传递的 fileHash 来判断这个文件是否已经被上传过。

存在上传文件,则通知前端不需要再上传并且返回文件的引用地址。

不存在上传的文件,但存在该文件的切片目录。则通知前端需要继续上传,并且返回已上传的切片文件列表,让前端可以跳过这些切片文件的上传

不存在该文件的切片目录,通知前端需要继续上传,已上传的文件列表返回一个空数组就行。

// uploadController.js

exports.verifyUpload = async (ctx) => {

// 请求体参数

const { fileHash, fileName } = ctx.request.body;

// 读取文件的后缀名 eg: .mp4

const ext = extractExt(fileName);

// 文件路径

const targetPath = path.resolve(UPLOAD_DIR, fileHash + ext);

// 判断文件是否存在

if (fse.existsSync(targetPath)) {

// 合成后的文件存在,不需要上传

ctx.success({ shouldUpload: false, url: `${domain}:${port}/upload/${fileHash}${ext}` })

} else {

// 切片目录路径

const chunkDir = getChunkDir(fileHash);

// 合成后的文件不存在

const uploadedList =

// 切片目录存在

fse.existsSync(chunkDir)

// 返回切片目录下已上传的切片文件列表

? await fse.readdir(chunkDir)

// 切片目录不存在,返回空数组

: [];

// 返回已经上传的切片列表

ctx.success({

shouldUpload: true,

uploadedList: uploadedList

})

}

}

切片合并接口

在切片上传完成后,前端需要调用合并接口,来合并上传的所有切片。合并成功后,返回相应的文件引用地址给前端。

// uploadController.js

exports.mergeChunk = async (ctx) => {

// 请求体参数

const { fileHash, fileName, splitSize, total } = ctx.request.body;

// 获取该文件的切片的文件夹路径

const chunkDir = getChunkDir(fileHash);

// 获取切片的文件列表

const chunkPaths = await fse.readdir(chunkDir);

// 已存在的切片数量与切片的总数不符,不能直接合并

if (chunkPaths.length !== total) {

ctx.fail("切片文件数量不符合");

}

// 根据切片下标进行排序,直接读取目录获得的顺序可能会错乱

chunkPaths.sort((a, b) => a.split("-")[1] - b.split("-")[1]);

// 获取该文件的后缀名 eg: .mp4

const ext = extractExt(fileName);

// 计算需要最终的存储的文件路径

const targetPath = path.resolve(UPLOAD_DIR, fileHash + ext);

// 开始合并切片

await Promise.all(

chunkPaths.map((chunkPath, index) => {

return pipeStream(

path.resolve(chunkDir, chunkPath),

// 指定位置创建可写流

fse.createWriteStream(targetPath, {

start: index * splitSize

})

);

})

);

// 合并后删除保存切片的目录

fse.rmdirSync(chunkDir);

// 返回文件地址给前端

ctx.success({ url: `${domain}:${port}/upload/${fileHash}${ext}` })

}

// 读流 -> 写流

const pipeStream = (path, writeStream) =>

new Promise((resolve) => {

// 创建可读流

const readStream = fse.createReadStream(path);

// 监听读取完成

readStream.on('end', () => {

// 删除 chunk 文件

fse.unlinkSync(path);

resolve();

});

readStream.pipe(writeStream);

});

客户端部分

技术选型:Vue3 + Axios + Vite

配置请求Api

// 导入配置的 axios 实例

import request from "/@/service/request";

// 上传切片接口

export const uploadChunk = (data, { onUploadProgress, cancelToken }) => request({

url: "/upload/chunk",

// 文件上传的请求头

header: { "Content-Type": "multipart/form-data" },

// 监控上传进度的回调

onUploadProgress,

// 配置取消请求令牌

cancelToken,

timeout: 1000 * 60 * 5,

method: "post",

data

});

// 合并切片接口

export const mergeChunk = (data) => request({

url: "/upload/merge",

method: "post",

data

});

// 上传切片前的验证接口

export const uploadVerify = (data) => request({

url: "/upload/verify",

method: "post",

data

});

前端状态

// 文件切片大小 2M

const defaultChunkSize = 1024 * 1024 * 2;

// 上传的文件

const uploadFile = ref(null);

// 切片数组

const chunks = ref([]);

// 文件哈希

const hash = ref("");

// 取消请求源 数组

const sources = ref([]);

// 暂停

const pasue = ref(false);

// 预览地址

const url = ref("");

文件切片

点击上传按钮时,先调用 handleFileSlice 方法将文件切片。具体实现过程是:设置默认单个切片文件的大小,通过while循环和slice方法将文件进行切割,并将生成的一个个 chunk 切片数据放到数组中:

// 开始切片

async function startSlice() {

// 切片文件是否存在

if (!isEmpty(chunks.value)) {

return ElMessage.warning("切片已存在");

}

// 进行切片

const _chunks = handleFileSlice(uploadFile.value, defaultChunkSize);

// 计算哈希

hash.value = await calculateHash(_chunks);

// 切片属性增强

chunks.value = enhanceChunks(_chunks, {

file: uploadFile.value,

hash: hash.value,

chunkSize: defaultChunkSize

});

}

// utils/index.js

export function handleFileSlice(file, chunkSize) {

// 切片数组

const chunks = [];

// 切片开始位置

let start = 0;

// 循环切片

while (start < file.size) {

// 切片结束位置

const end = Math.min(start + chunkSize, file.size);

// 切片

const chunk = file.slice(start, end);

// 添加切片

chunks.push({ chunk });

// 前进

start += chunkSize;

}

// 返回切片数组

return chunks;

}

生成hash(spark-md5)

无论是客户端还是服务端,都要用到文件和切片的 hash,生成 hash 最简单的方法是 文件名 + 切片下标,但是如果文件名一旦修改,生成的 hash 就会失效。事实上只要文件内容不变, hash 就不应该变化,所以我们根据文件内容生成 hash。

这里我们选用 spark-md5 库,它可以根据文件内容计算出文件的hash值。

如果上传的文件过大时,读取文件内容计算hash非常耗时,并且会引起 UI 阻塞,导致页面假死,所以我们使用 web-worker 在worker 线程计算 hash,这样仍可以在主界面正常做交互。具体做法如下:

// utils/index.js

const worker = new Worker(new URL("./worker.js", import.meta.url), { type: "module" });

// 计算文件哈希

export function calculateHash(chunks) {

return new Promise((resolve, reject) => {

// 发送消息

worker.postMessage({ chunks });

// 接收消息

worker.onmessage = (e) => {

// 哈希

const { hash } = e.data;

if (hash) {

// 哈希计算完成

resolve(hash);

}

};

// 错误处理

worker.onerror = reject;

});

}

// utils/worker.js

import SparkMD5 from "spark-md5";

// 接受主进程发送过来的数据

self.onmessage = function (e) {

const { chunks } = e.data;

// 计算整个文件哈希

getFileHash(chunks)

.then((hash) => {

// 发送哈希值给主进程

self.postMessage({ hash });

})

.catch((error) => {

// 发送错误信息给主进程

self.postMessage({ error });

});

};

// 计算整个文件哈希

async function getFileHash(chunks) {

// 创建 SparkMD5 对象

const spark = new SparkMD5.ArrayBuffer();

// 读取每个 chunk 内容

const promises = chunks.map(({ chunk }) => {

return getChunkContent(chunk);

});

try {

// 等待所有 chunk 内容读取完成

const contents = await Promise.all(promises);

// 将所有 chunk 内容添加到 spark 对象中

contents.forEach((content) => {

spark.append(content);

});

// 计算哈希值

const hash = spark.end();

// 返回哈希值

return hash;

} catch (error) {

console.log(error);

}

}

// 读取 chunk 内容

function getChunkContent(chunk) {

return new Promise((resolve, reject) => {

// 创建 FileReader 对象

const reader = new FileReader();

// 读取文件内容

reader.readAsArrayBuffer(chunk);

// 读取成功后的回调

reader.onload = function (e) {

resolve(e.target.result);

};

// 读取失败后的回调

reader.onerror = function () {

reject(reader.error);

reader.abort();

};

});

}

收集切片信息

生成文件切片后,将计算出来的hash、切片索引、文件个数、当前切片的hash、切片内容等信息都收集起来,发送给服务端。上传索引的目的是为了让后端可以知道当前切片是第几个切片,方便服务端合并切片。

// 增强 chunks 的信息

function enhanceChunks(chunks, meta) {

// 元数据

const { hash, file, chunkSize } = meta;

// 给切片对象添加属性

return chunks.map(({ chunk }, index) => {

return {

// 1. 文件哈希

fileHash: hash,

// 2. 切片索引

index,

// 3. 切片总数

total: chunks.length,

// 4. 文件总大小

totalSize: file.size,

// 5. 当前切片的哈希

hash: `${hash}-${index}`,

// 6. 切片对象

chunk,

// 7. 切片大小

size: chunk.size,

// 8. 分割大小

splitSize: chunkSize,

// 9. 文件名

fileName: file.name,

// 10. 上传进度

percentage: 0

};

});

}

创建切片请求

// 切片请求数组

function getChunkReqs(chunks) {

return chunks.map((enhancedChunk) => {

// chunk 属性

const { fileHash, hash, chunk } = enhancedChunk;

// 创建 FormData 对象

const formData = new FormData();

// 文件哈希

formData.append("fileHash", fileHash);

// 切片哈希

formData.append("hash", hash);

// 切片

formData.append("chunk", chunk);

// 取消令牌源

const source = axios.CancelToken.source();

// 收集取消令牌源

sources.value.push(source);

// 上传请求

return () => service.comm.uploadChunk(formData, {

// 取消令牌

cancelToken: source.token,

// 监听上传进度

onUploadProgress: (progressEvent) => {

if (progressEvent.lengthComputable) {

// 上传进度

const progress = floor((progressEvent.loaded / progressEvent.total) * 100 | 0);

// 更新进度

enhancedChunk.percentage = progress;

}

}

}).then(() => {

// 上传成功,查找请求成功的<取消令牌源>的索引

const sourceIndex = sources.value.findIndex((item) => item === source);

// 移除请求成功的<取消令牌源>

sources.value.splice(sourceIndex, 1);

});

});

}

上传切片

使用上面创建切片请求的方法,创建好一个执行切片请求的函数数组,调用 Promise.all 并发上传所有的切片。

// 开始上传

async function startUpload() {

const promises = getChunkReqs(chunks.value)

// 全部上传

await Promise.all(promises);

// 合并切片

const { url: fileURL } = await service.comm.mergeChunk({

fileHash: hash.value,

fileName: uploadFile.value.name,

splitSize: defaultChunkSize,

total: chunks.value.length

});

url.value = fileURL;

ElMessage.success("上传成功");

}

文件秒传

需要在开始上传前,调文件验证接口,验证文件是否已经上传过。

// 验证文件是否已经上传过

function verify() {

return service.comm

.uploadVerify({

fileHash: hash.value,

fileName: uploadFile.value.name

});

}

// 开始上传

async function startUpload() {

// 验证文件是否已经上传过

const { shouldUpload, url: fileURL } = await verify();

// 文件秒传

if (!shouldUpload) {

// 直接修改每个 chunk 的上传进度为100

changePercentage(chunks.value, get100());

url.value = fileURL;

ElMessage.success("上传成功");

return;

}

// ...

}

断点续传

文件验证接口会返回一个 uploadedList,根据这个数据判断需要过滤哪些切片,即续传剩下的切片。

// 开始上传

async function startUpload() {

// 验证文件是否已经上传过

const { shouldUpload, url: fileURL, uploadedList } = await verify();

let promises = [];

// 文件秒传

if (!shouldUpload) {

// ...

} else { // 断点续传

// 存在上传的切片

if (!isEmpty(uploadedList)) {

const unUploaded = [];

// 过滤出未上传的切片

chunks.value.forEach((chunk) => {

if (uploadedList.includes(chunk.hash)) {

// 这里需要将已上传的切片进度重置为100%,保证上传进度的准确性(有些切片上传上去了,但是请求取消了,导致上传进度监听没有被及时触发,从而导致切片上传进度没有更新,例如最终进度可能停留在73%,但实际切片已上传完成了)

chunk.percentage = get100();

} else {

// 未上传的切片

unUploaded.push(chunk);

}

});

// 未上传切片请求

promises = getChunkReqs(unUploaded);

} else { // 从头开始上传

// ...

}

// 全部上传

await Promise.all(promises);

// ...

}

}

并发控制

我们知道 http1.1 一个域名最多可以建立 6 个 TCP 连接,如果同时上传这么多切片的话,会导致网络阻塞,以至于其他的请求无法被处理。我们需要保证切片上传的请求数量不能同时占满 6 个 TCP 连接。

// utils/index.js

// 并发控制器

export function taskController(list, max = 4, cb, isAbort) {

let runIdx = 0;

let finished = 0;

const run = () => {

while(max && runIdx < list.length) {

// 中断(引用值可以保证取到变化后的布尔值)

if (isAbort.value) return;

const task = list[runIdx];

runIdx++;

if (task) {

max--;

task().finally(() => {

max++;

finished++;

if (finished === list.length) {

cb();

} else {

run();

}

});

}

}

};

run();

}

加入并发控制器后的上传函数。

// 开始上传

async function startUpload() {

// ...

// 并发控制上传

taskController(promises, 4, async () => {

// 合并文件

const { url: fileURL } = await service.comm.mergeChunk({

fileHash: hash.value,

fileName: uploadFile.value.name,

splitSize: defaultChunkSize,

total: chunks.value.length

});

url.value = fileURL;

ElMessage.success("上传成功");

}, pasue);

}

暂停上传

我们在创建切片请求时,创建了取消令牌源 const source = axios.CancelToken.source(),并且收集了取消令牌源 sources.value.push(source),针对每一个切片请求都配置了 cancelToken: source.token。

// 切片请求数组

function getChunkReqs(chunks) {

return chunks.map((enhancedChunk) => {

// ...

// 取消令牌源

const source = axios.CancelToken.source();

// 收集取消令牌源

sources.value.push(source);

// 上传请求

return () => service.comm.uploadChunk(formData, {

// 取消令牌

cancelToken: source.token,

// ...

}).then(() => {

// 上传成功,查找请求成功的<取消令牌源>的索引

const sourceIndex = sources.value.findIndex((item) => item === source);

// 移除请求成功的<取消令牌源>

sources.value.splice(sourceIndex, 1);

});

});

}

所以在修改为暂停状态时,我们直接取消正在进行的请求即可。

// 暂停上传

function handlePause() {

// 切换暂停状态

pasue.value = !pasue.value;

if (pasue.value) {

// 暂停

sources.value.forEach((source) => source.cancel());

sources.value = [];

} else {

// 继续

startUpload();

}

}

上传进度

前面我们在创建切片请求时,配置了上传进度监听的回调函数,我们根据监听触发更新了每个 chunk 的进度。

function getChunkReqs(chunks) {

return chunks.map((enhancedChunk) => {

// ...

// 上传请求

return () => service.comm.uploadChunk(formData, {

// ...

// 监听上传进度

onUploadProgress: (progressEvent) => {

if (progressEvent.lengthComputable) {

// 上传进度

const progress = floor((progressEvent.loaded / progressEvent.total) * 100 | 0);

// 更新进度

enhancedChunk.percentage = progress;

}

}

}).then(() => {

// ...

});

});

}

我们已经知道了每个切片上传的进度,所以可以得到每个切片已上传的字节数,即:切片大小 * 上传进度。将这些字节数累计加起来,我们可以得到总的已上传的字节数,所以总的上传进度为:已上传的字节数 / 文件的大小。

在这里 chunks 切片列表是一个 ref 数组,所以我们可以使用 Vue 中的计算属性来实现总进度的计算。

// 判断一下 chunks 是否被增强(即计算了 fileHash 以后),增强过后的 chunk 才会被额外添加的属性,例如下面计算需要的 percentage 进度属性

const isEnhanced = computed(() => {

return !isEmpty(chunks.value) && !isEmpty(chunks.value[0].fileHash);

});

// 总进度

const totalProgress = computed(() => {

if (isEnhanced.value) {

const loaded = chunks.value

.map(({ size, percentage }) => size * percentage)

.reduce((prev, curr) => prev + curr, 0);

// 总进度

return floor(loaded / uploadFile.value.size);

}

return 0;

});

参考

实现一个大文件上传和断点续传

原生JS + Node 实现大文件分片上传、断点续传、秒传

如何设计一个控制并发数的任务队列

相关创意

三国赵云有后代吗?大儿子官至虎贲中郎(小儿子不幸战死)
流量囧途文章
如何在安卓设备上发送长视频:6 种可行的解决方案
怎么调整洗衣机的水位 调整洗衣机的水位技巧【详解】
同框是什麼意思
硒鼓内部结构示意图和功能说明
什么花代表友谊
怎么在CAD里把当前图块替换成其他图块
1000日元等于多少人民币?