Skip to content

多线程

Java 多线程是实现并发编程的核心技术,允许程序同时执行多个任务。

线程基础

创建线程

方式一:继承 Thread 类

java
public class MyThread extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + ": " + i);
        }
    }
}

// 使用
MyThread thread = new MyThread();
thread.start();  // 启动线程

方式二:实现 Runnable 接口

java
public class MyRunnable implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + ": " + i);
        }
    }
}

// 使用
Thread thread = new Thread(new MyRunnable());
thread.start();

// 使用 Lambda 表达式
Thread lambdaThread = new Thread(() -> {
    System.out.println("Lambda 线程运行中");
});
lambdaThread.start();

方式三:实现 Callable 接口(有返回值)

java
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= 100; i++) {
            sum += i;
        }
        return sum;
    }
}

// 使用
FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
Thread thread = new Thread(futureTask);
thread.start();

// 获取返回值
try {
    Integer result = futureTask.get();  // 阻塞直到获取结果
    System.out.println("计算结果: " + result);
} catch (Exception e) {
    e.printStackTrace();
}

线程常用方法

java
Thread thread = new Thread(() -> {
    System.out.println("线程运行中...");
});

// 启动线程
thread.start();

// 获取线程名称
String name = thread.getName();

// 设置线程名称
thread.setName("MyThread");

// 获取线程 ID
long id = thread.getId();

// 获取线程状态
Thread.State state = thread.getState();

// 获取线程优先级(1-10,默认5)
int priority = thread.getPriority();
thread.setPriority(Thread.MAX_PRIORITY);  // 10

// 判断线程是否存活
boolean isAlive = thread.isAlive();

// 判断是否为守护线程
boolean isDaemon = thread.isDaemon();
thread.setDaemon(true);  // 设置为守护线程

// 线程休眠
Thread.sleep(1000);  // 休眠1秒

// 线程让步
Thread.yield();  // 让出 CPU 时间片

// 等待线程结束
thread.join();         // 阻塞直到线程结束
thread.join(1000);     // 最多等待1秒

// 获取当前线程
Thread current = Thread.currentThread();

线程状态

java
// 线程状态
public enum State {
    NEW,           // 新建
    RUNNABLE,      // 可运行
    BLOCKED,       // 阻塞
    WAITING,       // 等待
    TIMED_WAITING, // 计时等待
    TERMINATED     // 终止
}

// 状态转换示例
Thread thread = new Thread(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

System.out.println(thread.getState());  // NEW
thread.start();
System.out.println(thread.getState());  // RUNNABLE

线程同步

synchronized 关键字

同步代码块

java
public class Counter {
    private int count = 0;
    private final Object lock = new Object();

    public void increment() {
        synchronized (lock) {  // 使用任意对象作为锁
            count++;
        }
    }

    // 或使用 this 作为锁
    public void increment2() {
        synchronized (this) {
            count++;
        }
    }
}

同步方法

java
public class Counter {
    private int count = 0;

    // 同步实例方法,锁为 this
    public synchronized void increment() {
        count++;
    }

    // 同步静态方法,锁为 Class 对象
    public static synchronized void staticMethod() {
        // ...
    }
}

同步示例

java
public class TicketSystem {
    private int tickets = 100;

    public synchronized void sellTicket() {
        if (tickets > 0) {
            System.out.println(Thread.currentThread().getName() +
                " 卖出第 " + tickets + " 张票");
            tickets--;
        }
    }
}

// 使用
TicketSystem system = new TicketSystem();

Runnable task = () -> {
    while (true) {
        system.sellTicket();
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            break;
        }
    }
};

Thread t1 = new Thread(task, "窗口1");
Thread t2 = new Thread(task, "窗口2");
Thread t3 = new Thread(task, "窗口3");

t1.start();
t2.start();
t3.start();

ReentrantLock

java
import java.util.concurrent.locks.ReentrantLock;

public class Counter {
    private int count = 0;
    private final ReentrantLock lock = new ReentrantLock();

    public void increment() {
        lock.lock();  // 获取锁
        try {
            count++;
        } finally {
            lock.unlock();  // 释放锁(必须在 finally 中)
        }
    }

