深入并发学习-锁


公平和非公平锁

  • 公平锁:是指多个线程按照申请锁的顺序来获取锁,类似排队打饭,先来后到。公平锁就是很公平,在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照FIFO的规则从队列中取到自己。
  • 非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后中请的线程比先中请的线程优先获取锁。在高并发的情况下,有可能会造成优先级反转或者饥饿现象。非公平锁比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就再采用类似公平锁那种方式。

并发包中ReentrantLock的创建可以指定构造函数的boolean类型来得到公平锁或非公平锁,默认是非公平锁。

Java ReentrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。

非公平锁的优点在于吞吐量比公平锁大。

对于Synchronized而言,也是一种非公平锁

可重入锁理论知识

指的是同一线程外层函数获得锁之后,内层递归函数仍然能获取该锁的代码,在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。

也即是说,线程可以进入任何一个它已经拥有的锁所同步着的代码块。

ReentrantLock/synchronized就是一个典型的可重入锁。

可重入锁最大的作用是避免死锁。

可重入锁代码验证

synchronized

class Phone {

    public synchronized void sendSMS() throws Exception{
        System.out.println(Thread.currentThread().getName() + "\t invoked sendSMS()");

        // 在同步方法中,调用另外一个同步方法
        sendEmail();
    }


    public synchronized void sendEmail() throws Exception{
        System.out.println(Thread.currentThread().getId() + "\t invoked sendEmail()");
    }
}

public class SynchronizedReentrantLockDemo {

    public static void main(String[] args) {
        Phone phone = new Phone();

        // 两个线程操作资源列
        new Thread(() -> {
            try {
                phone.sendSMS();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            try {
                phone.sendSMS();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "t2").start();
    }

}

输出结果:

t1     invoked sendSMS()
11     invoked sendEmail()
t2     invoked sendSMS()
12     invoked sendEmail()

ReentranLock

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Phone2 implements Runnable{

    Lock lock = new ReentrantLock();

    /**
     * set进去的时候,就加锁,调用set方法的时候,能否访问另外一个加锁的set方法
     */
    public void getLock() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "\t get Lock");
            setLock();
        } finally {
            lock.unlock();
        }
    }

    public void setLock() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "\t set Lock");
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void run() {
        getLock();
    }
}

public class ReentrantLockDemo {


    public static void main(String[] args) {
        Phone2 phone = new Phone2();

        /**
         * 因为Phone实现了Runnable接口
         */
        Thread t3 = new Thread(phone, "t3");
        Thread t4 = new Thread(phone, "t4");
        t3.start();
        t4.start();
    }
}

输出结果

t3     get Lock
t3     set Lock
t4     get Lock    
t4     set Lock

自旋锁理论知识

自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU

提到了互斥同步对性能最大的影响阻塞的实现,挂起线程和恢复线程的操作都需要转入内核态完成,这些操作给系统的并发性能带来了很大的压力。同时,虚拟机的开发团队也注意到在许多应用上,共享数据的锁定状态只会持续很短的一段时间,为了这段时间去挂起和恢复线程并不值得。如果物理机器有一个以上的处理器,能让两个或以上的线程同时并行执行,我们就可以让后面请求锁的那个线程 “稍等一下”,但不放弃处理器的执行时间,看看持有锁的线程是否很快就会释放锁。为了让线程等待,我们只需让线程执行一个忙循环(自旋),这项技术就是所谓的自旋锁。

自旋锁代码验证

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class SpinLockDemo {
    // 现在的泛型装的是Thread,原子引用线程
    AtomicReference<Thread>  atomicReference = new AtomicReference<>();

    public void myLock() {
        // 获取当前进来的线程
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName() + "\t come in ");

        // 开始自旋,期望值是null,更新值是当前线程,如果是null,则更新为当前线程,否者自旋
        while(!atomicReference.compareAndSet(null, thread)) {
            //摸鱼
        }
    }

    public void myUnLock() {
        // 获取当前进来的线程
        Thread thread = Thread.currentThread();

        // 自己用完了后,把atomicReference变成null
        atomicReference.compareAndSet(thread, null);

        System.out.println(Thread.currentThread().getName() + "\t invoked myUnlock()");
    }

    public static void main(String[] args) {
        SpinLockDemo spinLockDemo = new SpinLockDemo();

        // 启动t1线程,开始操作
        new Thread(() -> {

            // 开始占有锁
            spinLockDemo.myLock();

            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 开始释放锁
            spinLockDemo.myUnLock();

        }, "t1").start();


        // 让main线程暂停1秒,使得t1线程,先执行
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 1秒后,启动t2线程,开始占用这个锁
        new Thread(() -> {

            // 开始占有锁
            spinLockDemo.myLock();
            // 开始释放锁
            spinLockDemo.myUnLock();

        }, "t2").start();
    }
}

