Java 多线程共享模型之管程(下)

共享模型之管程

wait、notify

wait、notify 原理

Java 多线程共享模型之管程(下)
  • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
  • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
  • BLOCKED 线程会在 Owner 线程释放锁时唤醒
  • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入EntryList 重新竞争

API 介绍

  • obj.wait() 让进入 object 监视器的线程到 waitSet 等待
  • obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
  • obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法

package WaNo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo2")
public class demo2 {

    static final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (lock){
                log.debug("执行");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其他代码");
            }
        },"t1").start();

        new Thread(() -> {
            synchronized (lock){
                log.debug("执行");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其他代码");
            }
        },"t2").start();

        Thread.sleep(2000);
        log.debug("唤醒 lock 上其他线程");
        synchronized (lock){
            lock.notify();  //唤醒 lock 上的一个线程(随机)
            //lock.notifyAll();   //唤醒 lock 上的所有线程
        }
    }
}
  • notify()
20:20:58 [t1] c.demo2 - 执行
20:20:58 [t2] c.demo2 - 执行
20:21:00 [main] c.demo2 - 唤醒 lock 上其他线程
20:21:00 [t1] c.demo2 - 其他代码
  • notifyAll()
20:22:04 [t1] c.demo2 - 执行
20:22:04 [t2] c.demo2 - 执行
20:22:06 [main] c.demo2 - 唤醒 lock 上其他线程
20:22:06 [t2] c.demo2 - 其他代码
20:22:06 [t1] c.demo2 - 其他代码

wait() 方法会释放对象的锁,进入 WaitSet 等待区,从而让其他线程就机会获取对象的锁。无限制等待,直到notify 为止

wait(long n) 有时限的等待, 到 n 毫秒后结束等待,或是被 notify

wait、notify 正确使用

sleep vs. wait

  • sleep 是 Thread 方法,而 wait 是 Object 的方法
  • sleep 不需要强制和 synchronized 配合使用,但 wait 需要和 synchronized 一起用
  • sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁
  • 它们状态 TIMED_WAITING
step 1

思考下面的解决方案好不好,为什么?

package WaNo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class demo4 {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeOut = false;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (room){
                log.debug("有烟没?[{}]",hasCigarette);
                if(!hasCigarette){
                    log.debug("没烟,睡会!");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有烟没?[{}]",hasCigarette);
                if(hasCigarette){
                    log.debug("开始干活!");
                }
            }
        },"小南").start();

        for(int i=0;i {
                synchronized (room){
                    log.debug("开始干活!");
                }
            },"其他人").start();
        }

        Thread.sleep(1000);
        new Thread(() -> {
            hasCigarette = true;
            log.debug("烟到了!");
        },"送烟的").start();
    }
}

输出:

20:41:09 [小南] c.demo4 - 有烟没?[false]
20:41:09 [小南] c.demo4 - 没烟,睡会!
20:41:10 [送烟的] c.demo4 - 烟到了!
20:41:11 [小南] c.demo4 - 有烟没?[true]
20:41:11 [小南] c.demo4 - 开始干活!
20:41:11 [其他人] c.demo4 - 开始干活!
20:41:11 [其他人] c.demo4 - 开始干活!
20:41:11 [其他人] c.demo4 - 开始干活!
20:41:11 [其他人] c.demo4 - 开始干活!
20:41:11 [其他人] c.demo4 - 开始干活!
  • 其它干活的线程,都要一直阻塞,效率太低
  • 小南线程必须睡足 2s 后才能醒来,就算烟提前送到,也无法立刻醒来
  • 加了 synchronized (room) 后,就好比小南在里面反锁了门睡觉,烟根本没法送进门,main 没加synchronized 就好像 main 线程是翻窗户进来的
  • 解决方法,使用 wait – notify 机制
step 2

思考下面的实现行吗,为什么?

package WaNo.step;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class step2 {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeOut = false;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (room){
                log.debug("有烟没?[{}]",hasCigarette);
                if(!hasCigarette){
                    log.debug("没烟,睡会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有烟没?[{}]",hasCigarette);
                if(hasCigarette){
                    log.debug("开始干活!");
                }
            }
        },"小南").start();

        for(int i=0;i {
                synchronized (room){
                    log.debug("开始干活!");
                }
            },"其他人").start();
        }

        Thread.sleep(1000);
        new Thread(() -> {
            synchronized (room){
                hasCigarette = true;
                log.debug("烟到了!");
                room.notify();
            }
        },"送烟的").start();
    }
}