    // 尝试获取锁
    public boolean tryIncrement() {
        if (lock.tryLock()) {  // 尝试获取锁,立即返回
            try {
                count++;
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false;
    }

    // 带超时的尝试
    public boolean tryIncrementWithTimeout() {
        try {
            if (lock.tryLock(1, TimeUnit.SECONDS)) {
                try {
                    count++;
                    return true;
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }
}

synchronized vs ReentrantLock

特性synchronizedReentrantLock
锁获取方式JVM 实现API 实现
释放锁自动释放手动释放
可中断不可可中断
公平性非公平可选公平/非公平
条件变量单一多个

线程通信

wait/notify

java
public class ProducerConsumer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int maxSize = 10;

    public synchronized void produce(int value) throws InterruptedException {
        while (queue.size() == maxSize) {
            wait();  // 队列满,等待
        }
        queue.offer(value);
        System.out.println("生产: " + value);
        notifyAll();  // 唤醒消费者
    }

    public synchronized int consume() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();  // 队列空,等待
        }
        int value = queue.poll();
        System.out.println("消费: " + value);
        notifyAll();  // 唤醒生产者
        return value;
    }
}

Condition

java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer<T> {
    private final Queue<T> queue = new LinkedList<>();
    private final int maxSize;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBuffer(int maxSize) {
        this.maxSize = maxSize;
    }

    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == maxSize) {
                notFull.await();  // 等待非满
            }
            queue.offer(item);
            notEmpty.signal();  // 通知非空
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await();  // 等待非空
            }
            T item = queue.poll();
            notFull.signal();  // 通知非满
            return item;
        } finally {
            lock.unlock();
        }
    }
}

线程池

ExecutorService

java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// 创建固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);

// 创建单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// 创建可缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 创建定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);

// 提交任务
fixedPool.execute(() -> {
    System.out.println("任务执行");
});

// 提交有返回值的任务
Future<Integer> future = fixedPool.submit(() -> {
    return 1 + 1;
});

try {
    Integer result = future.get();
    System.out.println("结果: " + result);
} catch (Exception e) {
    e.printStackTrace();
}

// 关闭线程池
fixedPool.shutdown();  // 平滑关闭
fixedPool.shutdownNow();  // 立即关闭

// 等待所有任务完成
fixedPool.awaitTermination(60, TimeUnit.SECONDS);

ThreadPoolExecutor

java
import java.util.concurrent.*;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5,                      // 核心线程数
    10,                     // 最大线程数
    60L,                    // 空闲线程存活时间
    TimeUnit.SECONDS,       // 时间单位
    new LinkedBlockingQueue<>(100),  // 任务队列
    Executors.defaultThreadFactory(), // 线程工厂
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

// 拒绝策略
// AbortPolicy: 抛出异常(默认)
// CallerRunsPolicy: 由调用线程执行
// DiscardPolicy: 直接丢弃
// DiscardOldestPolicy: 丢弃最老的任务

executor.execute(() -> {
    System.out.println("任务执行");
});

executor.shutdown();

定时任务

java
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

// 延迟执行
scheduler.schedule(() -> {
    System.out.println("延迟3秒执行");
}, 3, TimeUnit.SECONDS);

// 固定延迟
scheduler.scheduleWithFixedDelay(() -> {
    System.out.println("固定延迟执行");
}, 0, 2, TimeUnit.SECONDS);

// 固定频率
scheduler.scheduleAtFixedRate(() -> {
    System.out.println("固定频率执行");
}, 0, 2, TimeUnit.SECONDS);

并发工具类

CountDownLatch

java
import java.util.concurrent.CountDownLatch;

CountDownLatch latch = new CountDownLatch(3);

for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + " 完成");
        latch.countDown();  // 计数减1
    }).start();
}

latch.await();  // 等待计数归零
System.out.println("所有线程完成");

CyclicBarrier

java
import java.util.concurrent.CyclicBarrier;

CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有线程到达屏障,继续执行");
});

for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + " 到达屏障");
        try {
            barrier.await();  // 等待其他线程
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 继续执行");
    }).start();
}

Semaphore

java
import java.util.concurrent.Semaphore;

Semaphore semaphore = new Semaphore(3);  // 3个许可

for (int i = 0; i < 10; i++) {
    new Thread(() -> {
        try {
            semaphore.acquire();  // 获取许可
            System.out.println(Thread.currentThread().getName() + " 获取资源");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();  // 释放许可
            System.out.println(Thread.currentThread().getName() + " 释放资源");
        }
    }).start();
}

Exchanger