输出结果

t1     come in 
t2     come in 
t1     invoked myUnlock()
t2     invoked myUnlock()

读写锁理论知识

  • 独占锁:指该锁一次只能被一个线程所持有。对ReentrantLock和Synchronized而言都是独占锁
  • 共享锁:指该锁可被多个线程所持有。

多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享资源应该可以同时进行。但是,如果有一个线程想去写共享资源来,就不应该再有其它线程可以对该资源进行读或写。

对ReentrantReadWriteLock其读锁是共享锁,其写锁是独占锁。

读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。

读写锁代码验证

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

class MyCache {

    private volatile Map<String, Object> map = new HashMap<>();

    public void put(String key, Object value) {
        System.out.println(Thread.currentThread().getName() + "\t 正在写入:" + key);
        try {
            // 模拟网络拥堵,延迟0.3秒
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        map.put(key, value);
        System.out.println(Thread.currentThread().getName() + "\t 写入完成");
    }

    public void get(String key) {
        System.out.println(Thread.currentThread().getName() + "\t 正在读取:");
        try {
            // 模拟网络拥堵,延迟0.3秒
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Object value = map.get(key);
        System.out.println(Thread.currentThread().getName() + "\t 读取完成:" + value);
    }
}

public class ReadWriteWithoutLockDemo {

    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        // 线程操作资源类,5个线程写
        for (int i = 0; i < 5; i++) {
            final int tempInt = i;
            new Thread(() -> {
                myCache.put(tempInt + "", tempInt +  "");
            }, String.valueOf(i)).start();
        }

        // 线程操作资源类, 5个线程读
        for (int i = 0; i < 5; i++) {
            final int tempInt = i;
            new Thread(() -> {
                myCache.get(tempInt + "");
            }, String.valueOf(i)).start();
        }

    }

}

输出结果:

0     正在写入:0
1     正在写入:1
3     正在写入:3
2     正在写入:2
4     正在写入:4
0     正在读取:
1     正在读取:
2     正在读取:
4     正在读取:
3     正在读取:
1     写入完成
4     写入完成
0     写入完成
2     写入完成
3     写入完成
3     读取完成:3
0     读取完成:0
2     读取完成:2
1     读取完成:null
4     读取完成:null

看到有些线程读取到null,可用ReentrantReadWriteLock解决

package com.lun.concurrency;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class MyCache2 {

    private volatile Map<String, Object> map = new HashMap<>();

    private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

    public void put(String key, Object value) {

        // 创建一个写锁
        rwLock.writeLock().lock();

        try {

            System.out.println(Thread.currentThread().getName() + "\t 正在写入:" + key);

            try {
                // 模拟网络拥堵,延迟0.3秒
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            map.put(key, value);

            System.out.println(Thread.currentThread().getName() + "\t 写入完成");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 写锁 释放
            rwLock.writeLock().unlock();
        }
    }

    public void get(String key) {

        // 读锁
        rwLock.readLock().lock();
        try {

            System.out.println(Thread.currentThread().getName() + "\t 正在读取:");

            try {
                // 模拟网络拥堵,延迟0.3秒
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Object value = map.get(key);

            System.out.println(Thread.currentThread().getName() + "\t 读取完成:" + value);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 读锁释放
            rwLock.readLock().unlock();
        }
    }

    public void clean() {
        map.clear();
    }


}

public class ReadWriteWithLockDemo {
    public static void main(String[] args) {

        MyCache2 myCache = new MyCache2();

        // 线程操作资源类,5个线程写
        for (int i = 1; i <= 5; i++) {
            // lambda表达式内部必须是final
            final int tempInt = i;
            new Thread(() -> {
                myCache.put(tempInt + "", tempInt +  "");
            }, String.valueOf(i)).start();
        }

        // 线程操作资源类, 5个线程读
        for (int i = 1; i <= 5; i++) {
            // lambda表达式内部必须是final
            final int tempInt = i;
            new Thread(() -> {
                myCache.get(tempInt + "");
            }, String.valueOf(i)).start();
        }
    }
}

输出结果:

1     正在写入:1
1     写入完成
2     正在写入:2
2     写入完成
3     正在写入:3
3     写入完成
5     正在写入:5
5     写入完成
4     正在写入:4
4     写入完成
2     正在读取:
3     正在读取:
1     正在读取:
5     正在读取:
4     正在读取:
3     读取完成:3
2     读取完成:2
1     读取完成:1
5     读取完成:5
4     读取完成:4

对ReentrantReadWriteLock其读锁是共享锁,其写锁是独占锁。

读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。

CountDownLatch

让一线程阻塞直到另一些线程完成一系列操作才被唤醒。

CountDownLatch主要有两个方法(await(),countDown())。

当一个或多个线程调用await()时,调用线程会被阻塞。其它线程调用countDown()会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为零时,因调用await方法被阻塞的线程会被唤醒,继续执行。

public class CountDownLatchDemo {
    public static void main(String[] args) throws Exception{
        CountDownLatch countDownLatch = new CountDownLatch(5);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                countDownLatch.countDown();
                System.out.println("先执行");
            }).start();
        }

        countDownLatch.await(); // 这里会阻塞,直到为0

        System.out.println("上面执行完,才最后执行");
    }
}

