深入理解Java IO体系与SpringBoot应用实践
Java IO系统是所有Java应用程序的基础组件之一,从最初的阻塞IO到现代的非阻塞IO和异步IO,Java的IO框架不断发展,为开发人员提供了越来越强大和灵活的工具。本文将深入探讨Java IO的各个方面,并结合SpringBoot和常用第三方框架介绍其实际应用。
Java IO发展历程
传统BIO (Blocking IO)
Java最早提供的是阻塞式IO,位于java.io
包下,主要特点是同步阻塞。
try (FileInputStream fis = new FileInputStream("input.txt");
FileOutputStream fos = new FileOutputStream("output.txt")) {
byte[] buffer = new byte[1024];
int len;
while ((len = fis.read(buffer)) != -1) {
fos.write(buffer, 0, len);
}
} catch (IOException e) {
e.printStackTrace();
}
阻塞IO的优点是使用简单直观,但在处理大量并发连接时性能较差,因为每个连接需要一个专用线程。
NIO (New IO)
Java 1.4引入了NIO (java.nio
包),提供了非阻塞IO操作能力:
try (FileChannel sourceChannel = FileChannel.open(Paths.get("input.txt"), StandardOpenOption.READ);
FileChannel targetChannel = FileChannel.open(Paths.get("output.txt"), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (sourceChannel.read(buffer) > 0) {
buffer.flip();
targetChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
NIO的核心组件包括:
- Buffer: 数据容器
- Channel: 数据传输通道
- Selector: 多路复用选择器
AIO (Asynchronous IO)
Java 7引入了AIO (java.nio.channels
包的AsynchronousChannel
等),提供了真正的异步非阻塞IO操作:
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(Paths.get("test.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
fileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("读取完成: " + result);
attachment.flip();
// 处理数据
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
SpringBoot中的IO应用
SpringBoot提供了多种方式简化IO操作:
资源加载与处理
SpringBoot的ResourceLoader
接口和Resource
抽象极大简化了资源访问:
@Autowired
private ResourceLoader resourceLoader;
public void loadResource() {
Resource resource = resourceLoader.getResource("classpath:config.json");
try (InputStream inputStream = resource.getInputStream()) {
// 处理输入流
} catch (IOException e) {
e.printStackTrace();
}
}
文件上传与处理
SpringBoot简化了文件上传操作:
@PostMapping("/upload")
public String handleFileUpload(@RequestParam("file") MultipartFile file) {
if (!file.isEmpty()) {
try {
byte[] bytes = file.getBytes();
Path path = Paths.get("uploads/" + file.getOriginalFilename());
Files.write(path, bytes);
return "上传成功";
} catch (IOException e) {
return "上传失败:" + e.getMessage();
}
}
return "文件为空";
}
配置文件上传大小限制
# application.properties
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
WebFlux响应式编程
SpringBoot 2.0引入的WebFlux框架支持响应式非阻塞IO:
@RestController
public class ReactiveController {
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamData() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Event " + sequence)
.take(10);
}
@GetMapping("/file")
public Mono<ResponseEntity<Resource>> serveFile(@RequestParam String filename) {
Resource file = new FileSystemResource("path/to/files/" + filename);
return Mono.just(ResponseEntity.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + file.getFilename() + "\"")
.body(file));
}
}
常用第三方IO框架
Apache Commons IO
Apache Commons IO提供了许多实用的IO工具类:
@Service
public class FileService {
public void copyFile(String source, String dest) throws IOException {
FileUtils.copyFile(new File(source), new File(dest));
}
public List<String> readLines(String filename) throws IOException {
return FileUtils.readLines(new File(filename), StandardCharsets.UTF_8);
}
public String readFileToString(String filename) throws IOException {
return FileUtils.readFileToString(new File(filename), StandardCharsets.UTF_8);
}
}
Guava IO
Google Guava提供的IO工具也很实用:
public void guavaIoExample() throws IOException {
// 读取文件所有内容
String content = Files.asCharSource(new File("test.txt"), StandardCharsets.UTF_8).read();
// 按行读取
ImmutableList<String> lines = Files.asCharSource(new File("test.txt"), StandardCharsets.UTF_8).readLines();
// 写入文件
Files.asCharSink(new File("output.txt"), StandardCharsets.UTF_8).write("Hello, World!");
}
MINA和Netty
对于网络应用,MINA和Netty是两个主流的NIO框架:
Netty服务器示例
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new StringDecoder(),
new StringEncoder(),
new EchoServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
class EchoServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println("收到消息: " + msg);
ctx.writeAndFlush("Echo: " + msg);
}
}
SpringBoot与Netty整合
SpringBoot可以与Netty无缝整合,构建高性能的网络应用:
@Component
public class NettyServerBootstrap implements ApplicationListener<ContextRefreshedEvent> {
private final ServerBootstrap serverBootstrap;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
public NettyServerBootstrap() {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketChannelInitializer());
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
try {
ChannelFuture future = serverBootstrap.bind(8088).sync();
System.out.println("Netty服务器启动,端口: 8088");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@PreDestroy
public void destroy() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
IO性能优化技巧
使用缓冲
无论是传统IO还是NIO,使用缓冲都能显著提升性能:
public void bufferedExample() throws IOException {
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream("large.file"));
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream("copy.file"))) {
byte[] buffer = new byte[8192]; // 8KB缓冲区
int bytesRead;
while ((bytesRead = bis.read(buffer)) != -1) {
bos.write(buffer, 0, bytesRead);
}
}
}
内存映射文件
对于大文件处理,内存映射文件(Memory-Mapped File)提供了最高效的方式:
public void memoryMappedExample() throws IOException {
try (RandomAccessFile file = new RandomAccessFile("hugefile.data", "r");
FileChannel channel = file.getChannel()) {
long fileSize = channel.size();
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
// 直接操作内存缓冲区
while (buffer.hasRemaining()) {
// 处理数据
byte b = buffer.get();
}
}
}
零拷贝技术
零拷贝是一种避免CPU多次复制数据的技术,适用于文件传输等场景:
public void zeroCopyExample() throws IOException {
try (FileChannel sourceChannel = new FileInputStream("source.file").getChannel();
FileChannel destinationChannel = new FileOutputStream("destination.file").getChannel()) {
// 直接从源通道传输到目标通道,不经过用户空间
sourceChannel.transferTo(0, sourceChannel.size(), destinationChannel);
}
}
案例:构建高效的文件处理服务
将所有知识点整合,我们可以用SpringBoot构建一个高效的文件处理REST服务:
@RestController
@RequestMapping("/api/files")
public class FileProcessingController {
private final FileProcessingService fileService;
public FileProcessingController(FileProcessingService fileService) {
this.fileService = fileService;
}
@PostMapping("/upload")
public ResponseEntity<String> uploadFile(@RequestParam("file") MultipartFile file) {
try {
String id = fileService.storeFile(file);
return ResponseEntity.ok("文件上传成功,ID: " + id);
} catch (IOException e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("上传失败: " + e.getMessage());
}
}
@GetMapping("/download/{id}")
public ResponseEntity<Resource> downloadFile(@PathVariable String id) {
try {
Resource resource = fileService.loadFileAsResource(id);
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=\"" + resource.getFilename() + "\"")
.body(resource);
} catch (Exception e) {
return ResponseEntity.notFound().build();
}
}
@GetMapping("/process/{id}")
public Mono<ResponseEntity<String>> processFileAsync(@PathVariable String id) {
return fileService.processFileReactive(id)
.map(result -> ResponseEntity.ok("处理完成: " + result))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
}
对应的服务实现
@Service
public class FileProcessingService {
private final Path fileStorageLocation;
@Autowired
public FileProcessingService(@Value("${file.upload-dir:uploads}") String uploadDir) {
this.fileStorageLocation = Paths.get(uploadDir).toAbsolutePath().normalize();
try {
Files.createDirectories(this.fileStorageLocation);
} catch (IOException ex) {
throw new RuntimeException("Could not create upload directory", ex);
}
}
public String storeFile(MultipartFile file) throws IOException {
String fileName = StringUtils.cleanPath(file.getOriginalFilename());
String fileId = UUID.randomUUID().toString();
// 使用NIO存储文件
Path targetLocation = this.fileStorageLocation.resolve(fileId);
Files.copy(file.getInputStream(), targetLocation, StandardCopyOption.REPLACE_EXISTING);
return fileId;
}
public Resource loadFileAsResource(String fileId) throws IOException {
Path filePath = this.fileStorageLocation.resolve(fileId).normalize();
Resource resource = new UrlResource(filePath.toUri());
if (resource.exists()) {
return resource;
} else {
throw new FileNotFoundException("File not found: " + fileId);
}
}
public Mono<String> processFileReactive(String fileId) {
Path filePath = this.fileStorageLocation.resolve(fileId).normalize();
return Mono.fromCallable(() -> {
// 使用内存映射文件处理大文件
try (RandomAccessFile file = new RandomAccessFile(filePath.toFile(), "r");
FileChannel channel = file.getChannel()) {
long fileSize = channel.size();
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
// 这里仅为示例,实际处理逻辑应根据业务需求实现
long checksum = 0;
while (buffer.hasRemaining()) {
checksum += buffer.get() & 0xFF;
}
return "文件处理完成,校验和: " + checksum;
}
}).subscribeOn(Schedulers.boundedElastic());
}
}
总结
Java IO系统经过多年发展,从最初的阻塞式IO发展到现在的非阻塞式、异步IO模型,为开发人员提供了丰富的选择。在实际应用中,应根据不同场景选择合适的IO模型:
- 简单场景:使用传统BIO或工具库如Apache Commons IO
- 并发场景:使用NIO配合Selector
- 高吞吐量网络应用:使用Netty或MINA框架
- 现代响应式应用:使用SpringBoot WebFlux
无论选择哪种方式,理解底层IO原理,合理使用缓冲区和零拷贝等技术,才能构建出性能优异的应用程序。
高级文件上传下载技术
在实际的企业级应用中,文件上传下载往往需要处理更复杂的场景,比如大文件传输、断点续传、进度跟踪等。下面我们深入探讨这些高级技术。
分片上传技术
对于大文件上传,单次HTTP请求可能因为超时或网络波动而失败,分片上传是一种有效的解决方案:
@RestController
@RequestMapping("/api/upload")
public class ChunkedUploadController {
private final Path tempDir = Paths.get(System.getProperty("java.io.tmpdir"), "uploads");
private final Path targetDir = Paths.get("uploads");
@PostMapping("/chunk")
public ResponseEntity<Map<String, Object>> uploadChunk(
@RequestParam("file") MultipartFile file,
@RequestParam("uuid") String uuid,
@RequestParam("chunkNumber") Integer chunkNumber,
@RequestParam("totalChunks") Integer totalChunks) {
try {
// 确保目录存在
Files.createDirectories(tempDir.resolve(uuid));
// 保存分片
Path chunkFile = tempDir.resolve(uuid).resolve(String.valueOf(chunkNumber));
Files.copy(file.getInputStream(), chunkFile, StandardCopyOption.REPLACE_EXISTING);
Map<String, Object> response = new HashMap<>();
response.put("uploaded", true);
response.put("chunkNumber", chunkNumber);
// 检查是否所有分片都已上传
if (isUploadComplete(uuid, totalChunks)) {
// 合并文件
Path targetFile = targetDir.resolve(uuid + "-" + file.getOriginalFilename());
mergeChunks(uuid, totalChunks, targetFile);
response.put("complete", true);
response.put("filePath", targetFile.toString());
}
return ResponseEntity.ok(response);
} catch (IOException e) {
Map<String, Object> response = new HashMap<>();
response.put("uploaded", false);
response.put("error", e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}
private boolean isUploadComplete(String uuid, int totalChunks) throws IOException {
Path chunksDir = tempDir.resolve(uuid);
if (!Files.exists(chunksDir)) {
return false;
}
try (Stream<Path> pathStream = Files.list(chunksDir)) {
return pathStream.count() >= totalChunks;
}
}
private void mergeChunks(String uuid, int totalChunks, Path targetFile) throws IOException {
Files.createDirectories(targetFile.getParent());
try (FileChannel targetChannel = FileChannel.open(targetFile,
StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
for (int i = 1; i <= totalChunks; i++) {
Path chunkFile = tempDir.resolve(uuid).resolve(String.valueOf(i));
try (FileChannel sourceChannel = FileChannel.open(chunkFile, StandardOpenOption.READ)) {
sourceChannel.transferTo(0, sourceChannel.size(), targetChannel);
}
}
}
// 清理临时文件
FileUtils.deleteDirectory(tempDir.resolve(uuid).toFile());
}
}
前端分片上传实现示例 (JavaScript)
function uploadLargeFile(file) {
const chunkSize = 2 * 1024 * 1024; // 2MB
const totalChunks = Math.ceil(file.size / chunkSize);
const fileUuid = generateUUID();
for (let chunkNumber = 1; chunkNumber <= totalChunks; chunkNumber++) {
const start = (chunkNumber - 1) * chunkSize;
const end = Math.min(start + chunkSize, file.size);
const chunk = file.slice(start, end);
uploadChunk(chunk, fileUuid, chunkNumber, totalChunks, file.name);
}
}
function uploadChunk(chunk, uuid, chunkNumber, totalChunks, originalFilename) {
const formData = new FormData();
formData.append('file', chunk, originalFilename);
formData.append('uuid', uuid);
formData.append('chunkNumber', chunkNumber);
formData.append('totalChunks', totalChunks);
fetch('/api/upload/chunk', {
method: 'POST',
body: formData
})
.then(response => response.json())
.then(data => {
updateProgressUI(chunkNumber, totalChunks);
if (data.complete) {
console.log('Upload complete:', data.filePath);
showUploadComplete(data.filePath);
}
})
.catch(error => console.error('Error uploading chunk:', error));
}
function generateUUID() {
// UUID生成逻辑
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
const r = Math.random() * 16 | 0;
const v = c === 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
断点续传实现
断点续传允许用户在上传中断后恢复上传,而不必重新上传整个文件:
@RestController
@RequestMapping("/api/resumable")
public class ResumableUploadController {
private final Path uploadDir = Paths.get("uploads");
private static final Map<String, ResumableInfo> resumableInfos = new ConcurrentHashMap<>();
static class ResumableInfo {
String identifier;
long totalSize;
Set<Integer> uploadedChunks = ConcurrentHashMap.newKeySet();
}
@GetMapping("/status")
public ResponseEntity<Map<String, Object>> getStatus(
@RequestParam("identifier") String identifier,
@RequestParam("chunkNumber") Integer chunkNumber) {
Map<String, Object> response = new HashMap<>();
ResumableInfo info = resumableInfos.get(identifier);
if (info != null && info.uploadedChunks.contains(chunkNumber)) {
// 该分片已上传
response.put("uploaded", true);
return ResponseEntity.ok(response);
} else {
// 该分片未上传
response.put("uploaded", false);
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(response);
}
}
@PostMapping("/upload")
public ResponseEntity<Map<String, Object>> uploadChunk(
@RequestParam("file") MultipartFile file,
@RequestParam("resumableIdentifier") String identifier,
@RequestParam("resumableChunkNumber") Integer chunkNumber,
@RequestParam("resumableTotalChunks") Integer totalChunks,
@RequestParam("resumableTotalSize") Long totalSize,
@RequestParam("resumableFilename") String filename) {
Map<String, Object> response = new HashMap<>();
try {
// 获取或创建断点信息
ResumableInfo info = resumableInfos.computeIfAbsent(identifier, id -> {
ResumableInfo newInfo = new ResumableInfo();
newInfo.identifier = id;
newInfo.totalSize = totalSize;
return newInfo;
});
// 保存分片
Path chunksDir = uploadDir.resolve(identifier);
Files.createDirectories(chunksDir);
Path chunkFile = chunksDir.resolve(String.valueOf(chunkNumber));
Files.copy(file.getInputStream(), chunkFile, StandardCopyOption.REPLACE_EXISTING);
// 更新已上传分片信息
info.uploadedChunks.add(chunkNumber);
response.put("uploaded", true);
response.put("chunkNumber", chunkNumber);
// 检查是否所有分片都已上传
if (info.uploadedChunks.size() == totalChunks) {
Path targetFile = uploadDir.resolve(filename);
mergeChunks(identifier, totalChunks, targetFile);
// 清理断点信息
resumableInfos.remove(identifier);
response.put("complete", true);
response.put("filePath", targetFile.toString());
}
return ResponseEntity.ok(response);
} catch (IOException e) {
response.put("uploaded", false);
response.put("error", e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}
private void mergeChunks(String identifier, int totalChunks, Path targetFile) throws IOException {
// 与前面的mergeChunks方法类似
Files.createDirectories(targetFile.getParent());
try (FileChannel targetChannel = FileChannel.open(targetFile,
StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
for (int i = 1; i <= totalChunks; i++) {
Path chunkFile = uploadDir.resolve(identifier).resolve(String.valueOf(i));
try (FileChannel sourceChannel = FileChannel.open(chunkFile, StandardOpenOption.READ)) {
sourceChannel.transferTo(0, sourceChannel.size(), targetChannel);
}
}
}
}
}
流式下载与范围请求支持
为了支持文件的部分下载和断点续传下载,我们可以实现HTTP范围请求:
@RestController
@RequestMapping("/api/download")
public class RangeDownloadController {
private final Path storageLocation = Paths.get("uploads");
@GetMapping("/stream/{filename:.+}")
public ResponseEntity<Resource> downloadFile(
@PathVariable String filename,
HttpServletRequest request) throws IOException {
Path filePath = storageLocation.resolve(filename).normalize();
Resource resource = new UrlResource(filePath.toUri());
if (!resource.exists()) {
return ResponseEntity.notFound().build();
}
String contentType = request.getServletContext().getMimeType(resource.getFile().getAbsolutePath());
if (contentType == null) {
contentType = "application/octet-stream";
}
// 获取文件大小
long fileLength = resource.contentLength();
// 解析Range头
String rangeHeader = request.getHeader(HttpHeaders.RANGE);
if (rangeHeader != null && rangeHeader.startsWith("bytes=")) {
// 处理范围请求
String[] ranges = rangeHeader.substring("bytes=".length()).split("-");
long start = Long.parseLong(ranges[0]);
long end;
if (ranges.length > 1 && !ranges[1].isEmpty()) {
end = Long.parseLong(ranges[1]);
} else {
end = fileLength - 1;
}
if (start > end || start < 0 || end >= fileLength) {
return ResponseEntity.status(HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
.header(HttpHeaders.CONTENT_RANGE, "bytes */" + fileLength)
.build();
}
long rangeLength = end - start + 1;
// 创建自定义资源以支持范围请求
InputStreamResource rangeResource = new InputStreamResource(new FileInputStream(resource.getFile()) {
@Override
public int read() throws IOException {
// 跳过起始位置之前的字节
long skipped = 0;
while (skipped < start) {
long skip = super.skip(start - skipped);
if (skip <= 0) break;
skipped += skip;
}
return super.read();
}
});
return ResponseEntity.status(HttpStatus.PARTIAL_CONTENT)
.contentType(MediaType.parseMediaType(contentType))
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + resource.getFilename() + "\"")
.header(HttpHeaders.ACCEPT_RANGES, "bytes")
.header(HttpHeaders.CONTENT_RANGE, "bytes " + start + "-" + end + "/" + fileLength)
.contentLength(rangeLength)
.body(rangeResource);
} else {
// 正常下载
return ResponseEntity.ok()
.contentType(MediaType.parseMediaType(contentType))
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + resource.getFilename() + "\"")
.header(HttpHeaders.ACCEPT_RANGES, "bytes")
.contentLength(fileLength)
.body(resource);
}
}
}
高性能大文件下载与异步处理
对于大文件下载,我们可以使用异步servlet和NIO技术提高性能:
@RestController
@RequestMapping("/api/download")
public class AsyncDownloadController {
private final Path storageLocation = Paths.get("uploads");
@GetMapping("/async/{filename:.+}")
public void downloadFileAsync(
@PathVariable String filename,
HttpServletRequest request,
HttpServletResponse response,
AsyncContext asyncContext) throws IOException {
Path filePath = storageLocation.resolve(filename).normalize();
if (!Files.exists(filePath)) {
response.sendError(HttpServletResponse.SC_NOT_FOUND);
return;
}
String mimeType = request.getServletContext().getMimeType(filePath.toString());
if (mimeType == null) {
mimeType = "application/octet-stream";
}
// 设置响应头
response.setContentType(mimeType);
response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"");
response.setHeader(HttpHeaders.ACCEPT_RANGES, "bytes");
// 使用CompletableFuture异步处理
CompletableFuture.runAsync(() -> {
try (FileChannel fileChannel = FileChannel.open(filePath, StandardOpenOption.READ);
WritableByteChannel outputChannel = Channels.newChannel(response.getOutputStream())) {
long fileSize = fileChannel.size();
response.setContentLengthLong(fileSize);
// 使用零拷贝技术
long position = 0;
long bytesTransferred;
while (position < fileSize) {
bytesTransferred = fileChannel.transferTo(position, 1024 * 1024, outputChannel);
if (bytesTransferred <= 0) {
break;
}
position += bytesTransferred;
}
response.getOutputStream().flush();
asyncContext.complete();
} catch (IOException e) {
// 处理异常
try {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
asyncContext.complete();
} catch (IOException ex) {
ex.printStackTrace();
}
}
});
}
}
使用第三方库简化文件操作
对于分片上传和断点续传,还可以使用第三方库如commons-fileupload或Spring的集成解决方案:
使用commons-fileupload
@Bean
public MultipartResolver multipartResolver() {
CommonsMultipartResolver resolver = new CommonsMultipartResolver();
resolver.setMaxUploadSize(100 * 1024 * 1024); // 100MB
resolver.setMaxUploadSizePerFile(10 * 1024 * 1024); // 10MB per file
return resolver;
}
文件处理监控与进度跟踪
在Spring Boot应用中监控文件上传和下载进度:
@Component
public class ProgressListener {
private final ConcurrentMap<String, UploadProgressInfo> progressMap = new ConcurrentHashMap<>();
@Getter
@Setter
public static class UploadProgressInfo {
private long contentLength;
private long bytesTransferred;
private long startTime;
private long lastUpdateTime;
public int getProgressPercent() {
return contentLength > 0 ? (int) ((bytesTransferred * 100) / contentLength) : 0;
}
public long getTransferRateBytesPerSecond() {
long duration = (lastUpdateTime - startTime) / 1000;
return duration > 0 ? bytesTransferred / duration : 0;
}
}
public UploadProgressInfo getProgressInfo(String uploadId) {
return progressMap.get(uploadId);
}
public void startTracking(String uploadId, long contentLength) {
UploadProgressInfo info = new UploadProgressInfo();
info.setContentLength(contentLength);
info.setStartTime(System.currentTimeMillis());
info.setLastUpdateTime(System.currentTimeMillis());
progressMap.put(uploadId, info);
}
public void updateProgress(String uploadId, long bytesTransferred) {
UploadProgressInfo info = progressMap.get(uploadId);
if (info != null) {
info.setBytesTransferred(bytesTransferred);
info.setLastUpdateTime(System.currentTimeMillis());
}
}
public void removeTracking(String uploadId) {
progressMap.remove(uploadId);
}
@RestController
@RequestMapping("/api/progress")
public static class ProgressController {
private final ProgressListener progressListener;
public ProgressController(ProgressListener progressListener) {
this.progressListener = progressListener;
}
@GetMapping("/{uploadId}")
public ResponseEntity<UploadProgressInfo> getProgress(@PathVariable String uploadId) {
UploadProgressInfo info = progressListener.getProgressInfo(uploadId);
if (info != null) {
return ResponseEntity.ok(info);
} else {
return ResponseEntity.notFound().build();
}
}
}
}
配合自定义的MultipartResolver
和ServletFilter
,我们可以实现自动进度跟踪:
@Component
public class ProgressFilter implements Filter {
private final ProgressListener progressListener;
public ProgressFilter(ProgressListener progressListener) {
this.progressListener = progressListener;
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
if (request instanceof HttpServletRequest && request.getContentType() != null
&& request.getContentType().contains("multipart/form-data")) {
String uploadId = ((HttpServletRequest) request).getParameter("uploadId");
if (uploadId != null) {
progressListener.startTracking(uploadId, request.getContentLengthLong());
request = new ProgressServletRequestWrapper(request, progressListener, uploadId);
}
}
chain.doFilter(request, response);
}
}
WebSocket实时传输进度反馈
使用WebSocket可以为客户端提供实时的上传/下载进度:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").withSockJS();
}
}
@Component
public class UploadProgressNotifier {
private final SimpMessagingTemplate messagingTemplate;
private final ProgressListener progressListener;
private final ScheduledExecutorService scheduler;
public UploadProgressNotifier(SimpMessagingTemplate messagingTemplate, ProgressListener progressListener) {
this.messagingTemplate = messagingTemplate;
this.progressListener = progressListener;
this.scheduler = Executors.newScheduledThreadPool(1);
// 每秒发送一次进度更新
scheduler.scheduleAtFixedRate(() -> {
progressListener.progressMap.forEach((uploadId, info) -> {
messagingTemplate.convertAndSend("/topic/progress/" + uploadId, info);
});
}, 0, 1, TimeUnit.SECONDS);
}
@PreDestroy
public void shutdown() {
scheduler.shutdown();
}
}
WebSocket客户端示例 (JavaScript)
const socket = new SockJS('/ws');
const stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/progress/' + uploadId, function(message) {
const progressInfo = JSON.parse(message.body);
updateProgressUI(progressInfo);
});
});
function updateProgressUI(progressInfo) {
const progressBar = document.getElementById('progress-bar');
progressBar.style.width = progressInfo.progressPercent + '%';
const speedElement = document.getElementById('transfer-speed');
const speedMbps = (progressInfo.transferRateBytesPerSecond / (1024 * 1024)).toFixed(2);
speedElement.textContent = `${speedMbps} MB/s`;
if (progressInfo.progressPercent >= 100) {
document.getElementById('status').textContent = '上传完成!';
}
}