Skip to content

多线程

多线程是指程序中包含多个执行流,可以同时执行多个任务。Java 内置了对多线程的支持,是并发编程的重要基础。

线程概述

进程与线程

text
┌─────────────────────────────────────────────────────────────────┐
│                     进程 vs 线程                                 │
├─────────────────────────────────────────────────────────────────┤
│  进程:                                                          │
│    - 操作系统分配资源的基本单位                                   │
│    - 拥有独立的内存空间                                           │
│    - 进程间通信需要特殊机制                                       │
│    - 创建和切换开销大                                             │
│                                                                 │
│  线程:                                                          │
│    - CPU 调度的基本单位                                           │
│    - 共享进程的内存空间                                           │
│    - 线程间通信方便                                               │
│    - 创建和切换开销小                                             │
│                                                                 │
│  关系:一个进程可以包含多个线程                                    │
└─────────────────────────────────────────────────────────────────┘

线程状态

text
┌─────────────────────────────────────────────────────────────────┐
│                      线程生命周期                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│    new Thread()        start()                                  │
│        │                  │                                     │
│        ▼                  ▼                                     │
│   ┌─────────┐         ┌─────────┐                               │
│   │  新建   │────────>│  就绪   │                               │
│   │ (NEW)   │         │(RUNNABLE)│<────────┐                    │
│   └─────────┘         └────┬────┘         │                    │
│                            │               │                    │
│                     获取CPU │               │ 时间片用完         │
│                            ▼               │                    │
│                       ┌─────────┐          │                    │
│                       │  运行   │──────────┘                    │
│                       │(RUNNING)│                               │
│                       └────┬────┘                               │
│                            │                                    │
│            ┌───────────────┼───────────────┐                    │
│            │               │               │                    │
│            ▼               ▼               ▼                    │
│      ┌──────────┐   ┌──────────┐   ┌──────────┐                │
│      │  阻塞    │   │  等待    │   │ 计时等待 │                │
│      │(BLOCKED) │   │(WAITING) │   │(TIMED_   │                │
│      └──────────┘   └──────────┘   │ WAITING) │                │
│                                     └──────────┘                │
│            │               │               │                    │
│            └───────────────┴───────────────┘                    │
│                            │                                    │
│                            ▼                                    │
│                       ┌─────────┐                               │
│                       │  终止   │                               │
│                       │(TERMINA-│                               │
│                       │  TED)   │                               │
│                       └─────────┘                               │
└─────────────────────────────────────────────────────────────────┘

创建线程

继承 Thread 类

java
// 继承 Thread 类
class MyThread extends Thread {
    @Override
    public void run() {
        // 线程执行的代码
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + " - " + i);
            try {
                Thread.sleep(100);  // 休眠 100 毫秒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ExtendThread {
    public static void main(String[] args) {
        // 创建线程对象
        MyThread thread1 = new MyThread();
        MyThread thread2 = new MyThread();
        
        // 设置线程名称
        thread1.setName("线程A");
        thread2.setName("线程B");
        
        // 启动线程
        thread1.start();
        thread2.start();
        
        // 主线程
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + " - " + i);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

实现 Runnable 接口

java
// 实现 Runnable 接口
class MyRunnable implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + " - " + i);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ImplementRunnable {
    public static void main(String[] args) {
        // 创建 Runnable 对象
        MyRunnable runnable = new MyRunnable();
        
        // 创建 Thread 对象,传入 Runnable
        Thread thread1 = new Thread(runnable, "线程A");
        Thread thread2 = new Thread(runnable, "线程B");
        
        // 启动线程
        thread1.start();
        thread2.start();
        
        // 使用 Lambda 表达式
        Thread thread3 = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Lambda线程 - " + i);
            }
        }, "Lambda线程");
        thread3.start();
    }
}

实现 Callable 接口

java
import java.util.concurrent.*;

// 实现 Callable 接口(有返回值)
class MyCallable implements Callable<Integer> {
    private int n;
    
    public MyCallable(int n) {
        this.n = n;
    }
    
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= n; i++) {
            sum += i;
        }
        return sum;
    }
}

public class ImplementCallable {
    public static void main(String[] args) throws Exception {
        // 创建 Callable 对象
        MyCallable callable = new MyCallable(100);
        
        // 创建 FutureTask 对象
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        
        // 创建 Thread 并启动
        Thread thread = new Thread(futureTask);
        thread.start();
        
        // 获取返回值(会阻塞直到结果可用)
        Integer result = futureTask.get();
        System.out.println("1到100的和:" + result);
        
        // 使用线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        // 提交任务
        Future<Integer> future1 = executor.submit(new MyCallable(50));
        Future<Integer> future2 = executor.submit(new MyCallable(100));
        
        // 获取结果
        System.out.println("1到50的和:" + future1.get());
        System.out.println("1到100的和:" + future2.get());
        
        // 关闭线程池
        executor.shutdown();
    }
}

创建方式对比

