聊一聊Kotlin中的线程安全

Posted by 卢小胖 on 2021-06-11
Estimated Reading Time 14 Minutes
Words 3.1k In Total
Viewed Times

以上代码已上传Github,球球老哥们的Star

前言

在实际开发中有许多地方需要保证线程安全,保证互斥性。加锁是一个很好的实现方式,在kotlin中有哪些方式可以实现线程安全的效果?

为什么需要处理线程安全问题

线程安全问题由全局变量和静态变量引起,多个线程同时对统同一对象进行写操作,就可能发生每个线程读取到的值不一样。简而言之,如果在线程中不对变量进行写操作,值进行读操作,就可以避免线程安全。

加锁

锁可以分为实例锁和全局锁,通用的方法有Synchronized关键字和Lock方法进行加锁

  • 实例锁:对某一个对象加锁,如果该对象的类的单例类,将同时也和全局锁一样
  • 全局锁:对某一类加锁,该类的所有对象都会加锁,持有类锁的线程将会同时持有全部对象的锁

1. Synchronized

Synchronized在java中是一个关键字,在kotlin中是一个注解类


/**
* Marks the JVM method generated from the annotated function as `synchronized`, meaning that the method
* will be protected from concurrent execution by multiple threads by the monitor of the instance (or,
* for static methods, the class) on which the method is defined.
*/
@Target(FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Synchronized

我们复习一下java中的Synchronized关键字:

Synchronized关键字可以保证加锁对象在多线程环境下同时只有一个线程能够执行,在当前线程释放对象锁之后其他线程才能获取。synchronized同时能够保证共享变量的可见性。

基本规则:

  • 对于普通方法,锁的是当前对象
  • 对于静态方法,锁的是class类
  • 对于代码块加锁,锁的是代码块内对象

一个线程访问对象的synchronized区域时,其他线程访问该对象synchronized区域将会阻塞,但是其他线程可以访问该对象的非同步区域。

1.1 java中的synchronized使用

略略略

1.2 kotlin中的synchronized使用

对方法加锁


对方法加锁的时候只需要加上@Synchronized 注解就Ok了,比如我们定义了以下方法:

@Synchronized
private fun postResult(s: String){
println("${System.currentTimeMillis()}$s" )
sleep(5000)
}

在不同的线程中调用该方法:

fun testFunSync() {
Thread{
postResult("first thread")
}.start()

Thread{
postResult("second thread")
}.start()
}

第一个线程调用postResult方法,输出当前时间后阻塞5s。5s之后第二个线程才能获取postResult 的锁。

输出结果:

1622794297695 ��first thread
1622794302698 ��second thread

输出结果和我们预想的一样,相差5s

如果我们将第一个线程进行阻塞呢,此时锁的获取情况是什么样的?

fun testFunSync() {
Thread{
postResult("first thread")
//我们在此线程阻塞10s,看下结果是否一样
sleep(10000)
}.start()

Thread{
postResult("second thread")
}.start()
}
1622794450105 ��first thread
1622794455107 ��second thread

输出结果依旧是一样的,first线程在执行玩postResult之后就会释放锁,后续的操作不会影响second线程获取postResult的锁


对类加锁

首先我们创建一个Result类:

class Result {

var s : String = ""

public fun printResult(){
println("${System.currentTimeMillis()}$s" )
Thread.sleep(5000)
}

}

调用

fun testClassSync() {
val result= Result()


Thread{
synchronized(result){
result.s = currentThread().name
result.printResult()
}
}.apply {
name = "first thread"
start()
}

Thread{
synchronized(result){
result.s = currentThread().name
result.printResult()
}
}.apply {
name = "second thread"
start()
}

}

输出结果:

1622795509253 ��first thread
1622795514269 ��second thread
//时间相差5s,说明锁生效了

以上就是对Result类对象加锁,如果Result类有多个实例对象怎么办?我们稍微修改下调用方法:

fun testClassSync() {
val result= Result()

//Result类的多个实例对象
val result2= Result()

Thread{
synchronized(result){
result.s = currentThread().name
result.printResult()
}
}.apply {
name = "first thread"
start()
}

Thread{
synchronized(result2){
result2.s = currentThread().name
result2.printResult()
}
}.apply {
name = "second thread"
start()
}

}

创建了多个Result类对象,first线程对A对象加锁,并不会影响second线程获取B对象锁,输出结果:

1622795623480 ��first thread
1622795623480 ��second thread

如果我们对Result类加锁,那所有的对象将会共享同一锁:

fun testClassSync() {
val result= Result()
val result2= Result()

//对Result类加锁
Thread{
synchronized(Result::class.java){
result.s = currentThread().name
result.printResult()
}
}.apply {
name = "first thread"
start()
}

Thread{
synchronized(Result::class.java){
result2.s = currentThread().name
result2.printResult()
}
}.apply {
name = "second thread"
start()
}

}

输出结果:

1622796124320 ��first thread
1622796129336 ��second thread

尽管first线程和second线程调用的是不同对象,但是first线程对Resutl类加上了类锁,second线程只能乖乖等着。
输出结果表示:对xx::class.java加上类锁的时候,该类所有的对象将会共享同一锁。


2.Lock

lock是一个接口,我们常用的实现类有ReentrantLock,意思是可重入锁。

我们定义一个全局变量,在两个线程中同时进行写操作,同时start两个线程

fun testLockSync() {

var count = 0

val thread1 = Thread{
for (i in 0..1000){
count += i
}
println("${Thread.currentThread().name} : count:${count}")
}

val thread2 = Thread{
for (i in 0..1000){
count += i
}
println("${Thread.currentThread().name} : count:${count}")
}

//同时开启两个线程
thread1.start()
thread2.start()
}

输出结果:会发现每次输出的结果都不一样

Thread-1 : count:505253
Thread-2 : count:1001000
---
Thread-2 : count:1001000
Thread-1 : count:1001000
---
Thread-2 : count:822155
Thread-1 : count:822155

使用ReentrantLock保证线程安全:


fun testLockSync() {

val lock = ReentrantLock()
var count = 0

val thread1 = Thread{
lock.lock()

for (i in 0..1000){
count += i
}
println("${Thread.currentThread().name} : count:${count}")

lock.unlock()
}

val thread2 = Thread{
lock.lock()

for (i in 0..1000){
count += i
}
println("${Thread.currentThread().name} : count:${count}")

lock.unlock()
}

thread1.start()
thread2.start()
}

输出结果和执行顺序每次都是一样:

Thread-1 : count:500500
Thread-2 : count:1001000

Lock常用方法

  • lock() :获取锁,获取成功设置当前线程count++,如果其他线程占有锁,则当前线程不可用,等待。
  • tryLock() : 获取锁,如果锁不可用,则此方法将立即返回,线程不阻塞,可设置等待时间。
  • unLock() : 尝试释放锁,当前thread count–,如果count为0,则释放锁。

2.Object

在kotlin中object有很多作用,可以实现对象表达式、对象声明等

对象表达式就是创建一个Object类,实现单例模式:


object Singleton {

}

在idea中查看其反编译java代码:

public final class Singleton {
public static final Singleton INSTANCE;

private Singleton() {
}

static {
Singleton var0 = new Singleton();
INSTANCE = var0;
}
}

可以看到其属于饿汉模式:
优点是在类加载的时候创建,不会存在线程安全问题,效率高。缺点就是浪费资源。具体可以参考此链接

但是在Object类中定义的方法并不是线程安全的

我们定义如下方法:

object Singleton {

fun printS(){
println("${System.currentTimeMillis()} : ${Thread.currentThread().name}")
sleep(1000)
println("${System.currentTimeMillis()} : ${Thread.currentThread().name}")
}
}

调用:

fun testObjectSync() {

val thread1 = thread(start = false,name = "first thread"){
Singleton.printS()
}

val thread2 = thread(start = false,name = "second thread"){
Singleton.printS()
}

thread1.start()
thread2.start()
}

输出:

1623036691322 : first thread
1623036691322 : second thread
1623036692329 : second thread
1623036692329 : first thread

我们发现两个线程能够同时获取到对象,输出结果几乎是同一时间,并没有起到sleep效果。

还是需要加锁@Synchronized才能实现线程安全,如下:

object Singleton {

@Synchronized
fun printS(){
println("${System.currentTimeMillis()} : ${Thread.currentThread().name}")
sleep(1000)
println("${System.currentTimeMillis()} : ${Thread.currentThread().name}")
}

}


输出:
1623036783474 : first thread
1623036784474 : first thread
1623036784474 : second thread
1623036785476 : second thread

3.by lazy实现

Object虽然非常简单是实现单例模式,但是Objcet不能初始化参数,我们可以利用by lazy 的延迟加载属性创建带参数的单例类:

class SomeSingleton(s:String) {

companion object{

private var s = ""

private val instance : SomeSingleton by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
SomeSingleton(s)
}

fun getInstances(string: String):SomeSingleton{
this.s = string
return instance
}
}

fun printS(){
println("${System.currentTimeMillis()} : ${Thread.currentThread().name} ------- s is:${s}")
Thread.sleep(5000)
}

}

