본문 바로가기
Program Language/Kotlin

[Do it! 코틀린 프로그래밍] 11. 코루틴과 동시성 프로그래밍

by SungJe 2021. 5. 21.

11. 코루틴과 동시성 프로그래밍


1. 동시성 프로그래밍

프로그램은 작업 수행 방식에 따라 순서대로 작업을 수행하여 1개의 루틴을 완료한 후 다른 루틴을 실행하는 동기적(synchronous) 방식과 여러 개의 루틴이 선행 작업의 순서나 완료 여부와 상관없이 실행되는 비동기적(asynchronous) 방식으로 나뉜다.
코틀린은 비동기 프로그래밍을 위해 코루틴(Coroutine) 을 서드파티가 아닌 기본으로 제공한다. 코루틴을 사용하면 넌블로킹(Non-blocking)또는 비동기 코드를 쉽게 작성할 수 있다.
우선 동시성 프로그래밍에 사용되는 용어를 정리해보도록 한다.

NOTE✏️

  • 서드파티 라이브러리(Third-party Library): 기본으로 제공되는 표준 라이브러리가 아닌 다른 개발자(제 3자)가 만든 라이브러리를 말한다. 개발에 편의를 주는 플러그인, 프레임워크, 유틸리티 API 등을 제공하고 있다.
  • 코루틴(Coroutine): 여러 개의 루틴(routine)들이 협력(co)한다는 의미로 만들어진 합성어이다. Link

블로킹과 넌블로킹

블로킹 동작

Blocking

입출력 과정이 수행될 때 태스크 A의 코드가 더 이상 진행되지 않고 내부 메모리 영역에서 대기하게 된다. 이런 상황을 코드가 '블로킹'하고 있다고 말한다. 태스크 A가 블로킹하는 동안 우선순위가 낮은 태스크 B가 실행될 수 있다. 우선순위가 높은 태스크 A의 실행이 재개되면 태스크 B는 블로킹하고 태스크 A가 종료되면 다시 재개된다.

넌블로킹 동작

Non-blocking

입출력 요청을 하더라도 운영체제에 의해 EAGAIN과 같은 시그널을 태스크 A가 받아서 실행을 재개할 수 있다. 태스크 A는 다른 루틴을 수행하다가 입출력 완료 시그널을 받은 후 콜백 루틴(Callback Routine) 등을 호출해 이후의 일을 처리할 수 있다. 태스크 A를 수행하는 도중에 태스크 B가 생성 될 수 있는데 이때 태스크 A와 B는 운영체제의 스케줄링 기법에 의해 비 동기적으로 수행될 수 있다.

프로세스와 스레드

프로세스와 스레드의 개념

태스크는 큰 실행 단위인 프로세스와 좀 더 작은 생행 단위인 스레드를 말한다. 프로세스(Process) 는 실행되는 메모리, 스택, 열린 파일 등을 포함하기 때문에 프로세스 간 문맥 교환(Context-Switching)을 할 때 많은 비용이 필요하다. 반면 스레드(Thread) 는 자신의 스택만 독립적으로 갖고 나머지 자원은 스레드 간 공유하므로 문맥 교환 비용이 낮아 프로그래밍에서 많이 사용된다.

Process Thread

NOTE✏️

  • 문맥 교환(Context-Switching): 하나의 프로세스나 스레드가 CPU를 사용하고 있는 상태에서 다른 프로세스나 스레드가 CPU를 사용하기 위해, 이전의 프로세스의 상태(문맥)를 보관하고 새로운 프로세스의 상태를 적재하는 과정을 말한다.

스레드 생성하기

코루틴을 사용하기 앞서 기존 자바에서 사용하던 스레드를 생성하는 방법에 대해 알아본다.

// ① Thread 클래스를 상속받아 구현하기
class SimpleThread: Thread() {
    override fun run() {
        println("Class Threads: ${Thread.currentThread()}")
    }
}

// ② Runnable 인터페이스로부터 run() 메서드 구현하기
class SimpleRunnable: Runnable {
    override fun run() {
        println("Interface Threads, ${Thread.currentThread()}")
    }
}