输出结果:

先执行
先执行
先执行
先执行
先执行
最后执行

枚举 + CountDownLatch

import java.util.Objects;

public enum CountryEnum {
    ONE(1, "齐"), TWO(2, "楚"), THREE(3, "燕"), FOUR(4, "赵"), FIVE(5, "魏"), SIX(6, "韩");

    private Integer retcode;
    private String retMessage;

    CountryEnum(Integer retcode, String retMessage) {
        this.retcode = retcode;
        this.retMessage = retMessage;
    }

    public static CountryEnum forEach_countryEnum(int index) {

        CountryEnum[] myArray = CountryEnum.values();

        for(CountryEnum ce : myArray) {
            if(Objects.equals(index, ce.getRetcode())) {
                return ce;
            }
        }

        return null;
    }

    public Integer getRetcode() {
        return retcode;
    }

    public void setRetcode(Integer retcode) {
        this.retcode = retcode;
    }

    public String getRetMessage() {
        return retMessage;
    }

    public void setRetMessage(String retMessage) {
        this.retMessage = retMessage;
    }

}
import java.util.concurrent.CountDownLatch;

public class UnifySixCountriesDemo {

    public static void main(String[] args) throws InterruptedException {
        // 计数器
        CountDownLatch countDownLatch = new CountDownLatch(6);

        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "国被灭了!");
                countDownLatch.countDown();
            }, CountryEnum.forEach_countryEnum(i).getRetMessage()).start();
        }

        countDownLatch.await();

        System.out.println(Thread.currentThread().getName() + " 秦国统一中原。");
    }
}

输出结果:

齐国被灭了!
燕国被灭了!
楚国被灭了!
魏国被灭了!
韩国被灭了!
赵国被灭了!
main 秦国统一中原。

CyclicBarrier

CyclicBarrier的字面意思就是可循环(Cyclic)使用的屏障(Barrier)。它要求做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await方法。

CyclicBarrier与CountDownLatch的区别:CyclicBarrier可重复多次,而CountDownLatch只能是一次。