输出:

20:46:32 [小南] c.demo4 - 有烟没?[false]
20:46:32 [小南] c.demo4 - 没烟,睡会!
20:46:32 [其他人] c.demo4 - 开始干活!
20:46:32 [其他人] c.demo4 - 开始干活!
20:46:32 [其他人] c.demo4 - 开始干活!
20:46:32 [其他人] c.demo4 - 开始干活!
20:46:32 [其他人] c.demo4 - 开始干活!
20:46:33 [送烟的] c.demo4 - 烟到了!
20:46:33 [小南] c.demo4 - 有烟没?[true]
20:46:33 [小南] c.demo4 - 开始干活!
  • 解决了其它干活的线程阻塞的问题
  • 但如果有其它线程也在等待条件呢?
step 3
package WaNo.step;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class step3 {
    static final Object room = new Object();
    static boolean hasCigarette = false;
    static boolean hasTakeOut = false;

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            synchronized (room){
                log.debug("有烟没?[{}]",hasCigarette);
                if(!hasCigarette){
                    log.debug("没烟,睡会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("有烟没?[{}]",hasCigarette);
                if(hasCigarette){
                    log.debug("开始干活!");
                } else {
                    log.debug("没干成活...");
                }
            }
        },"小南").start();

        new Thread(() -> {
            synchronized (room) {
                Thread thread = Thread.currentThread();
                log.debug("外卖送到没?[{}]", hasTakeOut);
                if (!hasTakeOut) {
                    log.debug("没外卖,先歇会!");
                    try {
                        room.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("外卖送到没?[{}]", hasTakeOut);
                if (hasTakeOut) {
                    log.debug("可以开始干活了");
                } else {
                    log.debug("没干成活...");
                }
            }
        }, "小女").start();

        for(int i=0;i {
                synchronized (room){
                    log.debug("开始干活!");
                }
            },"其他人").start();
        }

        Thread.sleep(1000);
        new Thread(() -> {
            synchronized (room){
                hasCigarette = true;
                log.debug("烟到了!");
                room.notify();
            }
        },"送烟的").start();
    }
}

输出:

20:53:12.173 [小南] c.TestCorrectPosture - 有烟没?[false]
20:53:12.176 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:53:12.176 [小女] c.TestCorrectPosture - 外卖送到没?[false]
20:53:12.176 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:53:13.174 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:53:13.174 [小南] c.TestCorrectPosture - 有烟没?[false]
20:53:13.174 [小南] c.TestCorrectPosture - 没干成活...

notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】

解决方法,改为 notifyAll

step 4
new Thread(() -> {
    synchronized (room) {
        hasTakeout = true;
        log.debug("外卖到了噢!");
        room.notifyAll();
    }
}, "送外卖的").start();

输出:

20:55:23.978 [小南] c.TestCorrectPosture - 有烟没?[false]
20:55:23.982 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:55:23.982 [小女] c.TestCorrectPosture - 外卖送到没?[false]
20:55:23.982 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:55:24.979 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:55:24.979 [小女] c.TestCorrectPosture - 外卖送到没?[true]
20:55:24.980 [小女] c.TestCorrectPosture - 可以开始干活了
20:55:24.980 [小南] c.TestCorrectPosture - 有烟没?[false]
20:55:24.980 [小南] c.TestCorrectPosture - 没干成活...

用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了

解决方法,用 while + wait,当条件不成立,再次 wait

step 5

将 if 改为 while