fun main() {
    val thread = SimpleThread()
    thread.start()

    val runnable = SimpleRunnable()
    val thread1 = Thread(runnable)
    thread1.start()

    // ③ Thread 클래스의 익명 객체로 스레드 생성하기
    object : Thread() {
        override fun run() {
            println("Object Threads: ${Thread.currentThread()}")
        }
    }.start()

    // ④ 람다식으로 스레드 생성하기
    Thread {
        println("Lambda Threads: ${Thread.currentThread()}")
    }.start()
}

스레드 풀 사용하기

애플리케이션의 비즈니스 로직을 설계할 때, 몇 개의 스레드를 먼저 만들어 놓고 필요에 따라 재사용할 수 있다.

import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

...

val myService: ExecutorService = Executors.newFixedThreadPool(8) // 8개의 스레드 생성
var i = 0

while (i < items.size) { // 아주 큰 데이터를 처리할 때
    val item = items[i]
    myService.submit {
        processItem(item) // 여기서 아주 긴 시간 동안 처리하는 경우
    }
    i += 1
}

2. 코루틴의 개념과 사용 방법

스레드와 같은 기법으로 구성하는 넌블로킹 코드는 성능이 뛰어나지만 복잡한 설계로 안전하지 못한 코드를 만들 가능성이 높아진다. 코틀린은 코루틴을 통해 복잡성을 줄이고도 손쉽게 루틴을 만들어 낼 수 있다.

코루틴의 기본 개념

코루틴은 비용이 많이 드는 문맥 교환 없이 해당 루틴을 일시 중단(suspended)하여 비용을 줄일 수 있다. 즉, 운영체제가 스케줄링에 개입하는 과정이 필요하지 않다는 것이다. 또한 일시 중단은 사용자가 제어할 수 있다.

코루틴 문맥(Coroutine Context)

코루틴 문맥은 코루틴을 실행하기 위한 다양한 설정값을 가진 관리 정보로 코루틴 이름, 디스패처, 작업 상세사항, 예외 핸들러 등의 정보를 갖는다.

코루틴 문맥 구성

  • CoroutineName: 코루틴에 이를을 주며 디버깅을 위해 사용된다.
  • Job: 작업 객체를 지정할 수 있으며 취소가능 여부에 따라 SupervisorJob()를 사용한다.
  • CoroutineDispatcher: 적절한 스레드의 작업을 전달한다. 필요에 따라 스레드 풀을 생성할 수 있다.
  • CoroutineExceptionHandler: 코루틴 문맥을 위한 예외처리를 담당한다. 예외가 발생한 코루틴은 상위 코루틴에 전달되어 처리될 수 있다. 예외가 다중으로 발생하면 최초 하나만 처리하고 나머지는 무시된다.

코루틴의 주요 패키지

  • kotlinx.coroutines의 common 패키지

    기능 설명
    launch / async 코루틴 빌더
    Job / Deferred cancellation 지원을 위한 기능
    Dispatchers Default는 백그라운드 코루틴을 위한 것이고
    Main은 Android나 Swing, JavaFx를 위해 사용
    delay / yield 상위 레벨 지연(suspending) 함수
    Channel / Mutex 통신과 동기화를 위한 기능
    coroutineScope /
    supervisorScope
    범위 빌더
    select 표현식 지원
  • kotlinx.coroutines의 core 패키지

    기능 설명
    CommonPool 코루틴 컨텍스트
    produce / actor 코루틴 빌더

코루틴의 패키지에 대한 더 자세한 내용은 다음 웹사이트에서 참조할 수 있다. Link

launch와 async 코루틴 빌더

launch와 async 비교

  • launch: 일단 실행하고 잊어버리는(fire-and-forget) 형태의 코루틴으로 메인 프로그램과 독립되어 실행할 수 있다. 넌블로킹 상태로 즉시 실행하며 블록 내의 실행 결과를 반환하지 않고 Job 객체를 즉시 반환한다. join을 통해 상위 코드가 종료되지 않고 완료를 기다리게 할 수 있다.

  • async: 비동기 호출을 위해 만든 코루틴으로 결과나 예오를 반환한다. 실행 결과는 Deffered<T>를 통해 반환하며 await을 통해 받을 수 있다. await은 작업이 완료될 때까지 기다리게 된다.

시작 시점에 대한 속성

