Appearance
多线程
Java 多线程是实现并发编程的核心技术,允许程序同时执行多个任务。
线程基础
创建线程
方式一:继承 Thread 类
java
public class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
}
}
}
// 使用
MyThread thread = new MyThread();
thread.start(); // 启动线程方式二:实现 Runnable 接口
java
public class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
}
}
}
// 使用
Thread thread = new Thread(new MyRunnable());
thread.start();
// 使用 Lambda 表达式
Thread lambdaThread = new Thread(() -> {
System.out.println("Lambda 线程运行中");
});
lambdaThread.start();方式三:实现 Callable 接口(有返回值)
java
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
}
}
// 使用
FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
Thread thread = new Thread(futureTask);
thread.start();
// 获取返回值
try {
Integer result = futureTask.get(); // 阻塞直到获取结果
System.out.println("计算结果: " + result);
} catch (Exception e) {
e.printStackTrace();
}线程常用方法
java
Thread thread = new Thread(() -> {
System.out.println("线程运行中...");
});
// 启动线程
thread.start();
// 获取线程名称
String name = thread.getName();
// 设置线程名称
thread.setName("MyThread");
// 获取线程 ID
long id = thread.getId();
// 获取线程状态
Thread.State state = thread.getState();
// 获取线程优先级(1-10,默认5)
int priority = thread.getPriority();
thread.setPriority(Thread.MAX_PRIORITY); // 10
// 判断线程是否存活
boolean isAlive = thread.isAlive();
// 判断是否为守护线程
boolean isDaemon = thread.isDaemon();
thread.setDaemon(true); // 设置为守护线程
// 线程休眠
Thread.sleep(1000); // 休眠1秒
// 线程让步
Thread.yield(); // 让出 CPU 时间片
// 等待线程结束
thread.join(); // 阻塞直到线程结束
thread.join(1000); // 最多等待1秒
// 获取当前线程
Thread current = Thread.currentThread();线程状态
java
// 线程状态
public enum State {
NEW, // 新建
RUNNABLE, // 可运行
BLOCKED, // 阻塞
WAITING, // 等待
TIMED_WAITING, // 计时等待
TERMINATED // 终止
}
// 状态转换示例
Thread thread = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(thread.getState()); // NEW
thread.start();
System.out.println(thread.getState()); // RUNNABLE线程同步
synchronized 关键字
同步代码块
java
public class Counter {
private int count = 0;
private final Object lock = new Object();
public void increment() {
synchronized (lock) { // 使用任意对象作为锁
count++;
}
}
// 或使用 this 作为锁
public void increment2() {
synchronized (this) {
count++;
}
}
}同步方法
java
public class Counter {
private int count = 0;
// 同步实例方法,锁为 this
public synchronized void increment() {
count++;
}
// 同步静态方法,锁为 Class 对象
public static synchronized void staticMethod() {
// ...
}
}同步示例
java
public class TicketSystem {
private int tickets = 100;
public synchronized void sellTicket() {
if (tickets > 0) {
System.out.println(Thread.currentThread().getName() +
" 卖出第 " + tickets + " 张票");
tickets--;
}
}
}
// 使用
TicketSystem system = new TicketSystem();
Runnable task = () -> {
while (true) {
system.sellTicket();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
};
Thread t1 = new Thread(task, "窗口1");
Thread t2 = new Thread(task, "窗口2");
Thread t3 = new Thread(task, "窗口3");
t1.start();
t2.start();
t3.start();ReentrantLock
java
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock(); // 获取锁
try {
count++;
} finally {
lock.unlock(); // 释放锁(必须在 finally 中)
}
}
// 尝试获取锁
public boolean tryIncrement() {
if (lock.tryLock()) { // 尝试获取锁,立即返回
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
// 带超时的尝试
public boolean tryIncrementWithTimeout() {
try {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
}synchronized vs ReentrantLock
| 特性 | synchronized | ReentrantLock |
|---|---|---|
| 锁获取方式 | JVM 实现 | API 实现 |
| 释放锁 | 自动释放 | 手动释放 |
| 可中断 | 不可 | 可中断 |
| 公平性 | 非公平 | 可选公平/非公平 |
| 条件变量 | 单一 | 多个 |
线程通信
wait/notify
java
public class ProducerConsumer {
private final Queue<Integer> queue = new LinkedList<>();
private final int maxSize = 10;
public synchronized void produce(int value) throws InterruptedException {
while (queue.size() == maxSize) {
wait(); // 队列满,等待
}
queue.offer(value);
System.out.println("生产: " + value);
notifyAll(); // 唤醒消费者
}
public synchronized int consume() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // 队列空,等待
}
int value = queue.poll();
System.out.println("消费: " + value);
notifyAll(); // 唤醒生产者
return value;
}
}Condition
java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer<T> {
private final Queue<T> queue = new LinkedList<>();
private final int maxSize;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int maxSize) {
this.maxSize = maxSize;
}
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == maxSize) {
notFull.await(); // 等待非满
}
queue.offer(item);
notEmpty.signal(); // 通知非空
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待非空
}
T item = queue.poll();
notFull.signal(); // 通知非满
return item;
} finally {
lock.unlock();
}
}
}线程池
ExecutorService
java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 创建固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// 创建单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 创建可缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 创建定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
// 提交任务
fixedPool.execute(() -> {
System.out.println("任务执行");
});
// 提交有返回值的任务
Future<Integer> future = fixedPool.submit(() -> {
return 1 + 1;
});
try {
Integer result = future.get();
System.out.println("结果: " + result);
} catch (Exception e) {
e.printStackTrace();
}
// 关闭线程池
fixedPool.shutdown(); // 平滑关闭
fixedPool.shutdownNow(); // 立即关闭
// 等待所有任务完成
fixedPool.awaitTermination(60, TimeUnit.SECONDS);ThreadPoolExecutor
java
import java.util.concurrent.*;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 任务队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 拒绝策略
// AbortPolicy: 抛出异常(默认)
// CallerRunsPolicy: 由调用线程执行
// DiscardPolicy: 直接丢弃
// DiscardOldestPolicy: 丢弃最老的任务
executor.execute(() -> {
System.out.println("任务执行");
});
executor.shutdown();定时任务
java
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 延迟执行
scheduler.schedule(() -> {
System.out.println("延迟3秒执行");
}, 3, TimeUnit.SECONDS);
// 固定延迟
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("固定延迟执行");
}, 0, 2, TimeUnit.SECONDS);
// 固定频率
scheduler.scheduleAtFixedRate(() -> {
System.out.println("固定频率执行");
}, 0, 2, TimeUnit.SECONDS);并发工具类
CountDownLatch
java
import java.util.concurrent.CountDownLatch;
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 完成");
latch.countDown(); // 计数减1
}).start();
}
latch.await(); // 等待计数归零
System.out.println("所有线程完成");CyclicBarrier
java
import java.util.concurrent.CyclicBarrier;
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程到达屏障,继续执行");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 到达屏障");
try {
barrier.await(); // 等待其他线程
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 继续执行");
}).start();
}Semaphore
java
import java.util.concurrent.Semaphore;
Semaphore semaphore = new Semaphore(3); // 3个许可
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " 获取资源");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
System.out.println(Thread.currentThread().getName() + " 释放资源");
}
}).start();
}Exchanger
java
import java.util.concurrent.Exchanger;
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = "来自线程A的数据";
String received = exchanger.exchange(data);
System.out.println("线程A收到: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
String data = "来自线程B的数据";
String received = exchanger.exchange(data);
System.out.println("线程B收到: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();并发集合
ConcurrentHashMap
java
import java.util.concurrent.ConcurrentHashMap;
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.putIfAbsent("key", 2); // 不存在时才放入
map.compute("key", (k, v) -> v == null ? 1 : v + 1); // 原子计算
map.merge("key", 1, Integer::sum); // 合并值
Integer value = map.get("key");CopyOnWriteArrayList
java
import java.util.concurrent.CopyOnWriteArrayList;
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("元素");
list.addIfAbsent("元素"); // 不存在时添加
// 迭代时可以安全修改(迭代的是快照)
for (String s : list) {
list.add("新元素"); // 不会抛出 ConcurrentModificationException
}BlockingQueue
java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 生产者
new Thread(() -> {
try {
queue.put("数据"); // 队列满时阻塞
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 消费者
new Thread(() -> {
try {
String data = queue.take(); // 队列空时阻塞
System.out.println("消费: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 非阻塞操作
boolean added = queue.offer("数据"); // 立即返回
String polled = queue.poll(); // 立即返回原子类
基本原子类
java
import java.util.concurrent.atomic.*;
AtomicInteger atomicInt = new AtomicInteger(0);
atomicInt.incrementAndGet(); // 原子自增
atomicInt.decrementAndGet(); // 原子自减
atomicInt.getAndIncrement(); // 先获取后自增
atomicInt.addAndGet(10); // 原子加
int value = atomicInt.get(); // 获取值
AtomicBoolean atomicBool = new AtomicBoolean(false);
boolean oldValue = atomicBool.compareAndSet(false, true); // CAS 操作
AtomicLong atomicLong = new AtomicLong(0);
atomicLong.incrementAndGet();原子引用
java
import java.util.concurrent.atomic.AtomicReference;
AtomicReference<String> ref = new AtomicReference<>("初始值");
ref.set("新值");
String current = ref.get();
boolean success = ref.compareAndSet("旧值", "新值");原子数组
java
import java.util.concurrent.atomic.AtomicIntegerArray;
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.set(0, 100);
int value = array.get(0);
array.incrementAndGet(0);原子更新器
java
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
class Person {
volatile int age;
}
AtomicIntegerFieldUpdater<Person> updater =
AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
Person person = new Person();
updater.incrementAndGet(person);实践示例
多线程下载器
java
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class MultiThreadDownloader {
private final ExecutorService executor;
private final BlockingQueue<String> taskQueue;
private final AtomicInteger completedTasks = new AtomicInteger(0);
private final CountDownLatch latch;
public MultiThreadDownloader(int threadCount, int taskCount) {
this.executor = Executors.newFixedThreadPool(threadCount);
this.taskQueue = new LinkedBlockingQueue<>();
this.latch = new CountDownLatch(taskCount);
}
public void addTask(String url) {
taskQueue.offer(url);
}
public void start() {
int threadCount = ((ThreadPoolExecutor) executor).getCorePoolSize();
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
String url = taskQueue.poll(1, TimeUnit.SECONDS);
if (url != null) {
download(url);
completedTasks.incrementAndGet();
latch.countDown();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
private void download(String url) {
System.out.println(Thread.currentThread().getName() + " 下载: " + url);
try {
Thread.sleep(1000); // 模拟下载
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void awaitCompletion() throws InterruptedException {
latch.await();
executor.shutdown();
System.out.println("所有任务完成,共完成: " + completedTasks.get());
}
public static void main(String[] args) throws InterruptedException {
MultiThreadDownloader downloader = new MultiThreadDownloader(3, 10);
for (int i = 1; i <= 10; i++) {
downloader.addTask("http://example.com/file" + i);
}
downloader.start();
downloader.awaitCompletion();
}
}生产者消费者模式
java
import java.util.concurrent.*;
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// 生产者
Runnable producer = () -> {
for (int i = 0; i < 10; i++) {
try {
queue.put(i);
System.out.println("生产: " + i + ", 队列大小: " + queue.size());
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
};
// 消费者
Runnable consumer = () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Integer item = queue.take();
System.out.println("消费: " + item + ", 队列大小: " + queue.size());
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
};
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(producer);
executor.submit(consumer);
executor.submit(consumer);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdownNow();
}
}