掘金 后端 ( ) • 2024-04-11 17:08

第一部分:Java多线程基础

1.1 创建线程的方式

Java提供了两种主要的创建线程的方式:继承Thread类和实现Runnable接口。此外,还可以通过实现Callable接口实现线程,并在主线程获取返回值。

继承Thread类:

class MyThread extends Thread {
    public void run() {
        // 线程执行的任务
    }
}

public class Main {
    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();
    }
}

实现Runnable接口:

class MyRunnable implements Runnable {
    public void run() {
        // 线程执行的任务
    }
}

public class Main {
    public static void main(String[] args) {
        Thread myThread = new Thread(new MyRunnable());
        myThread.start();
    }
}

实现Callable接口:

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

class MyCallable implements Callable<String> {
    public String call() throws Exception {
        // 线程执行的任务,并返回结果
        return "Task completed!";
    }
}

public class Main {
    public static void main(String[] args) {
        // 创建Callable任务
        Callable<String> myCallable = new MyCallable();
        // 将Callable任务封装为FutureTask
        FutureTask<String> futureTask = new FutureTask<>(myCallable);

        // 创建线程并启动
        Thread thread = new Thread(futureTask);
        thread.start();

        try {
            // 获取线程执行结果
            String result = futureTask.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

为什么要使用实现Runnable接口和实现Callable接口的方式?因为Java是单继承的,如果已经继承了其他类,就无法再继承Thread类,但可以实现Runnable接口或实现Callable接口,从而更灵活地组织代码结构。

1.2 线程的同步与互斥

在多线程环境中,为了确保共享资源的安全访问,需要使用synchronized关键字实现线程的同步与互斥。

同步方法:

class Counter {
    private int count = 0;

    public synchronized void increment() {
        count++;
    }
}

同步代码块:

class Counter {
    private int count = 0;

    public void increment() {
        synchronized (this) {
            count++;
        }
    }
}

通过synchronized关键字,可以确保同一时刻只有一个线程访问共享资源,避免数据不一致的问题。

1.3 线程间通信

在多线程编程中,线程之间可能需要进行通信。Java提供了waitnotifynotifyAll等方法来实现线程间的协作。

生产者-消费者模型:

class SharedResource {
    private boolean flag = false;

    public synchronized void produce() throws InterruptedException {
        while (flag) {
            wait();
        }
        // 生产操作
        flag = true;
        notify();
    }

    public synchronized void consume() throws InterruptedException {
        while (!flag) {
            wait();
        }
        // 消费操作
        flag = false;
        notify();
    }
}

在这个例子中,生产者线程通过produce方法向共享资源添加数据,而消费者线程通过consume方法从共享资源消费数据。通过使用waitnotify,实现了线程之间的协同工作。

第二部分:Java线程池

2.1 Java内置线程池

Java提供了Executors工厂类,用于创建各种类型的线程池。以下是一个使用FixedThreadPool的示例:

import java.util.concurrent.*;

class MyTask implements Runnable {
    public void run() {
        // 线程执行的任务
    }
}

public class Main {
    public static void main(String[] args) {
        // 创建固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);
        // 提交任务
        executor.submit(new MyTask());
        // 关闭线程池
        executor.shutdown();
    }
}

2.2 使用alibaba推荐的线程池方式

Alibaba推荐使用ThreadPoolExecutor的构造方法创建线程池,以提供更精细的参数配置。

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5, // corePoolSize
            10, // maximumPoolSize
            60, // keepAliveTime
            TimeUnit.SECONDS, // unit
            new LinkedBlockingQueue<>(100), // workQueue
            new ThreadPoolExecutor.CallerRunsPolicy() // handler
        );

        // 提交任务
        executor.submit(new MyTask());

        // 关闭线程池
        executor.shutdown();
    }
}

这里使用了ThreadPoolExecutor.CallerRunsPolicy作为拒绝策略,表示由调用线程(主线程)执行被拒绝的任务。

2.3 线程池的核心参数与配置

