안드로이드 개발을 하다 보면 네트워크 요청을 적절히 제어해야 하는 상황을 자주 마주칩니다. 사용자가 버튼을 연타하거나, 스크롤하면서 수많은 이미지를 요청하거나, 백그라운드 동기화가 과도하게 발생하는 경우가 대표적입니다.
오늘은 이런 문제를 해결하는 핵심 알고리즘인 Token Bucket에 대해 깊이 있게 다뤄보겠습니다. 특히 Kotlin으로 구현하면서 안드로이드 환경에서 실제로 활용할 수 있는 방법을 중점적으로 살펴보겠습니다.
Token Bucket 알고리즘의 핵심 원리
Token Bucket은 네트워크 트래픽 쉐이핑(Traffic Shaping)에서 널리 사용되는 알고리즘입니다. 기본 원리는 다음과 같습니다.
- 토큰 버킷: 정해진 용량을 가진 버킷에 토큰을 저장
- 토큰 생성: 일정한 속도(rate)로 토큰이 생성되어 버킷에 추가
- 요청 처리: 요청이 들어오면 토큰을 소비하여 처리
- 버스트 허용: 버킷에 토큰이 충분하면 순간적인 대량 요청 처리 가능
이 알고리즘의 가장 큰 장점은 평균 처리율을 제한하면서도 버스트 트래픽을 유연하게 처리할 수 있다는 점입니다. 이는 모바일 앱의 사용 패턴과 잘 맞아떨어집니다.
Kotlin으로 구현하는 기본 Token Bucket
먼저 가장 기본적인 형태의 Token Bucket을 구현해보겠습니다. Thread-safe를 고려한 구현입니다.
import java.util.concurrent.atomic.AtomicLong
import kotlin.math.min
class TokenBucket(
private val capacity: Long, // 버킷의 최대 용량
private val refillRate: Double, // 초당 충전되는 토큰 수
private val refillPeriodMillis: Long = 1000 // 리필 주기 (기본 1초)
) {
private val tokens = AtomicLong(capacity)
private val lastRefillTimestamp = AtomicLong(System.nanoTime())
/**
* 토큰을 소비하고 요청 허용 여부를 반환
* @param numTokens 소비할 토큰 수
* @return 요청 허용 여부
*/
fun tryConsume(numTokens: Long = 1): Boolean {
refill()
return tokens.updateAndGet { current ->
if (current >= numTokens) current - numTokens else current
} >= numTokens
}
/**
* 경과 시간에 따라 토큰을 리필
*/
private fun refill() {
val now = System.nanoTime()
val lastRefill = lastRefillTimestamp.get()
val elapsedNanos = now - lastRefill
if (elapsedNanos > refillPeriodMillis * 1_000_000) {
val elapsedPeriods = elapsedNanos / (refillPeriodMillis * 1_000_000)
val tokensToAdd = (elapsedPeriods * refillRate).toLong()
if (tokensToAdd > 0) {
tokens.updateAndGet { current ->
min(capacity, current + tokensToAdd)
}
lastRefillTimestamp.set(now)
}
}
}
/**
* 현재 사용 가능한 토큰 수 조회
*/
fun availableTokens(): Long {
refill()
return tokens.get()
}
}
이 구현의 핵심은 AtomicLong
을 사용한 lock-free 동시성 제어입니다. 안드로이드 앱에서는 여러 스레드에서 동시에 네트워크 요청이 발생할 수 있기 때문에 thread-safety가 필수적입니다.
안드로이드에 최적화된 구현
실제 안드로이드 앱에서는 더 정교한 제어가 필요합니다. 다음은 코루틴과 함께 사용할 수 있는 방법입니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import java.util.concurrent.atomic.AtomicReference
import kotlin.math.min
class AdvancedTokenBucket(
private val capacity: Int,
private val refillRate: Double,
private val scope: CoroutineScope = GlobalScope
) {
data class BucketState(
val tokens: Double,
val lastRefillTime: Long = System.nanoTime()
)
private val state = AtomicReference(BucketState(capacity.toDouble()))
private val waitingRequests = Channel<CompletableDeferred<Boolean>>(Channel.UNLIMITED)
init {
// 백그라운드에서 주기적으로 토큰 리필 및 대기 중인 요청 처리
scope.launch {
while (isActive) {
delay(100) // 100ms마다 체크
refillTokens()
processWaitingRequests()
}
}
}
/**
* 논블로킹 방식으로 토큰 소비 시도
*/
fun tryConsume(tokens: Int = 1): Boolean {
while (true) {
val current = state.get()
val now = System.nanoTime()
val newState = calculateNewState(current, now)
if (newState.tokens >= tokens) {
val consumedState = newState.copy(tokens = newState.tokens - tokens)
if (state.compareAndSet(current, consumedState)) {
return true
}
} else {
return false
}
}
}
/**
* 토큰이 사용 가능할 때까지 대기 (suspend function)
*/
suspend fun consume(tokens: Int = 1, timeoutMillis: Long = 5000): Boolean {
// 먼저 즉시 시도
if (tryConsume(tokens)) {
return true
}
// 대기 요청 등록
return withTimeoutOrNull(timeoutMillis) {
val deferred = CompletableDeferred<Boolean>()
waitingRequests.send(deferred)
deferred.await()
} ?: false
}
private fun calculateNewState(current: BucketState, now: Long): BucketState {
val elapsedSeconds = (now - current.lastRefillTime) / 1_000_000_000.0
val tokensToAdd = elapsedSeconds * refillRate
val newTokens = min(capacity.toDouble(), current.tokens + tokensToAdd)
return BucketState(
tokens = newTokens,
lastRefillTime = if (tokensToAdd > 0) now else current.lastRefillTime
)
}
private fun refillTokens() {
while (true) {
val current = state.get()
val now = System.nanoTime()
val newState = calculateNewState(current, now)
if (state.compareAndSet(current, newState)) {
break
}
}
}
private suspend fun processWaitingRequests() {
while (!waitingRequests.isEmpty) {
val request = waitingRequests.tryReceive().getOrNull() ?: break
if (tryConsume()) {
request.complete(true)
} else {
// 다시 대기열에 추가
waitingRequests.send(request)
break
}
}
}
}
이 구현은 코루틴을 활용해 비동기적으로 토큰을 관리하며, 대기 중인 요청을 효율적으로 처리합니다.
안드로이드 앱에서의 실제 활용 사례
1. Retrofit 인터셉터와 통합
class RateLimitInterceptor(
private val tokenBucket: TokenBucket
) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
// API 엔드포인트별로 다른 가중치 적용
val request = chain.request()
val weight = when {
request.url.encodedPath.contains("/upload") -> 10 // 업로드는 10토큰
request.url.encodedPath.contains("/search") -> 3 // 검색은 3토큰
else -> 1 // 기본 1토큰
}
// 토큰이 부족하면 429 에러 반환
if (!tokenBucket.tryConsume(weight.toLong())) {
return Response.Builder()
.code(429)
.message("Too Many Requests")
.protocol(Protocol.HTTP_1_1)
.request(request)
.build()
}
return chain.proceed(request)
}
}
// Retrofit 설정
val retrofit = Retrofit.Builder()
.baseUrl("https://api.example.com/")
.client(
OkHttpClient.Builder()
.addInterceptor(RateLimitInterceptor(tokenBucket))
.build()
)
.build()
2. 이미지 로딩 라이브러리와 통합
class RateLimitedImageLoader(
private val context: Context,
private val tokenBucket: AdvancedTokenBucket
) {
private val imageLoader = ImageLoader.Builder(context)
.components {
add(RateLimitFetcher.Factory(tokenBucket))
}
.build()
suspend fun loadImage(url: String): Drawable? {
val request = ImageRequest.Builder(context)
.data(url)
.build()
return when (val result = imageLoader.execute(request)) {
is SuccessResult -> result.drawable
else -> null
}
}
}
class RateLimitFetcher(
private val tokenBucket: AdvancedTokenBucket,
private val wrapped: Fetcher
) : Fetcher {
override suspend fun fetch(): FetchResult? {
// 이미지 로딩 전 토큰 소비 (타임아웃 포함)
if (!tokenBucket.consume(tokens = 1, timeoutMillis = 3000)) {
throw HttpException(Response.Builder()
.code(429)
.message("Rate limit exceeded")
.build())
}
return wrapped.fetch()
}
}
분산 환경에서의 Token Bucket
모바일 앱이 여러 서버와 통신하거나, 서버 측에서도 rate limiting이 필요한 경우 Redis를 활용한 분산 Token Bucket을 구현할 수 있습니다.
class DistributedTokenBucket(
private val redisClient: RedisClient,
private val key: String,
private val capacity: Int,
private val refillRate: Double
) {
// Lua 스크립트로 원자적 연산 보장
private val consumeScript = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
-- 토큰 리필 계산
local elapsed = math.max(0, now - last_refill)
local refilled = math.min(capacity, tokens + (elapsed * rate))
if refilled >= requested then
-- 토큰 소비
redis.call('HMSET', key,
'tokens', refilled - requested,
'last_refill', now)
redis.call('EXPIRE', key, 86400) -- 24시간 TTL
return 1
else
-- 토큰 부족
if elapsed > 0 then
redis.call('HMSET', key,
'tokens', refilled,
'last_refill', now)
end
return 0
end
""".trimIndent()
suspend fun tryConsume(tokens: Int = 1): Boolean {
val result = redisClient.eval(
script = consumeScript,
keys = listOf(key),
args = listOf(
capacity.toString(),
refillRate.toString(),
(System.currentTimeMillis() / 1000.0).toString(),
tokens.toString()
)
)
return result == 1L
}
}
적응형 Rate Limiting
네트워크 상태나 서버 응답 시간에 따라 동적으로 rate를 조정하는 적응형 Token Bucket도 구현할 수 있습니다.
class AdaptiveTokenBucket(
initialCapacity: Int,
private val minRate: Double = 1.0,
private val maxRate: Double = 100.0
) {
private var currentRate = minRate
private var tokenBucket = TokenBucket(
capacity = initialCapacity.toLong(),
refillRate = currentRate
)
// 네트워크 성능 메트릭
private val latencyWindow = CircularFifoQueue<Long>(100)
private val errorWindow = CircularFifoQueue<Boolean>(100)
fun recordLatency(latencyMs: Long) {
latencyWindow.add(latencyMs)
adjustRateBasedOnPerformance()
}
fun recordResult(success: Boolean) {
errorWindow.add(success)
adjustRateBasedOnPerformance()
}
private fun adjustRateBasedOnPerformance() {
val avgLatency = latencyWindow.average()
val successRate = errorWindow.count { it } / errorWindow.size.toDouble()
// 네트워크 상태에 따른 rate 조정
currentRate = when {
avgLatency < 100 && successRate > 0.95 -> {
// 네트워크 상태 좋음 - rate 증가
min(maxRate, currentRate * 1.2)
}
avgLatency > 500 || successRate < 0.8 -> {
// 네트워크 상태 나쁨 - rate 감소
max(minRate, currentRate * 0.7)
}
else -> currentRate
}
// 새로운 rate로 버킷 재생성
tokenBucket = TokenBucket(
capacity = (currentRate * 10).toLong(),
refillRate = currentRate
)
}
}
이는 TCP의 혼잡 제어 알고리즘과 유사한 원리로, AIMD(Additive Increase Multiplicative Decrease) 방식을 따릅니다.
Token Bucket vs 다른 Rate Limiting 알고리즘
알고리즘 비교
알고리즘 | 장점 | 단점 | 적합한 사용 사례 |
---|---|---|---|
Token Bucket | 버스트 허용, 유연성 | 구현 복잡도 | 모바일 앱, API 클라이언트 |
Leaky Bucket | 일정한 출력 속도 | 버스트 불가 | 스트리밍, 일정한 처리율 필요 |
Fixed Window | 구현 단순 | 경계 문제 | 간단한 제한 |
Sliding Window | 정확성 | 메모리 사용량 | 정밀한 제어 필요 |
Token Bucket이 모바일 앱에 적합한 이유
- 사용자 인터랙션은 본질적으로 버스트 특성을 가짐
- 네트워크 상태 변화에 유연하게 대응 가능
- 백그라운드 작업과 포그라운드 작업의 우선순위 조절 가능
모니터링과 디버깅
다음은 Token Bucket의 효과를 측정하고 최적화하기 위한 모니터링 구현 예시입니다.
class TokenBucketMonitor(
private val tokenBucket: TokenBucket
) {
private val metrics = mutableMapOf<String, AtomicLong>()
init {
metrics["totalRequests"] = AtomicLong(0)
metrics["allowedRequests"] = AtomicLong(0)
metrics["rejectedRequests"] = AtomicLong(0)
metrics["totalTokensConsumed"] = AtomicLong(0)
}
fun recordRequest(allowed: Boolean, tokensRequested: Long = 1) {
metrics["totalRequests"]?.incrementAndGet()
if (allowed) {
metrics["allowedRequests"]?.incrementAndGet()
metrics["totalTokensConsumed"]?.addAndGet(tokensRequested)
} else {
metrics["rejectedRequests"]?.incrementAndGet()
}
}
fun getMetrics(): Map<String, Long> {
return metrics.mapValues { it.value.get() } + mapOf(
"availableTokens" to tokenBucket.availableTokens(),
"rejectionRate" to calculateRejectionRate()
)
}
private fun calculateRejectionRate(): Long {
val total = metrics["totalRequests"]?.get() ?: 0
val rejected = metrics["rejectedRequests"]?.get() ?: 0
return if (total > 0) (rejected * 100 / total) else 0
}
// Firebase Analytics나 Crashlytics로 전송
fun reportToAnalytics() {
val bundle = Bundle().apply {
getMetrics().forEach { (key, value) ->
putLong("token_bucket_$key", value)
}
}
FirebaseAnalytics.getInstance(context).logEvent("rate_limit_metrics", bundle)
}
}
실제 구현 시 고려사항
1. 시스템 시간 변경 대응
안드로이드 기기의 시간이 변경되는 경우를 대비해 System.nanoTime()
을 사용합니다. 이는 시스템 시간과 독립적인 monotonic clock
입니다.
2. 앱 라이프사이클 고려
class TokenBucketManager(private val context: Context) : DefaultLifecycleObserver {
private var tokenBucket: TokenBucket? = null
override fun onStart(owner: LifecycleOwner) {
// 앱이 포그라운드로 올 때 rate 증가
tokenBucket = TokenBucket(
capacity = 100,
refillRate = 10.0
)
}
override fun onStop(owner: LifecycleOwner) {
// 백그라운드로 갈 때 rate 감소
tokenBucket = TokenBucket(
capacity = 10,
refillRate = 1.0
)
}
}
3. 메모리 효율성
Token Bucket 인스턴스는 싱글톤이나 의존성 주입(Hilt/Dagger)을 통해 관리하여 메모리 사용을 최적화합니다.
정리
Token Bucket 알고리즘은 모바일 앱의 네트워크 요청을 효과적으로 제어하는 강력한 도구입니다. 특히 Kotlin의 코루틴과 원자적 연산을 활용하면 효율적이고 안전한 구현이 가능합니다.
핵심 포인트
- 버스트 트래픽 허용: 사용자 경험을 해치지 않으면서 서버 부하 제어
- Thread-safe 구현: 멀티스레드 환경에서 안전한 동작 보장
- 유연한 통합: Retrofit, OkHttp, 이미지 로더 등과 쉽게 통합
- 적응형 제어: 네트워크 상태에 따른 동적 조절
Token Bucket은 단순한 rate limiting을 넘어, 안정적이고 효율적인 네트워크 통신을 위한 필수 컴포넌트입니다. 특히 모바일 환경의 불안정한 네트워크와 제한된 리소스를 고려할 때, 적절한 트래픽 제어는 앱의 품질을 크게 향상시킬 수 있습니다.