From 3f11026b10770d6c04cc4487b0aebbf79c6eb839 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Sat, 6 Jun 2026 16:40:19 +0200 Subject: [PATCH 1/2] perf: cache public extraction endpoints --- .../server/services/CachedChannelService.kt | 26 +++----- .../server/services/CachedCommentService.kt | 23 +++----- .../server/services/CachedSearchService.kt | 22 +++---- .../services/CachedSuggestionService.kt | 29 +++------ .../server/services/CachedTrendingService.kt | 22 +++---- .../server/services/PublicCacheKey.kt | 22 +++++++ .../server/services/PublicCachePolicy.kt | 46 +++++++++++++++ .../server/services/PublicExtractionCache.kt | 59 +++++++++++++++++++ .../server/CachedSuggestionServiceTest.kt | 33 ++++++++--- .../typetype/server/PublicCachePolicyTest.kt | 50 ++++++++++++++++ 10 files changed, 240 insertions(+), 92 deletions(-) create mode 100644 src/main/kotlin/dev/typetype/server/services/PublicCacheKey.kt create mode 100644 src/main/kotlin/dev/typetype/server/services/PublicCachePolicy.kt create mode 100644 src/main/kotlin/dev/typetype/server/services/PublicExtractionCache.kt create mode 100644 src/test/kotlin/dev/typetype/server/PublicCachePolicyTest.kt diff --git a/src/main/kotlin/dev/typetype/server/services/CachedChannelService.kt b/src/main/kotlin/dev/typetype/server/services/CachedChannelService.kt index c31f87a..1d5e22a 100644 --- a/src/main/kotlin/dev/typetype/server/services/CachedChannelService.kt +++ b/src/main/kotlin/dev/typetype/server/services/CachedChannelService.kt @@ -1,6 +1,5 @@ package dev.typetype.server.services -import dev.typetype.server.cache.CacheJson import dev.typetype.server.cache.CacheService import dev.typetype.server.models.ChannelResponse import dev.typetype.server.models.ExtractionResult @@ -10,21 +9,12 @@ class CachedChannelService( private val cache: CacheService, ) : ChannelService { - companion object { - private const val CHANNEL_CACHE_TTL_SECONDS = 1800L - } - - override suspend fun getChannel(url: String, nextpage: String?, sort: String?): ExtractionResult { - val key = "channel:$url:${nextpage ?: "null"}:${sort ?: "default"}" - runCatching { cache.get(key) }.getOrNull()?.let { cached -> - return runCatching { ExtractionResult.Success(CacheJson.decodeFromString(cached)) }.getOrElse { - delegate.getChannel(url, nextpage, sort) - } - } - val result = delegate.getChannel(url, nextpage, sort) - if (result is ExtractionResult.Success) { - runCatching { cache.set(key, CacheJson.encodeToString(ChannelResponse.serializer(), result.data), CHANNEL_CACHE_TTL_SECONDS) } - } - return result - } + override suspend fun getChannel(url: String, nextpage: String?, sort: String?): ExtractionResult = + PublicExtractionCache.getOrLoad( + cache = cache, + area = "channel", + key = PublicCacheKey.of("channel", url, nextpage, sort), + serializer = ChannelResponse.serializer(), + ttlSeconds = { PublicCachePolicy.channelTtl(url, nextpage, sort) }, + ) { delegate.getChannel(url, nextpage, sort) } } diff --git a/src/main/kotlin/dev/typetype/server/services/CachedCommentService.kt b/src/main/kotlin/dev/typetype/server/services/CachedCommentService.kt index 45f7353..4b94047 100644 --- a/src/main/kotlin/dev/typetype/server/services/CachedCommentService.kt +++ b/src/main/kotlin/dev/typetype/server/services/CachedCommentService.kt @@ -1,6 +1,5 @@ package dev.typetype.server.services -import dev.typetype.server.cache.CacheJson import dev.typetype.server.cache.CacheService import dev.typetype.server.models.CommentsPageResponse import dev.typetype.server.models.ExtractionResult @@ -10,18 +9,12 @@ class CachedCommentService( private val cache: CacheService, ) : CommentService { - override suspend fun getComments(url: String, nextpage: String?): ExtractionResult { - val key = "comments:$url:${nextpage ?: "null"}" - runCatching { cache.get(key) }.getOrNull()?.let { cached -> - return runCatching { ExtractionResult.Success(CacheJson.decodeFromString(cached)) }.getOrElse { - delegate.getComments(url, nextpage) - } - } - val result = delegate.getComments(url, nextpage) - if (result is ExtractionResult.Success) { - runCatching { cache.set(key, CacheJson.encodeToString(CommentsPageResponse.serializer(), result.data), 300L) } - } - return result - } + override suspend fun getComments(url: String, nextpage: String?): ExtractionResult = + PublicExtractionCache.getOrLoad( + cache = cache, + area = "comments", + key = PublicCacheKey.of("comments", url, nextpage), + serializer = CommentsPageResponse.serializer(), + ttlSeconds = { PublicCachePolicy.commentsTtl(url, nextpage) }, + ) { delegate.getComments(url, nextpage) } } - diff --git a/src/main/kotlin/dev/typetype/server/services/CachedSearchService.kt b/src/main/kotlin/dev/typetype/server/services/CachedSearchService.kt index 94f7aae..0b25044 100644 --- a/src/main/kotlin/dev/typetype/server/services/CachedSearchService.kt +++ b/src/main/kotlin/dev/typetype/server/services/CachedSearchService.kt @@ -1,6 +1,5 @@ package dev.typetype.server.services -import dev.typetype.server.cache.CacheJson import dev.typetype.server.cache.CacheService import dev.typetype.server.models.ExtractionResult import dev.typetype.server.models.SearchPageResponse @@ -14,18 +13,11 @@ class CachedSearchService( query: String, serviceId: Int, nextpage: String?, - ): ExtractionResult { - val key = "search:$serviceId:$query:${nextpage ?: "null"}" - runCatching { cache.get(key) }.getOrNull()?.let { cached -> - return runCatching { ExtractionResult.Success(CacheJson.decodeFromString(cached)) }.getOrElse { - delegate.search(query, serviceId, nextpage) - } - } - val result = delegate.search(query, serviceId, nextpage) - if (result is ExtractionResult.Success) { - runCatching { cache.set(key, CacheJson.encodeToString(SearchPageResponse.serializer(), result.data), 300L) } - } - return result - } + ): ExtractionResult = PublicExtractionCache.getOrLoad( + cache = cache, + area = "search", + key = PublicCacheKey.of("search", serviceId.toString(), query, nextpage), + serializer = SearchPageResponse.serializer(), + ttlSeconds = { PublicCachePolicy.searchTtl(serviceId, nextpage) }, + ) { delegate.search(query, serviceId, nextpage) } } - diff --git a/src/main/kotlin/dev/typetype/server/services/CachedSuggestionService.kt b/src/main/kotlin/dev/typetype/server/services/CachedSuggestionService.kt index 2ac4b2c..069706f 100644 --- a/src/main/kotlin/dev/typetype/server/services/CachedSuggestionService.kt +++ b/src/main/kotlin/dev/typetype/server/services/CachedSuggestionService.kt @@ -1,6 +1,5 @@ package dev.typetype.server.services -import dev.typetype.server.cache.CacheJson import dev.typetype.server.cache.CacheService import dev.typetype.server.models.ExtractionResult import kotlinx.serialization.builtins.ListSerializer @@ -11,24 +10,14 @@ class CachedSuggestionService( private val cache: CacheService, ) : SuggestionService { - override suspend fun getSuggestions(query: String, serviceId: Int): ExtractionResult> { - val key = "suggestions:$serviceId:$query" - runCatching { cache.get(key) }.getOrNull()?.let { cached -> - return runCatching { - ExtractionResult.Success(CacheJson.decodeFromString(ListSerializer(String.serializer()), cached)) - }.getOrElse { delegate.getSuggestions(query, serviceId) } - } - val result = delegate.getSuggestions(query, serviceId) - if (result is ExtractionResult.Success) { - runCatching { - cache.set(key, CacheJson.encodeToString(ListSerializer(String.serializer()), result.data), SUGGESTIONS_TTL_SECONDS) - } - } - return result - } + private val listSerializer = ListSerializer(String.serializer()) - private companion object { - const val SUGGESTIONS_TTL_SECONDS = 300L - } + override suspend fun getSuggestions(query: String, serviceId: Int): ExtractionResult> = + PublicExtractionCache.getOrLoad( + cache = cache, + area = "suggestions", + key = PublicCacheKey.of("suggestions", serviceId.toString(), query), + serializer = listSerializer, + ttlSeconds = { PublicCachePolicy.suggestionTtl(serviceId) }, + ) { delegate.getSuggestions(query, serviceId) } } - diff --git a/src/main/kotlin/dev/typetype/server/services/CachedTrendingService.kt b/src/main/kotlin/dev/typetype/server/services/CachedTrendingService.kt index 27b4bdf..ff79dce 100644 --- a/src/main/kotlin/dev/typetype/server/services/CachedTrendingService.kt +++ b/src/main/kotlin/dev/typetype/server/services/CachedTrendingService.kt @@ -1,6 +1,5 @@ package dev.typetype.server.services -import dev.typetype.server.cache.CacheJson import dev.typetype.server.cache.CacheService import dev.typetype.server.models.ExtractionResult import dev.typetype.server.models.VideoItem @@ -13,18 +12,11 @@ class CachedTrendingService( private val listSerializer = ListSerializer(VideoItem.serializer()) - override suspend fun getTrending(serviceId: Int): ExtractionResult> { - val key = "trending:$serviceId" - runCatching { cache.get(key) }.getOrNull()?.let { cached -> - return runCatching { ExtractionResult.Success(CacheJson.decodeFromString(listSerializer, cached)) }.getOrElse { - delegate.getTrending(serviceId) - } - } - val result = delegate.getTrending(serviceId) - if (result is ExtractionResult.Success) { - runCatching { cache.set(key, CacheJson.encodeToString(listSerializer, result.data), 900L) } - } - return result - } + override suspend fun getTrending(serviceId: Int): ExtractionResult> = PublicExtractionCache.getOrLoad( + cache = cache, + area = "trending", + key = PublicCacheKey.of("trending", serviceId.toString()), + serializer = listSerializer, + ttlSeconds = { PublicCachePolicy.trendingTtl(serviceId) }, + ) { delegate.getTrending(serviceId) } } - diff --git a/src/main/kotlin/dev/typetype/server/services/PublicCacheKey.kt b/src/main/kotlin/dev/typetype/server/services/PublicCacheKey.kt new file mode 100644 index 0000000..9b4fe43 --- /dev/null +++ b/src/main/kotlin/dev/typetype/server/services/PublicCacheKey.kt @@ -0,0 +1,22 @@ +package dev.typetype.server.services + +import java.security.MessageDigest + +internal object PublicCacheKey { + private val hex = "0123456789abcdef".toCharArray() + + fun of(area: String, vararg parts: String?): String = "$area:v2:${digest(parts)}" + + private fun digest(parts: Array): String { + val raw = parts.joinToString(separator = "\u001f") { it.orEmpty() } + return MessageDigest.getInstance("SHA-256").digest(raw.toByteArray()).toHex().take(32) + } + + private fun ByteArray.toHex(): String = buildString(size * 2) { + this@toHex.forEach { byte -> + val value = byte.toInt() and 0xff + append(hex[value ushr 4]) + append(hex[value and 0x0f]) + } + } +} diff --git a/src/main/kotlin/dev/typetype/server/services/PublicCachePolicy.kt b/src/main/kotlin/dev/typetype/server/services/PublicCachePolicy.kt new file mode 100644 index 0000000..a30b9f7 --- /dev/null +++ b/src/main/kotlin/dev/typetype/server/services/PublicCachePolicy.kt @@ -0,0 +1,46 @@ +package dev.typetype.server.services + +internal object PublicCachePolicy { + fun trendingTtl(serviceId: Int): Long = when (serviceId) { + BILIBILI_SERVICE_ID, NICONICO_SERVICE_ID -> 600L + YOUTUBE_SERVICE_ID -> 1_800L + else -> 3_600L + } + + fun suggestionTtl(serviceId: Int): Long = when (serviceId) { + BILIBILI_SERVICE_ID, NICONICO_SERVICE_ID -> 900L + else -> 1_800L + } + + fun searchTtl(serviceId: Int, nextpage: String?): Long { + if (nextpage != null) return 300L + return when (serviceId) { + BILIBILI_SERVICE_ID, NICONICO_SERVICE_ID -> 300L + YOUTUBE_SERVICE_ID -> 600L + else -> 900L + } + } + + fun channelTtl(url: String, nextpage: String?, sort: String?): Long = when { + url.contains("/search", ignoreCase = true) -> 600L + nextpage != null -> 1_800L + url.contains("/shorts", ignoreCase = true) -> 900L + sort.equals("latest", ignoreCase = true) -> 900L + else -> 3_600L + } + + fun commentsTtl(url: String, nextpage: String?): Long { + if (nextpage != null) return 600L + return when (url.serviceHint()) { + BILIBILI_SERVICE_ID, NICONICO_SERVICE_ID -> 300L + else -> 180L + } + } +} + +private fun String.serviceHint(): Int? = when { + contains("bilibili.com", ignoreCase = true) -> BILIBILI_SERVICE_ID + contains("nicovideo.jp", ignoreCase = true) -> NICONICO_SERVICE_ID + contains("youtube.com", ignoreCase = true) || contains("youtu.be", ignoreCase = true) -> YOUTUBE_SERVICE_ID + else -> null +} diff --git a/src/main/kotlin/dev/typetype/server/services/PublicExtractionCache.kt b/src/main/kotlin/dev/typetype/server/services/PublicExtractionCache.kt new file mode 100644 index 0000000..8771fc2 --- /dev/null +++ b/src/main/kotlin/dev/typetype/server/services/PublicExtractionCache.kt @@ -0,0 +1,59 @@ +package dev.typetype.server.services + +import dev.typetype.server.cache.CacheJson +import dev.typetype.server.cache.CacheService +import dev.typetype.server.models.ExtractionResult +import kotlinx.serialization.KSerializer +import org.slf4j.LoggerFactory + +internal object PublicExtractionCache { + private val logger = LoggerFactory.getLogger(PublicExtractionCache::class.java) + + suspend fun getOrLoad( + cache: CacheService, + area: String, + key: String, + serializer: KSerializer, + ttlSeconds: (T) -> Long, + load: suspend () -> ExtractionResult, + ): ExtractionResult { + val cached = runCatching { cache.get(key) } + .onFailure { logger.warn("cache event=get_failed area={} key={} error={}", area, key, it.message) } + .getOrNull() + if (cached != null) { + val decoded = runCatching { CacheJson.decodeFromString(serializer, cached) } + decoded.getOrNull()?.let { + logger.info("cache event=hit area={} key={}", area, key) + return ExtractionResult.Success(it) + } + logger.warn( + "cache event=decode_failed area={} key={} error={}", + area, + key, + decoded.exceptionOrNull()?.message, + ) + } + logger.info("cache event=miss area={} key={}", area, key) + val result = load() + if (result is ExtractionResult.Success) { + store(cache, area, key, serializer, result.data, ttlSeconds(result.data)) + } + return result + } + + private suspend fun store( + cache: CacheService, + area: String, + key: String, + serializer: KSerializer, + data: T, + ttlSeconds: Long, + ) { + if (ttlSeconds <= 0L) return + runCatching { cache.set(key, CacheJson.encodeToString(serializer, data), ttlSeconds) } + .onSuccess { logger.info("cache event=store area={} key={} ttlSeconds={}", area, key, ttlSeconds) } + .onFailure { + logger.warn("cache event=set_failed area={} key={} error={}", area, key, it.message) + } + } +} diff --git a/src/test/kotlin/dev/typetype/server/CachedSuggestionServiceTest.kt b/src/test/kotlin/dev/typetype/server/CachedSuggestionServiceTest.kt index 2256007..0998b60 100644 --- a/src/test/kotlin/dev/typetype/server/CachedSuggestionServiceTest.kt +++ b/src/test/kotlin/dev/typetype/server/CachedSuggestionServiceTest.kt @@ -4,6 +4,7 @@ import dev.typetype.server.cache.CacheService import dev.typetype.server.models.ExtractionResult import dev.typetype.server.services.CachedSuggestionService import dev.typetype.server.services.SuggestionService +import dev.typetype.server.services.YOUTUBE_SERVICE_ID import io.mockk.coEvery import io.mockk.coVerify import io.mockk.mockk @@ -19,27 +20,41 @@ class CachedSuggestionServiceTest { @Test fun `cache hit returns cached suggestions without calling delegate`() = runBlocking { - coEvery { cache.get("suggestions:0:rick") } returns """["rick astley","rickroll"]""" - val result = service.getSuggestions("rick", 0) + coEvery { cache.get(any()) } returns """["rick astley","rickroll"]""" + val result = service.getSuggestions("rick", YOUTUBE_SERVICE_ID) assertEquals(ExtractionResult.Success(listOf("rick astley", "rickroll")), result) + coVerify(exactly = 1) { cache.get(match { it.startsWith("suggestions:v2:") && !it.contains("rick") }) } coVerify(exactly = 0) { delegate.getSuggestions(any(), any()) } } @Test fun `cache miss delegates and stores result`() = runBlocking { - coEvery { cache.get("suggestions:0:rick") } returns null - coEvery { delegate.getSuggestions("rick", 0) } returns ExtractionResult.Success(listOf("rick astley")) + coEvery { cache.get(any()) } returns null + coEvery { delegate.getSuggestions("rick", YOUTUBE_SERVICE_ID) } returns ExtractionResult.Success(listOf("rick astley")) coEvery { cache.set(any(), any(), any()) } returns Unit - val result = service.getSuggestions("rick", 0) + val result = service.getSuggestions("rick", YOUTUBE_SERVICE_ID) assertEquals(ExtractionResult.Success(listOf("rick astley")), result) - coVerify(exactly = 1) { cache.set("suggestions:0:rick", any(), 300L) } + coVerify(exactly = 1) { + cache.set(match { it.startsWith("suggestions:v2:") && !it.contains("rick") }, any(), 1800L) + } + } + + @Test + fun `corrupt cache delegates and stores refreshed result`() = runBlocking { + coEvery { cache.get(any()) } returns "not-json" + coEvery { delegate.getSuggestions("rick", YOUTUBE_SERVICE_ID) } returns ExtractionResult.Success(listOf("rick astley")) + coEvery { cache.set(any(), any(), any()) } returns Unit + val result = service.getSuggestions("rick", YOUTUBE_SERVICE_ID) + assertEquals(ExtractionResult.Success(listOf("rick astley")), result) + coVerify(exactly = 1) { delegate.getSuggestions("rick", YOUTUBE_SERVICE_ID) } + coVerify(exactly = 1) { cache.set(any(), any(), 1800L) } } @Test fun `delegate failure is not cached`() = runBlocking { - coEvery { cache.get("suggestions:0:bad") } returns null - coEvery { delegate.getSuggestions("bad", 0) } returns ExtractionResult.Failure("network error") - val result = service.getSuggestions("bad", 0) + coEvery { cache.get(any()) } returns null + coEvery { delegate.getSuggestions("bad", YOUTUBE_SERVICE_ID) } returns ExtractionResult.Failure("network error") + val result = service.getSuggestions("bad", YOUTUBE_SERVICE_ID) assertEquals(ExtractionResult.Failure("network error"), result) coVerify(exactly = 0) { cache.set(any(), any(), any()) } } diff --git a/src/test/kotlin/dev/typetype/server/PublicCachePolicyTest.kt b/src/test/kotlin/dev/typetype/server/PublicCachePolicyTest.kt new file mode 100644 index 0000000..ef549f5 --- /dev/null +++ b/src/test/kotlin/dev/typetype/server/PublicCachePolicyTest.kt @@ -0,0 +1,50 @@ +package dev.typetype.server + +import dev.typetype.server.services.BILIBILI_SERVICE_ID +import dev.typetype.server.services.PublicCacheKey +import dev.typetype.server.services.PublicCachePolicy +import dev.typetype.server.services.YOUTUBE_SERVICE_ID +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test + +class PublicCachePolicyTest { + + @Test + fun `public cache keys are versioned and hashed`() { + val key = PublicCacheKey.of("search", YOUTUBE_SERVICE_ID.toString(), "rick", "cursor-token") + assertTrue(key.startsWith("search:v2:")) + assertFalse(key.contains("rick")) + assertFalse(key.contains("cursor-token")) + } + + @Test + fun `trending ttl depends on service volatility`() { + assertEquals(1_800L, PublicCachePolicy.trendingTtl(YOUTUBE_SERVICE_ID)) + assertEquals(600L, PublicCachePolicy.trendingTtl(BILIBILI_SERVICE_ID)) + } + + @Test + fun `search ttl is shorter for cursored pages`() { + assertEquals(600L, PublicCachePolicy.searchTtl(YOUTUBE_SERVICE_ID, null)) + assertEquals(300L, PublicCachePolicy.searchTtl(YOUTUBE_SERVICE_ID, "cursor")) + } + + @Test + fun `channel ttl is shorter for channel search and volatile sorts`() { + assertEquals(3_600L, PublicCachePolicy.channelTtl("https://www.youtube.com/channel/id", null, null)) + assertEquals( + 600L, + PublicCachePolicy.channelTtl("https://www.youtube.com/channel/id/search?query=x", null, null), + ) + assertEquals(900L, PublicCachePolicy.channelTtl("https://www.youtube.com/channel/id", null, "latest")) + } + + @Test + fun `comments ttl is shortest on first youtube page`() { + assertEquals(180L, PublicCachePolicy.commentsTtl("https://youtube.com/watch?v=id", null)) + assertEquals(600L, PublicCachePolicy.commentsTtl("https://youtube.com/watch?v=id", "cursor")) + assertEquals(300L, PublicCachePolicy.commentsTtl("https://www.bilibili.com/video/id", null)) + } +} From 5cef8af875c70d48f76e1af32b45f0fdaf4776b7 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Sat, 6 Jun 2026 17:16:19 +0200 Subject: [PATCH 2/2] perf: coalesce concurrent stream cache misses --- .../server/services/CachedStreamService.kt | 38 +++++++- .../server/CachedStreamServiceTest.kt | 91 +++++++++++++++++++ 2 files changed, 124 insertions(+), 5 deletions(-) create mode 100644 src/test/kotlin/dev/typetype/server/CachedStreamServiceTest.kt diff --git a/src/main/kotlin/dev/typetype/server/services/CachedStreamService.kt b/src/main/kotlin/dev/typetype/server/services/CachedStreamService.kt index 253d3f6..4b31176 100644 --- a/src/main/kotlin/dev/typetype/server/services/CachedStreamService.kt +++ b/src/main/kotlin/dev/typetype/server/services/CachedStreamService.kt @@ -4,29 +4,57 @@ import dev.typetype.server.cache.CacheJson import dev.typetype.server.cache.CacheService import dev.typetype.server.models.ExtractionResult import dev.typetype.server.models.StreamResponse +import kotlinx.coroutines.CompletableDeferred +import java.util.concurrent.ConcurrentHashMap class CachedStreamService( private val delegate: StreamService, private val cache: CacheService, ) : StreamService { + private val inFlight = ConcurrentHashMap>>() + companion object { fun cacheKey(url: String): String = "stream:$url" } override suspend fun getStreamInfo(url: String): ExtractionResult { val key = cacheKey(url) - runCatching { cache.get(key) }.getOrNull()?.let { cached -> - return runCatching { ExtractionResult.Success(CacheJson.decodeFromString(cached)) }.getOrElse { - delegate.getStreamInfo(url) - } + cachedStream(key)?.let { return it } + val pending = CompletableDeferred>() + val existing = inFlight.putIfAbsent(key, pending) + if (existing != null) return existing.await() + return try { + val result = getCachedOrLoad(url, key) + pending.complete(result) + result + } catch (error: Throwable) { + pending.completeExceptionally(error) + throw error + } finally { + inFlight.remove(key, pending) } + } + + private suspend fun getCachedOrLoad(url: String, key: String): ExtractionResult { + cachedStream(key)?.let { return it } val result = delegate.getStreamInfo(url) if (result is ExtractionResult.Success) { val ttl = result.data.streamCacheTtlSeconds() - if (ttl > 0) runCatching { cache.set(key, CacheJson.encodeToString(StreamResponse.serializer(), result.data), ttl) } + if (ttl > 0) { + runCatching { + cache.set(key, CacheJson.encodeToString(StreamResponse.serializer(), result.data), ttl) + } + } } return result } + private suspend fun cachedStream(key: String): ExtractionResult? = runCatching { cache.get(key) } + .getOrNull() + ?.let { cached -> + runCatching { + ExtractionResult.Success(CacheJson.decodeFromString(cached)) + }.getOrNull() + } } diff --git a/src/test/kotlin/dev/typetype/server/CachedStreamServiceTest.kt b/src/test/kotlin/dev/typetype/server/CachedStreamServiceTest.kt new file mode 100644 index 0000000..dedd68b --- /dev/null +++ b/src/test/kotlin/dev/typetype/server/CachedStreamServiceTest.kt @@ -0,0 +1,91 @@ +package dev.typetype.server + +import dev.typetype.server.cache.CacheService +import dev.typetype.server.models.ExtractionResult +import dev.typetype.server.models.StreamResponse +import dev.typetype.server.services.CachedStreamService +import dev.typetype.server.services.StreamService +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import java.util.concurrent.atomic.AtomicInteger + +class CachedStreamServiceTest { + + @Test + fun `concurrent cache misses share one delegate extraction`() = runBlocking { + val response = testStreamResponse() + val delegate = CountingStreamService(ExtractionResult.Success(response)) + val cache = RecordingCacheService() + val service = CachedStreamService(delegate, cache) + val results = concurrentRequests(service, REQUEST_URL) + assertEquals(List(REQUEST_COUNT) { ExtractionResult.Success(response) }, results) + assertEquals(1, delegate.calls.get()) + assertEquals(1, cache.setCalls.get()) + } + + @Test + fun `cached stream skips delegate after first fill`() = runBlocking { + val response = testStreamResponse() + val delegate = CountingStreamService(ExtractionResult.Success(response)) + val service = CachedStreamService(delegate, RecordingCacheService()) + assertEquals(ExtractionResult.Success(response), service.getStreamInfo(REQUEST_URL)) + assertEquals(ExtractionResult.Success(response), service.getStreamInfo(REQUEST_URL)) + assertEquals(1, delegate.calls.get()) + } + + @Test + fun `concurrent delegate failures are shared and not cached`() = runBlocking { + val delegate = CountingStreamService(ExtractionResult.Failure("blocked")) + val cache = RecordingCacheService() + val service = CachedStreamService(delegate, cache) + val results = concurrentRequests(service, REQUEST_URL) + assertEquals(List(REQUEST_COUNT) { ExtractionResult.Failure("blocked") }, results) + assertEquals(1, delegate.calls.get()) + assertEquals(0, cache.setCalls.get()) + } + + private suspend fun concurrentRequests( + service: CachedStreamService, + url: String, + ): List> = coroutineScope { + List(REQUEST_COUNT) { async { service.getStreamInfo(url) } }.awaitAll() + } + + private class CountingStreamService( + private val result: ExtractionResult, + ) : StreamService { + val calls = AtomicInteger() + + override suspend fun getStreamInfo(url: String): ExtractionResult { + calls.incrementAndGet() + delay(50) + return result + } + } + + private class RecordingCacheService : CacheService { + private val values = mutableMapOf() + val setCalls = AtomicInteger() + + override suspend fun get(key: String): String? = values[key] + + override suspend fun set(key: String, value: String, ttlSeconds: Long) { + setCalls.incrementAndGet() + values[key] = value + } + + override suspend fun delete(key: String) { + values.remove(key) + } + } + + private companion object { + const val REQUEST_COUNT = 8 + const val REQUEST_URL = "https://www.youtube.com/watch?v=test" + } +}