线程池的核心参数包括corePoolSizemaximumPoolSizekeepAliveTimeunitworkQueue。这些参数决定了线程池的基本特性和行为。

  • corePoolSize:核心线程数,即线程池维护的最小线程数。

  • maximumPoolSize:最大线程数,即线程池允许创建的最大线程数。

  • keepAliveTime:线程空闲时间,即当线程池中线程数量超过corePoolSize时,多余的空闲线程在被终止之前等待新任务的最长时间。

  • unit:时间单位,用于指定keepAliveTime的时间单位。

  • workQueue:工作队列,用于保存等待执行的任务。

  • 2.4线程池执行流程​​

  • 具体来说,线程池的行为如下:

  • 当任务数量超过核心线程数 (corePoolSize) 时,新任务会被放入工作队列 (workQueue)。

  • 如果工作队列已满,且当前线程数量小于最大线程数 (maximumPoolSize),则会创建新的非核心线程来处理任务。

  • 如果工作队列已满,且当前线程数量达到最大线程数,此时新任务无法处理,会触发线程池的拒绝策略。

  • 对于非核心线程,在它们处于闲置状态时,会根据 keepAliveTime 的设置,在超过这个时间后被回收。

以下是一个线程池的配置示例:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, // corePoolSize
    10, // maximumPoolSize
    60, // keepAliveTime
    TimeUnit.SECONDS, // unit
    new LinkedBlockingQueue<>(100) // workQueue
);

参数选择的注意事项:

  • corePoolSize 的选择应该考虑系统资源和并发任务的性质。
  • maximumPoolSize 的选择应该足够大,以处理可能的突发任务。
  • keepAliveTime 的选择应该根据任务执行时间的长短和系统资源的可用性来确定。
  • workQueue 的选择应该根据任务的性质来确定,例如,对于耗时短、频繁的任务,可以选择一个无界队列,对于耗时长的任务,可以选择一个有界队列。

2.5 线程池的使用

使用线程池可以通过submit方法提交任务,并通过Future对象获取执行结果。在任务执行完毕后,记得调用shutdown方法关闭线程池。

import java.util.concurrent.*;

class MyTask implements Callable<String> {
    public String call() throws Exception {
        // 线程执行的任务,并返回结果
        return "Task completed!";
    }
}

public class Main {
    public static void main(String[] args) {
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);
        try {
            // 提交任务
            Future<String> future = executor.submit(new MyTask());
            // 获取执行结果
            String result = future.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            executor.shutdown();
        }
    }
}

2.6 线程池的异常处理和关闭

线程池的异常处理可以通过setRejectedExecutionHandler方法进行设置,用于处理任务被拒绝执行的情况。在任务执行完毕后,要记得调用shutdown方法关闭线程池。

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    // ...
);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.shutdown();

第三部分:实战与最佳实践

3.1 抢红包实例

在实际应用中,线程池和多线程编程可以用于模拟一些有趣的场景,比如多人同时抢红包。

import java.util.concurrent.*;

class RedPacketTask implements Callable<String> {
    private static int totalAmount = 1000;

    public String call() throws Exception {
        synchronized (RedPacketTask.class) {
            if (totalAmount > 0) {
                // 抢红包逻辑
                System.out.println(Thread.currentThread().getName() + " 抢到红包");
                totalAmount -= 10;
                return "抢到红包";
            } else {
                return "红包已抢完";
            }
        }
    }
}

public class Main {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        try {
            for (int i = 0; i < 10; i++) {
                Future<String> future = executor.submit(new RedPacketTask());
                System.out.println(future.get()); // 获取执行结果
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

synchronized (RedPacketTask.class) {
    // ...
}

这里使用的锁定对象是 RedPacketTask.class,而不是具体的 RedPacketTask 实例。这是因为synchronized关键字可以用于不同对象的同步块,只要这些块使用相同的锁。

对于这个具体的例子,RedPacketTask.class 是一个类的 Class 对象,是在类加载时由 Java 虚拟机自动创建的,且在整个应用程序的生命周期中只有一个。因此,通过使用类的 Class 对象作为锁,你确保了对于所有 RedPacketTask 类的实例都使用相同的锁。

这样做的好处是,不同的 RedPacketTask 实例共享同一个锁,确保在同一时刻只有一个线程能够执行同步块中的代码。这是一种广义的锁,适用于类级别的同步。

3.2 工厂模拟实例

另一个例子是模拟工厂中多个工人同时工作的场景。

import java.util.concurrent.*;

class Worker implements Callable<String> {
    private String name;

    public Worker(String name) {
        this.name = name;
    }

    public String call() throws Exception {
        System.out.println(name + " 开始工作");
        // 工作逻辑
        System.out.println(name + " 完成工作");
        return name + " 工作完成";
    }
}

public class Main {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        try {
            List<Future<String>> futures = new ArrayList<>();
            for (int i = 1; i <= 5; i++) {
                futures.add(executor.submit(new Worker("工人" + i)));
            }

            for (Future<String> future : futures) {
                System.out.println(future.get()); // 获取执行结果
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}