Kafka Streams — KStream, KTable, Windowing

Kafka Streams란?

Kafka 위에서 동작하는 경량 스트림 처리 라이브러리.

Kafka Topic (입력)
  ↓
Kafka Streams 처리 (변환, 집계, 조인)
  ↓
Kafka Topic (출력)

특징:

  • 별도 클러스터 불필요 — 일반 Java/Kotlin 애플리케이션
  • Kafka에 데이터가 있는 한 자체적으로 상태 관리
  • 정확히 한 번(EOS) 처리 지원
  • 수평 확장 — 인스턴스 추가만으로 병렬 처리

KStream vs KTable

KStream — 이벤트 스트림

이벤트 로그: 모든 이벤트를 독립적으로 처리

시간 → [클릭] [클릭] [클릭] [클릭] ...

같은 키라도 각각 별개의 이벤트
예: 클릭 이벤트, 주문 생성, 로그

KTable — 변경 로그 (상태 테이블)

같은 키의 최신 값만 유지 (upsert 의미론)

key=user-1, value={"name": "Alice"}
key=user-1, value={"name": "Alice Kim"}  ← 이전 값 대체

DB 테이블처럼 동작
예: 사용자 프로필, 재고 현황, 설정
KStream:
  key=A, v1
  key=A, v2
  key=A, v3  → 세 이벤트 모두 처리

KTable:
  key=A → v3  (최신값만 유지)

기본 스트림 처리

val props = Properties().apply {
    put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor")
    put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()::class.java)
    put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String()::class.java)
}
 
val builder = StreamsBuilder()
 
// KStream 생성
val orders: KStream<String, String> = builder.stream("orders")
 
// 변환 체인
orders
    .filter { key, value -> value.contains("\"status\":\"PAID\"") }
    .mapValues { value -> parseOrder(value).toShippingEvent().toJson() }
    .to("shipping-events")
 
val topology = builder.build()
val streams = KafkaStreams(topology, props)
streams.start()
 
// 종료 시
Runtime.getRuntime().addShutdownHook(Thread { streams.close() })

스트림 연산자

Stateless 연산

val stream: KStream<String, Order> = builder.stream("orders")
 
// filter — 조건에 맞는 것만
val paid = stream.filter { _, order -> order.status == "PAID" }
 
// map — 키+값 변환
val events = stream.map { key, order ->
    KeyValue("${order.userId}", order.toEvent())
}
 
// mapValues — 값만 변환
val summaries = stream.mapValues { order -> order.toSummary() }
 
// flatMap — 하나의 레코드 → 여러 레코드
val items = stream.flatMap { _, order ->
    order.items.map { item -> KeyValue(item.productId, item) }
}
 
// selectKey — 키 변경
val rekeyed = stream.selectKey { _, order -> order.userId }
 
// peek — 디버깅용 (사이드 이펙트)
stream.peek { key, value -> logger.debug("Processing: $key") }

Stateful 연산

// groupByKey + count
val orderCount: KTable<String, Long> = stream
    .groupByKey()
    .count(Materialized.as("order-count-store"))
 
// groupBy + aggregate
val totalByUser: KTable<String, Long> = stream
    .groupBy { _, order -> order.userId }
    .aggregate(
        { 0L },  // 초기값
        { _, order, total -> total + order.amount },  // 집계
        Materialized.`as`<String, Long>("user-totals-store")
            .withValueSerde(Serdes.Long())
    )
 
// reduce
val maxByUser: KTable<String, Order> = stream
    .groupByKey()
    .reduce { order1, order2 ->
        if (order1.amount > order2.amount) order1 else order2
    }

Windowing

시간 범위를 기준으로 이벤트를 그룹화.

Tumbling Window (고정 시간 슬라이딩 없음)

|---1분---|---1분---|---1분---|
  [A][B]     [C]       [D][E]

겹치지 않는 고정 크기 윈도우
stream
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
    .count()
    .toStream()
    .map { windowedKey, count ->
        KeyValue(
            "${windowedKey.key()}_${windowedKey.window().start()}",
            count
        )
    }
    .to("order-count-per-minute")

Hopping Window (겹치는 슬라이딩)

크기 5분, 간격 1분:
|-----5분-----|
  |-----5분-----|
    |-----5분-----|
      ...
stream
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30))
            .advanceBy(Duration.ofMinutes(1))
    )
    .count()

Session Window (비활성 기간 기준)

같은 키의 이벤트들 사이에 비활성 시간이 임계값 초과 시 세션 분리

[A] [A] [A]    gap    [A] [A]
|--session1--|  |--session2--|
stream
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5)))
    .count()

Grace Period

늦게 도착한 이벤트 처리:
  grace(Duration.ofSeconds(30))
  → 윈도우 종료 후 30초간 늦은 이벤트 허용

KStream-KTable 조인

val orders: KStream<String, Order> = builder.stream("orders")
val users: KTable<String, User> = builder.table("users")
 
// Stream-Table 조인 (orders의 key = userId)
val enriched: KStream<String, EnrichedOrder> = orders
    .join(users) { order, user ->
        EnrichedOrder(
            orderId = order.id,
            userId = order.userId,
            userName = user.name,
            amount = order.amount
        )
    }
 
enriched.to("enriched-orders")

조인 종류

조인설명
stream.join(table)테이블에 키가 있을 때만 조인 (inner join)
stream.leftJoin(table)테이블에 없으면 null (left join)
stream.join(stream)두 스트림을 시간 윈도우 기준으로 조인
table.join(table)두 테이블 조인

State Store

집계, 조인에 필요한 상태를 로컬 RocksDB에 저장.

// 인터랙티브 쿼리 — 외부에서 상태 직접 조회 가능
val store = streams.store(
    StoreQueryParameters.fromNameAndType(
        "order-count-store",
        QueryableStoreTypes.keyValueStore<String, Long>()
    )
)
 
val count = store.get("user-123")
println("user-123의 주문 수: $count")
 
// 윈도우 스토어 조회
val windowStore = streams.store(
    StoreQueryParameters.fromNameAndType(
        "windowed-counts",
        QueryableStoreTypes.windowStore<String, Long>()
    )
)

State Store는 Kafka 토픽에 백업 (__streams-changelog 토픽)되어 장애 복구 가능.


Processor API (저수준)

class OrderProcessor : Processor<String, Order, String, String> {
    private lateinit var context: ProcessorContext<String, String>
    private lateinit var store: KeyValueStore<String, Long>
 
    override fun init(context: ProcessorContext<String, String>) {
        this.context = context
        this.store = context.getStateStore("order-counts")
 
        // 30초마다 스케줄링
        context.schedule(Duration.ofSeconds(30), PunctuationType.WALL_CLOCK_TIME) { timestamp ->
            store.all().forEach { entry ->
                if (entry.value > 100) {
                    context.forward(Record(entry.key, "HIGH_VOLUME", timestamp))
                }
            }
        }
    }
 
    override fun process(record: Record<String, Order>) {
        val current = store.get(record.key()) ?: 0L
        store.put(record.key(), current + 1)
    }
}

정리

  • KStream: 이벤트 스트림 — 각 이벤트 독립 처리
  • KTable: 상태 테이블 — 같은 키의 최신 값 유지
  • Windowing: 시간 범위로 집계 — Tumbling(겹침 없음), Hopping(겹침), Session(비활성 기준)
  • State Store: RocksDB 기반 로컬 상태, Changelog 토픽으로 백업
  • 조인: KStream-KTable, KStream-KStream(윈도우), KTable-KTable
  • 장점: 별도 클러스터 없이 Kafka 위에서 실시간 처리, EOS 지원