tio-boot 案例 - 定时任务 监控 PostgreSQL、Redis 和 Elasticsearch

背景介绍

在现代的应用程序中,服务的可靠性和稳定性至关重要。为了确保系统在出现问题时能够及时发现并采取措施,对关键服务的监控显得尤为重要。本项目使用了 Tio-boot 框架,结合 PostgreSQL、Redis 和 Elasticsearch 三个服务的监控功能,并通过 Telegram 推送报警消息,确保当服务出现异常时,能够及时通知相关人员进行处理。

实现方案概述

本项目的监控和报警系统主要由以下几个部分组成:

  1. 服务监控:通过对 PostgreSQL、Redis 和 Elasticsearch 服务进行周期性连接测试,判断服务的可用性。
  2. 报警推送:一旦发现服务不可用,系统将生成报警信息,并通过 Telegram 推送给指定的用户。
  3. 定时任务:使用 Quartz 定时任务,定期执行服务的监控操作,确保系统在任何时间段都能检测到异常。

监控代码

PostgreSQL 监控

PostgresWatchService 类负责监控 PostgreSQL 服务。它通过 JDBC 连接测试主数据库和从数据库的可用性。如果连接失败,会调用 NotificationService 发送报警信息。

import com.litongjava.jfinal.aop.Aop;
import com.litongjava.tio.boot.constatns.TioBootConfigKeys;
import com.litongjava.tio.utils.dsn.DbDSNParser;
import com.litongjava.tio.utils.dsn.JdbcInfo;
import com.litongjava.tio.utils.environment.EnvUtils;
public class PostgresWatchService {

  public void index() {
    String driverClass = "org.postgresql.Driver";

    String mainDsn = EnvUtils.get(TioBootConfigKeys.DATABASE_DSN);
    if (mainDsn != null) {
      JdbcInfo jdbc = new DbDSNParser().parse(mainDsn);
      String message = "Failed to connect main postgresql:" + mainDsn;
      Aop.get(PostgresConnectService.class).connect(jdbc.getUrl(), jdbc.getUser(), jdbc.getPswd(), driverClass, message);
    }

    String replicas = EnvUtils.get("DATABASE_DSN_REPLICAS");
    if (replicas != null) {
      String[] dsns = replicas.split(",");
      for (String dsn : dsns) {
        JdbcInfo jdbc = new DbDSNParser().parse(dsn);
        String message = "Failed to connect slave postgresql:" + dsn;
        Aop.get(PostgresConnectService.class).connect(jdbc.getUrl(), jdbc.getUser(), jdbc.getPswd(), driverClass, message);
      }
    }
  }
}
package com.red.book.watch;

import java.sql.Connection;
import java.sql.DriverManager;

import com.litongjava.jfinal.aop.Aop;
import com.red.book.service.NotificationService;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PostgresConnectService {

  public void connect(String jdbcUrl, String jdbcUser, String jdbcPswd, String driverClass, String message) {
    try (Connection connection = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPswd);) {
      log.info("success");
    } catch (Exception e) {
      e.printStackTrace();
      log.info("fail:" + message);
      Aop.get(NotificationService.class).index("WatchService", "I", "unknow", message);
    }
  }
}

在连接数据库时,PostgresConnectService 类使用 try-with-resources 语句确保在连接结束后,资源能够自动释放。

Redis 监控

RedisWatchService 类通过 Jedis 客户端来测试 Redis 服务的可用性。如果连接失败,或者无法进行基本的 Redis 操作(如设置和获取键值),则会发送报警信息。

import com.litongjava.jfinal.aop.Aop;
import com.litongjava.tio.boot.constatns.TioBootConfigKeys;
import com.litongjava.tio.utils.dsn.RedisInfo;
import com.litongjava.tio.utils.dsn.RedisUrlParser;
import com.litongjava.tio.utils.environment.EnvUtils;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;

public class RedisWatchService {

  public void index() {
    String redisUrl = EnvUtils.get(TioBootConfigKeys.REDIS_URL);
    String message = "Failed to connect redis:" + redisUrl;

    if (redisUrl != null) {
      RedisInfo redisInfo = new RedisUrlParser().parse(redisUrl);

      // 得到Jedis对象
      Jedis jedis = new Jedis(redisInfo.getHost(), redisInfo.getPort());

      try {
        // 如果密码不为空,则进行认证
        if (redisInfo.getPswd() != null && !redisInfo.getPswd().isEmpty()) {
          jedis.auth(redisInfo.getPswd());
        }
        jedis.connect();
        jedis.set("test_key", "test_value");
        String string = jedis.get("test_key");
        log.info("success:{}", string);
        log.info("connect redis successful");
      } catch (Exception e) {
        Aop.get(NotificationService.class).index("WatchService", "I", "unknown", message);
        e.printStackTrace();
      } finally {
        jedis.disconnect();
      }
    }
  }
}