text
┌─────────────────────────────────────────────────────────────────┐
│                    线程创建方式对比                              │
├─────────────────────────────────────────────────────────────────┤
│  继承 Thread 类:                                                │
│    - 简单直接                                                    │
│    - 不能继承其他类                                              │
│    - 不推荐使用                                                  │
│                                                                 │
│  实现 Runnable 接口:                                            │
│    - 可以继承其他类                                              │
│    - 适合资源共享                                                │
│    - 推荐使用                                                    │
│                                                                 │
│  实现 Callable 接口:                                            │
│    - 可以有返回值                                                │
│    - 可以抛出异常                                                │
│    - 配合线程池使用                                              │
└─────────────────────────────────────────────────────────────────┘

线程控制

常用方法

java
public class ThreadMethods {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            System.out.println("子线程开始");
            try {
                Thread.sleep(2000);  // 休眠 2 秒
            } catch (InterruptedException e) {
                System.out.println("线程被中断");
            }
            System.out.println("子线程结束");
        });
        
        // 获取线程信息
        System.out.println("线程名称:" + thread.getName());
        System.out.println("线程ID:" + thread.getId());
        System.out.println("线程优先级:" + thread.getPriority());
        System.out.println("是否存活:" + thread.isAlive());
        System.out.println("线程状态:" + thread.getState());
        
        // 设置线程属性
        thread.setName("MyThread");
        thread.setPriority(Thread.MAX_PRIORITY);  // 1-10,默认 5
        thread.setDaemon(true);  // 设置为守护线程
        
        // 启动线程
        thread.start();
        
        // 等待线程结束
        thread.join(1000);  // 最多等待 1 秒
        
        // 让出 CPU
        Thread.yield();
        
        // 中断线程
        // thread.interrupt();
    }
}

线程休眠

java
public class ThreadSleep {
    public static void main(String[] args) {
        // 模拟倒计时
        new Thread(() -> {
            for (int i = 10; i >= 0; i--) {
                System.out.println("倒计时:" + i);
                try {
                    Thread.sleep(1000);  // 休眠 1 秒
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("时间到!");
        }).start();
    }
}

线程等待和唤醒

java
public class WaitNotifyDemo {
    private static final Object lock = new Object();
    private static boolean flag = false;
    
    public static void main(String[] args) {
        // 等待线程
        Thread waitThread = new Thread(() -> {
            synchronized (lock) {
                while (!flag) {
                    try {
                        System.out.println("等待线程:等待中...");
                        lock.wait();  // 等待,释放锁
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("等待线程:被唤醒了!");
            }
        });
        
        // 唤醒线程
        Thread notifyThread = new Thread(() -> {
            synchronized (lock) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                flag = true;
                System.out.println("唤醒线程:发送通知");
                lock.notify();  // 唤醒一个等待的线程
                // lock.notifyAll();  // 唤醒所有等待的线程
            }
        });
        
        waitThread.start();
        notifyThread.start();
    }
}

线程同步

线程安全问题

java
// 线程不安全的示例
class UnsafeCounter {
    private int count = 0;
    
    public void increment() {
        count++;  // 非原子操作
    }
    
    public int getCount() {
        return count;
    }
}

public class ThreadUnsafe {
    public static void main(String[] args) throws InterruptedException {
        UnsafeCounter counter = new UnsafeCounter();
        
        // 创建 100 个线程,每个线程增加 1000 次
        Thread[] threads = new Thread[100];
        for (int i = 0; i < 100; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
            });
            threads[i].start();
        }
        
        // 等待所有线程结束
        for (Thread thread : threads) {
            thread.join();
        }
        
        // 预期结果:100000,实际结果:小于 100000
        System.out.println("最终结果:" + counter.getCount());
    }
}

同步代码块

java
class SafeCounter {
    private int count = 0;
    private final Object lock = new Object();  // 锁对象
    
    public void increment() {
        synchronized (lock) {  // 同步代码块
            count++;
        }
    }
    
    public int getCount() {
        return count;
    }
}

同步方法

java
class SafeCounter2 {
    private int count = 0;
    
    // 同步方法:锁是 this 对象
    public synchronized void increment() {
        count++;
    }
    
    // 同步静态方法:锁是类对象
    public static synchronized void staticMethod() {
        // ...
    }
    
    public int getCount() {
        return count;
    }
}

ReentrantLock

java
import java.util.concurrent.locks.ReentrantLock;

class SafeCounter3 {
    private int count = 0;
    private final ReentrantLock lock = new ReentrantLock();
    
    public void increment() {
        lock.lock();  // 获取锁
        try {
            count++;
        } finally {
            lock.unlock();  // 释放锁(必须在 finally 中)
        }
    }
    
    public int getCount() {
        return count;
    }
}

public class ReentrantLockDemo {
    public static void main(String[] args) throws InterruptedException {
        SafeCounter3 counter = new SafeCounter3();
        
        Thread[] threads = new Thread[100];
        for (int i = 0; i < 100; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
            });
            threads[i].start();
        }
        
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("最终结果:" + counter.getCount());  // 100000
    }
}

同步方式对比

