AIO HTTP 服务器

创建简单的 HTTP 服务器

概述

AIO(Asynchronous I/O,异步 I/O)是一种能够在处理网络请求时提升效率的方式,它能够非阻塞地处理多个请求。与传统的阻塞 I/O 不同,AIO 通过异步事件通知机制减少了线程的等待时间。在 Java 中,可以使用 AsynchronousServerSocketChannelAsynchronousSocketChannel 来构建基于 AIO 的 HTTP 服务器。

本文介绍了如何使用 AIO 创建一个简单的 HTTP 服务器,该服务器能够接收客户端发送的请求并响应相应的内容。

代码实现

package com.litongjava.ip;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class AioHttpServerJava8 {

  public static void main(String[] args) {
    try {
      // 创建异步服务器通道并绑定到端口8080
      AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
      serverChannel.bind(new InetSocketAddress(8080));

      System.out.println("AIO HTTP Server started on port 8080...");

      // 使用非阻塞模式接受连接
      serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
        @Override
        public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
          // 再次调用 accept(),以继续接受新连接
          serverChannel.accept(null, this);

          if (clientChannel != null && clientChannel.isOpen()) {
            handleRequest(clientChannel);
          }

        }

        @Override
        public void failed(Throwable exc, Void attachment) {
          serverChannel.accept(null, this);
          exc.printStackTrace();
        }
      });

      // 防止主线程退出
      Thread.currentThread().join();

    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
  }

  // 处理客户端连接
  private static void handleRequest(AsynchronousSocketChannel clientChannel) {
    ByteBuffer buffer = ByteBuffer.allocate(4096); // 增加 buffer 大小以处理较大请求

    clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
      @Override
      public void completed(Integer result, ByteBuffer attachment) {
        attachment.flip();
        String request = StandardCharsets.UTF_8.decode(attachment).toString();

        // 解析请求路径
        String requestPath = getRequestPath(request);
        if ("/ip".equals(requestPath)) {
          ipHandler(clientChannel, request);
          // 发送响应
          return;
        }

        // 其他路径,返回404
        writeHttpResponse(clientChannel, 404, "text/plain", "404 Not Found");

      }

      @Override
      public void failed(Throwable exc, ByteBuffer attachment) {
        exc.printStackTrace();
      }
    });
  }

  private static void ipHandler(AsynchronousSocketChannel clientChannel, String request) {
    // 解析请求,获取IP参数
    Map<String, String> requestMap = getRequestMap(request);
    String ip = requestMap.get("ip");

    String body = ip;

    // 构建HTTP响应
    writeHttpResponse(clientChannel, 200, "text/plain;charset=utf-8", body);
  }

  // 从请求中提取请求路径
  private static String getRequestPath(String request) {
    String[] lines = request.split("\r\n");
    for (String line : lines) {
      if (line.startsWith("GET") || line.startsWith("POST")) {
        String[] parts = line.split(" ");
        if (parts.length > 1) {
          String query = parts[1];
          return query.split("\\?")[0]; // 提取请求路径
        }
      }
    }
    return "/";
  }

  // 从请求中提取参数并封装为Map的方法
  private static Map<String, String> getRequestMap(String request) {
    Map<String, String> paramMap = new HashMap<>();
    String[] lines = request.split("\r\n");
    for (String line : lines) {
      if (line.startsWith("GET") || line.startsWith("POST")) {
        String[] parts = line.split(" ");
        if (parts.length > 1) {
          String query = parts[1];
          if (query.contains("?")) {
            String[] queryParams = query.substring(query.indexOf("?") + 1).split("&");
            for (String param : queryParams) {
              String[] keyValue = param.split("=");
              if (keyValue.length > 1) {
                paramMap.put(keyValue[0], keyValue[1]);
              } else {
                paramMap.put(keyValue[0], ""); // 如果没有值,则设为空字符串
              }
            }
          }
        }
      }
    }
    return paramMap;
  }

  private static void writeHttpResponse(AsynchronousSocketChannel clientChannel, int statusCode, String contentType, String body) {

    String statusMessage;
    switch (statusCode) {
    case 200:
      statusMessage = "OK";
      break;
    case 404:
      statusMessage = "Not Found";
      break;
    case 500:
      statusMessage = "Internal Server Error";
      break;
    default:
      statusMessage = "Unknown";
    }

    String response;
    String string = "HTTP/1.1 " + statusCode + " " + statusMessage + "\r\n" + "Content-Type: " + contentType + "\r\n";

    if (body != null) {
      byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
      response = string + "Content-Length: " + bytes.length + "\r\n" + "\r\n" + body;
    } else {
      response = string + "Content-Length: 0" + "\r\n" + "\r\n";
    }

    ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
    clientChannel.write(responseBuffer, responseBuffer, new CompletionHandler<Integer, ByteBuffer>() {
      @Override
      public void completed(Integer result, ByteBuffer buffer) {
        try {
          clientChannel.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }

      @Override
      public void failed(Throwable exc, ByteBuffer buffer) {
        exc.printStackTrace();
      }
    });
  }
}