此处在 Jedis 连接 Redis 后,使用 finally 块确保在任何情况下连接都能正常关闭。

Elasticsearch 监控

ElasticsearchWatchService 类通过 RestHighLevelClient 来测试 Elasticsearch 服务的可用性。如果连接失败,会发送报警信息。

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestHighLevelClient;

import com.litongjava.es.client.EsDSN;
import com.litongjava.es.client.EsDSNUtils;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.hutool.StrUtil;
public class ElasticsearchWatchService {

  public void index() {
    String dsn = EnvUtils.getStr("ELASTICSEARCH_DSN");

    if (StrUtil.notBlank(dsn)) {
      EsDSN esDSN = EsDSNUtils.parse(dsn);

      CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
      credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esDSN.getUsername(), esDSN.getPassword()));

      HttpHost httpHost = new HttpHost(esDSN.getHost(), esDSN.getPort(), esDSN.getSchema());

      HttpClientConfigCallback httpClientConfigCallback = httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

      RestClientBuilder builder = RestClient.builder(httpHost).setHttpClientConfigCallback(httpClientConfigCallback);

      try (RestHighLevelClient client = new RestHighLevelClient(builder)) {
        boolean isConnected = client.ping(RequestOptions.DEFAULT);
        log.info("ElasticSearch 连接状态: {}", isConnected);
      } catch (Exception e) {
        String message = "Failed to connect elastic-search:" + dsn;
        Aop.get(NotificationService.class).index("WatchService", "I", "unknown", message);
        e.printStackTrace();
      }
    }
  }
}

RestHighLevelClient 使用 try-with-resources 确保在连接结束后资源能够释放。

向 Telegram 推送消息

报警信息通过 NotificationService 类推送到 Telegram。消息内容包括报警时间、应用名称、报警级别、设备名称等信息,确保接收者能够快速了解并处理问题。

import java.text.SimpleDateFormat;
import java.util.Date;

import com.litongjava.tio.utils.http.ResponseVo;
import com.litongjava.tio.utils.notification.NotificationTemplate;
import com.litongjava.tio.utils.telegram.Telegram;

public class NotificationService {

  public boolean index(String warningName, String level, String deviceName, String content) {
    String appGroupName = "imaginix";
    String appName = "kimi-service-monitoring";
    Date date = new Date();
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String dateString = sdf.format(date);

    String text = String.format(NotificationTemplate.alarmTemplate, dateString, appGroupName, appName, warningName, level, deviceName, content);

    log.info("text:{}",text);
    try {
      ResponseVo responseVo = Telegram.use().sendMessage(TelegramConfig.CHAT_ID, text);
      int code = responseVo.getCode();
      if (code == 200) {
        return true;
      } else {
        log.error("Failed to send message to telegram");
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return false;
  }
}

定时任务的实现

为了定期执行监控任务,使用 Quartz 定时任务。WatchingTask 类会根据配置文件中的调度时间,定期调用监控服务的 index() 方法,确保系统的实时监控能力。

import org.quartz.JobExecutionContext;

import com.litongjava.jfinal.aop.Aop;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.quartz.AbstractJobWithLog;

public class WatchingTask extends AbstractJobWithLog {
  @Override
  public void run(JobExecutionContext context) {
    try {
      if (!EnvUtils.isDev()) {
        log.info("it is not dev environment add PostgresWatchService,RedisWatchService and ElasticsearchWatchService");
        Aop.get(PostgresWatchService.class).index();
        Aop.get(RedisWatchService.class).index();
        Aop.get(ElasticsearchWatchService.class).index();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

通过在 tio-quartz.properties 文件中配置任务执行的时间表达式,可以灵活调整监控任务的频率:

com.task.WatchingTask = 2 * * * * ?

结论

通过以上代码和配置,成功实现了 PostgreSQL、Redis 和 Elasticsearch 的服务监控,结合 Telegram 消息推送功能,当服务出现问题时能够及时报警。定时任务确保了系统的监控是持续且有效的,为运维提供了及时有效的工具。