필요한 경우 lanch()나 async()에 인자를 지정해 코루틴에 필요한 속성을 줄 수 있다. start 매개변수는 CoroutineStart를 인자로 받아 시작 방법을 정의할 수 있다. Link

CoroutineStart

  • DEFAULT: 즉시 시작(해당 문맥에 따라 즉시 스케줄링됨)
  • LAZY: 코루틴을 느리게 시작(처음에는 중단된 상태이며 start()나 await() 등으로 시작됨)
  • ATOMIC: DEFAULT와 비슷하나 코루틴을 실행전에는 취소 불가능
  • UNDISPATCHED: 현재 스레드에서 즉시 시작되어 첫 지연함수가 발생되면 디스패치됨
/* 시작 시점 늦추기 */
val job = async(start = CoroutineStart.LAZY) { doWork1() }
...
job.start() // 실제 시작 시점(또는 job.await() 함수로 시작됨)

launch와 async로 코루틴 빌더 생성하기

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

suspend fun doWork1(): String {
    delay(1000L) // 1초 지연
    return "Work1"
}

suspend fun doWork2(): String {
    delay(3000L) // 3초 지연
    return "Work2"
}

/* launch 코루틴 빌더 생성 */
private fun worksInSerial(): Job {
    // 비동기 코드지만 지연을 통해 순차적으로 실행
    val job = GlobalScope.launch {
        val one = doWork1()
        val two = doWork2()
        println("Kotlin One: $one")
        println("Kotlin Two: $two")
    }
    return job
}

/* async 코루틴 빌더 생성 */
private fun worksInParallel(): Job {
    // 완전히 병행 실행, Deferred<T>를 통해 결과값을 반환
    val one = GlobalScope.async { doWork1() }
    val two = GlobalScope.async { doWork2() }

    val job = GlobalScope.launch {
        val combined = one.await() + "_" + two.await()
        println("Kotlin Combined: $combined")
    }
    return job
}

fun main() = runBlocking {
    val time1 = measureTimeMillis {
        val job = worksInSerial()
        job.join()
    }
    println("Time: $time1") // 소요시간 약 4초

    val time2 = measureTimeMillis {
        val job = worksInParallel()
        job.join()
    }
    println("Time: $time2") // 소요시간 약 3초
}

일시 중단(suspended) 함수

코루틴에서 사용되는 함수는 suspend()로 선언된 지연 함수여야 코루틴 기능을 사용할 수 있다. suspend로 표기함으로써 실행이 일시 중단(suspended)될 수 있으며 필요한 경우 다시 재개(resume)할 수 있다.

/* kotlinx.coroutines-core의 DelayKt.class의 일부 */
public suspend fun delay(timeMillis: kotlin.Long): kotlin.Unit { /* compiled code */ }


/* Hello World! 코루틴 */
fun main() { // main 스레드의 문맥
    GlobalScope.launch { // 새로운 코루틴을 백그라운드에 실행
        delay(1000L) // 1초 넌블로킹 지연
        println("World!")
        doSomething() // 코루틴 또는 또 다른 지연함수에서만 사용가능
    }
    println("Hello, ") // 코루틴이 지연되는 동안 main 스레드는 계속 실행
    Thread.sleep(2000L) // main 스레드가 JVM에서 종료되지 않도록 2초 대기기}
}

suspend fun doSomething() { // 사용자 suspend 함수
    println("Do Something")
}

runBlocking()과 join() 함수

  • runBlocking(): 새로운 코루틴을 실행하고 완료되기 전까지 현재 스레드를 블로킹한다. 블로킹 모드로 동작시키면 함수 내부의 코루틴이 모두 작동할 때까지 메인 스레드는 종료되지 않고 대기한다.

  • join() 함수: launch의 반환 값인 Job 객체의 join() 함수를 사용하여 명시적으로 코루틴의 작업이 완료되는 것을 기다릴 수 있다.

/* runBlocking() 함수 사용하기 */
fun main() = runBlocking { // 블로킹 모드로 동작
    val job = launch { // 백그라운드로 코루틴 실행
        delay(1000L)
        println("World!")
    }
    println("Hello")
    // delay(2000L) // delay() 함수를 사용하지 않아도 코루틴을 기다림
    job.join() // 명시적으로 코루틴 완료를 기다림
}