while (!hasCigarette) {
    log.debug("没烟,先歇会!");
    try {
        room.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
}

输出:

20:58:34.322 [小南] c.TestCorrectPosture - 有烟没?[false]
20:58:34.326 [小南] c.TestCorrectPosture - 没烟,先歇会!
20:58:34.326 [小女] c.TestCorrectPosture - 外卖送到没?[false]
20:58:34.326 [小女] c.TestCorrectPosture - 没外卖,先歇会!
20:58:35.323 [送外卖的] c.TestCorrectPosture - 外卖到了噢!
20:58:35.324 [小女] c.TestCorrectPosture - 外卖送到没?[true]
20:58:35.324 [小女] c.TestCorrectPosture - 可以开始干活了
20:58:35.324 [小南] c.TestCorrectPosture - 没烟,先歇会!
套路总结
synchronized(lock) {
    while(条件不成立) {
        lock.wait();
    }
    // 干活
}

//另一个线程
synchronized(lock) {
    lock.notifyAll();
}

同步模式之保护性暂停

定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式

Java 多线程共享模型之管程(下)
实现
package WaNo.step;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;

@Slf4j(topic = "c.demo4")
public class demo4 {
    public static void main(String[] args) {
        //线程1 等待线程2 的下载结果
        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            List list = (List) guardedObject.get();
            log.debug("结果的大小是:{}",list.size());
        },"t1").start();

        new Thread(() -> {
            log.debug("执行下载");
            try {
                Thread.sleep(5000);
                List list = new ArrayList<>();
                list.add("1");
                guardedObject.complete(list);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2").start();
    }
}

class GuardedObject {
    //结果
    private Object response;

    //获取结果
    public Object get() {
        synchronized (this){
            //还没有结果
            while (response == null){
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }

    //产生结果
    public void complete(Object response){
        synchronized (this){
            //给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

输出:

16:47:15 [t2] c.demo4 - &#x6267;&#x884C;&#x4E0B;&#x8F7D;
16:47:20 [t1] c.demo4 - &#x7ED3;&#x679C;&#x7684;&#x5927;&#x5C0F;&#x662F;&#xFF1A;1

异步模式之生产者/消费者

要点

  • 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

Java 多线程共享模型之管程(下)
package WaNo;

import lombok.AllArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.util.LinkedList;

@Slf4j(topic = "c.demo5")
public class demo5 {
    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue(2);

        for (int i = 0; i < 3; i++) {
            int id = i;
            new Thread(() -> {
                queue.put(new Message(id,"值"+id));
            },"生产者" + i).start();
        }

        new Thread(() -> {
            while (true){
                try {
                    Thread.sleep(1000);
                    Message message = queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"消费者").start();
    }
}

//消息队列类(线程间通信)
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
    //消息队列集合
    private LinkedList list = new LinkedList<>();
    //队列容量
    private int capcity;

    public MessageQueue(int capcity){
        this.capcity = capcity;
    }

    //获取消息
    public Message take(){
        //检查队列是否为空
        synchronized (list){
            while (list.isEmpty()){
                try {
                    log.debug("队列为空,消费者线程等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //从队列头部获取消息返回
            Message message = list.removeFirst();
            log.debug("已消费消息 {}",message);
            list.notifyAll();
            return message;
        }
    }

    //存入消息
    public void put(Message message){
        synchronized (list){
            //检查队列是否已满
            while (list.size() == capcity){
                try {
                    log.debug("队列已满,生产者线程等待");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //将消息加入队列的尾部
            list.addLast(message);
            log.debug("已生产消息 {}",message);
            list.notifyAll();
        }
    }
}

@Setter
@AllArgsConstructor
@ToString
@Slf4j(topic = "c.Message")
final class Message {
    private int id;
    private Object value;
}

输出:

17:18:49 [&#x751F;&#x4EA7;&#x8005;0] c.MessageQueue - &#x5DF2;&#x751F;&#x4EA7;&#x6D88;&#x606F; Message(id=0, value=&#x503C;0)
17:18:49 [&#x751F;&#x4EA7;&#x8005;2] c.MessageQueue - &#x5DF2;&#x751F;&#x4EA7;&#x6D88;&#x606F; Message(id=2, value=&#x503C;2)
17:18:49 [&#x751F;&#x4EA7;&#x8005;1] c.MessageQueue - &#x961F;&#x5217;&#x5DF2;&#x6EE1;&#xFF0C;&#x751F;&#x4EA7;&#x8005;&#x7EBF;&#x7A0B;&#x7B49;&#x5F85;
17:18:50 [&#x6D88;&#x8D39;&#x8005;] c.MessageQueue - &#x5DF2;&#x6D88;&#x8D39;&#x6D88;&#x606F; Message(id=0, value=&#x503C;0)
17:18:50 [&#x751F;&#x4EA7;&#x8005;1] c.MessageQueue - &#x5DF2;&#x751F;&#x4EA7;&#x6D88;&#x606F; Message(id=1, value=&#x503C;1)
17:18:51 [&#x6D88;&#x8D39;&#x8005;] c.MessageQueue - &#x5DF2;&#x6D88;&#x8D39;&#x6D88;&#x606F; Message(id=2, value=&#x503C;2)
17:18:52 [&#x6D88;&#x8D39;&#x8005;] c.MessageQueue - &#x5DF2;&#x6D88;&#x8D39;&#x6D88;&#x606F; Message(id=1, value=&#x503C;1)
17:18:53 [&#x6D88;&#x8D39;&#x8005;] c.MessageQueue - &#x961F;&#x5217;&#x4E3A;&#x7A7A;&#xFF0C;&#x6D88;&#x8D39;&#x8005;&#x7EBF;&#x7A0B;&#x7B49;&#x5F85;

park、unpark

基本使用

它们是 LockSupport 类中的方法

// 暂停当前线程
LockSupport.park();

// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

先 park 再 unpark

Thread t1 = new Thread(() -> {
    log.debug("start...");
    sleep(1);
    log.debug("park...");
    LockSupport.park();
    log.debug("resume...");
},"t1");
t1.start();

Thread.sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

输出:

18:42:52.585 c.TestParkUnpark [t1] - start...

18:42:53.589 c.TestParkUnpark [t1] - park...

18:42:54.583 c.TestParkUnpark [main] - unpark...

18:42:54.583 c.TestParkUnpark [t1] - resume...

先 unpark 再 park

Thread t1 = new Thread(() -> {
    log.debug("start...");
     sleep(2);
     log.debug("park...");
     LockSupport.park();
     log.debug("resume...");
}, "t1");
t1.start();

sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

输出:

18:43:50.765 c.TestParkUnpark [t1] - start...

18:43:51.764 c.TestParkUnpark [main] - unpark...

18:43:52.769 c.TestParkUnpark [t1] - park...

18:43:52.769 c.TestParkUnpark [t1] - resume...

特点

与 Object 的 wait & notify 相比

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

原理

每个线程都有自己的一个 Parker 对象,由三部分组成 _counter , _cond 和 _mutex

Java 多线程共享模型之管程(下)
  1. 当前线程调用 Unsafe.park() 方法
  2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁
  3. 线程进入 _cond 条件变量阻塞
  4. 设置 _counter = 0

Java 多线程共享模型之管程(下)
  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 唤醒 _cond 条件变量中的 Thread_0
  3. Thread_0 恢复运行
  4. 设置 _counter 为 0

Java 多线程共享模型之管程(下)
  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 当前线程调用 Unsafe.park() 方法
  3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
  4. 设置 _counter 为 0

重新理解六种状态

Java 多线程共享模型之管程(下)

假设有线程 Thread t

情况一

NEW –> RUNNABLE

当调用 t.start() 方法时,由 NEW –> RUNNABLE

情况二

​ RUNNABLE

t 线程用 synchronized(obj) 获取了对象锁后

  • 调用 obj.wait() 方法时, t 线程从 RUNNABLE –> WAITING
  • 调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
  • 竞争锁成功, t 线程从 WAITING –> RUNNABLE
  • 竞争锁失败, t 线程从 WAITING –> BLOCKED

情况三

RUNNABLE

  • 当前线程调用 t.join() 方法时, 当前线程从 RUNNABLE –> WAITING
  • 注意是 当前线程t 线程对象的监视器上等待
  • t 线程运行结束,或调用了 当前线程的 interrupt() 时, 当前线程从 WAITING –> RUNNABLE

情况四

RUNNABLE

  • 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE –> WAITING
  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING –> RUNNABLE

情况五

​ RUNNABLE

t 线程用 synchronized(obj) 获取了对象锁后

  • 调用 obj.wait(long n) 方法时, t 线程从 RUNNABLE –> TIMED_WAITING
  • t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
  • 竞争锁成功, t 线程从 TIMED_WAITING –> RUNNABLE
  • 竞争锁失败, t 线程从 TIMED_WAITING –> BLOCKED

情况六

RUNNABLE

  • 当前线程调用 t.join(long n) 方法时, 当前线程从 RUNNABLE –> TIMED_WAITING
  • 注意是 当前线程t 线程对象的监视器上等待
  • 当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了 当前线程的 interrupt() 时, 当前线程从TIMED_WAITING –> RUNNABLE

情况七

RUNNABLE

  • 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE –> TIMED_WAITING
  • 当前线程等待时间超过了 n 毫秒, 当前线程从 TIMED_WAITING –> RUNNABLE

情况八

RUNNABLE

  • 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时, 当前线程从 RUNNABLE –> TIMED_WAITING
  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从TIMED_WAITING–> RUNNABLE

情况九

RUNNABLE

  • t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED
  • 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED

情况十

RUNNABLE

当前线程所有代码运行完毕,进入 TERMINATED

多把锁

package WaNo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo6")
public class demo6 {
    public static void main(String[] args) {
        BigRoom bigRoom = new BigRoom();
        new Thread(() -> {
            bigRoom.study();
        },"r1").start();

        new Thread(() -> {
            bigRoom.sleep();
        },"r2").start();
    }
}

@Slf4j(topic = "c.BigRoom")
class BigRoom {
    private final Object studyRoom = new Object();
    private final Object bedRoom = new Object();

    public void sleep(){
        synchronized (bedRoom){
            log.debug("sleep two hours");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void study(){
        synchronized (studyRoom){
            log.debug("study one hour");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

输出:

20:01:42 [r2] c.BigRoom - sleep two hours
20:01:42 [r1] c.BigRoom - study one hour

将锁的粒度细分

  • 好处,是可以增强并发度
  • 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁

活跃性

死锁

有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁

t1 线程 获得 A对象 锁,接下来想获取 B对象 的锁 t2 线程 获得 B对象 锁,接下来想获取 A对象 的锁

Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
    synchronized (A) {
        log.debug("lock A");
        sleep(1);
        synchronized (B) {
            log.debug("lock B");
            log.debug("操作...");
        }
    }
}, "t1");

Thread t2 = new Thread(() -> {
    synchronized (B) {
        log.debug("lock B");
        sleep(0.5);
        synchronized (A) {
            log.debug("lock A");
            log.debug("操作...");
        }
    }
}, "t2");
t1.start();
t2.start();

输出:

12:22:06.962 [t2] c.TestDeadLock - lock B
12:22:06.962 [t1] c.TestDeadLock - lock A

哲学家进餐问题

Java 多线程共享模型之管程(下)

有五位哲学家,围坐在圆桌旁。

  • 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。
  • 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。
  • 如果筷子被身边的人拿着,自己就得等待

这种线程没有按预期结束,执行不下去的情况,归类为【活跃性】问题,除了死锁以外,还有活锁和饥饿者两种情况

活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束

public class TestLiveLock {
    static volatile int count = 10;
    static final Object lock = new Object();
    public static void main(String[] args) {
    new Thread(() -> {
        // 期望减到 0 退出循环
        while (count > 0) {
            sleep(0.2);
            count--;
            log.debug("count: {}", count);
        }
    }, "t1").start();

    new Thread(() -> {
        // 期望超过 20 退出循环
        while (count < 20) {
            sleep(0.2);
            count++;
            log.debug("count: {}", count);
        }
    }, "t2").start();
 }

饥饿

一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束

ReentrantLock

相对于 synchronized 它具备如下特点

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量

与 synchronized 一样,都支持可重入

基本语法

// 获取锁
reentrantLock.lock();
try {
    // 临界区
} finally {
    // 释放锁
    reentrantLock.unlock();
}

可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo1")
public class demo1 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {

        lock.lock();
        try {
            log.debug("enter main");
            m1();
        }finally {
            lock.unlock();
        }
    }

    public static void m1(){
        lock.lock();
        try {
            log.debug("enter m1");
            m2();
        }finally {
            lock.unlock();
        }
    }

    public static void m2(){
        lock.lock();
        try {
            log.debug("enter m2");
        }finally {
            lock.unlock();
        }
    }
}

输出:

20:19:19 [main] c.demo1 - enter main
20:19:19 [main] c.demo1 - enter m1
20:19:19 [main] c.demo1 - enter m2

可打断

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo2")
public class demo2 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            try {
                //如果没有竞争,此方法会获取对象的锁
                //如果有竞争,就进入阻塞队列,可以被其他线程用 interrupt 打断
                log.debug("尝试获得锁");
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.debug("未获得锁,返回");
                return;
            }
            try {
                log.debug("获取到锁");
            }finally {
                lock.unlock();
            }
        }, "t1");

        lock.lock();
        t1.start();
        Thread.sleep(1000);
        log.debug("打断t1");
        t1.interrupt();
    }
}

输出:

20:26:05 [t1] c.demo2 - &#x5C1D;&#x8BD5;&#x83B7;&#x5F97;&#x9501;
20:26:06 [main] c.demo2 - &#x6253;&#x65AD;t1
20:26:06 [t1] c.demo2 - &#x672A;&#x83B7;&#x5F97;&#x9501;&#xFF0C;&#x8FD4;&#x56DE;
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
    at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
    at ReentrantLockDemo.demo2.lambda$main$0(demo2.java:16)
    at java.lang.Thread.run(Thread.java:748)

Process finished with exit code 0

注意如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断

锁超时

立刻失败

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo3")
public class demo3 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            log.debug("尝试获得锁");
            if(!lock.tryLock()){
                log.debug("获取不到锁");
                return;
            }
            try {
                log.debug("获得到锁");
            }finally {
                lock.unlock();
            }
        },"t1");

        lock.lock();
        log.debug("获得到锁");
        t1.start();
    }
}

输出:

20:31:15 [main] c.demo3 - &#x83B7;&#x5F97;&#x5230;&#x9501;
20:31:15 [t1] c.demo3 - &#x5C1D;&#x8BD5;&#x83B7;&#x5F97;&#x9501;
20:31:15 [t1] c.demo3 - &#x83B7;&#x53D6;&#x4E0D;&#x5230;&#x9501;

超时失败

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;
import sun.reflect.generics.tree.Tree;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo3")
public class demo3 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            log.debug("尝试获得锁");
            try {
                if(!lock.tryLock(1, TimeUnit.SECONDS)){
                    log.debug("获取不到锁");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                log.debug("获取不到锁");
                return;
            }
            try {
                log.debug("获得到锁");
            }finally {
                lock.unlock();
            }
        },"t1");

        lock.lock();
        log.debug("获得到锁");
        Thread.sleep(1000);
        lock.unlock();
        t1.start();
    }
}

输出:

20:34:03 [main] c.demo3 - &#x83B7;&#x5F97;&#x5230;&#x9501;
20:34:04 [t1] c.demo3 - &#x5C1D;&#x8BD5;&#x83B7;&#x5F97;&#x9501;
20:34:04 [t1] c.demo3 - &#x83B7;&#x5F97;&#x5230;&#x9501;

公平锁

ReentrantLock 默认是不公平的

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class demo4 {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock(false);
        lock.lock();
        for (int i = 0; i < 500; i++) {
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " running...");
                } finally {
                    lock.unlock();
                }
            }, "t" + i).start();
        }
// 1s 之后去争抢锁
        Thread.sleep(1000);
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + " start...");
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " running...");
            } finally {
                lock.unlock();
            }
        }, "强行插入").start();
        lock.unlock();
    }
}

强行插入,有机会在中间输出

注意该实验不一定总能复现

t39 running...

t40 running...

t41 running...

t42 running...

t43 running...

&#x5F3A;&#x884C;&#x63D2;&#x5165; start...

&#x5F3A;&#x884C;&#x63D2;&#x5165; running...

t44 running...

t45 running...

t46 running...

t47 running...

t49 running...

改为公平锁后

ReentrantLock lock = new ReentrantLock(true);

强行插入,总是在最后输出

t465 running...

t464 running...

t477 running...

t442 running...

t468 running...

t493 running...

t482 running...

t485 running...

t481 running...

&#x5F3A;&#x884C;&#x63D2;&#x5165; running...

公平锁一般没有必要,会降低并发度

条件变量

ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒

使用要点:

  • await 前需要获得锁
  • await 执行后,会释放锁,进入 conditionObject 等待
  • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
  • 竞争 lock 锁成功后,从 await 后继续执行
package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo4")
public class demo4 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        //创建一个新的条件变量(休息室)
        Condition condition1 = lock.newCondition();
        Condition condition2 = lock.newCondition();

        lock.lock();
        //进入休息室等待
        condition1.await();

        condition1.signal();
        //condition1.signalAll();
    }
}

同步模式之顺序控制

固定运行顺序

比如,必须先 2 后 1 打印

wait notify版

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "c.demo4")
public class demo4 {
    static final Object lock = new Object();
    //表示 t2 是否被运行过
    static boolean t2runned = false;
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lock){
                while (!t2runned){
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("1");
            }
        },"t1");

        Thread t2 = new Thread(() -> {
            synchronized (lock){
                log.debug("2");
                t2runned = true;
                lock.notify();
            }
        },"t2");

        t1.start();
        t2.start();
    }
}