请求示例

http://localhost:8080/ip?ip=192.168.1.100

响应

192.168.1.100

日志示例

AIO HTTP Server started on port 8080...
Received request:
GET /ip?ip=192.168.1.100 HTTP/1.1
Host: localhost:8080
Connection: keep-alive
Pragma: no-cache
Cache-Control: no-cache
sec-ch-ua: "Chromium";v="128", "Not;A=Brand";v="24", "Google Chrome";v="128"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "Windows"
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Sec-Fetch-Site: none
Sec-Fetch-Mode: navigate
Sec-Fetch-User: ?1
Sec-Fetch-Dest: document
Accept-Encoding: gzip, deflate, br, zstd
Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7
Cookie: SECKEY_ABVK=ycY75NLzlyo0bY0Kn7ZG5bVPp/l7na1YeWo9DWQ68ds%3D; BMAP_SECKEY=ycY75NLzlyo0bY0Kn7ZG5Z81bbL49sME2ZtV4ChhP7dmYvS5bFstYwymM9_vhT07nWUjJzQ0XX7FwomWPJJKjGukIC9JPxJKEKt9wn8aMUKBAY1tMttDz5xTxCST_QIROkSLyIiDMcL0dMLVxbcA4I1jdJfL_axpsM5vS_A-DhULyyNfAe6L08l_OBQ1rpK3; PHPSE
Extracted IP parameter: 192.168.1.100

代码讲解

AIO 的运行原理

在这段代码中,AIO 通过异步通道 AsynchronousServerSocketChannelAsynchronousSocketChannel 实现非阻塞的网络通信。

  1. 创建服务器通道:服务器通过 AsynchronousServerSocketChannel 打开一个异步通道,并绑定到指定端口(8080)。

    AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
    serverChannel.bind(new InetSocketAddress(8080));
    
  2. 接受连接:服务器在 while(true) 循环中调用 accept() 方法,该方法返回一个 Future 对象,表示一个即将建立的连接。调用 get() 方法会等待并获取实际的客户端连接。

    Future<AsynchronousSocketChannel> acceptFuture = serverChannel.accept();
    AsynchronousSocketChannel clientChannel = acceptFuture.get();
    
  3. 处理请求:连接建立后,服务器通过 read() 方法读取客户端发送的数据。read() 方法是异步的,它接受一个缓冲区和 CompletionHandler,读取完成后会调用 completed() 方法。服务器通过解析请求中的数据路径来判断响应的内容。

    clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
      @Override
      public void completed(Integer result, ByteBuffer attachment) {
        // 处理请求
      }
    });
    
  4. 发送响应:通过 write() 方法将响应写入客户端,并使用 CompletionHandler 异步地完成响应的发送。当写入完成时调用 completed() 方法关闭客户端连接。

    clientChannel.write(responseBuffer, responseBuffer, new CompletionHandler<Integer, ByteBuffer>() {
      @Override
      public void completed(Integer result, ByteBuffer buffer) {
        clientChannel.close();
      }
    });
    

