自定义和线程池和池化 ByteBuffer
背景与问题
在使用 Java AIO(NIO.2)编程时,如果不做特殊配置,默认的 AsynchronousChannelGroup
会使用系统默认的线程池。默认情况下,线程池为会频繁的创建线程、策略单一,当面对高并发请求时,很容易成为性能瓶颈。此外,频繁为每个请求分配 ByteBuffer
对象也会在高并发场景中造成较大的 GC 压力,从而影响整体吞吐率和响应时间。
为了解决这些问题,我们可以采用以下两个优化手段:
自定义 AsynchronousChannelGroup:通过为
AsynchronousChannelGroup
提供自定义的ThreadPoolExecutor
,我们可以精确控制线程数量和队列策略,确保在高并发负载下具有足够的处理能力,避免默认线程池过小导致的拥塞问题。ByteBuffer 池化:为避免在每个请求处理时频繁创建和销毁
ByteBuffer
,我们可以使用对象池(Object Pool)的思想预先分配一定数量的ByteBuffer
对象。当有请求到来时,从池中获取ByteBuffer
,请求处理完毕后将其归还池中。这种方式能够显著减少 GC 压力,提升内存使用效率和整体吞吐量。
核心思想与实现步骤
1. 自定义 AsynchronousChannelGroup
int threadCount = Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadCount,
threadCount,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "aio-worker-" + count.getAndIncrement());
t.setDaemon(true);
return t;
}
}
);
// 创建异步通道组,并使用自定义线程池
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executor);
说明:
threadCount
:根据实际需要进行调整。通常以 CPU 核心数的倍数作为参考,防止线程过少导致阻塞或过多导致切换开销过大。AsynchronousChannelGroup.withThreadPool(executor)
:使创建的异步 I/O 通道在指定的线程池中执行回调,提供更可控和可调的并发处理能力。
2. ByteBuffer 池化
private static class ByteBufferPool {
private final LinkedBlockingQueue<ByteBuffer> pool;
private final int bufferSize;
public ByteBufferPool(int poolSize, int bufferSize) {
this.pool = new LinkedBlockingQueue<>(poolSize);
this.bufferSize = bufferSize;
for (int i = 0; i < poolSize; i++) {
pool.offer(ByteBuffer.allocate(bufferSize));
}
}
public ByteBuffer acquire() {
ByteBuffer buffer = pool.poll();
if (buffer == null) {
// 池用完则临时创建一个,以确保请求不被阻塞
return ByteBuffer.allocate(bufferSize);
}
buffer.clear();
return buffer;
}
public void release(ByteBuffer buffer) {
if (buffer != null) {
buffer.clear();
pool.offer(buffer);
}
}
}
// 创建一个池子,比如1000个 4KB 的缓冲区
private static final ByteBufferPool BUFFER_POOL = new ByteBufferPool(1000, 4096);
说明:
- 利用
LinkedBlockingQueue
来管理缓冲区池。 - 当缓冲区池空时,会回退到临时分配策略,这保证了在极高并发时不会被池容量限制死锁。
- 在请求完成或异常时,需要及时将缓冲区释放回池。
3. 整合示例代码
下面的示例代码展示了一个简单的 AIO HTTP 服务器。服务端采用自定义的 AsynchronousChannelGroup
来支持高并发请求处理,并使用 ByteBufferPool
来减少缓冲区反复创建和销毁带来的 GC 压力。
package com.litongjava.ip;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.litongjava.ip.utils.Ip2RegionUtils;
public class AioHttpServerJava8 {
// 自定义的异步通道组
private static AsynchronousChannelGroup channelGroup;
static {
try {
int threadCount = Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadCount,
threadCount,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "aio-worker-" + count.getAndIncrement());
t.setDaemon(true);
return t;
}
}
);
channelGroup = AsynchronousChannelGroup.withThreadPool(executor);
} catch (IOException e) {
e.printStackTrace();
}
}
// ByteBuffer对象池
private static class ByteBufferPool {
private final LinkedBlockingQueue<ByteBuffer> pool;
private final int bufferSize;
public ByteBufferPool(int poolSize, int bufferSize) {
this.pool = new LinkedBlockingQueue<>(poolSize);
this.bufferSize = bufferSize;
for (int i = 0; i < poolSize; i++) {
pool.offer(ByteBuffer.allocate(bufferSize));
}
}
public ByteBuffer acquire() {
ByteBuffer buffer = pool.poll();
if (buffer == null) {
// 当池中无可用缓冲区时,临时分配
return ByteBuffer.allocate(bufferSize);
}
buffer.clear();
return buffer;
}
public void release(ByteBuffer buffer) {
if (buffer != null) {
buffer.clear();
pool.offer(buffer);
}
}
}
private static final ByteBufferPool BUFFER_POOL = new ByteBufferPool(1000, 4096);
public static void main(String[] args) {
try {
AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(channelGroup);
serverChannel.bind(new InetSocketAddress(8000));
System.out.println("AIO HTTP Server started on port 8000...");
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
// 继续接受下一个连接
serverChannel.accept(null, this);
if (clientChannel != null && clientChannel.isOpen()) {
handleRequest(clientChannel);
}
}
@Override
public void failed(Throwable exc, Void attachment) {
serverChannel.accept(null, this);
exc.printStackTrace();
}
});
// 保持主线程存活
Thread.currentThread().join();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
// 处理请求
private static void handleRequest(AsynchronousSocketChannel clientChannel) {
final ByteBuffer buffer = BUFFER_POOL.acquire();
clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result == -1) {
// 客户端关闭连接
closeClient(clientChannel);
BUFFER_POOL.release(attachment);
return;
}
attachment.flip();
String request = StandardCharsets.UTF_8.decode(attachment).toString();
String requestPath = getRequestPath(request);
if ("/ip".equals(requestPath)) {
ipHandler(clientChannel, request);
} else if ("/ok".equals(requestPath)) {
writeHttpResponse(clientChannel, 200, "text/plain", "OK");
} else if ("/echo".equals(requestPath)) {
writeHttpResponse(clientChannel, 200, "text/plain", request);
} else {
writeHttpResponse(clientChannel, 404, "text/plain", "404 Not Found");
}
BUFFER_POOL.release(attachment);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
BUFFER_POOL.release(attachment);
closeClient(clientChannel);
}
});
}
private static void closeClient(AsynchronousSocketChannel clientChannel) {
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void ipHandler(AsynchronousSocketChannel clientChannel, String request) {
Map<String, String> requestMap = getRequestMap(request);
String ip = requestMap.get("ip");
if (ip == null) {
try {
InetSocketAddress remoteAddress = (InetSocketAddress) clientChannel.getRemoteAddress();
ip = remoteAddress.getAddress().getHostAddress();
} catch (IOException e) {
e.printStackTrace();
}
}
String body = null;
if (ip != null && !ip.isEmpty()) {
body = Ip2RegionUtils.searchIp(ip);
}
writeHttpResponse(clientChannel, 200, "text/plain;charset=utf-8", body);
}
private static String getRequestPath(String request) {
int lineBreak = request.indexOf("\r\n");
String firstLine = lineBreak > 0 ? request.substring(0, lineBreak) : request;
if (firstLine.startsWith("GET") || firstLine.startsWith("POST")) {
int firstSpace = firstLine.indexOf(' ');
if (firstSpace == -1) return "/";
int secondSpace = firstLine.indexOf(' ', firstSpace + 1);
if (secondSpace == -1) return "/";
String path = firstLine.substring(firstSpace + 1, secondSpace);
int qIndex = path.indexOf('?');
return (qIndex > 0) ? path.substring(0, qIndex) : path;
}
return "/";
}
private static Map<String, String> getRequestMap(String request) {
Map<String, String> paramMap = new HashMap<>();
String[] lines = request.split("\r\n");
for (String line : lines) {
if (line.startsWith("GET") || line.startsWith("POST")) {
String[] parts = line.split(" ");
if (parts.length > 1) {
String query = parts[1];
if (query.contains("?")) {
String[] queryParams = query.substring(query.indexOf("?") + 1).split("&");
for (String param : queryParams) {
String[] keyValue = param.split("=");
if (keyValue.length > 1) {
paramMap.put(keyValue[0], keyValue[1]);
} else {
paramMap.put(keyValue[0], "");
}
}
}
}
}
}
return paramMap;
}
private static void writeHttpResponse(AsynchronousSocketChannel clientChannel, int statusCode, String contentType, String body) {
String statusMessage;
switch (statusCode) {
case 200:
statusMessage = "OK";
break;
case 404:
statusMessage = "Not Found";
break;
case 500:
statusMessage = "Internal Server Error";
break;
default:
statusMessage = "Unknown";
}
String responseHeader = "HTTP/1.1 " + statusCode + " " + statusMessage + "\r\n" +
"Content-Type: " + contentType + "\r\n";
String response;
if (body != null) {
byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
response = responseHeader + "Content-Length: " + bytes.length + "\r\n\r\n" + body;
} else {
response = responseHeader + "Content-Length: 0\r\n\r\n";
}
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8));
clientChannel.write(responseBuffer, responseBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
closeClient(clientChannel);
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
exc.printStackTrace();
closeClient(clientChannel);
}
});
}
}
总结
通过上述优化措施,我们针对 Java AIO 在高并发环境下可能出现的性能问题进行了改进。在实际应用中,您可以根据目标场景进一步微调线程数、缓冲区大小、池容量,并对请求处理逻辑进行更精细的优化。采用自定义异步通道组与缓冲区池化的策略,有助于提升服务器的吞吐量、降低响应延迟,并在资源受限的环境中获得更佳的性能表现。