手写 AIO Http 服务器

本文将介绍一个手写的 Java AIO(Asynchronous I/O)HTTP 服务器的实现。该服务器采用 Java NIO 的异步特性,实现了一个简单的 HTTP 服务器,能够处理基本的 HTTP 请求和响应。以下将详细解析代码的各个部分,帮助您理解其工作原理。

一、概述

该 HTTP 服务器的核心功能包括:

  • 使用 Java NIO 的 AsynchronousServerSocketChannel 实现非阻塞的服务器套接字。
  • 通过 CompletionHandler 接口处理连接、读取和写入操作的回调。
  • 实现了一个简单的路由机制,能够根据请求路径调用相应的处理器。
  • 提供了工具类用于解析 HTTP 请求和构建 HTTP 响应。

二、主要组件介绍

1. 主程序:MainApp

package com.litongjava.aio.boot;

import java.util.Map;

import com.litongjava.aio.boot.config.ServerConfig;
import com.litongjava.aio.boot.context.AioHttpServerJava8;
import com.litongjava.aio.boot.http.HttpRequestHandler;
import com.litongjava.aio.boot.http.HttpRequestRouter;
import com.litongjava.aio.boot.utils.RequestUtils;
import com.litongjava.aio.boot.utils.ResponseUtils;

public class MainApp {

  public static void main(String[] args) {
    HttpRequestRouter r = ServerConfig.me().getHttpRequestRouter();
    r.add("/", new HttpRequestHandler() {

      @Override
      public String handle(String request) throws Exception {
        Map<String, String> requestMap = RequestUtils.getRequestMap(request);
        String body = requestMap.get("ip");
        if (body == null) {
          body = "ok";
        }
        String response = ResponseUtils.toResponse(200, "text/plain;charset=utf-8", body);
        return response;
      }
    });
    AioHttpServerJava8.run();
  }

}

解析:

  • 创建了一个 HttpRequestRouter,并添加了对路径/的处理器。
  • 处理器从请求中获取参数ip,如果不存在则返回"ok"
  • 使用 ResponseUtils 构建 HTTP 响应。
  • 启动 AIO HTTP 服务器。

2. 请求解析工具:RequestUtils

package com.litongjava.aio.boot.utils;

import java.util.HashMap;
import java.util.Map;

public class RequestUtils {

  // 从请求中提取请求路径
  public static String getRequestPath(String request) {
    String[] lines = request.split("\r\n");
    for (String line : lines) {
      if (line.startsWith("GET") || line.startsWith("POST")) {
        String[] parts = line.split(" ");
        if (parts.length > 1) {
          String query = parts[1];
          return query.split("\\?")[0]; // 提取请求路径
        }
      }
    }
    return "/";
  }

  // 从请求中提取参数并封装为Map的方法
  public static Map<String, String> getRequestMap(String request) {
    Map<String, String> paramMap = new HashMap<>();
    String[] lines = request.split("\r\n");
    for (String line : lines) {
      if (line.startsWith("GET") || line.startsWith("POST")) {
        String[] parts = line.split(" ");
        if (parts.length > 1) {
          String query = parts[1];
          if (query.contains("?")) {
            String[] queryParams = query.substring(query.indexOf("?") + 1).split("&");
            for (String param : queryParams) {
              String[] keyValue = param.split("=");
              if (keyValue.length > 1) {
                paramMap.put(keyValue[0], keyValue[1]);
              } else {
                paramMap.put(keyValue[0], ""); // 如果没有值,则设为空字符串
              }
            }
          }
        }
      }
    }
    return paramMap;
  }
}

解析:

  • getRequestPath:从 HTTP 请求中提取请求的路径。
  • getRequestMap:解析请求参数,返回一个包含参数键值对的 Map。

3. 响应构建工具:ResponseUtils

package com.litongjava.aio.boot.utils;

import java.nio.charset.StandardCharsets;

public class ResponseUtils {