部分代码解释

为什么要再次调用 AsynchronousServerSocketChannel 的 acccept 方法

在异步 I/O 编程中,尤其是使用 AsynchronousServerSocketChannel 时,accept() 方法只会接受一次连接。这意味着每次有新的客户端连接时,都需要再次调用 accept() 方法来等待新的连接请求。如果不再次调用 accept(),服务器将不会继续接收新的客户端连接。

让我们详细解释一下:

1. accept() 方法的工作原理

AsynchronousServerSocketChannel.accept() 是一个异步方法,用于等待客户端连接。当有客户端连接时,它会触发指定的回调(在这里是 CompletionHandler)。但该方法只会处理一个连接请求,处理完当前连接后它就“停止工作”了。

如果你希望服务器能够继续接受后续的客户端连接,你必须再次调用 accept() 方法。

2. 为什么要在回调中再次调用 accept()

当一个连接请求完成后,比如有客户端连接到服务器,accept() 的回调方法(即 CompletionHandlercompleted() 方法)会被调用。在处理完这个客户端连接后,你需要让服务器准备好接受下一个连接,这就是为什么你需要在 completed() 回调中再次调用 accept() 方法的原因:

serverChannel.accept(null, this);

这个调用是为了:

  • 重新开始等待新的客户端连接:在处理完一个连接后,服务器需要继续监听端口,等待下一个连接请求。如果不这样做,服务器将无法接受更多的客户端连接。
  • 保持服务器能够处理多个客户端:每次一个新的客户端连接时,accept() 被调用以准备下一个连接请求,从而使服务器能不断处理多个客户端连接。
3. 异步模式的特性

在异步 I/O 模式下,操作是非阻塞的,服务器不会停留在 accept() 调用处等待,而是会立即返回。只有当连接请求到达时,回调函数才会被调用。这种非阻塞行为使得我们可以并行处理多个客户端。

通过在 accept() 的回调中再次调用 accept(),你确保了服务器能够继续接受新连接,并且不会阻塞在某个连接上。

总结:

accept() 方法只接受一个连接,处理完这个连接后,必须再次调用 accept() 来接受下一个连接。这样可以保证服务器能够持续监听并处理后续的客户端连接,而不会因为只处理一个连接后就停止工作。

为什么在 completed 的第一行调用 accept 更好?

completed 方法的第一行就调用 serverChannel.accept(null, this);,这是一个常见的做法,确保在处理当前客户端连接时,服务器已经开始接受下一个客户端连接请求。

  1. 不阻塞新连接:在异步 I/O 模式下,你处理一个连接的同时,希望服务器能立即开始接受新的连接请求。如果在 completed 的后面调用 accept,服务器在处理当前连接期间不会接受新的连接,可能导致其他客户端在等待。将 accept() 放在第一行意味着你立即开始准备接受下一个连接,确保并发处理多个客户端。

  2. 保持高并发性能:在高并发场景下,服务器能够并行处理多个连接请求非常重要。如果 accept() 在连接处理结束后才被调用,那么服务器会浪费时间等待,无法在处理一个连接的同时接受新的连接。

serverChannel.accept(null, this); 放在 completed() 方法的第一行是一个最佳实践,确保服务器始终处于可接受新连接的状态,保持异步服务器的高并发性能和响应能力。

serverChannel 瞬时只能接受一个连接吗?

是的,serverChannel.accept() 一次只能处理一个连接请求。这并不意味着服务器只能同时处理一个连接,而是说:

  • 每次调用 accept():只能接受一个新的客户端连接。
  • 处理当前连接后:你需要再次调用 accept() 来接受下一个客户端连接。

在异步 I/O 模式下,这个过程是非阻塞的。也就是说,你可以同时接受并处理多个连接,但每个新的连接请求都需要通过 accept() 方法来启动。

异步 I/O 的并行处理机制