Job 객체

Job은 코루틴의 생명주기를 관리하며 생성된 코루틴 작업들은 부모-자식과 같은 관계를 가질 수 있다. 보동 Job() 팩토리 함수나 launch에 의해 생성된다. 자세한 내용은 공식 문서를 참조한다. Link

  • 규칙

    1. 부모가 취소(cancel)되거나 실행 실패하면 하위 자식들은 모두 취소된다.
    2. 자식의 실패는 그 부모에 전달되며 부모 또한 실패한다. (다른 모든 자식도 취소됨)
  • Job의 상태

    Process Thread
    상태 isActive isCompleted isCancelled
    New false false false
    Active (기본값 상태) true false false
    Completing true false false
    Cancelling false false true
    Cancelled (최종 상태) false true true
    Completed (최종 상태) false true false
fun main() {
    val job = GlobalScope.launch { // Job 객체의 반환
        delay(1000L)
        println("World!")
    }
    println("Hello, ")
    println("job.isActive: ${job.isActive}, job.isCompleted: ${job.isCompleted}") // true, false
    Thread.sleep(2000L)
    println("job.isActive: ${job.isActive}, job.isCompleted: ${job.isCompleted}") // false, true
}

코루틴의 스코프

GlobalScope

  • 독립형(Standalone) 코루틴을 구성, 생명주기는 프로그램 전체(top-level)에 해당하는 범위를 가진다. 즉, main의 생명 주기가 끝나면 같이 종료된다.
  • 범용 목적으로는 사용되나 launch나 async와 같이 세부 설계에서의 사용은 권장하지 않는다.

CoroutineScope

  • 특정 목적의 디스패처를 지정한 범위를 블록으로 구성할 수 있다.
  • 모든 코루틴 빌더는 CoroutineScope의 인스턴스를 갖는다.
  • launch(Dispatchers.옵션인자) { ... }와 같이 디스패처의 스케줄러를 지정할 수 있으며, 인자가 없는 경우에는 CorutineScope에서 상위 문맥이 상속되어 결정된다.
fun main() = runBlocking {
    val request = launch {
        GlobalScope.launch { // 프로그램 전역으로 독립적인 수행 (부모-자식관계 없음)
            println("job1: before suspend function")
            delay(1000L)
            println("job1: after suspend function") // 작업 취소에 영향을 받지 않음
        }

        launch { // 부모의 문맥을 상속 (상위 launch의 자식)
//        launch(Dispatchers.Default) { // 부모의 문맥을 상속 (상위 launch의 자식), 분리된 작업업
//       CoroutineScope(Dispatchers.Default).launch { // 새로운 스코프가 구성되 request와 무관
            delay(100L)
            println("job2: before suspend function")
            delay(1000L)
            println("job2: after suspend function") // request(부모)가 취소되면 수행되지 않음
        }
    }
    delay(500L)
    request.cancel()
    delay(1000L)
}

3. 코루틴 동작 제어하기

코루틴을 사용하다 보면 특정 문맥에서 실행되면서 반복하거나 작업의 취소, 실행의 보장이나 시간 만료의 처리등의 다양한 동작을 제어해야 한다. 코루틴 문맥과 코루틴의 동작을 제어하는 방법에 대해서 정리해 본다.

코루틴 디스패처(Dispatcher)

코루틴은 항상 특정 문맥에서 실행된다. 디스패처는 코루틴을 어떤 문맥에서 실행할지 결정한다. Link

CoroutineDispatcher

  • Dispatchers.Default: 기본 문맥인 CommonPool에서 실행되기 때문에 새로운 스레드를 생성하지 않고 기존에 있는 것을 이용한다. 따라서 연산 중심의 코드에 적합하다.
  • Dispatchers.IO: 입출력에 적합한 공유 풀로써, 블로킹 동작이 많은 파일 or 소켓 I/O 처리에 사용하면 좋다.
  • Dispatchers.Uncofined: 첫번째 중단점을 만날때까지만 호출자 스레드에서 실행된다. 중단점 이후에 재개될 때 서스펜드 함수가 실행된 스레드에서 수행된다.
  • newSingleThreadContext: 사용자가 직접 새 스레드 풀을 만들 수 있다. 새 스레드를 만들기 때문에 비용이 많이 들고 더 이상 필요하지 않으면 해제하거나 종료시켜야 한다.