注意这里的by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED)

延迟属性的在初始化的过程分多钟情况,首先是默认的SYNCHRONIZED,上锁为了保证只有一条线程可去初始化lazy属性。也就是说同时多线程进行访问该延迟属性时,如果没有初始化好,其他线程将无法访问。

延迟加载上锁实现源码:

private class SynchronizedLazyImpl<out T>(initializer: () -> T, lock: Any? = null) : Lazy<T>, Serializable {
private var initializer: (() -> T)? = initializer
@Volatile private var _value: Any? = UNINITIALIZED_VALUE
// final field is required to enable safe publication of constructed instance
private val lock = lock ?: this

override val value: T
get() {
val _v1 = _value
if (_v1 !== UNINITIALIZED_VALUE) {
@Suppress("UNCHECKED_CAST")
return _v1 as T
}

return synchronized(lock) {
val _v2 = _value
if (_v2 !== UNINITIALIZED_VALUE) {
@Suppress("UNCHECKED_CAST") (_v2 as T)
} else {
val typedValue = initializer!!()
_value = typedValue
initializer = null
typedValue
}
}
}

override fun isInitialized(): Boolean = _value !== UNINITIALIZED_VALUE

override fun toString(): String = if (isInitialized()) value.toString() else "Lazy value not initialized yet."

private fun writeReplace(): Any = InitializedLazyImpl(value)
}