  public static String toResponse(int statusCode, String contentType, String body) {
    String statusMessage;
    switch (statusCode) {
    case 200:
      statusMessage = "OK";
      break;
    case 404:
      statusMessage = "Not Found";
      break;
    case 500:
      statusMessage = "Internal Server Error";
      break;
    default:
      statusMessage = "Unknown";
    }

    String response;
    String string = "HTTP/1.1 " + statusCode + " " + statusMessage + "\r\n" + "Content-Type: " + contentType + "\r\n";

    if (body != null) {
      byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
      response = string + "Content-Length: " + bytes.length + "\r\n" + "\r\n" + body;
    } else {
      response = string + "Content-Length: 0" + "\r\n" + "\r\n";
    }
    return response;
  }
}

解析:

  • toResponse:根据状态码、内容类型和响应体构建 HTTP 响应字符串。

4. 服务器启动类:AioHttpServerJava8

package com.litongjava.aio.boot.context;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;

import com.litongjava.aio.boot.handler.AcceptCompletionHandler;

public class AioHttpServerJava8 {
  public static void run() {
    try {
      // 创建异步服务器通道并绑定到端口8080
      AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
      serverChannel.bind(new InetSocketAddress(8080));

      System.out.println("AIO HTTP Server started on port 8080...");

      AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(serverChannel);

      serverChannel.accept(null, acceptCompletionHandler);

      // 防止主线程退出
      Thread.currentThread().join();

    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
  }
}

解析:

  • 创建了 AsynchronousServerSocketChannel,绑定到 8080 端口。
  • 使用 AcceptCompletionHandler 处理新的客户端连接。
  • 主线程阻塞,防止程序退出。

5. 连接处理器:AcceptCompletionHandler

package com.litongjava.aio.boot.handler;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {

  private AsynchronousServerSocketChannel serverChannel;

  public AcceptCompletionHandler(AsynchronousServerSocketChannel serverChannel) {
    this.serverChannel = serverChannel;
  }

  @Override
  public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
    // 再次调用 accept(),以继续接受新连接
    serverChannel.accept(null, this);

    if (clientChannel != null && clientChannel.isOpen()) {
      ByteBuffer buffer = ByteBuffer.allocate(4096);
      ReadCompletionHandler readCompletionHandler = new ReadCompletionHandler(clientChannel);
      clientChannel.read(buffer, buffer, readCompletionHandler);
    }

  }

  @Override
  public void failed(Throwable exc, Void attachment) {
    serverChannel.accept(null, this);
    exc.printStackTrace();
  }
}

解析:

  • 当有新的客户端连接时,completed方法被调用。
  • 再次调用accept以接受新的连接,保持服务器持续监听。
  • 为每个新连接创建一个 ReadCompletionHandler 来处理读操作。

6. 读取处理器:ReadCompletionHandler

package com.litongjava.aio.boot.handler;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

import com.litongjava.aio.boot.config.ServerConfig;

