Appearance
多线程
多线程是指程序中包含多个执行流,可以同时执行多个任务。Java 内置了对多线程的支持,是并发编程的重要基础。
线程概述
进程与线程
text
┌─────────────────────────────────────────────────────────────────┐
│ 进程 vs 线程 │
├─────────────────────────────────────────────────────────────────┤
│ 进程: │
│ - 操作系统分配资源的基本单位 │
│ - 拥有独立的内存空间 │
│ - 进程间通信需要特殊机制 │
│ - 创建和切换开销大 │
│ │
│ 线程: │
│ - CPU 调度的基本单位 │
│ - 共享进程的内存空间 │
│ - 线程间通信方便 │
│ - 创建和切换开销小 │
│ │
│ 关系:一个进程可以包含多个线程 │
└─────────────────────────────────────────────────────────────────┘线程状态
text
┌─────────────────────────────────────────────────────────────────┐
│ 线程生命周期 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ new Thread() start() │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │ 新建 │────────>│ 就绪 │ │
│ │ (NEW) │ │(RUNNABLE)│<────────┐ │
│ └─────────┘ └────┬────┘ │ │
│ │ │ │
│ 获取CPU │ │ 时间片用完 │
│ ▼ │ │
│ ┌─────────┐ │ │
│ │ 运行 │──────────┘ │
│ │(RUNNING)│ │
│ └────┬────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 阻塞 │ │ 等待 │ │ 计时等待 │ │
│ │(BLOCKED) │ │(WAITING) │ │(TIMED_ │ │
│ └──────────┘ └──────────┘ │ WAITING) │ │
│ └──────────┘ │
│ │ │ │ │
│ └───────────────┴───────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ 终止 │ │
│ │(TERMINA-│ │
│ │ TED) │ │
│ └─────────┘ │
└─────────────────────────────────────────────────────────────────┘创建线程
继承 Thread 类
java
// 继承 Thread 类
class MyThread extends Thread {
@Override
public void run() {
// 线程执行的代码
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + " - " + i);
try {
Thread.sleep(100); // 休眠 100 毫秒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ExtendThread {
public static void main(String[] args) {
// 创建线程对象
MyThread thread1 = new MyThread();
MyThread thread2 = new MyThread();
// 设置线程名称
thread1.setName("线程A");
thread2.setName("线程B");
// 启动线程
thread1.start();
thread2.start();
// 主线程
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + " - " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}实现 Runnable 接口
java
// 实现 Runnable 接口
class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + " - " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ImplementRunnable {
public static void main(String[] args) {
// 创建 Runnable 对象
MyRunnable runnable = new MyRunnable();
// 创建 Thread 对象,传入 Runnable
Thread thread1 = new Thread(runnable, "线程A");
Thread thread2 = new Thread(runnable, "线程B");
// 启动线程
thread1.start();
thread2.start();
// 使用 Lambda 表达式
Thread thread3 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Lambda线程 - " + i);
}
}, "Lambda线程");
thread3.start();
}
}实现 Callable 接口
java
import java.util.concurrent.*;
// 实现 Callable 接口(有返回值)
class MyCallable implements Callable<Integer> {
private int n;
public MyCallable(int n) {
this.n = n;
}
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= n; i++) {
sum += i;
}
return sum;
}
}
public class ImplementCallable {
public static void main(String[] args) throws Exception {
// 创建 Callable 对象
MyCallable callable = new MyCallable(100);
// 创建 FutureTask 对象
FutureTask<Integer> futureTask = new FutureTask<>(callable);
// 创建 Thread 并启动
Thread thread = new Thread(futureTask);
thread.start();
// 获取返回值(会阻塞直到结果可用)
Integer result = futureTask.get();
System.out.println("1到100的和:" + result);
// 使用线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任务
Future<Integer> future1 = executor.submit(new MyCallable(50));
Future<Integer> future2 = executor.submit(new MyCallable(100));
// 获取结果
System.out.println("1到50的和:" + future1.get());
System.out.println("1到100的和:" + future2.get());
// 关闭线程池
executor.shutdown();
}
}创建方式对比
text
┌─────────────────────────────────────────────────────────────────┐
│ 线程创建方式对比 │
├─────────────────────────────────────────────────────────────────┤
│ 继承 Thread 类: │
│ - 简单直接 │
│ - 不能继承其他类 │
│ - 不推荐使用 │
│ │
│ 实现 Runnable 接口: │
│ - 可以继承其他类 │
│ - 适合资源共享 │
│ - 推荐使用 │
│ │
│ 实现 Callable 接口: │
│ - 可以有返回值 │
│ - 可以抛出异常 │
│ - 配合线程池使用 │
└─────────────────────────────────────────────────────────────────┘线程控制
常用方法
java
public class ThreadMethods {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("子线程开始");
try {
Thread.sleep(2000); // 休眠 2 秒
} catch (InterruptedException e) {
System.out.println("线程被中断");
}
System.out.println("子线程结束");
});
// 获取线程信息
System.out.println("线程名称:" + thread.getName());
System.out.println("线程ID:" + thread.getId());
System.out.println("线程优先级:" + thread.getPriority());
System.out.println("是否存活:" + thread.isAlive());
System.out.println("线程状态:" + thread.getState());
// 设置线程属性
thread.setName("MyThread");
thread.setPriority(Thread.MAX_PRIORITY); // 1-10,默认 5
thread.setDaemon(true); // 设置为守护线程
// 启动线程
thread.start();
// 等待线程结束
thread.join(1000); // 最多等待 1 秒
// 让出 CPU
Thread.yield();
// 中断线程
// thread.interrupt();
}
}线程休眠
java
public class ThreadSleep {
public static void main(String[] args) {
// 模拟倒计时
new Thread(() -> {
for (int i = 10; i >= 0; i--) {
System.out.println("倒计时:" + i);
try {
Thread.sleep(1000); // 休眠 1 秒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("时间到!");
}).start();
}
}线程等待和唤醒
java
public class WaitNotifyDemo {
private static final Object lock = new Object();
private static boolean flag = false;
public static void main(String[] args) {
// 等待线程
Thread waitThread = new Thread(() -> {
synchronized (lock) {
while (!flag) {
try {
System.out.println("等待线程:等待中...");
lock.wait(); // 等待,释放锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("等待线程:被唤醒了!");
}
});
// 唤醒线程
Thread notifyThread = new Thread(() -> {
synchronized (lock) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("唤醒线程:发送通知");
lock.notify(); // 唤醒一个等待的线程
// lock.notifyAll(); // 唤醒所有等待的线程
}
});
waitThread.start();
notifyThread.start();
}
}线程同步
线程安全问题
java
// 线程不安全的示例
class UnsafeCounter {
private int count = 0;
public void increment() {
count++; // 非原子操作
}
public int getCount() {
return count;
}
}
public class ThreadUnsafe {
public static void main(String[] args) throws InterruptedException {
UnsafeCounter counter = new UnsafeCounter();
// 创建 100 个线程,每个线程增加 1000 次
Thread[] threads = new Thread[100];
for (int i = 0; i < 100; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
threads[i].start();
}
// 等待所有线程结束
for (Thread thread : threads) {
thread.join();
}
// 预期结果:100000,实际结果:小于 100000
System.out.println("最终结果:" + counter.getCount());
}
}同步代码块
java
class SafeCounter {
private int count = 0;
private final Object lock = new Object(); // 锁对象
public void increment() {
synchronized (lock) { // 同步代码块
count++;
}
}
public int getCount() {
return count;
}
}同步方法
java
class SafeCounter2 {
private int count = 0;
// 同步方法:锁是 this 对象
public synchronized void increment() {
count++;
}
// 同步静态方法:锁是类对象
public static synchronized void staticMethod() {
// ...
}
public int getCount() {
return count;
}
}ReentrantLock
java
import java.util.concurrent.locks.ReentrantLock;
class SafeCounter3 {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock(); // 获取锁
try {
count++;
} finally {
lock.unlock(); // 释放锁(必须在 finally 中)
}
}
public int getCount() {
return count;
}
}
public class ReentrantLockDemo {
public static void main(String[] args) throws InterruptedException {
SafeCounter3 counter = new SafeCounter3();
Thread[] threads = new Thread[100];
for (int i = 0; i < 100; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终结果:" + counter.getCount()); // 100000
}
}同步方式对比
text
┌─────────────────────────────────────────────────────────────────┐
│ 同步方式对比 │
├─────────────────────────────────────────────────────────────────┤
│ synchronized: │
│ - 关键字,自动获取和释放锁 │
│ - 不可中断 │
│ - 非公平锁 │
│ - 简单易用 │
│ │
│ ReentrantLock: │
│ - 类,手动获取和释放锁 │
│ - 可中断 │
│ - 可选公平/非公平 │
│ - 功能更强大 │
└─────────────────────────────────────────────────────────────────┘线程通信
生产者消费者模式
java
import java.util.LinkedList;
import java.util.Queue;
// 共享缓冲区
class Buffer {
private final Queue<Integer> queue = new LinkedList<>();
private final int capacity;
public Buffer(int capacity) {
this.capacity = capacity;
}
// 生产
public synchronized void produce(int item) throws InterruptedException {
while (queue.size() == capacity) {
System.out.println("缓冲区已满,生产者等待");
wait(); // 缓冲区满,等待
}
queue.add(item);
System.out.println("生产:" + item + ",缓冲区大小:" + queue.size());
notifyAll(); // 唤醒消费者
}
// 消费
public synchronized int consume() throws InterruptedException {
while (queue.isEmpty()) {
System.out.println("缓冲区为空,消费者等待");
wait(); // 缓冲区空,等待
}
int item = queue.poll();
System.out.println("消费:" + item + ",缓冲区大小:" + queue.size());
notifyAll(); // 唤醒生产者
return item;
}
}
public class ProducerConsumer {
public static void main(String[] args) {
Buffer buffer = new Buffer(5);
// 生产者线程
Thread producer = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
buffer.produce(i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
buffer.consume();
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
consumer.start();
}
}线程池
线程池概述
java
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) {
// 1. 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(3);
// 2. 单线程线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 3. 缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 4. 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
// 5. 自定义线程池(推荐)
ThreadPoolExecutor customPool = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 任务队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 提交任务
for (int i = 0; i < 5; i++) {
final int taskId = i;
fixedPool.execute(() -> {
System.out.println("任务 " + taskId + " 由 " +
Thread.currentThread().getName() + " 执行");
});
}
// 关闭线程池
fixedPool.shutdown();
try {
if (!fixedPool.awaitTermination(60, TimeUnit.SECONDS)) {
fixedPool.shutdownNow();
}
} catch (InterruptedException e) {
fixedPool.shutdownNow();
}
}
}定时任务
java
import java.util.concurrent.*;
public class ScheduledTaskDemo {
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 延迟执行
scheduler.schedule(() -> {
System.out.println("延迟 3 秒执行");
}, 3, TimeUnit.SECONDS);
// 固定延迟执行
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("每隔 2 秒执行一次");
}, 0, 2, TimeUnit.SECONDS);
// 固定频率执行
scheduler.scheduleAtFixedRate(() -> {
System.out.println("固定频率执行");
}, 0, 3, TimeUnit.SECONDS);
// 注意:实际使用中需要关闭 scheduler
}
}CompletableFuture
java
import java.util.concurrent.*;
public class CompletableFutureDemo {
public static void main(String[] args) {
// 异步执行,无返回值
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("异步任务执行");
});
// 异步执行,有返回值
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
// 获取结果
future2.thenAccept(result -> {
System.out.println("结果:" + result);
});
// 链式调用
CompletableFuture<Integer> future3 = CompletableFuture
.supplyAsync(() -> 10)
.thenApply(n -> n * 2)
.thenApply(n -> n + 5);
System.out.println("链式调用结果:" + future3.join());
// 组合多个 Future
CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future5 = CompletableFuture.supplyAsync(() -> 20);
// 两个都完成后合并
CompletableFuture<Integer> combined = future4.thenCombine(future5, (a, b) -> a + b);
System.out.println("合并结果:" + combined.join());
// 任一完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future4, future5);
System.out.println("任一完成:" + anyOf.join());
// 全部完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(future4, future5);
allOf.thenRun(() -> System.out.println("全部完成"));
}
}小结
本章我们学习了:
- 线程创建:继承 Thread、实现 Runnable、实现 Callable
- 线程控制:start、sleep、join、yield、interrupt
- 线程同步:synchronized、ReentrantLock
- 线程通信:wait、notify、notifyAll
- 线程池:ThreadPoolExecutor、ScheduledExecutorService
- 异步编程:CompletableFuture
下一章,我们将学习 泛型,了解 Java 泛型编程。
