Pipeline & 배치 — 성능 최적화
네트워크 왕복 비용
일반 명령 실행 (N개):
Client → [SET key1] → Server (처리) → Client (응답)
Client → [SET key2] → Server (처리) → Client (응답)
Client → [SET key3] → Server (처리) → Client (응답)
...
총 N번의 네트워크 왕복 (RTT × N)
Pipeline:
Client → [SET key1, SET key2, SET key3, ...] → Server (처리) → Client (N개 응답)
총 1번의 네트워크 왕복
로컬: 0.1ms RTT × 1000개 = 100ms Pipeline: 1번 왕복 ≈ 0.1ms
Pipeline
CLI에서 Pipeline
# 파이프를 통한 배치 입력
echo -e "SET key1 val1\nSET key2 val2\nSET key3 val3" | redis-cli --pipe
# 파일로 배치
cat commands.txt | redis-cli --pipe
# 결과 확인
cat commands.txt | redis-cli --pipe --pipe-modeLettuce (Kotlin)
// Pipelining — 응답 없이 명령어만 전송
val connection = redisClient.connect()
val async = connection.async()
// 파이프라인 시작
async.setAutoFlushCommands(false)
val futures = mutableListOf<RedisFuture<*>>()
for (i in 1..1000) {
futures.add(async.set("key$i", "value$i"))
}
// 한 번에 전송
async.flushCommands()
// 결과 대기
LettuceFutures.awaitAll(5, TimeUnit.SECONDS, *futures.toTypedArray())
async.setAutoFlushCommands(true)
connection.close()Spring RedisTemplate
// executePipelined — Pipeline 실행
val results = redisTemplate.executePipelined { connection ->
for (i in 1..1000) {
connection.set("key$i".toByteArray(), "value$i".toByteArray())
}
null // 반환값 없음
}
// 결과는 List<Any?> 형태
results.forEach { result ->
println(result) // OK
}Jedis (비교용)
// Jedis Pipeline
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < 1000; i++) {
pipeline.set("key" + i, "value" + i);
}
List<Object> results = pipeline.syncAndReturnAll();다중 키 명령어 (Pipeline 대안)
String
# 여러 키 동시 SET/GET
MSET key1 val1 key2 val2 key3 val3
MGET key1 key2 key3 # [val1, val2, val3]
MSETNX key1 val1 key2 val2 # 모두 없을 때만
# DEL 여러 키
DEL key1 key2 key3Hash
# 여러 필드 동시 SET/GET
HSET hash field1 val1 field2 val2 field3 val3
HMGET hash field1 field2 field3
# 전체 Hash 읽기
HGETALL hashZSet
# 여러 멤버 동시 추가
ZADD leaderboard 9500 "alice" 8800 "bob" 9200 "charlie"
# 여러 score 동시 조회 (Redis 3.2+)
ZMSCORE leaderboard alice bob charlie트랜잭션 vs Pipeline
Pipeline:
명령어를 한 번에 전송 → 네트워크 왕복 감소
서버에서는 여전히 각각 순서대로 실행
원자성 없음 (중간에 다른 클라이언트 끼어들 수 있음)
MULTI/EXEC (트랜잭션):
원자성 있음 (다른 클라이언트 끼어들기 없음)
네트워크는 2번 왕복 (MULTI+명령어, EXEC)
Pipeline + MULTI/EXEC:
MULTI → ... → EXEC를 Pipeline으로 전송
→ 원자성 + 1번 왕복
// Pipeline + Transaction
val results = redisTemplate.executePipelined { connection ->
connection.multi()
connection.set("key1".toByteArray(), "val1".toByteArray())
connection.set("key2".toByteArray(), "val2".toByteArray())
connection.exec()
null
}배치 처리 패턴
대량 데이터 로드
// 1000개씩 나눠 Pipeline으로 로드
fun bulkLoad(data: List<Pair<String, String>>) {
data.chunked(1000).forEach { chunk ->
redisTemplate.executePipelined { connection ->
chunk.forEach { (key, value) ->
connection.set(key.toByteArray(), value.toByteArray())
}
null
}
}
}캐시 Warm-up
// 배치로 캐시 미리 채우기
fun warmupCache(userIds: List<Long>) {
val users = userRepository.findAllById(userIds)
redisTemplate.executePipelined { connection ->
users.forEach { user ->
val key = "cache:user:${user.id}".toByteArray()
val value = user.toJson().toByteArray()
connection.set(key, value)
connection.expire(key, 3600) // 1시간 TTL
}
null
}
}다중 키 조회 후 미싱 처리
fun getUsers(userIds: List<Long>): Map<Long, User> {
val keys = userIds.map { "cache:user:$it" }
val cached = redisTemplate.opsForValue().multiGet(keys)
val result = mutableMapOf<Long, User>()
val missingIds = mutableListOf<Long>()
userIds.forEachIndexed { idx, userId ->
val value = cached?.get(idx)
if (value != null) {
result[userId] = value.toUser()
} else {
missingIds.add(userId)
}
}
// DB에서 미싱 조회
if (missingIds.isNotEmpty()) {
val dbUsers = userRepository.findAllById(missingIds)
// 캐시에 저장
redisTemplate.executePipelined { connection ->
dbUsers.forEach { user ->
val key = "cache:user:${user.id}".toByteArray()
connection.set(key, user.toJson().toByteArray())
connection.expire(key, 3600)
}
null
}
dbUsers.forEach { result[it.id] = it }
}
return result
}성능 비교
테스트: 10,000개 SET 명령 (로컬 Redis)
순차 실행:
10,000 × 0.1ms = 1,000ms (1초)
MSET:
MSET key1 val1 ... key10000 val10000 = ~5ms (1번 명령)
Pipeline (1000개씩):
10번 × ~1ms = ~10ms
Pipeline (한 번에):
~5ms (1번 왕복)
정리
- Pipeline: 여러 명령을 한 번에 전송 → 네트워크 왕복 감소 (원자성 없음)
- MSET/MGET: String 다중 키 조작 (단일 명령)
- executePipelined: Spring RedisTemplate Pipeline API
- Pipeline + MULTI/EXEC: 원자성 + 최적화 동시에
- 청크 처리: 너무 큰 배치는 메모리 이슈 → 1000~5000개씩 분할 권장