public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {

  private AsynchronousSocketChannel clientChannel;

  public ReadCompletionHandler(AsynchronousSocketChannel clientChannel) {
    this.clientChannel = clientChannel;
  }

  @Override
  public void completed(Integer result, ByteBuffer attachment) {
    ServerConfig.me().getByteBufferHandler().handle(clientChannel, result, attachment);
  }

  @Override
  public void failed(Throwable exc, ByteBuffer attachment) {
    exc.printStackTrace();
    try {
      clientChannel.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

解析:

  • 在读取操作完成后,completed方法被调用。
  • 调用 ServerConfig 中的 ByteBufferHandler 来处理读取的数据。

7. 写入处理器:WriteCompletionHandler

package com.litongjava.aio.boot.handler;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class WriteCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {

  private AsynchronousSocketChannel clientChannel;
  public WriteCompletionHandler(AsynchronousSocketChannel clientChannel) {
    this.clientChannel=clientChannel;
  }

  @Override
  public void completed(Integer result, ByteBuffer buffer) {
    try {
      clientChannel.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void failed(Throwable exc, ByteBuffer buffer) {
    try {
      clientChannel.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
    exc.printStackTrace();
  }
}

解析:

  • 在写入操作完成后,关闭客户端连接。

8. 配置类:ServerConfig

package com.litongjava.aio.boot.config;

import com.litongjava.aio.boot.handler.ByteBufferHandler;
import com.litongjava.aio.boot.handler.DefaultByteBufferHandler;
import com.litongjava.aio.boot.http.DefaultHttpReqeustRouter;
import com.litongjava.aio.boot.http.HttpRequestRouter;

public class ServerConfig {
  private static ServerConfig me = new ServerConfig();

  public static ServerConfig me() {
    return me;
  }

  private ByteBufferHandler byteBufferHandler = new DefaultByteBufferHandler();;
  private HttpRequestRouter httpRequestRouter = new DefaultHttpReqeustRouter();

  public HttpRequestRouter getHttpRequestRouter() {
    return httpRequestRouter;
  }

  public void setHttpRequestRouter(HttpRequestRouter httpRequestRouter) {
    this.httpRequestRouter = httpRequestRouter;
  }

  public ByteBufferHandler getByteBufferHandler() {
    return byteBufferHandler;
  }

  public void setByteBufferHandler(ByteBufferHandler byteBufferHandler) {
    this.byteBufferHandler = byteBufferHandler;
  }
}

解析:

  • 使用单例模式存储服务器的配置。
  • 包含 ByteBufferHandler 和 HttpRequestRouter 的实例。

9. ByteBuffer 处理器接口:ByteBufferHandler

package com.litongjava.aio.boot.handler;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;

public interface ByteBufferHandler {

  public ByteBuffer handle(AsynchronousSocketChannel clientChannel, Integer result, ByteBuffer attachment);
}

解析:

  • 定义了处理读取到的 ByteBuffer 的接口方法。

10. 默认的 ByteBuffer 处理器:DefaultByteBufferHandler

package com.litongjava.aio.boot.handler;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.StandardCharsets;

import com.litongjava.aio.boot.config.ServerConfig;
import com.litongjava.aio.boot.http.HttpRequestHandler;
import com.litongjava.aio.boot.http.HttpRequestRouter;
import com.litongjava.aio.boot.utils.RequestUtils;
import com.litongjava.aio.boot.utils.ResponseUtils;

public class DefaultByteBufferHandler implements ByteBufferHandler {

  @Override
  public ByteBuffer handle(AsynchronousSocketChannel clientChannel, Integer result, ByteBuffer attachment) {
    attachment.flip();
    String request = StandardCharsets.UTF_8.decode(attachment).toString();

    // 解析请求路径
    String requestPath = RequestUtils.getRequestPath(request);

    HttpRequestRouter httpRequestRouter = ServerConfig.me().getHttpRequestRouter();
    HttpRequestHandler handler = httpRequestRouter.find(requestPath);
    if (handler != null) {
      String response = null;
      try {
        response = handler.handle(request);
      } catch (Exception e) {
        response = ResponseUtils.toResponse(500, "text/plain", e.getMessage());
        e.printStackTrace();
      }
      if (request == null) {
        response = ResponseUtils.toResponse(404, "text/plain", "Null");
        writeHttpResponse(clientChannel, response);
      } else {
        writeHttpResponse(clientChannel, response);
      }

    }else {
      // 其他路径,返回404
      String response = ResponseUtils.toResponse(404, "text/plain", "404 Not Found");
      writeHttpResponse(clientChannel, response);
    }


    return null;
  }

  private static void writeHttpResponse(AsynchronousSocketChannel clientChannel, String response) {
    ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
    WriteCompletionHandler writeCompletionHandler = new WriteCompletionHandler(clientChannel);
    clientChannel.write(responseBuffer, responseBuffer, writeCompletionHandler);
  }

}

解析:

  • 实现了 ByteBufferHandler 接口。
  • 解析 HTTP 请求,找到对应的处理器,并生成 HTTP 响应。
  • 使用 WriteCompletionHandler 来发送响应并关闭连接。

11. HTTP 请求路由器:DefaultHttpReqeustRouter

package com.litongjava.aio.boot.http;

public interface HttpRequestRouter {

  /**
   * 添加路由
   * @param path
   * @param handler
   */
  public void add(String path, HttpRequestHandler handler);

  /**
   * 查找路由
   * @param path
   * @return
   */
  public HttpRequestHandler find(String path);
}

package com.litongjava.aio.boot.http;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class DefaultHttpReqeustRouter implements HttpRequestRouter {
  Map<String, HttpRequestHandler> requestMapping = new ConcurrentHashMap<>();

  public void add(String path, HttpRequestHandler handler) {
    requestMapping.put(path, handler);
  }

  /**
   * find route /* 表示匹配任何以特定路径开始的路径,/** 表示匹配该路径及其下的任何子路径
   */
  public HttpRequestHandler find(String path) {
    HttpRequestHandler httpRequestRouteHandler = requestMapping.get(path);
    if (httpRequestRouteHandler != null) {
      return httpRequestRouteHandler;
    }

    // Check for wildcard matches
    Set<Map.Entry<String, HttpRequestHandler>> entrySet = requestMapping.entrySet();

    for (Map.Entry<String, HttpRequestHandler> entry : entrySet) {
      String key = entry.getKey();

      if (key.endsWith("/*")) {
        String baseRoute = key.substring(0, key.length() - 1);
        if (path.startsWith(baseRoute)) {
          return entry.getValue();
        }
      } else if (key.endsWith("/**")) {
        String baseRoute = key.substring(0, key.length() - 2);
        if (path.startsWith(baseRoute)) {
          return entry.getValue();
        }
      }
    }

    return null;
  }
}

解析:

  • 实现了 HttpRequestRouter 接口。
  • 维护了一个路径到处理器的映射。
  • 支持精确匹配和通配符匹配(/*/**)。

12. HTTP 请求处理器接口:HttpRequestHandler

package com.litongjava.aio.boot.http;

@FunctionalInterface
public interface HttpRequestHandler {
  String handle(String rquest) throws Exception;
}

解析:

  • 定义了处理 HTTP 请求的接口方法。

三、工作流程

  1. 服务器启动

    • AioHttpServerJava8.run()启动服务器,绑定端口并开始监听。
    • 使用AcceptCompletionHandler处理新的连接。
  2. 接受连接

    • 当有新连接时,AcceptCompletionHandler.completed方法被调用。
    • 为新连接创建ReadCompletionHandler来处理读操作。
  3. 读取请求

    • ReadCompletionHandler.completed方法被调用,读取客户端发送的数据。
    • 调用DefaultByteBufferHandler.handle处理读取的数据。
  4. 处理请求

    • DefaultByteBufferHandler.handle中:
      • 解析请求路径和参数。
      • 使用HttpRequestRouter找到对应的HttpRequestHandler
      • 调用处理器的handle方法获取响应内容。
      • 使用ResponseUtils构建 HTTP 响应。
  5. 发送响应

    • 使用WriteCompletionHandler将响应发送给客户端。
    • 发送完成后,关闭客户端连接。

四、总结

该 Java AIO HTTP 服务器通过异步非阻塞的方式,实现了一个简单的 HTTP 服务器。它使用了 Java NIO 中的异步通道和回调机制,充分利用了 Java 8 的特性。通过自定义的路由器和处理器,能够方便地扩展处理不同的请求路径。

潜在的改进方向:

  • 支持更多的 HTTP 方法:目前只处理了 GET 和 POST 方法,可以扩展对其他 HTTP 方法的支持。
  • 完善 HTTP 协议解析:当前的请求解析较为简单,可能无法处理复杂的 HTTP 请求。
  • 错误处理:增加对异常和错误状态的处理,提供更加友好的错误响应。
  • 性能优化:针对高并发场景,进行性能调优。

参考:

  • Java 官方文档关于 NIO 的介绍。
  • Java NIO 的异步通道和 CompletionHandler 的使用示例。

通过阅读以上内容,相信您已经对手写 Java AIO HTTP 服务器的实现有了深入的了解。

压力测试

测试数据

root@ping-Inspiron-3458:~# ab -c1000 -n10000000 http://localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1843412 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 1000000 requests
Completed 2000000 requests
Completed 3000000 requests
Completed 4000000 requests
Completed 5000000 requests
Completed 6000000 requests
Completed 7000000 requests
Completed 8000000 requests
Completed 9000000 requests
Completed 10000000 requests
Finished 10000000 requests


Server Software:
Server Hostname:        localhost
Server Port:            8080

Document Path:          /
Document Length:        2 bytes

Concurrency Level:      1000
Time taken for tests:   908.218 seconds
Complete requests:      10000000
Failed requests:        0
Total transferred:      800000000 bytes
HTML transferred:       20000000 bytes
Requests per second:    11010.57 [#/sec] (mean)
Time per request:       90.822 [ms] (mean)
Time per request:       0.091 [ms] (mean, across all concurrent requests)
Transfer rate:          860.20 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0   45  60.0     42    3095
Processing:     1   46  16.9     46    6842
Waiting:        0   32  16.7     31    6832
Total:          4   91  63.1     88    7882

Percentage of the requests served within a certain time (ms)
  50%     88
  66%     90
  75%     91
  80%     92
  90%     94
  95%     97
  98%    103
  99%    108
 100%   7882 (longest request)

压力测试报告

一、测试概述

本次压力测试旨在评估本地服务器(localhost:8080)在高并发条件下的性能和稳定性。通过使用 ApacheBench(ab)工具,模拟大量并发请求,以测量服务器在高负载下的响应能力和处理效率。

二、测试环境

  • 服务器主机名: localhost
  • 服务器端口: 8080
  • 操作系统: 未明确提供(假设为 Linux 系统)
  • 服务器软件: 未明确提供(可能为 Apache、Nginx 或其他)
  • 硬件配置: 未明确提供(建议补充具体配置,如 CPU、内存、存储等)
  • 网络环境: 本地网络

三、测试工具

四、测试参数

  • 目标 URL: http://localhost:8080/
  • 文档路径: /
  • 文档长度: 2 字节
  • 并发级别(Concurrency Level): 1000
  • 请求总数(Total Requests): 10,000,000
  • 连接类型: 默认(短连接)

五、测试结果

5.1 总体性能指标
指标数值
测试总耗时908.218 秒 (~15 分钟)
完成请求数10,000,000
失败请求数0
每秒请求数11,010.57 [#/sec] (平均)
每个请求的平均时间90.822 毫秒 (单个请求)
每个请求的平均时间0.091 毫秒 (所有并发请求)
传输速率860.20 KB/s 接收
5.2 连接时间分析
阶段最小值 (ms)平均值 (ms)标准差 (ms)中位数 (ms)最大值 (ms)
连接04560.0423095
处理14616.9466842
等待03216.7316832
总时间49163.1887882
5.3 响应时间分布
百分比响应时间 (ms)
50%88
66%90
75%91
80%92
90%94
95%97
98%103
99%108
100%7882 (最长请求)

六、分析与讨论

6.1 高吞吐量表现

服务器在高并发(1000)情况下,成功处理了每秒约 11,000 个请求,且所有请求均未失败。这表明服务器在高负载下具有较高的处理能力和稳定性。

6.2 响应时间表现
  • 整体响应速度: 大部分请求(99%)的响应时间均在 108 毫秒以内,显示出良好的响应速度。
  • 极端响应时间: 有 1%的请求响应时间显著增高,最长达到 7882 毫秒。这可能由以下因素导致:
    • 服务器资源瞬时紧张(CPU、内存等)
    • 应用程序内部的长时间运行操作
    • 垃圾回收(若使用 JVM 等)
    • 锁竞争或其他后台任务
6.3 连接时间波动

连接时间的标准差较大(60 ms),最大值达到 3095 ms,表明在高并发情况下,部分连接建立时间较长。可能原因包括:

  • 网络堆栈的性能瓶颈
  • 服务器的连接管理策略不优化
  • 资源竞争导致的延迟
6.4 资源利用率
  • 传输量: 每个请求的文档长度仅为 2 字节,传输量极小,有助于提高每秒处理请求数。然而,在实际应用中,文档大小可能更大,需要根据实际情况调整测试参数。
  • 系统资源: 未提供具体的 CPU、内存、I/O 等资源利用率数据,建议在未来测试中监控这些指标,以更全面地评估系统性能。
6.5 系统瓶颈

尽管整体性能表现良好,但极少数请求的高响应时间提示可能存在潜在的性能瓶颈。需要进一步调查具体原因,建议:

  • 检查服务器日志,寻找异常请求或错误信息
  • 监控系统资源使用情况(CPU、内存、I/O 等)
  • 优化应用程序,减少长时间运行的操作

八、结论

本次压力测试表明,目标服务器在高并发条件下具有良好的处理能力和稳定性,能够高效地处理大量请求。然而,极少数请求的高响应时间提示可能存在潜在的性能瓶颈。建议根据上述改进建议,进一步优化系统,以确保在各种负载情况下都能保持优异的性能表现。