text
┌─────────────────────────────────────────────────────────────────┐
│                     同步方式对比                                 │
├─────────────────────────────────────────────────────────────────┤
│  synchronized:                                                  │
│    - 关键字,自动获取和释放锁                                    │
│    - 不可中断                                                    │
│    - 非公平锁                                                    │
│    - 简单易用                                                    │
│                                                                 │
│  ReentrantLock:                                                 │
│    - 类,手动获取和释放锁                                        │
│    - 可中断                                                      │
│    - 可选公平/非公平                                             │
│    - 功能更强大                                                  │
└─────────────────────────────────────────────────────────────────┘

线程通信

生产者消费者模式

java
import java.util.LinkedList;
import java.util.Queue;

// 共享缓冲区
class Buffer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity;
    
    public Buffer(int capacity) {
        this.capacity = capacity;
    }
    
    // 生产
    public synchronized void produce(int item) throws InterruptedException {
        while (queue.size() == capacity) {
            System.out.println("缓冲区已满,生产者等待");
            wait();  // 缓冲区满,等待
        }
        
        queue.add(item);
        System.out.println("生产:" + item + ",缓冲区大小:" + queue.size());
        notifyAll();  // 唤醒消费者
    }
    
    // 消费
    public synchronized int consume() throws InterruptedException {
        while (queue.isEmpty()) {
            System.out.println("缓冲区为空,消费者等待");
            wait();  // 缓冲区空,等待
        }
        
        int item = queue.poll();
        System.out.println("消费:" + item + ",缓冲区大小:" + queue.size());
        notifyAll();  // 唤醒生产者
        return item;
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
        Buffer buffer = new Buffer(5);
        
        // 生产者线程
        Thread producer = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                try {
                    buffer.produce(i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        // 消费者线程
        Thread consumer = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                try {
                    buffer.consume();
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        producer.start();
        consumer.start();
    }
}

线程池

线程池概述

java
import java.util.concurrent.*;

public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 1. 固定大小线程池
        ExecutorService fixedPool = Executors.newFixedThreadPool(3);
        
        // 2. 单线程线程池
        ExecutorService singlePool = Executors.newSingleThreadExecutor();
        
        // 3. 缓存线程池
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        
        // 4. 定时任务线程池
        ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
        
        // 5. 自定义线程池(推荐)
        ThreadPoolExecutor customPool = new ThreadPoolExecutor(
            2,                      // 核心线程数
            4,                      // 最大线程数
            60,                     // 空闲线程存活时间
            TimeUnit.SECONDS,       // 时间单位
            new LinkedBlockingQueue<>(100),  // 任务队列
            new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
        );
        
        // 提交任务
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            fixedPool.execute(() -> {
                System.out.println("任务 " + taskId + " 由 " + 
                    Thread.currentThread().getName() + " 执行");
            });
        }
        
        // 关闭线程池
        fixedPool.shutdown();
        try {
            if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
                fixedPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            fixedPool.shutdownNow();
        }
    }
}

定时任务

java
import java.util.concurrent.*;

public class ScheduledTaskDemo {
    public static void main(String[] args) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
        
        // 延迟执行
        scheduler.schedule(() -> {
            System.out.println("延迟 3 秒执行");
        }, 3, TimeUnit.SECONDS);
        
        // 固定延迟执行
        scheduler.scheduleWithFixedDelay(() -> {
            System.out.println("每隔 2 秒执行一次");
        }, 0, 2, TimeUnit.SECONDS);
        
        // 固定频率执行
        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("固定频率执行");
        }, 0, 3, TimeUnit.SECONDS);
        
        // 注意:实际使用中需要关闭 scheduler
    }
}

CompletableFuture

java
import java.util.concurrent.*;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 异步执行,无返回值
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
            System.out.println("异步任务执行");
        });
        
        // 异步执行,有返回值
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 100;
        });
        
        // 获取结果
        future2.thenAccept(result -> {
            System.out.println("结果:" + result);
        });
        
        // 链式调用
        CompletableFuture<Integer> future3 = CompletableFuture
            .supplyAsync(() -> 10)
            .thenApply(n -> n * 2)
            .thenApply(n -> n + 5);
        
        System.out.println("链式调用结果:" + future3.join());
        
        // 组合多个 Future
        CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> 10);
        CompletableFuture<Integer> future5 = CompletableFuture.supplyAsync(() -> 20);
        
        // 两个都完成后合并
        CompletableFuture<Integer> combined = future4.thenCombine(future5, (a, b) -> a + b);
        System.out.println("合并结果:" + combined.join());
        
        // 任一完成
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future4, future5);
        System.out.println("任一完成:" + anyOf.join());
        
        // 全部完成
        CompletableFuture<Void> allOf = CompletableFuture.allOf(future4, future5);
        allOf.thenRun(() -> System.out.println("全部完成"));
    }
}

小结

本章我们学习了:

  • 线程创建:继承 Thread、实现 Runnable、实现 Callable
  • 线程控制:start、sleep、join、yield、interrupt
  • 线程同步:synchronized、ReentrantLock
  • 线程通信:wait、notify、notifyAll
  • 线程池:ThreadPoolExecutor、ScheduledExecutorService
  • 异步编程:CompletableFuture

下一章,我们将学习 泛型,了解 Java 泛型编程。