输出:

20:49:28 [t2] c.demo4 - 2
20:49:28 [t1] c.demo4 - 1

park unpark版

可以看到,实现上很麻烦:

  • 首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait
  • 第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决此问题
  • 最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个

可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:

package ReentrantLockDemo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;

@Slf4j(topic = "demo5")
public class demo5 {
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            LockSupport.park();
            log.debug("1");
        }, "t1");

        t1.start();

        new Thread(() -> {
            log.debug("2");
            LockSupport.unpark(t1);
        },"t2").start();
    }
}

交替输出

线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现

wait notify版

package ReentrantLockDemo;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.LockSupport;

@Slf4j(topic = "demo5")
public class demo5 {
    public static void main(String[] args) {
        WaitNotify wn = new WaitNotify(1,5);
        new Thread(() -> {
            wn.print("a",1,2);
        }).start();
        new Thread(() -> {
            wn.print("b",2,3);
        }).start();
        new Thread(() -> {
            wn.print("c",3,1);
        }).start();
    }
}
/*
    输出内容    等待标记    下一个标记
    a           1           2
    b           2           3
    c           3           1
 */
@AllArgsConstructor
class WaitNotify{
    //等待标记
    private int flag;
    //循环次数
    private int loopNumber;

    //打印
    public void print(String str,int waitFlag,int nextFlag){
        for (int i = 0; i < loopNumber; i++) {
            synchronized (this){
                while (flag != waitFlag){
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.print(str);
                flag = nextFlag;
                this.notifyAll();
            }
        }
    }
}

输出:

abcabcabcabcabc

ReentrantLock版

package ReentrantLockDemo;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.demo6")
public class demo6 {
    public static void main(String[] args) throws InterruptedException {
        AwaitSignal awaitSignal = new AwaitSignal(5);
        Condition a = awaitSignal.newCondition();
        Condition b = awaitSignal.newCondition();
        Condition c = awaitSignal.newCondition();
        new Thread(() -> {
            awaitSignal.print("a", a, b);
        }).start();
        new Thread(() -> {
            awaitSignal.print("b", b, c);
        }).start();
        new Thread(() -> {
            awaitSignal.print("c", c, a);
        }).start();

        Thread.sleep(1000);
        awaitSignal.lock();
        try {
            System.out.println("开始。。。");
            a.signal();
        }finally {
            awaitSignal.unlock();
        }
    }
}

@AllArgsConstructor
class AwaitSignal extends ReentrantLock {
    private int loopNumber;
    /**
     * @param str 打印内容
     * @param current   进入哪一间休息室
     * @param next  下一间休息室
     */
    public void print(String str,Condition current,Condition next){
        for (int i = 0; i < loopNumber; i++) {
            lock();
            try {
                try {
                    current.await();
                    System.out.print(str);
                    next.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }finally {
                unlock();
            }
        }
    }
}

输出:

&#x5F00;&#x59CB;&#x3002;&#x3002;&#x3002;
abcabcabcabcabc

park unpark版

package ReentrantLockDemo;

import lombok.AllArgsConstructor;

import java.util.concurrent.locks.LockSupport;

public class demo7 {