fun main() = runBlocking {
    val jobs = arrayListOf<Job>()

    jobs += launch(Dispatchers.Unconfined) { // 메인 스레드에서 작업
        println("Unconfined: \t ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // 부모의 문맥(현재는 runBlocking의 문맥)
        println("coroutineContext: \t ${Thread.currentThread().name}")
    }
    jobs += launch(Dispatchers.Default) { // 디스패처의 기본값
        println("Default: \t ${Thread.currentThread().name}")
    }
    jobs += launch(Dispatchers.IO) { // 입출력 중심의 문맥
        println("IO: \t ${Thread.currentThread().name}")
    }
    jobs += launch { // 아무런 인자가 없을 때
        println("main runBlocking: \t ${Thread.currentThread().name}")
    }
    jobs += launch(newSingleThreadContext("MyThread")) { // 새 스레드를 생성
        println("MyThread: \t ${Thread.currentThread().name}")
    }

    jobs.forEach { it.join() }
}

기본 동작 제어하기

repeat() 함수를 사용한 반복 동작하기

fun main() = runBlocking {
    GlobalScope.launch { // 메인 스레드와 같은 생명주기
        repeat(1000) { i ->
            println("I'm sleeping $i...")// 1.3초 뒤 종료되므로 약 3번 출력
            delay(500L)
        }
    }
    delay(1300L)
}

코루틴 작업 취소하기

fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            println("I'm sleeping $i...")// 1.3초 뒤 작업이 취소되므로 약 3번 출력
            delay(500L)
        }
    }
    delay(1300L)
    job.cancel()
}

finally의 실행 보장

일반적인 finally 블록에서 지연 함수를 사용하려고 하면 코루틴이 취소되므로 사용할 수 없다. 만일 finally 블록에서 시간이 걸리는 작업이나 지연 함수가 사용될 경우 실행을 보장하기 위해 NonCancellable 문맥에서 작동하도록 해야 한다.

fun main() = runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) { // finally 블록의 완전한 실행을 보장
                println("I'm running finally")
                delay(1000L)
                println("Non-Cancellable")
            }
            println("Bye!")
        }
    }
    delay(1300L)
    job.cancelAndJoin()
    println("main: Quit!")
}

코루틴의 시간 만료

fun main() = runBlocking {
    try {
        withTimeout(1300L) { // timeout 발생시 예외 발생
            println("---* withTimeout *---")
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        }
    } catch (e: TimeoutCancellationException) {
        println("timed out with $e")
    }

    val result = withTimeoutOrNull(1300L) { // timeout 발생시 null을 반환
        println("---* withTimeoutOrNull *---")
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
        "Done"
    }
    println("Result: $result")
}

채널의 동작

채널(Channel)은 자료를 서로 주고받기 위해 약속된 일종의 통로 역할을 한다. 코루틴의 채널은 넌블로킹 전송 개념으로 사용되고 있다. 채널의 구현은 SendChannelReceiveChannel 인터페이스를 이용하며, 값들의 스트림을 전송하는 방법을 제공한다. 실제 전송에는 send()와 receive() 지연 함수를 사용한다.

/* 구조 */
public abstract suspend fun send(element: E): kotlin.Unit // SendChannel 인터페이스
public abstract suspend fun receive(): E // ReceiveChannel 인터페이스

/* send()와 receive() 함수로 채널 사용하기 */
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        // 여기에 다량의 CPU 연산 작업이나 비동기 로직을 구현할 수 있음
        for (x in 1..5) channel.send(x * x)
        channel.close() // 모두 보내고 닫기 명시
    }
    for (element in channel) println(element)
    println("Done!")
}