同样SomeSingleton中的方法并不是线程安全的,同样需要加上@Synchorized,这里就不过多叙述了。

4.协程中的线程安全

协程中提供了Mutex来保证互斥,可以看做是Synchorinzed和Lock的替代品,还有withLock 扩展函数,可以⽅便替代常⽤的:

mutex.lock()
try {
//do something
}finally {
mutex.unlock()
}

替换为:

mutex.withLock {
//do something
}

具体源码:

@OptIn(ExperimentalContracts::class)
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
}

lock(owner)
try {
return action()
} finally {
unlock(owner)
}
}

我们看一下具体的使用:

suspend fun testMutex() {
var count = 0

val job1 = CoroutineScope(Dispatchers.IO).launch{
repeat(100){
count ++
//delay 1ms是为了比避免执行太快
delay(1)
}
println("count1:${count}")
}


val job2 = CoroutineScope(Dispatchers.IO).launch{
repeat(100){
count ++
delay(1)
}
println("count2:${count}")
}

job1.join()
job2.join()
}

我们多次运行看下结果,发现每次输出都不一样:

count2:196
count1:196

我们加上Mutex试一下:注意,对于多个协程来说用的是同一个Mutex


suspend fun testMutex() {
var count = 0

//注意:对于多个协程来说用的是同一个Mutex
val mutex = Mutex()

val job1 = CoroutineScope(Dispatchers.IO).launch{

mutex.withLock(count) {
repeat(100) {
count++
delay(1)
}
}

println("count1:${count}")
}


val job2 = CoroutineScope(Dispatchers.IO).launch{

mutex.withLock(count) {
repeat(100) {
count++
delay(1)
}
}

println("count2:${count}")
}

job1.join()
job2.join()
}

输出结果:几乎同时开启两个协程,去竞争count的锁,job1和job2谁先拿到count的锁几率是相同的

count2:100
count1:200

如果我们只在一个协程中执行mutex,不会影响到另一个协程对count的读取。

谨慎在协程中使用@Synchronized

总所周知协程只是线程的封装框架,那么Synchronized配合协程使用有效吗?

/**
* 测试协程中使用Synchronzied
*/
fun testCoroutineWithSync() = runBlocking{
repeat(3){
launch(Dispatchers.IO){
doSomething()
}
}
}

@Synchronized
suspend fun doSomething(){

println("currentThread:${Thread.currentThread().name}, time:${System.currentTimeMillis()}, start")
delay(1000)
println("currentThread:${Thread.currentThread().name}, time:${System.currentTimeMillis()}, end")

}

我们利用@Synchronized标记suspend函数,理想情况应该是3次协程依次执行,但是实际情况是:

currentThread:DefaultDispatcher-worker-1, time:1623380113511, start
currentThread:DefaultDispatcher-worker-2, time:1623380113516, start
currentThread:DefaultDispatcher-worker-3, time:1623380113516, start
currentThread:DefaultDispatcher-worker-3, time:1623380114521, end
currentThread:DefaultDispatcher-worker-2, time:1623380114521, end
currentThread:DefaultDispatcher-worker-1, time:1623380114521, end

输出结果并没有按照我们预想的一样,@Synchronized并没有起到加锁的效果。

即使我们不适用@Synchronized标记,我们修改一下:

suspend fun doSomething2(){

synchronized(Any()){
println("currentThread:${Thread.currentThread().name}, time:${System.currentTimeMillis()}, start")

//模拟某些阻塞操作
val time = measureTimeMillis {
var count = 0
repeat(1000000){
count *= count
}
}

println("currentThread:${Thread.currentThread().name}, time:${System.currentTimeMillis()}, end")
}
}

输出结果依旧是一样的,并没有起到加锁效果:

currentThread:DefaultDispatcher-worker-1, time:1623381184155, start
currentThread:DefaultDispatcher-worker-3, time:1623381184157, start
currentThread:DefaultDispatcher-worker-2, time:1623381184157, start
currentThread:DefaultDispatcher-worker-3, time:1623381184167, end
currentThread:DefaultDispatcher-worker-2, time:1623381184167, end
currentThread:DefaultDispatcher-worker-1, time:1623381184167, end

5.总结

Kotlin协程中Mutex还是相对好用的,需要注意的是suspend函数中不要使用@synchronized。陆陆续续写了好几天,n脑子一片空白,以上应该还有不严谨和不完善的地方,欢迎大家补充。以上代码已上传Github,球球老哥们的Star和点赞