尽管 accept() 一次只能接受一个连接,但在异步 I/O 模式下,服务器在处理一个客户端连接时,仍然可以处理其他的连接。这是通过以下机制实现的:

  1. accept() 只负责接受连接:一旦连接成功建立,服务器会将连接交给 AsynchronousSocketChannel 来处理数据读写。

  2. 并行处理多个连接:每个客户端连接都有自己独立的 AsynchronousSocketChannel,服务器可以并行处理多个客户端的请求和响应。通过 read()write() 等方法,服务器能够同时处理多个连接的数据流。

  3. 重复调用 accept():每次成功接受连接后,你需要再次调用 accept() 来准备接受下一个客户端的连接请求。正因为服务器是异步的,这样做不会阻塞正在进行的其他 I/O 操作。

如果出现异常,调用 failed() 方法,服务器会停止接受新的连接?

当在异步 I/O 操作中出现异常时,CompletionHandlerfailed() 方法会被调用。如果你不在 failed() 方法中重新调用 accept(),那么服务器确实会停止接受新的连接。

为了确保服务器在发生异常时能够继续接受新的连接,你需要在 failed() 方法中也调用 accept()。这样即使发生异常,服务器仍然能够处理新的连接请求。

buffer 变量的作用

ByteBuffer buffer = ByteBuffer.allocate(4096); // 增加 buffer 大小以处理较大请求

在这段代码中,buffer 是用于存储从客户端读取的数据的缓存。当客户端发送请求时,数据会被读取到这个 ByteBuffer 中。你提到的 "两个 buffer 变量" 指的是 buffer 既作为读操作的参数传递,也作为 CompletionHandler 的第二个泛型参数传递。下面解释一下原因:

buffer 的用途:
  1. 数据读取缓存buffer 是实际用于存储客户端请求数据的缓存。当异步通道读取数据时,它需要一个 ByteBuffer 来存储这些数据。因此,在 clientChannel.read(buffer, buffer, handler) 中,第一个 buffer 是用于存储从客户端读取的数据。

  2. 异步操作的上下文:异步 I/O 操作中的 CompletionHandler 有两个泛型参数,第一个是操作结果(这里是读取的字节数 Integer),第二个是上下文对象(这里是 ByteBuffer)。在这段代码中,buffer 被作为上下文传入到 CompletionHandler,这样你就可以在回调方法(completed)中继续使用同一个 ByteBuffer 来处理数据。因为这是异步操作,所以你需要将 buffer 传入,以便在读取完成后可以继续处理它。

clientChannel.read 为什么要传入两个 ByteBuffer 对象

buffer 变量确实只创建了一个 ByteBuffer 对象,并且在异步读取数据时两次传入同一个 buffer 作为参数。这里的 buffer 实际上起着不同的作用,因此两次传入的 buffer 是有必要的。让我详细解释一下:

1. 传入 bufferread() 方法

在代码中,这行是负责读取客户端数据的关键操作:

clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {...});
  • 第一个 buffer 是传递给 read() 方法的。AsynchronousSocketChannel.read() 方法的第一个参数是一个 ByteBuffer,用于存储从客户端读取的数据。也就是说,网络请求的数据会被写入这个 buffer 中。
2. 传入 buffer 作为 CompletionHandler 的上下文

第二个 buffer 作为上下文传递给 CompletionHandlerattachment 参数。在异步操作完成时,CompletionHandler 会被调用。你需要知道,在回调函数中如何访问到原始的 ByteBuffer 对象,这样你可以处理客户端发送的数据。所以传入第二个 buffer 作为上下文,确保当 completed() 回调被触发时,你能够继续访问到这个 buffer,从而读取和处理数据。

为什么需要两次传入 buffer

虽然你传入的是同一个 ByteBuffer 对象,但它在 read() 方法和 CompletionHandler 中分别起着不同的作用:

  • 第一用途:作为 read() 方法的目标缓冲区,用于存储从客户端接收到的数据。
  • 第二用途:作为 CompletionHandler 的上下文,这样你可以在异步读取完成后继续访问同一个 buffer

