채널 & 동시성 도구
Channel — 코루틴 간 통신
Channel은 코루틴 간에 데이터를 주고받는 파이프입니다. send로 보내고 receive로 받습니다.
val channel = Channel<Int>()
// 생산자 코루틴
launch {
for (i in 1..5) {
println("전송: $i")
channel.send(i)
}
channel.close()
}
// 소비자 코루틴
launch {
for (value in channel) { // close() 전까지 수신
println("수신: $value")
}
}채널 타입
// RENDEZVOUS (기본) — 버퍼 없음, 송신자와 수신자가 동시에 만나야 함
val rendezvous = Channel<Int>()
// BUFFERED — 고정 버퍼 (기본 64)
val buffered = Channel<Int>(Channel.BUFFERED)
val sized = Channel<Int>(10) // 버퍼 10개
// UNLIMITED — 무제한 버퍼 (OOM 주의)
val unlimited = Channel<Int>(Channel.UNLIMITED)
// CONFLATED — 최신값 하나만 유지
val conflated = Channel<Int>(Channel.CONFLATED)버퍼 동작
val channel = Channel<Int>(2) // 버퍼 2개
launch {
channel.send(1) // 버퍼에 저장
channel.send(2) // 버퍼에 저장
channel.send(3) // 버퍼 가득 — 수신자 기다림
println("전송 완료")
}
delay(100)
println(channel.receive()) // 1
println(channel.receive()) // 2
println(channel.receive()) // 3trySend / tryReceive — 비블로킹
val channel = Channel<Int>(5)
// 블로킹 없이 시도
val result = channel.trySend(42)
if (result.isSuccess) println("전송 성공")
if (result.isFailure) println("버퍼 가득")
val received = channel.tryReceive()
if (received.isSuccess) println("수신: ${received.getOrNull()}")
if (received.isFailure) println("비어있음")produce / consumeEach
// produce — 채널을 생산하는 코루틴 빌더
val squares = produce {
for (i in 1..5) {
send(i * i)
}
}
squares.consumeEach { println(it) }
// 1, 4, 9, 16, 25// 파이프라인 패턴
fun CoroutineScope.numbers() = produce {
var n = 1
while (true) send(n++)
}
fun CoroutineScope.filter(input: ReceiveChannel<Int>, prime: Int) = produce {
for (n in input) {
if (n % prime != 0) send(n)
}
}
// 에라토스테네스의 체
runBlocking {
var current = numbers()
repeat(10) {
val prime = current.receive()
println("소수: $prime")
current = filter(current, prime)
}
}select — 여러 채널 동시 대기
먼저 준비된 채널에서 수신합니다.
val ch1 = Channel<String>()
val ch2 = Channel<String>()
launch { delay(100); ch1.send("ch1") }
launch { delay(50); ch2.send("ch2") }
val result = select {
ch1.onReceive { "ch1에서: $it" }
ch2.onReceive { "ch2에서: $it" }
}
println(result) // "ch2에서: ch2" — 먼저 도착한 ch2Mutex — 상호 배제
공유 자원을 동시에 하나의 코루틴만 접근하게 보호합니다.
val mutex = Mutex()
var counter = 0
// 잘못된 방법 — 경쟁 조건
repeat(1000) {
launch { counter++ } // 동시 접근 — 결과 불확실
}
// 올바른 방법 — Mutex
repeat(1000) {
launch {
mutex.withLock {
counter++ // 한 번에 하나만 실행
}
}
}
// counter == 1000 보장class SafeCache<K, V> {
private val mutex = Mutex()
private val map = mutableMapOf<K, V>()
suspend fun get(key: K): V? = mutex.withLock {
map[key]
}
suspend fun put(key: K, value: V) = mutex.withLock {
map[key] = value
}
suspend fun getOrPut(key: K, default: () -> V): V = mutex.withLock {
map.getOrPut(key, default)
}
}tryLock — 비블로킹 락 시도
val mutex = Mutex()
if (mutex.tryLock()) {
try {
// 락 획득 성공
} finally {
mutex.unlock()
}
} else {
println("락 획득 실패 — 다른 코루틴이 사용 중")
}Semaphore — 동시 접근 수 제한
지정한 수만큼 동시 실행을 허용합니다.
val semaphore = Semaphore(3) // 최대 3개 동시
repeat(10) { i ->
launch {
semaphore.withPermit {
println("작업 $i 시작")
delay(1000)
println("작업 $i 완료")
}
}
}
// 동시에 3개씩 실행// API 요청 속도 제한
class RateLimitedClient(private val maxConcurrent: Int = 5) {
private val semaphore = Semaphore(maxConcurrent)
suspend fun request(url: String): String =
semaphore.withPermit {
httpClient.get(url)
}
}Atomic — 원자적 연산
락 없이 thread-safe한 기본 타입 연산입니다.
import java.util.concurrent.atomic.*
val counter = AtomicInteger(0)
val flag = AtomicBoolean(false)
val longVal = AtomicLong(0L)
val ref = AtomicReference<String>("초기값")
// 원자적 연산
counter.incrementAndGet()
counter.getAndAdd(5)
counter.compareAndSet(5, 10) // 현재값이 5이면 10으로 변경
ref.updateAndGet { it.uppercase() }
ref.compareAndSet("초기값", "변경값")@Volatile
class Singleton {
companion object {
@Volatile
private var instance: Singleton? = null
fun getInstance(): Singleton =
instance ?: synchronized(this) {
instance ?: Singleton().also { instance = it }
}
}
}ThreadLocal — 코루틴 로컬
코루틴 간에 스레드 로컬 값을 안전하게 전달합니다.
val threadLocal = ThreadLocal.withInitial { "기본값" }
runBlocking(threadLocal.asContextElement("코루틴 값")) {
println(threadLocal.get()) // "코루틴 값"
withContext(Dispatchers.IO) {
println(threadLocal.get()) // "코루틴 값" — 다른 스레드에서도 유지
}
}동시성 도구 선택 가이드
단순 카운터/플래그 증감 → Atomic (가장 빠름)
공유 자원 단일 접근 보호 → Mutex
최대 N개 동시 접근 제한 → Semaphore
코루틴 간 데이터 파이프라인 → Channel
상태 공유 (단일 최신값) → StateFlow + Mutex
이벤트 브로드캐스트 → SharedFlow
정리
Channel
- 코루틴 간 통신 파이프
RENDEZVOUS/BUFFERED/UNLIMITED/CONFLATEDproduce/consumeEach— 생산자/소비자 패턴select— 여러 채널 동시 대기
Mutex
withLock { }— 코루틴 안전 상호 배제tryLock()— 비블로킹 시도
Semaphore
withPermit { }— 최대 N개 동시 실행 제한
Atomic
AtomicInteger,AtomicBoolean,AtomicReference— 락 없는 원자적 연산
@Volatile
- JVM volatile — 가시성 보장 (원자성은 보장 안 됨)