跳到主要内容

深入理解Java IO体系与SpringBoot应用实践

· 阅读需 19 分钟
Sébastien Lorber
Docusaurus maintainer

Java IO系统是所有Java应用程序的基础组件之一,从最初的阻塞IO到现代的非阻塞IO和异步IO,Java的IO框架不断发展,为开发人员提供了越来越强大和灵活的工具。本文将深入探讨Java IO的各个方面,并结合SpringBoot和常用第三方框架介绍其实际应用。

Java IO发展历程

传统BIO (Blocking IO)

Java最早提供的是阻塞式IO,位于java.io包下,主要特点是同步阻塞。

try (FileInputStream fis = new FileInputStream("input.txt");
FileOutputStream fos = new FileOutputStream("output.txt")) {
byte[] buffer = new byte[1024];
int len;
while ((len = fis.read(buffer)) != -1) {
fos.write(buffer, 0, len);
}
} catch (IOException e) {
e.printStackTrace();
}

阻塞IO的优点是使用简单直观,但在处理大量并发连接时性能较差,因为每个连接需要一个专用线程。

NIO (New IO)

Java 1.4引入了NIO (java.nio包),提供了非阻塞IO操作能力:

try (FileChannel sourceChannel = FileChannel.open(Paths.get("input.txt"), StandardOpenOption.READ);
FileChannel targetChannel = FileChannel.open(Paths.get("output.txt"), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {

ByteBuffer buffer = ByteBuffer.allocate(1024);
while (sourceChannel.read(buffer) > 0) {
buffer.flip();
targetChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}

NIO的核心组件包括:

  • Buffer: 数据容器
  • Channel: 数据传输通道
  • Selector: 多路复用选择器

AIO (Asynchronous IO)

Java 7引入了AIO (java.nio.channels包的AsynchronousChannel等),提供了真正的异步非阻塞IO操作:

AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(Paths.get("test.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);

fileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("读取完成: " + result);
attachment.flip();
// 处理数据
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});

SpringBoot中的IO应用

SpringBoot提供了多种方式简化IO操作:

资源加载与处理

SpringBoot的ResourceLoader接口和Resource抽象极大简化了资源访问:

@Autowired
private ResourceLoader resourceLoader;

public void loadResource() {
Resource resource = resourceLoader.getResource("classpath:config.json");
try (InputStream inputStream = resource.getInputStream()) {
// 处理输入流
} catch (IOException e) {
e.printStackTrace();
}
}

文件上传与处理

SpringBoot简化了文件上传操作:

@PostMapping("/upload")
public String handleFileUpload(@RequestParam("file") MultipartFile file) {
if (!file.isEmpty()) {
try {
byte[] bytes = file.getBytes();
Path path = Paths.get("uploads/" + file.getOriginalFilename());
Files.write(path, bytes);
return "上传成功";
} catch (IOException e) {
return "上传失败:" + e.getMessage();
}
}
return "文件为空";
}
配置文件上传大小限制
# application.properties
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB

WebFlux响应式编程

SpringBoot 2.0引入的WebFlux框架支持响应式非阻塞IO:

@RestController
public class ReactiveController {

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamData() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Event " + sequence)
.take(10);
}

@GetMapping("/file")
public Mono<ResponseEntity<Resource>> serveFile(@RequestParam String filename) {
Resource file = new FileSystemResource("path/to/files/" + filename);
return Mono.just(ResponseEntity.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + file.getFilename() + "\"")
.body(file));
}
}

常用第三方IO框架

Apache Commons IO

Apache Commons IO提供了许多实用的IO工具类:

@Service
public class FileService {

public void copyFile(String source, String dest) throws IOException {
FileUtils.copyFile(new File(source), new File(dest));
}

public List<String> readLines(String filename) throws IOException {
return FileUtils.readLines(new File(filename), StandardCharsets.UTF_8);
}

public String readFileToString(String filename) throws IOException {
return FileUtils.readFileToString(new File(filename), StandardCharsets.UTF_8);
}
}

Guava IO

Google Guava提供的IO工具也很实用:

public void guavaIoExample() throws IOException {
// 读取文件所有内容
String content = Files.asCharSource(new File("test.txt"), StandardCharsets.UTF_8).read();

// 按行读取
ImmutableList<String> lines = Files.asCharSource(new File("test.txt"), StandardCharsets.UTF_8).readLines();

// 写入文件
Files.asCharSink(new File("output.txt"), StandardCharsets.UTF_8).write("Hello, World!");
}

MINA和Netty

对于网络应用,MINA和Netty是两个主流的NIO框架:

Netty服务器示例
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new StringDecoder(),
new StringEncoder(),
new EchoServerHandler());
}
});

ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

class EchoServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println("收到消息: " + msg);
ctx.writeAndFlush("Echo: " + msg);
}
}

SpringBoot与Netty整合

SpringBoot可以与Netty无缝整合,构建高性能的网络应用:

@Component
public class NettyServerBootstrap implements ApplicationListener<ContextRefreshedEvent> {

private final ServerBootstrap serverBootstrap;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;

public NettyServerBootstrap() {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketChannelInitializer());
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
try {
ChannelFuture future = serverBootstrap.bind(8088).sync();
System.out.println("Netty服务器启动,端口: 8088");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@PreDestroy
public void destroy() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

IO性能优化技巧

使用缓冲

无论是传统IO还是NIO,使用缓冲都能显著提升性能:

public void bufferedExample() throws IOException {
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream("large.file"));
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream("copy.file"))) {

byte[] buffer = new byte[8192]; // 8KB缓冲区
int bytesRead;
while ((bytesRead = bis.read(buffer)) != -1) {
bos.write(buffer, 0, bytesRead);
}
}
}

内存映射文件

对于大文件处理,内存映射文件(Memory-Mapped File)提供了最高效的方式:

public void memoryMappedExample() throws IOException {
try (RandomAccessFile file = new RandomAccessFile("hugefile.data", "r");
FileChannel channel = file.getChannel()) {

long fileSize = channel.size();
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);

// 直接操作内存缓冲区
while (buffer.hasRemaining()) {
// 处理数据
byte b = buffer.get();
}
}
}

零拷贝技术

零拷贝是一种避免CPU多次复制数据的技术,适用于文件传输等场景:

public void zeroCopyExample() throws IOException {
try (FileChannel sourceChannel = new FileInputStream("source.file").getChannel();
FileChannel destinationChannel = new FileOutputStream("destination.file").getChannel()) {

// 直接从源通道传输到目标通道,不经过用户空间
sourceChannel.transferTo(0, sourceChannel.size(), destinationChannel);
}
}

案例:构建高效的文件处理服务

将所有知识点整合,我们可以用SpringBoot构建一个高效的文件处理REST服务:

@RestController
@RequestMapping("/api/files")
public class FileProcessingController {

private final FileProcessingService fileService;

public FileProcessingController(FileProcessingService fileService) {
this.fileService = fileService;
}

@PostMapping("/upload")
public ResponseEntity<String> uploadFile(@RequestParam("file") MultipartFile file) {
try {
String id = fileService.storeFile(file);
return ResponseEntity.ok("文件上传成功,ID: " + id);
} catch (IOException e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("上传失败: " + e.getMessage());
}
}

@GetMapping("/download/{id}")
public ResponseEntity<Resource> downloadFile(@PathVariable String id) {
try {
Resource resource = fileService.loadFileAsResource(id);
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=\"" + resource.getFilename() + "\"")
.body(resource);
} catch (Exception e) {
return ResponseEntity.notFound().build();
}
}