    static  Thread t1;
    static  Thread t2;
    static  Thread t3;

    public static void main(String[] args) {
        ParkUnpark pu = new ParkUnpark(5);
        t1 = new Thread(() -> {
            pu.print("a", t2);
        },"t1");
        t2 = new Thread(() -> {
            pu.print("b", t3);
        },"t2");
        t3 = new Thread(() -> {
            pu.print("c", t1);
        },"t3");

        t1.start();
        t2.start();
        t3.start();

        LockSupport.unpark(t1);
    }
}

@AllArgsConstructor
class ParkUnpark{
    private int loopNumber;

    public void print(String str,Thread next){
        for (int i = 0; i < loopNumber; i++) {
            LockSupport.park();
            System.out.print(str);
            LockSupport.unpark(next);
        }
    }
}

输出:

abcabcabcabcabc

Original: https://www.cnblogs.com/lcha-coding/p/16365735.html
Author: 染沁
Title: Java 多线程共享模型之管程(下)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/620300/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 4、Idea设置显示多行文件

    使用IDEA时,可能会没有注意到,一旦打开过多的Java文件时,默认会堆积在一行显示,就像浏览器打开了多个标签一样,此时需要通过右侧箭头筛选的方式来选择其他文件。为了解决这一问题,…

    数据库 2023年6月6日
    0122
  • 4. 事务和锁