如果只传递给 read(),而不作为 CompletionHandler 的上下文传入,异步回调完成时你就无法轻松访问到这个缓冲区里的数据。而 CompletionHandler 的设计就是让你可以携带上下文信息,比如 buffer,从而在回调时处理数据。

在异步编程中,将同一个 buffer 传递给 read()CompletionHandler 是为了保持在异步操作完成后的上下文数据。即便是两次传入 buffer,这仍然是对同一个 ByteBuffer 对象的引用,确保数据的读取和处理能顺利完成。

请求处理过程

  1. 一个请求发送到 AIO 时,AIO 为该请求分配的对象是 AsynchronousSocketChannel

    当客户端发送请求时,AIO 服务器通过 AsynchronousServerSocketChannel.accept() 方法接受客户端连接,并为该连接分配一个 AsynchronousSocketChannel 对象。这个对象代表客户端和服务器之间的异步通信通道。AsynchronousSocketChannel 是用于处理单个请求的通道,每个客户端请求都会有一个独立的 AsynchronousSocketChannel 实例。

    Future<AsynchronousSocketChannel> acceptFuture = serverChannel.accept();
    AsynchronousSocketChannel clientChannel = acceptFuture.get();
    
  2. 后续处理是针对 AsynchronousSocketChannel 对象进行的镜像处理

    这个对象被用于读取客户端请求数据和发送响应数据。具体步骤如下:

    • 读取请求数据:当客户端发送请求时,AsynchronousSocketChannel.read() 方法会启动异步读取操作。这个方法不会阻塞,它会将数据写入 ByteBuffer,并在读取完成后通过 CompletionHandler 回调函数处理数据。

      clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
          // 读取完成,处理请求
          String request = StandardCharsets.UTF_8.decode(attachment).toString();
        }
      
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
          // 读取失败的处理
        }
      });
      
    • 发送响应数据:处理完成后,服务器会根据请求内容构建响应并通过 AsynchronousSocketChannel.write() 方法将响应异步地发送给客户端。写入完成后,服务器会关闭该通道。

      ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8));
      clientChannel.write(responseBuffer, responseBuffer, new CompletionHandler<Integer, ByteBuffer>() {
        @Override
        public void completed(Integer result, ByteBuffer buffer) {
          try {
            clientChannel.close(); // 关闭通道
          } catch (IOException e) {
            e.printStackTrace();
          }
        }
      
        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {
          // 写入失败的处理
        }
      });
      

默认线程池

使用 AsynchronousServerSocketChannel.open() 而不指定任何参数时,Java NIO 使用了操作系统的本地异步 I/O 实现。在 Linux 上,这通常是通过 epoll 来实现的,非常高效,能够处理大量的并发连接。

  • 默认实现:使用 epoll,事件驱动,不需要为每个连接分配线程,能够高效地处理大量并发连接。

  • 自定义线程池的影响:一旦您为 AsynchronousChannelGroup 提供了自定义的线程池,即使是通过 withCachedThreadPool 方法,Java NIO 可能会切换到一种混合模式,部分使用线程池,部分使用事件驱动。这可能导致性能下降,因为线程池的配置和行为会影响到底层的异步 I/O 模型。

  • JVM 内部优化:Java NIO 的默认线程池(通过 AsynchronousChannelProvider.provider() 提供)由 JVM 自动管理,并且根据系统资源(如 CPU 核心数)进行了性能优化。你不需要担心如何调优线程池的大小、队列长度等问题,这些细节由 JVM 内部处理。

-自动负载管理:默认线程池会根据系统的负载情况,自动调整线程的使用,确保异步 I/O 操作的高效执行,而不会轻易出现过载或资源耗尽的情况。

总结

  • 请求到达时,AIO 会为每个请求分配一个 AsynchronousSocketChannel 对象,代表服务器与该客户端的通信通道。
  • 所有的读写操作都是通过这个通道对象进行的,读操作将数据写入缓冲区,写操作将响应发送回客户端。
  • 通道使用 CompletionHandler 进行异步操作,读写数据的处理都是通过回调完成的。