@GetMapping("/process/{id}")
public Mono<ResponseEntity<String>> processFileAsync(@PathVariable String id) {
return fileService.processFileReactive(id)
.map(result -> ResponseEntity.ok("处理完成: " + result))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
}
对应的服务实现
@Service
public class FileProcessingService {

private final Path fileStorageLocation;

@Autowired
public FileProcessingService(@Value("${file.upload-dir:uploads}") String uploadDir) {
this.fileStorageLocation = Paths.get(uploadDir).toAbsolutePath().normalize();

try {
Files.createDirectories(this.fileStorageLocation);
} catch (IOException ex) {
throw new RuntimeException("Could not create upload directory", ex);
}
}

public String storeFile(MultipartFile file) throws IOException {
String fileName = StringUtils.cleanPath(file.getOriginalFilename());
String fileId = UUID.randomUUID().toString();

// 使用NIO存储文件
Path targetLocation = this.fileStorageLocation.resolve(fileId);
Files.copy(file.getInputStream(), targetLocation, StandardCopyOption.REPLACE_EXISTING);

return fileId;
}

public Resource loadFileAsResource(String fileId) throws IOException {
Path filePath = this.fileStorageLocation.resolve(fileId).normalize();
Resource resource = new UrlResource(filePath.toUri());

if (resource.exists()) {
return resource;
} else {
throw new FileNotFoundException("File not found: " + fileId);
}
}

public Mono<String> processFileReactive(String fileId) {
Path filePath = this.fileStorageLocation.resolve(fileId).normalize();

return Mono.fromCallable(() -> {
// 使用内存映射文件处理大文件
try (RandomAccessFile file = new RandomAccessFile(filePath.toFile(), "r");
FileChannel channel = file.getChannel()) {

long fileSize = channel.size();
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);

// 这里仅为示例,实际处理逻辑应根据业务需求实现
long checksum = 0;
while (buffer.hasRemaining()) {
checksum += buffer.get() & 0xFF;
}

return "文件处理完成,校验和: " + checksum;
}
}).subscribeOn(Schedulers.boundedElastic());
}
}

总结

Java IO系统经过多年发展,从最初的阻塞式IO发展到现在的非阻塞式、异步IO模型,为开发人员提供了丰富的选择。在实际应用中,应根据不同场景选择合适的IO模型:

  1. 简单场景:使用传统BIO或工具库如Apache Commons IO
  2. 并发场景:使用NIO配合Selector
  3. 高吞吐量网络应用:使用Netty或MINA框架
  4. 现代响应式应用:使用SpringBoot WebFlux

无论选择哪种方式,理解底层IO原理,合理使用缓冲区和零拷贝等技术,才能构建出性能优异的应用程序。

高级文件上传下载技术

在实际的企业级应用中,文件上传下载往往需要处理更复杂的场景,比如大文件传输、断点续传、进度跟踪等。下面我们深入探讨这些高级技术。

分片上传技术

对于大文件上传,单次HTTP请求可能因为超时或网络波动而失败,分片上传是一种有效的解决方案:

@RestController
@RequestMapping("/api/upload")
public class ChunkedUploadController {

private final Path tempDir = Paths.get(System.getProperty("java.io.tmpdir"), "uploads");
private final Path targetDir = Paths.get("uploads");

@PostMapping("/chunk")
public ResponseEntity<Map<String, Object>> uploadChunk(
@RequestParam("file") MultipartFile file,
@RequestParam("uuid") String uuid,
@RequestParam("chunkNumber") Integer chunkNumber,
@RequestParam("totalChunks") Integer totalChunks) {

try {
// 确保目录存在
Files.createDirectories(tempDir.resolve(uuid));

// 保存分片
Path chunkFile = tempDir.resolve(uuid).resolve(String.valueOf(chunkNumber));
Files.copy(file.getInputStream(), chunkFile, StandardCopyOption.REPLACE_EXISTING);

Map<String, Object> response = new HashMap<>();
response.put("uploaded", true);
response.put("chunkNumber", chunkNumber);

// 检查是否所有分片都已上传
if (isUploadComplete(uuid, totalChunks)) {
// 合并文件
Path targetFile = targetDir.resolve(uuid + "-" + file.getOriginalFilename());
mergeChunks(uuid, totalChunks, targetFile);

response.put("complete", true);
response.put("filePath", targetFile.toString());
}

return ResponseEntity.ok(response);
} catch (IOException e) {
Map<String, Object> response = new HashMap<>();
response.put("uploaded", false);
response.put("error", e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

private boolean isUploadComplete(String uuid, int totalChunks) throws IOException {
Path chunksDir = tempDir.resolve(uuid);
if (!Files.exists(chunksDir)) {
return false;
}

try (Stream<Path> pathStream = Files.list(chunksDir)) {
return pathStream.count() >= totalChunks;
}
}

private void mergeChunks(String uuid, int totalChunks, Path targetFile) throws IOException {
Files.createDirectories(targetFile.getParent());

try (FileChannel targetChannel = FileChannel.open(targetFile,
StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {

for (int i = 1; i <= totalChunks; i++) {
Path chunkFile = tempDir.resolve(uuid).resolve(String.valueOf(i));
try (FileChannel sourceChannel = FileChannel.open(chunkFile, StandardOpenOption.READ)) {
sourceChannel.transferTo(0, sourceChannel.size(), targetChannel);
}
}
}

// 清理临时文件
FileUtils.deleteDirectory(tempDir.resolve(uuid).toFile());
}
}
前端分片上传实现示例 (JavaScript)
function uploadLargeFile(file) {
const chunkSize = 2 * 1024 * 1024; // 2MB
const totalChunks = Math.ceil(file.size / chunkSize);
const fileUuid = generateUUID();

for (let chunkNumber = 1; chunkNumber <= totalChunks; chunkNumber++) {
const start = (chunkNumber - 1) * chunkSize;
const end = Math.min(start + chunkSize, file.size);
const chunk = file.slice(start, end);

uploadChunk(chunk, fileUuid, chunkNumber, totalChunks, file.name);
}
}

function uploadChunk(chunk, uuid, chunkNumber, totalChunks, originalFilename) {
const formData = new FormData();
formData.append('file', chunk, originalFilename);
formData.append('uuid', uuid);
formData.append('chunkNumber', chunkNumber);
formData.append('totalChunks', totalChunks);

fetch('/api/upload/chunk', {
method: 'POST',
body: formData
})
.then(response => response.json())
.then(data => {
updateProgressUI(chunkNumber, totalChunks);

if (data.complete) {
console.log('Upload complete:', data.filePath);
showUploadComplete(data.filePath);
}
})
.catch(error => console.error('Error uploading chunk:', error));
}

function generateUUID() {
// UUID生成逻辑
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
const r = Math.random() * 16 | 0;
const v = c === 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}

断点续传实现

断点续传允许用户在上传中断后恢复上传,而不必重新上传整个文件:

@RestController
@RequestMapping("/api/resumable")
public class ResumableUploadController {

private final Path uploadDir = Paths.get("uploads");
private static final Map<String, ResumableInfo> resumableInfos = new ConcurrentHashMap<>();

static class ResumableInfo {
String identifier;
long totalSize;
Set<Integer> uploadedChunks = ConcurrentHashMap.newKeySet();
}

@GetMapping("/status")
public ResponseEntity<Map<String, Object>> getStatus(
@RequestParam("identifier") String identifier,
@RequestParam("chunkNumber") Integer chunkNumber) {

Map<String, Object> response = new HashMap<>();
ResumableInfo info = resumableInfos.get(identifier);

if (info != null && info.uploadedChunks.contains(chunkNumber)) {
// 该分片已上传
response.put("uploaded", true);
return ResponseEntity.ok(response);
} else {
// 该分片未上传
response.put("uploaded", false);
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(response);
}
}

@PostMapping("/upload")
public ResponseEntity<Map<String, Object>> uploadChunk(
@RequestParam("file") MultipartFile file,
@RequestParam("resumableIdentifier") String identifier,
@RequestParam("resumableChunkNumber") Integer chunkNumber,
@RequestParam("resumableTotalChunks") Integer totalChunks,
@RequestParam("resumableTotalSize") Long totalSize,
@RequestParam("resumableFilename") String filename) {

Map<String, Object> response = new HashMap<>();

try {
// 获取或创建断点信息
ResumableInfo info = resumableInfos.computeIfAbsent(identifier, id -> {
ResumableInfo newInfo = new ResumableInfo();
newInfo.identifier = id;
newInfo.totalSize = totalSize;
return newInfo;
});

// 保存分片
Path chunksDir = uploadDir.resolve(identifier);
Files.createDirectories(chunksDir);

Path chunkFile = chunksDir.resolve(String.valueOf(chunkNumber));
Files.copy(file.getInputStream(), chunkFile, StandardCopyOption.REPLACE_EXISTING);

// 更新已上传分片信息
info.uploadedChunks.add(chunkNumber);

response.put("uploaded", true);
response.put("chunkNumber", chunkNumber);

// 检查是否所有分片都已上传
if (info.uploadedChunks.size() == totalChunks) {
Path targetFile = uploadDir.resolve(filename);
mergeChunks(identifier, totalChunks, targetFile);

// 清理断点信息
resumableInfos.remove(identifier);

response.put("complete", true);
response.put("filePath", targetFile.toString());
}

return ResponseEntity.ok(response);
} catch (IOException e) {
response.put("uploaded", false);
response.put("error", e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

private void mergeChunks(String identifier, int totalChunks, Path targetFile) throws IOException {
// 与前面的mergeChunks方法类似
Files.createDirectories(targetFile.getParent());

try (FileChannel targetChannel = FileChannel.open(targetFile,
StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {

for (int i = 1; i <= totalChunks; i++) {
Path chunkFile = uploadDir.resolve(identifier).resolve(String.valueOf(i));
try (FileChannel sourceChannel = FileChannel.open(chunkFile, StandardOpenOption.READ)) {
sourceChannel.transferTo(0, sourceChannel.size(), targetChannel);
}
}
}
}
}

流式下载与范围请求支持

为了支持文件的部分下载和断点续传下载,我们可以实现HTTP范围请求:

@RestController
@RequestMapping("/api/download")
public class RangeDownloadController {

private final Path storageLocation = Paths.get("uploads");

@GetMapping("/stream/{filename:.+}")
public ResponseEntity<Resource> downloadFile(
@PathVariable String filename,
HttpServletRequest request) throws IOException {

Path filePath = storageLocation.resolve(filename).normalize();
Resource resource = new UrlResource(filePath.toUri());

if (!resource.exists()) {
return ResponseEntity.notFound().build();
}

String contentType = request.getServletContext().getMimeType(resource.getFile().getAbsolutePath());
if (contentType == null) {
contentType = "application/octet-stream";
}

// 获取文件大小
long fileLength = resource.contentLength();

// 解析Range头
String rangeHeader = request.getHeader(HttpHeaders.RANGE);

if (rangeHeader != null && rangeHeader.startsWith("bytes=")) {
// 处理范围请求
String[] ranges = rangeHeader.substring("bytes=".length()).split("-");
long start = Long.parseLong(ranges[0]);

long end;
if (ranges.length > 1 && !ranges[1].isEmpty()) {
end = Long.parseLong(ranges[1]);
} else {
end = fileLength - 1;
}

if (start > end || start < 0 || end >= fileLength) {
return ResponseEntity.status(HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
.header(HttpHeaders.CONTENT_RANGE, "bytes */" + fileLength)
.build();
}

long rangeLength = end - start + 1;

// 创建自定义资源以支持范围请求
InputStreamResource rangeResource = new InputStreamResource(new FileInputStream(resource.getFile()) {
@Override
public int read() throws IOException {
// 跳过起始位置之前的字节
long skipped = 0;
while (skipped < start) {
long skip = super.skip(start - skipped);
if (skip <= 0) break;
skipped += skip;
}
return super.read();
}
});

return ResponseEntity.status(HttpStatus.PARTIAL_CONTENT)
.contentType(MediaType.parseMediaType(contentType))
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + resource.getFilename() + "\"")
.header(HttpHeaders.ACCEPT_RANGES, "bytes")
.header(HttpHeaders.CONTENT_RANGE, "bytes " + start + "-" + end + "/" + fileLength)
.contentLength(rangeLength)
.body(rangeResource);
} else {
// 正常下载
return ResponseEntity.ok()
.contentType(MediaType.parseMediaType(contentType))
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + resource.getFilename() + "\"")
.header(HttpHeaders.ACCEPT_RANGES, "bytes")
.contentLength(fileLength)
.body(resource);
}
}
}

高性能大文件下载与异步处理

对于大文件下载,我们可以使用异步servlet和NIO技术提高性能:

@RestController
@RequestMapping("/api/download")
public class AsyncDownloadController {

private final Path storageLocation = Paths.get("uploads");

@GetMapping("/async/{filename:.+}")
public void downloadFileAsync(
@PathVariable String filename,
HttpServletRequest request,
HttpServletResponse response,
AsyncContext asyncContext) throws IOException {

Path filePath = storageLocation.resolve(filename).normalize();
if (!Files.exists(filePath)) {
response.sendError(HttpServletResponse.SC_NOT_FOUND);
return;
}

String mimeType = request.getServletContext().getMimeType(filePath.toString());
if (mimeType == null) {
mimeType = "application/octet-stream";
}

// 设置响应头
response.setContentType(mimeType);
response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"");
response.setHeader(HttpHeaders.ACCEPT_RANGES, "bytes");

// 使用CompletableFuture异步处理
CompletableFuture.runAsync(() -> {
try (FileChannel fileChannel = FileChannel.open(filePath, StandardOpenOption.READ);
WritableByteChannel outputChannel = Channels.newChannel(response.getOutputStream())) {

long fileSize = fileChannel.size();
response.setContentLengthLong(fileSize);

// 使用零拷贝技术
long position = 0;
long bytesTransferred;
while (position < fileSize) {
bytesTransferred = fileChannel.transferTo(position, 1024 * 1024, outputChannel);
if (bytesTransferred <= 0) {
break;
}
position += bytesTransferred;
}

response.getOutputStream().flush();
asyncContext.complete();
} catch (IOException e) {
// 处理异常
try {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
asyncContext.complete();
} catch (IOException ex) {
ex.printStackTrace();
}
}
});
}
}

使用第三方库简化文件操作

对于分片上传和断点续传,还可以使用第三方库如commons-fileupload或Spring的集成解决方案:

使用commons-fileupload
@Bean
public MultipartResolver multipartResolver() {
CommonsMultipartResolver resolver = new CommonsMultipartResolver();
resolver.setMaxUploadSize(100 * 1024 * 1024); // 100MB
resolver.setMaxUploadSizePerFile(10 * 1024 * 1024); // 10MB per file
return resolver;
}

文件处理监控与进度跟踪

在Spring Boot应用中监控文件上传和下载进度:

@Component
public class ProgressListener {

private final ConcurrentMap<String, UploadProgressInfo> progressMap = new ConcurrentHashMap<>();

@Getter
@Setter
public static class UploadProgressInfo {
private long contentLength;
private long bytesTransferred;
private long startTime;
private long lastUpdateTime;

public int getProgressPercent() {
return contentLength > 0 ? (int) ((bytesTransferred * 100) / contentLength) : 0;
}

public long getTransferRateBytesPerSecond() {
long duration = (lastUpdateTime - startTime) / 1000;
return duration > 0 ? bytesTransferred / duration : 0;
}
}

public UploadProgressInfo getProgressInfo(String uploadId) {
return progressMap.get(uploadId);
}

public void startTracking(String uploadId, long contentLength) {
UploadProgressInfo info = new UploadProgressInfo();
info.setContentLength(contentLength);
info.setStartTime(System.currentTimeMillis());
info.setLastUpdateTime(System.currentTimeMillis());
progressMap.put(uploadId, info);
}

public void updateProgress(String uploadId, long bytesTransferred) {
UploadProgressInfo info = progressMap.get(uploadId);
if (info != null) {
info.setBytesTransferred(bytesTransferred);
info.setLastUpdateTime(System.currentTimeMillis());
}
}

public void removeTracking(String uploadId) {
progressMap.remove(uploadId);
}

@RestController
@RequestMapping("/api/progress")
public static class ProgressController {

private final ProgressListener progressListener;

public ProgressController(ProgressListener progressListener) {
this.progressListener = progressListener;
}

@GetMapping("/{uploadId}")
public ResponseEntity<UploadProgressInfo> getProgress(@PathVariable String uploadId) {
UploadProgressInfo info = progressListener.getProgressInfo(uploadId);
if (info != null) {
return ResponseEntity.ok(info);
} else {
return ResponseEntity.notFound().build();
}
}
}
}

配合自定义的MultipartResolverServletFilter,我们可以实现自动进度跟踪:

@Component
public class ProgressFilter implements Filter {

private final ProgressListener progressListener;

public ProgressFilter(ProgressListener progressListener) {
this.progressListener = progressListener;
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {

if (request instanceof HttpServletRequest && request.getContentType() != null
&& request.getContentType().contains("multipart/form-data")) {

String uploadId = ((HttpServletRequest) request).getParameter("uploadId");
if (uploadId != null) {
progressListener.startTracking(uploadId, request.getContentLengthLong());
request = new ProgressServletRequestWrapper(request, progressListener, uploadId);
}
}

chain.doFilter(request, response);
}
}

WebSocket实时传输进度反馈

使用WebSocket可以为客户端提供实时的上传/下载进度:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").withSockJS();
}
}

@Component
public class UploadProgressNotifier {

private final SimpMessagingTemplate messagingTemplate;
private final ProgressListener progressListener;
private final ScheduledExecutorService scheduler;

public UploadProgressNotifier(SimpMessagingTemplate messagingTemplate, ProgressListener progressListener) {
this.messagingTemplate = messagingTemplate;
this.progressListener = progressListener;
this.scheduler = Executors.newScheduledThreadPool(1);

// 每秒发送一次进度更新
scheduler.scheduleAtFixedRate(() -> {
progressListener.progressMap.forEach((uploadId, info) -> {
messagingTemplate.convertAndSend("/topic/progress/" + uploadId, info);
});
}, 0, 1, TimeUnit.SECONDS);
}

@PreDestroy
public void shutdown() {
scheduler.shutdown();
}
}
WebSocket客户端示例 (JavaScript)
const socket = new SockJS('/ws');
const stompClient = Stomp.over(socket);

stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/progress/' + uploadId, function(message) {
const progressInfo = JSON.parse(message.body);
updateProgressUI(progressInfo);
});
});

function updateProgressUI(progressInfo) {
const progressBar = document.getElementById('progress-bar');
progressBar.style.width = progressInfo.progressPercent + '%';

const speedElement = document.getElementById('transfer-speed');
const speedMbps = (progressInfo.transferRateBytesPerSecond / (1024 * 1024)).toFixed(2);
speedElement.textContent = `${speedMbps} MB/s`;

if (progressInfo.progressPercent >= 100) {
document.getElementById('status').textContent = '上传完成!';
}
}

参考资料

  1. Java NIO官方文档
  2. Spring Boot官方文档
  3. Netty项目官网
  4. Java Performance: The Definitive Guide