public class CyclicBarrierDemo {
    public static void main(String[] args) throws Exception{
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
            System.out.println("全部执行后才能最后执行");
        });

        for (int i = 0; i < 7; i++) {
            new Thread(() -> {
                System.out.println("先执行");
                try {
                    // 先到的被阻塞,等全部线程完成后,才能执行方法
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

输出结果:

先执行
先执行
先执行
先执行
先执行
先执行
全部执行后才能最后执行

Semaphore

信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

正常的锁(Concurrent.locks或synchronized锁)在任何时刻都只允许一个任务访问一项资源,而 Semaphore允许n个任务同时访问这个资源。

模拟一个抢车位的场景,假设一共有6个车,3个停车位

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
    public static void main(String[] args) {

        /**
         * 初始化一个信号量为3,默认是false 非公平锁, 模拟3个停车位
         */
        Semaphore semaphore = new Semaphore(3, false);

        // 模拟6部车
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    // 代表一辆车,已经占用了该车位
                    semaphore.acquire(); // 抢占

                    System.out.println(Thread.currentThread().getName() + "\t 抢到车位");

                    // 每个车停3秒
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println(Thread.currentThread().getName() + "\t 离开车位");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 释放停车位
                    semaphore.release();
                }
            }, String.valueOf(i)).start();
        }
    }
}

输出结果:

1     抢到车位
2     抢到车位
0     抢到车位
0     离开车位
2     离开车位
1     离开车位
5     抢到车位
4     抢到车位
3     抢到车位
5     离开车位
4     离开车位
3     离开车位

Synchronized和Lock有什么区别

  • synchronized属于JVM层面,属于java的关键字。monitorenter(底层是通过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象 只能在同步块或者方法中才能调用 wait/ notify等方法)。Lock是具体类(java.util.concurrent.locks.Lock)是api层面的锁
  • synchronized不需要用户去手动释放锁,当synchronized代码执行后,系统会自动让线程释放对锁的占用。ReentrantLock:则需要用户去手动释放锁,若没有主动释放锁,就有可能出现死锁的现象,需要lock() 和 unlock() 配置try catch语句来完成
  • synchronized不可中断,除非抛出异常或者正常运行完成。ReentrantLock可中断,可以设置超时方法,trylock(long timeout, TimeUnit unit)
  • synchronized是非公平锁。ReentrantLock默认非公平锁,构造函数可以传递boolean值,true为公平锁,false为非公平锁
  • synchronized没有绑定多个条件Condition ,要么随机,要么全部唤醒。ReentrantLock用来实现分组唤醒需要唤醒的线程,可以精确唤醒,而不是像synchronized那样,要么随机,要么全部唤醒。

死锁编码及定位分析

死锁是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够碍到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。

发生死锁的四个条件:

  • 互斥条件,线程使用的资源至少有一个不能共享的。
  • 至少有一个线程必须持有一个资源且正在等待获取一个当前被别的线程持有的资源。
  • 资源不能被抢占。
  • 循环等待。

产生死锁的代码例子

package com.lun.concurrency;

import java.util.concurrent.TimeUnit;

class MyTask implements Runnable{

    private Object resourceA, resourceB;

    public MyTask(Object resourceA, Object resourceB) {
        this.resourceA = resourceA;
        this.resourceB = resourceB;
    }

    @Override
    public void run() {
        synchronized (resourceA) {
            System.out.println(String.format("%s 自己持有%s,尝试持有%s",// 
                    Thread.currentThread().getName(), resourceA, resourceB));

            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            synchronized (resourceB) {
                System.out.println(String.format("%s 同时持有%s,%s",// 
                        Thread.currentThread().getName(), resourceA, resourceB));
            }
        }
    }
}

public class DeadLockDemo {
    public static void main(String[] args) {
        Object resourceA = new Object();
        Object resourceB = new Object();

        new Thread(new MyTask(resourceA, resourceB),"Thread A").start();
        new Thread(new MyTask(resourceB, resourceA),"Thread B").start();
    }
}

输出结果:

Thread A 自己持有java.lang.Object@59d8d77,尝试持有java.lang.Object@7a15e6e6
Thread B 自己持有java.lang.Object@7a15e6e6,尝试持有java.lang.Object@59d8d77

查看是否死锁工具:

  1. jps命令定位进程号

  2. jstack找到死锁查看

C:\Users\abc>jps -l
11968 com.lun.concurrency.DeadLockDemo
6100 jdk.jcmd/sun.tools.jps.Jps
6204 Eclipse

C:\Users\abc>jstack 11968
2021-03-09 02:42:46
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.251-b08 mixed mode):

"DestroyJavaVM" #13 prio=5 os_prio=0 tid=0x00000000004de800 nid=0x2524 waiting on condition [0
x0000000000000000]
   java.lang.Thread.State: RUNNABLE
    .....

文章作者: Adbo
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Adbo !
评论
  目录