掘金 后端 ( ) • 2024-06-27 09:30

周一闲来无事,如期接到了面试电话,面试官小哥哥收了收困意,试探性地确定了下,是否是要面试的我。我言辞犀利,准确肯定地回答,确保了本次面试没有什么差错。

compressed_WechatIMG177.jpg

面试官: 看你说你对JAVA很熟悉,常规基础问题,我们先来问一下线程池的问题吧。

我: 好的,这个我最拿手了,你说这个,我可一点都不困... 😎

面试官: 比如线程池内存池,当JVM挂了,队列内的任务丢了没有?

我(思考): 卧槽,这些内存池,老大JVM都GG了,还玩个毛,毛都没有了啊。。。😱

我: 会丢失,这种情况,我们一般直接就使用MQ这种可以持久化消息的方案,不再使用线程池了。

面试官: 我们现在就讨论针对线程池的方案,如何做持久化方案。

我(思考): 我擦,搁以前,我说个MQ替代就完事了,还能这么墨迹。。。😅

我: 这个是优雅挂的,还是暴力挂的?

我(思考): 这个时候,我试图想通过一些JVM提供的钩子函数,做一些事情,所以想确定下挂的方式。。。

面试官: 就是挂了,啥也没有了,你怎么弄吗。

我(思考): 不讲武德啊,上来就挂挂挂,我这使用的啥服务器,总是挂。。。😩

我: 好的,如果JVM挂了,队列里的任务确实会丢失。为了避免这种情况,我们可以使用数据库来持久化任务。当任务被提交时,我们先将任务信息存储到数据库中。一旦任务执行完成,我们更新数据库中的任务状态。这样,即使JVM挂了,我们也可以从数据库中恢复未完成的任务。具体的实现步骤如下...


当JVM停止时,线程池中的任务会丢失,未执行的任务将不会被继续执行。具体行为取决于JVM的关闭过程以及线程池的状态管理。以下是一些详细说明:

1. 正常关闭与强制关闭

正常关闭

在正常关闭过程中,JVM会尝试完成正在执行的任务并处理一些清理工作。对于线程池,可以使用ExecutorServiceshutdown()方法来优雅地关闭线程池。这种方法会拒绝新的任务提交,但会继续执行已提交的任务。

executorService.shutdown();
try {
    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
        executorService.shutdownNow();
    }
} catch (InterruptedException e) {
    executorService.shutdownNow();
}

强制关闭

如果使用shutdownNow()方法,线程池将立即尝试停止所有活动任务并停止处理等待队列中的任务:

List<Runnable> pendingTasks = executorService.shutdownNow();

shutdownNow()方法会返回一个未处理任务的列表,这些任务被丢弃,不会被执行。

2. JVM 意外停止

如果JVM由于某些原因(如进程被杀死、系统崩溃等)意外停止,那么线程池中的所有任务将立即停止,未完成的任务将被丢失。这种情况无法通过编程方式控制或恢复。

3. 数据持久化

为了避免任务丢失,可以采用以下措施:

  • 任务持久化:将任务存储在数据库、消息队列或持久化存储中,以便在系统重启后可以重新获取和执行。
  • 使用分布式任务调度系统:如Quartz、Spring Batch等,这些框架提供了任务调度和持久化支持。

我来开始设计持久化方案了

要实现一个可靠的任务持久化方案,可以使用数据库和一个持久化任务队列。以下是一个详细的实现方案,示例使用MySQL数据库和Java。

1. 数据库设计

首先,需要设计一个数据库表来存储任务信息。假设我们有一个名为tasks的表,结构如下:

CREATE TABLE tasks (
    task_id INT AUTO_INCREMENT PRIMARY KEY,
    task_data VARCHAR(255) NOT NULL,
    status ENUM('PENDING', 'COMPLETED') DEFAULT 'PENDING',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

2. 任务持久化和执行逻辑

任务持久化

任务提交时,将任务信息存储到数据库中,状态为PENDING

任务执行

从数据库中加载PENDING状态的任务并提交到线程池执行。任务执行完成后,更新任务状态为COMPLETED

3. 示例代码

连接数据库的工具类

首先,创建一个工具类来管理数据库连接:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class DatabaseUtil {
    private static final String URL = "jdbc:mysql://localhost:3306/mydb";
    private static final String USER = "user";
    private static final String PASSWORD = "password";

    public static Connection getConnection() throws SQLException {
        return DriverManager.getConnection(URL, USER, PASSWORD);
    }
}

任务管理类

任务管理类负责任务的持久化和加载:

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

public class TaskManager {
    // 保存任务到数据库
    public void saveTask(String taskData) {
        String sql = "INSERT INTO tasks (task_data) VALUES (?)";
        try (Connection conn = DatabaseUtil.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, taskData);
            ps.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    // 从数据库中加载待处理任务
    public List<Task> loadPendingTasks() {
        String sql = "SELECT * FROM tasks WHERE status = 'PENDING'";
        List<Task> tasks = new ArrayList<>();
        try (Connection conn = DatabaseUtil.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql);
             ResultSet rs = ps.executeQuery()) {
            while (rs.next()) {
                int taskId = rs.getInt("task_id");
                String taskData = rs.getString("task_data");
                tasks.add(new Task(taskId, taskData));
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return tasks;
    }

    // 更新任务状态
    public void updateTaskStatus(int taskId, String status) {
        String sql = "UPDATE tasks SET status = ? WHERE task_id = ?";
        try (Connection conn = DatabaseUtil.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, status);
            ps.setInt(2, taskId);
            ps.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

任务类

任务类表示一个任务对象:

public class Task {
    private int taskId;
    private String taskData;

    public Task(int taskId, String taskData) {
        this.taskId = taskId;
        this.taskData = taskData;
    }

    public int getTaskId() {
        return taskId;
    }

    public String getTaskData() {
        return taskData;
    }
}

任务执行类

任务执行类实现Runnable接口,并在任务完成后更新任务状态:

public class TaskExecutor implements Runnable {
    private Task task;
    private TaskManager taskManager;

    public TaskExecutor(Task task, TaskManager taskManager) {
        this.task = task;
        this.taskManager = taskManager;
    }

    @Override
    public void run() {
        System.out.println("Executing task: " + task.getTaskData());
        try {
            // 模拟任务执行
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 更新任务状态为 COMPLETED
        taskManager.updateTaskStatus(task.getTaskId(), "COMPLETED");
        System.out.println("Task " + task.getTaskData() + " completed.");
    }
}

主类

主类负责初始化线程池、加载任务并提交执行:

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TaskScheduler {
    public static void main(String[] args) {
        TaskManager taskManager = new TaskManager();
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        // 加载未完成的任务
        List<Task> pendingTasks = taskManager.loadPendingTasks();
        for (Task task : pendingTasks) {
            executorService.submit(new TaskExecutor(task, taskManager));
        }

        // 示例:提交新任务
        taskManager.saveTask("New Task 1");
        taskManager.saveTask("New Task 2");

        // 加载并执行新任务
        List<Task> newTasks = taskManager.loadPendingTasks();
        for (Task task : newTasks) {
            executorService.submit(new TaskExecutor(task, taskManager));
        }

        executorService.shutdown();
    }
}

通过上述步骤,您可以实现一个可靠的任务持久化和执行系统,即使在JVM停止或崩溃后,也能重新加载并执行未完成的任务。