    404. 抱歉,您访问的资源不存在。 可能是URL不正确,或者对应的内容已经被删除,或者处于隐私状态。 [En] It may be that the URL is incorre…

    数据库 2023年5月24日
    0118
  • 线程本地存储 ThreadLocal

    线程本地存储提供了线程内存储变量的能力,这些变量是线程私有的。 线程本地存储一般用在跨类、跨方法的传递一些值。 线程本地存储也是解决特定场景下线程安全问题的思路之一(每个线程都访问…

    数据库 2023年6月11日
    0116
  • mysql视图,索引

    一、视图 View 视图是一个 虚拟表,是sql语句的查询结果,其内容由查询定义。同真实的表一样,视图包含一系列带有名称的列和行数据,在使用视图时动态生成。视图的数据变化会影响到基…

    数据库 2023年6月9日
    086
  • python: can’t open file ‘upload.py’: [Errno 2] No such file or directory

    为了发博客方便,参考别人的文章(见参考文章:[1][2]),使用 Metaweblog 和 pycnblog([3])插件实现相关功能,将本地markdown文件同步至博客园。使用…

    数据库 2023年6月14日
    0107
  • MyRocks DDL原理

    最近一个日常实例在做DDL过程中,直接把数据库给干趴下了,问题还是比较严重的,于是赶紧排查问题,撸了下crash堆栈和alert日志,发现是在去除唯一约束的场景下,MyRocks存…

    数据库 2023年6月9日
    0142
  • 数据类型

    布尔类型:true和false;占一个位 public class Demo01 {    public static void main(String[] args) { Ori…

    数据库 2023年6月11日
    0125
  • Rabbitmq从安装到简单入门

    1:Rabbitmq是什么? RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。 它由以高性能、健壮以及可伸缩性出名的 Erlang …

    数据库 2023年6月6日
    098
  • 测试执行和软件缺陷

    测试执行 1.基本概念 测试执行就是执行测试用例、提交Bug 单、测试结论的评估和总结等一系列测试活动,测试执行不仅包含测试用例的执行,还包括其它测试活动. 2.注意事项 (1) …

    数据库 2023年6月16日
    0108
  • 就这么一个简单的校验,80%的程序员却做不到,更不理解!

    在学生管理系统里,其中会有学生信息采集的功能。程序结构不外乎下面的分层实现方式。 开发出来这个功能,我觉得大家都易如反掌了。 当然易如反掌。 OK,我要说的是数据校验,以最简单的非…

    数据库 2023年6月9日
    099
  • volatility3-windows插件

    volatility3和volatility有很大的区别 查看镜像信息,volatility会进行分析 <span class=”ne-text”>python vol…

    数据库 2023年6月11日
    0113
  • 在ESXI6.7中安装OpenWrt

    在ESXI6.7中安装OpenWrt 21.02.2 一、前置准备 安装好的esxi6.7 下载openwrt镜像,如:openwrt-21.02.2-x86-64-generic…

    数据库 2023年6月9日
    0152
  • go的调度

    操作系统根据资源访问权限的不同,体系架构可以分为用户空间和内核空间;内核空间主要操作访问CPU资源,IO资源,内存资源等硬件资源,为应用程序提供最基本的基础资源;用户空间是上层应用…

    数据库 2023年6月9日
    0120
  • Redis缓存雪崩、缓存穿透、缓存击穿

    缓存雪崩 Redis中的缓存数据是有过期时间的,当在同一时间大量的缓存同时失效时就会造成缓存雪崩。解决方案1、设置Redis中的key永不过期,缺点是会占用很多内存2、使用Redi…

    数据库 2023年6月14日
    0117
  • 0. 数据库设计规范化

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    数据库 2023年6月16日
    0136
  • 用户后台管理

    User Management 这是通过SpringBoot完成的用户后台管理系统 一些解释说明也在代码里面, 源码及资源 会放在文末哦!!! – 这是效果图 大概就这…

    数据库 2023年6月16日
    0149
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球