확장된 채널 자료형 Link

  • RendezvousChannel: 내부에 버퍼를 두지 않는 채널이다. send 동작은 receive가 즉각 가져가기 전까지 일시 중단되고 receive 동작도 send하기 전까지 일시 중단된다.
  • ArrayChannel: 특정한 크기로 고정된 버퍼를 가진 채널이다. 버퍼가 꽉 차기 전까지 send 동작이 지연되지 않고 receive 동작도 비어 있기 전까지 지연되지 않는다.
  • LinkedListChannel: 링크드 리스트 형태로 구성되어 버퍼의 크기에 제한이 없어 send 동작이 일시 중단 상태를 갖지 않는다. 다만 지속적으로 send할 경우 메모리 부족 오류가 발생할 수 있다.
  • ConflatedChannel: 버퍼는 하나의 요소만 허용하기 때문에 모든 send 동작은 일시 지연되지 않는다. 다만 기존의 값을 덮어 씌운다.

채널의 활용

produce 생산자 소비자 패턴

produce는 채널이 붙어 있는 코루틴으로 생산자 측면의 코드를 쉽게 구성할 수 있다. 채널에 값을 보내면 생산자이고 소비자는 consumeEach 함수를 확장해 for문을 대신해서 저장된 요소를 소비한다.

// 생산자를 위한 함수 생성(확장 함수 형태)
fun CoroutineScope.producer(): ReceiveChannel<Int> = produce {
    var total: Int = 0
    for (x in 1..5) {
        total += x
        send(total)
    }
}

// 생산자를 위한 함수 생성(일반 함수 형태)
fun producer2(coroutineScope: CoroutineScope): ReceiveChannel<Int> =
    coroutineScope.produce(Dispatchers.Default) {
        var total: Int = 0
        for (x in 1..5) {
            total += x
            send(total)
        }
    }

fun main() = runBlocking {
    val result = producer() // 값의 생산
    result.consumeEach { print("$it ") } // 소비자 루틴 구성
    println()

    val result2 = producer2(this)
    result2.consumeEach { print("$it ") }
}

버퍼를 가진 채널

채널에는 기본 버퍼가 없으므로 경우에 따라 지연이 발생한다. 하지만 채널에 버퍼 크기를 주면 지연 없이 여러 개의 요소를 보낼 수 있게 된다. Channel() 생성자에 capacity 매개변수로 버퍼의 크기를 지정할 수 있다.

fun main() = runBlocking {
    val channel = Channel<Int>(capacity = 3) // 버퍼의 크기를 3으로 지정
    val sender = launch { // 송신자 측
        repeat(10) {
            println("Sending $it")
            channel.send(it) // 지속으로 송신하다가 버퍼가 꽉 차면 일시 지연
        }
    }

    delay(1000L) // 1초 대기
    sender.cancel() // 송신자의 작업을 취소
}

select 표현식

select를 사용하면 표현식을 통해 다양한 채널에서 무언가 응답해야 할 때 각 채널의 실행 시간에 따라 다른 결과를 받을 할 수 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import java.util.Random

fun main() = runBlocking {
    val routine1 = GlobalScope.produce {
        delay(Random().nextInt(1000).toLong())
        send("A")
    }
    val routine2 = GlobalScope.produce {
        delay(Random().nextInt(1000).toLong())
        send("B")
    }
    val result = select<String> { // 먼저 수행된 코루틴의 결과를 받는다.
        routine1.onReceive { it }
        routine2.onReceive { it }
    }
    println("Result was $result")
}

4. 공유 데이터 문제 알아보기

병행 프로그래밍에서는 전역 변수 같은 변경 가능한 공유 자원에 접근할 때 값의 무결성을 보장할 수 있는 방법이 필요하다. 코틀린에서 공유 자원의 무결성을 보장하는 기법에 대해서 알아본다.

동기화 기법

synchronized 메서드와 블록

코틀린에서는 @Synchronized 애노테이션 표기법을 사용하여 특정 스레드가 이미 자원을 사용하는 중이면 나머지 스레드의 접근을 막아 데이터의 안정성을 보장한다. @Synchronized 애노테이션 표기법은 내부적으로 자바의 synchronized를 사용하는 것과 같다.

@Synchronized
fun sysnchronizedMethod() {
    println("inside: ${Thread.currentThread()"}
}

자바의 volatile Link

캐시에 적재된 변수는 여러 스레드로부터 값을 읽거나 쓰면 데이터가 일치하지 않고 깨진다. 이것을 방지하기 위해 데이터를 캐시에 적재하지 않도록 volatile 키워드와 함께 변수를 선언할 수 있다. 또 코드가 최적화되면서 순서가 바뀌는 경우도 방지할 수 있어 프로그래머가 의도한 순서대로 읽기 및 쓰기를 수행한다. 단, 여러 스레드에서 공유 변수에 대한 읽기와 쓰기 연산이 있으면 원자성(Atomicity)을 보장하지 않는다.

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

