Java线程安全

在Java内存模型中,分为主内存和线程工作内存,线程使用共享数据时,都是先从主内存中拷贝到工作内存,使用完成之后再写入主内存,可以理解为线程之间通讯是通过共享内存的方式实现的。在多线程环境下,不同线程对同一份数据操作,就可能会产生不同线程中数据状态不一致的情况,这就是线程安全问题的定义或者说原因。要实现线程安全,需要保证数据操作的两个特性:

Java中的阻塞队列实现原理都是通过上篇文章中提到的ReentrantLock来实现的,所有操作方法都必须先获取内部的ReentrantLock才能继续,否则返回false/阻塞/抛出异常,常用的阻塞队列有以下几个:ArrayBlockingQueue:由数组实现的有界阻塞队列LinkedBlockingQueue:由链表实现的有界阻塞队列PriorityBlockingQueue:由数组实现的支持优先级的无界阻塞队列SynchronousBlockingQueue:不储存元素的阻塞队列,所有的入队列操作都将阻塞,直到被出队列唤醒,反之亦然,newCachedThreadPool中的阻塞队列就是这个DelayBlockingQueue:基于PriorityQueue实现的延时阻塞队列下面就以ArrayBlockingQueue为例摸一遍源码吧,其他也都差不多的套路:

图片 1JMM.png

 /** Number of elements in the queue */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put throws InterruptedException { checkNotNull; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue; } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private void enqueue { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }

图片 2Mark
Word

核心方法是enqueue、dequeue,因为是private并不包含任何同步和长度判断,只是简单的在数组中插入和删除元素罢了,真正的同步实在对外暴露的put、take等方法,首先获取Lock,同时会判断长度问题决定是否需要通过Condition等待队列非空/非满。关于BlockingQueue可以思考以下细节问题:队列长度问题:ArrayBlockingQueue通过count来记录长度,为什么不需要加volatile呢?上篇文章有讲到AQS中已经通过volatile来避免state的可见性问题,BlockingQueue中获取count之前已经获取锁,肯定不会有可见性问题的了。LinkedBlockingQueue中直接用了AtomicInteger来记录长度,简单粗暴,在获取size等方法时都不需要加锁。无界队列如PriorityBlockingQueue用数组实现最小堆结构,这个需要注意数据扩容导致的性能问题。Deque:采用双端队列的结构,其实主要原理还是一样,只不过加了尾部进出队列的方法。

偏向锁

锁不会主动释放,只有当出现竞争时,才会去检查是否需要释放或者重新偏向锁,主要应用场景适用于基本只有一个线程访问同步代码块,可以无限重复获取偏向锁,大家可以看到其实在竞争偏向锁时需要在暂停当前持有锁的线程的(当然是要在安全点暂停),所以在高并发高竞争的情况下可以考虑关闭偏向锁。

要学好Java并发编程,最重要的还是要理解JMM中并发问题的原理、Volatile+CAS的实现、Synchronized对象锁,几乎里面所有的东西都是围绕这几个东西来实现的。

volatile有两点特性,第一是保证变量对所有线程的可见性,对于普通变量,一个线程对其修改后并不保证会立即写入主存,只有当写入主存之后才会对其他线程可见,而volatile关键字能够保证线程修改完变量立即写回主存,而且每个线程在使用变量前都必须先从主存刷新数据,这样就保证了修改的可见性。第二点时禁止指令的重排序,Java编译器会对无结果依赖的代码指令进行重排序,下面这段伪代码可以说明指令重排序导致的并发问题:

核心原理都是通过volatile+CAS保证操作的线程安全,主要还是得理解上篇文章中线程安全问题的原因,volatile保证了代码里面的value(线程工作内存中的值)与主内存值的一致性,通过CAS的原子性操作比较并修改主存中的值。这句话可能有点绕,举个例子:主存中value值为1线程A、B同时读取到各自的工作内存value值为1线程A、B同时通过CAS指令想要设为2,由于CAS指令的原子性,假设A线程成功,则线程A和主存的值均变为2,这时才开始执行B的CAS指令发现值已经变为2,线程2失败然后线程B想要再次进行CAS操作时,由于volatile的可见性,必定会从主存重新读取value值为2,再次通过CAS指令去修改就能够成功了

轻量级锁&重量级锁

轻量级锁在竞争锁失败时,会通过自旋的方式尝试获取锁,而重量级锁竞争失败后会直接阻塞线程。所以,轻量级锁适用于那些同步代码块执行速度很快的场景,而重量级锁则对应同步代码块执行时间长的场景。

Java类库中有专门的concurrent包实现线程安全,核心类就是AQS(AbstractQueuedSynchronizer),一个抽象队列同步器,相当于一个模板,类库中ReentantLock等都是通过它来实现的。AQS的核心原理:维护一个volatile
state作为锁标志计数,同时维护一个Node链表作为等待节点队列,锁的请求释放流程如下:

图片 3AQS

主要的流程就是这么简单,当然要看代码实现细节包括很多细节:公平锁/非公平锁的区别、超时机制的实现、Node链表的哨兵技巧、Node的状态迁移、共享锁的实现、读写锁高低位的技巧等,其实就是某几行代码的区别,魔鬼在细节里面。Condition的实现和锁本身队列的维护类似,就不多做赘述了。

Java中的线程安全从JMM,到同步机制的实现,整体就有个大致的印象了,下次再分析分析线程安全容器和线程池的实现吧,喜欢的点个赞哟~

public class AtomicInteger: // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField; } catch (Exception ex) { throw new Error; } } private volatile int value; /** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }

图片 4对象锁

接续上篇Java线程安全,这次来撸一撸Java中并发容器的源码。

相关文章

Comment ()
评论是一种美德,说点什么吧,否则我会恨你的。。。