java
import java.util.concurrent.Exchanger;

Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
    try {
        String data = "来自线程A的数据";
        String received = exchanger.exchange(data);
        System.out.println("线程A收到: " + received);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

new Thread(() -> {
    try {
        String data = "来自线程B的数据";
        String received = exchanger.exchange(data);
        System.out.println("线程B收到: " + received);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

并发集合

ConcurrentHashMap

java
import java.util.concurrent.ConcurrentHashMap;

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

map.put("key", 1);
map.putIfAbsent("key", 2);  // 不存在时才放入
map.compute("key", (k, v) -> v == null ? 1 : v + 1);  // 原子计算
map.merge("key", 1, Integer::sum);  // 合并值

Integer value = map.get("key");

CopyOnWriteArrayList

java
import java.util.concurrent.CopyOnWriteArrayList;

CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

list.add("元素");
list.addIfAbsent("元素");  // 不存在时添加

// 迭代时可以安全修改(迭代的是快照)
for (String s : list) {
    list.add("新元素");  // 不会抛出 ConcurrentModificationException
}

BlockingQueue

java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// 生产者
new Thread(() -> {
    try {
        queue.put("数据");  // 队列满时阻塞
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

// 消费者
new Thread(() -> {
    try {
        String data = queue.take();  // 队列空时阻塞
        System.out.println("消费: " + data);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

// 非阻塞操作
boolean added = queue.offer("数据");  // 立即返回
String polled = queue.poll();  // 立即返回

原子类

基本原子类

java
import java.util.concurrent.atomic.*;

AtomicInteger atomicInt = new AtomicInteger(0);
atomicInt.incrementAndGet();  // 原子自增
atomicInt.decrementAndGet();  // 原子自减
atomicInt.getAndIncrement();  // 先获取后自增
atomicInt.addAndGet(10);      // 原子加
int value = atomicInt.get();  // 获取值

AtomicBoolean atomicBool = new AtomicBoolean(false);
boolean oldValue = atomicBool.compareAndSet(false, true);  // CAS 操作

AtomicLong atomicLong = new AtomicLong(0);
atomicLong.incrementAndGet();

原子引用

java
import java.util.concurrent.atomic.AtomicReference;

AtomicReference<String> ref = new AtomicReference<>("初始值");
ref.set("新值");
String current = ref.get();
boolean success = ref.compareAndSet("旧值", "新值");

原子数组

java
import java.util.concurrent.atomic.AtomicIntegerArray;

AtomicIntegerArray array = new AtomicIntegerArray(10);
array.set(0, 100);
int value = array.get(0);
array.incrementAndGet(0);

原子更新器

java
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

class Person {
    volatile int age;
}

AtomicIntegerFieldUpdater<Person> updater =
    AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");

Person person = new Person();
updater.incrementAndGet(person);

实践示例

多线程下载器

java
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MultiThreadDownloader {
    private final ExecutorService executor;
    private final BlockingQueue<String> taskQueue;
    private final AtomicInteger completedTasks = new AtomicInteger(0);
    private final CountDownLatch latch;

    public MultiThreadDownloader(int threadCount, int taskCount) {
        this.executor = Executors.newFixedThreadPool(threadCount);
        this.taskQueue = new LinkedBlockingQueue<>();
        this.latch = new CountDownLatch(taskCount);
    }

    public void addTask(String url) {
        taskQueue.offer(url);
    }

    public void start() {
        int threadCount = ((ThreadPoolExecutor) executor).getCorePoolSize();

        for (int i = 0; i < threadCount; i++) {
            executor.submit(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        String url = taskQueue.poll(1, TimeUnit.SECONDS);
                        if (url != null) {
                            download(url);
                            completedTasks.incrementAndGet();
                            latch.countDown();
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
        }
    }

    private void download(String url) {
        System.out.println(Thread.currentThread().getName() + " 下载: " + url);
        try {
            Thread.sleep(1000);  // 模拟下载
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        latch.await();
        executor.shutdown();
        System.out.println("所有任务完成,共完成: " + completedTasks.get());
    }

    public static void main(String[] args) throws InterruptedException {
        MultiThreadDownloader downloader = new MultiThreadDownloader(3, 10);

        for (int i = 1; i <= 10; i++) {
            downloader.addTask("http://example.com/file" + i);
        }

        downloader.start();
        downloader.awaitCompletion();
    }
}

生产者消费者模式

java
import java.util.concurrent.*;

public class ProducerConsumerDemo {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        // 生产者
        Runnable producer = () -> {
            for (int i = 0; i < 10; i++) {
                try {
                    queue.put(i);
                    System.out.println("生产: " + i + ", 队列大小: " + queue.size());
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        };

        // 消费者
        Runnable consumer = () -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Integer item = queue.take();
                    System.out.println("消费: " + item + ", 队列大小: " + queue.size());
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        };

        ExecutorService executor = Executors.newFixedThreadPool(3);
        executor.submit(producer);
        executor.submit(consumer);
        executor.submit(consumer);

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdownNow();
    }
}