@Volatile
var counter = 0

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 1000 // 실행할 코루틴의 수
    val k = 1000 // 각 코루틴을 반복할 수
    val time = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

원자 변수(Atomic Variable) Link

원자 변수란 특정 변수의 증가나 감소, 더하기나 빼기가 단일 기계어 명령으로 수행되는 것을 말한다. 해당 연산이 수행되는 도중에는 누구도 방해하지 못하기 때문에 값의 무결성을 보장할 수 있다.

import kotlinx.coroutines.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.system.measureTimeMillis

//var counter = 0 // 병행처리중 문제가 발생할 수 있는 변수
val counter = AtomicInteger(0) // 원자 변수로 초기화

suspend fun massiveRun(action: suspend () -> Unit) {...}

fun main() = runBlocking {
    massiveRun {
//        counter++ // 증가 연산 시 무결성에 문제가 발생할 수 있음
        counter.incrementAndGet() // 원자 변수의 멤버 메서드를 사용해 증가
    }
//    println("Counter = $counter")
    println("Counter = ${counter.get()}") // 값 읽기
}

스레드 가두기 Link

특정 문맥에서 작동하도록 단일 스레드에 가두는(Thread Confinement) 방법이 있다.

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

// 단일 스레드 문맥을 선언
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

suspend fun massiveRun(action: suspend () -> Unit) {...}

fun main() = runBlocking {
    withContext(counterContext) { // 단일 스레드에 가둠
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

스레드는 문맥상 counter를 독립적으로 가지며 처리하기 때문에 공유 변수 counter의 연산의 무결성을 보장할 수 있다. 다만 공간이 필요하므로 조금 느려진다는 점이 있다.

상호 배제(Mutual Exclusion) Link

상호 배제는 코드가 임계 구역(Critical Section)에 있는 경우 절대로 동시성이 일어나지 않게 하고 하나의 루틴만 접근하는 것을 보장한다. 코틀린의 코루틴에서는 Mutex의 lock과 unlock을 사용해 임계구역을 만들 수 있다.

Mutual Exclusion
/* 기본 구조 */
val mutex = Muted()
...
mutex.lock()
... // 보호하고자 하는 임계 구역 코드
mutex.unlock()
...

/* 상호 배제 사용하기 */
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.measureTimeMillis

val mutex = Mutex()
var counter = 0

suspend fun massiveRun(action: suspend () -> Unti) {...}

fun main() = runBlocking {
    massiveRun {
        mutex.withLock { // 임계 구역 코드
            counter++
        }
    }
    println("Counter = $counter")
}

actor 코루틴 빌더 Link

코루틴의 결합으로 만든 actor는 코루틴과 채널에서 통신하거나 상태를 관리한다. 코틀린에서는 들어오는 메일 박스 기능만 한다고 볼 수 있다. actor는 한 번에 1개의 메시지만 처리하는 것을 보장하고 순차적 실행으로 각 상태는 특정 actor 코루틴에 한정되므로 공유된 변경 가능한 상태에도 문제에 대한 해결책으로 작동한다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlin.system.measureTimeMillis

suspend fun massiveRun(action: suspend () -> Unit) {...}

// counter actor의 메시지 타입
sealed class CounterMsg {
    object IncCounter : CounterMsg() // counter를 증가하기 위한 단방향 메시지
    class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 응답 채널의 요청
}

// 새로운 counter actor를 위한 함수
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor의 상태로 공유되지 않음
    for (msg in channel) { // 들어오는 메시지 처리
        when(msg) {
            is CounterMsg.IncCounter -> counter++
            is CounterMsg.GetCounter -> msg.response.complete(counter)
        }
    }
}

fun main() = runBlocking<Unit> {
    val counter = counterActor() // actor의 생성
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(CounterMsg.IncCounter)
        }
    }
    // actor의 counter 값을 얻기 위해 요청
    val response = CompletableDeferred<Int>()
    counter.send(CounterMsg.GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // actor의 중단
}