diff --git a/Cargo.lock b/Cargo.lock index 23dd251..21cfb7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1186,7 +1186,7 @@ dependencies = [ [[package]] name = "collector-utils" -version = "2.0.2" +version = "2.0.3" dependencies = [ "anyhow", "async-trait", @@ -3783,7 +3783,7 @@ checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" [[package]] name = "logs-collector" -version = "2.1.8" +version = "2.1.9" dependencies = [ "anyhow", "clap", diff --git a/crates/collector-utils/Cargo.toml b/crates/collector-utils/Cargo.toml index 32b64e7..af7101f 100644 --- a/crates/collector-utils/Cargo.toml +++ b/crates/collector-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "collector-utils" -version = "2.0.2" +version = "2.0.3" edition = "2021" [dependencies] diff --git a/crates/collector-utils/src/storage.rs b/crates/collector-utils/src/storage.rs index c6479ee..3df2f0d 100644 --- a/crates/collector-utils/src/storage.rs +++ b/crates/collector-utils/src/storage.rs @@ -172,6 +172,29 @@ pub struct QueryExecutedRow { } impl QueryExecutedRow { + pub fn worker_id(&self) -> &str { + &self.worker_id + } + + /// Rough estimate of the heap + stack memory occupied by this row. + /// Used to bound the size of buffers and INSERT batches. + pub fn estimated_size(&self) -> usize { + std::mem::size_of::() + + self.client_id.len() + + self.worker_id.len() + + self.query_id.len() + + self.dataset.len() + + self.dataset_id.len() + + self.request_id.len() + + self.chunk_id.len() + + self.query.len() + + self.query_hash.len() + + self.output_hash.len() + + self.error_msg.len() + + self.client_signature.len() + + self.worker_version.len() + } + pub fn try_from( query_executed: QueryExecuted, worker_id: PeerId, diff --git a/crates/logs-collector/Cargo.toml b/crates/logs-collector/Cargo.toml index 3de5e6d..675ae18 100644 --- a/crates/logs-collector/Cargo.toml +++ b/crates/logs-collector/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "logs-collector" -version = "2.1.8" +version = "2.1.9" edition = "2021" [dependencies] diff --git a/crates/logs-collector/src/collector.rs b/crates/logs-collector/src/collector.rs index 01535d6..96bee9c 100644 --- a/crates/logs-collector/src/collector.rs +++ b/crates/logs-collector/src/collector.rs @@ -6,16 +6,43 @@ use sqd_network_transport::PeerId; use collector_utils::{QueryExecutedRow, Storage}; +/// Default maximum estimated size (bytes) of a single INSERT batch sent to +/// ClickHouse. Buffers larger than this are split into several INSERTs so that +/// an oversized batch can't fail repeatedly and stall log collection. +const DEFAULT_MAX_BATCH_SIZE: usize = 32 << 20; // 32 MiB + +/// Default maximum estimated size (bytes) of logs kept in memory between dumps. +/// Once reached, further logs are dropped for the current round; since the +/// ClickHouse watermark hasn't advanced for them, they are re-collected on the +/// next round once the buffer drains. +const DEFAULT_MAX_BUFFER_SIZE: usize = 256 << 20; // 256 MiB + +struct Buffer { + logs: Vec, + /// Running estimate of `logs`' memory footprint, kept in sync on push/take. + size: usize, +} + pub struct LogsCollector { storage: T, - buffered_logs: Mutex>, + buffer: Mutex, + max_batch_size: usize, + max_buffer_size: usize, } impl LogsCollector { pub fn new(storage: T) -> Self { + let max_batch_size = env_size("MAX_INSERT_BATCH_BYTES", DEFAULT_MAX_BATCH_SIZE); + let max_buffer_size = + env_size("MAX_BUFFER_BYTES", DEFAULT_MAX_BUFFER_SIZE).max(max_batch_size); Self { storage, - buffered_logs: Default::default(), + buffer: Mutex::new(Buffer { + logs: Vec::new(), + size: 0, + }), + max_batch_size, + max_buffer_size, } } @@ -27,19 +54,111 @@ impl LogsCollector { .map_err(|e| log::warn!("Invalid log message from {worker_id}: {e}")) .ok() }); - self.buffered_logs.lock().extend(rows); - // TODO: limit memory usage + + let mut buffer = self.buffer.lock(); + let mut dropped = 0; + for row in rows { + if buffer.size >= self.max_buffer_size { + dropped += 1; + continue; + } + buffer.size += row.estimated_size(); + buffer.logs.push(row); + } + if dropped > 0 { + log::warn!( + "Buffer full ({} bytes), dropped {dropped} logs from {worker_id}; \ + they will be re-collected once the buffer drains", + self.max_buffer_size + ); + } } pub async fn dump_buffer(&mut self) -> anyhow::Result<()> { - let logs = self.buffered_logs.get_mut().drain(..); - log::info!("Dumping {} logs to storage", logs.len()); - self.storage.store_logs(logs).await?; + let logs = { + let buffer = self.buffer.get_mut(); + buffer.size = 0; + std::mem::take(&mut buffer.logs) + }; + let total = logs.len(); + if total == 0 { + return Ok(()); + } + + // Flush in memory-bounded chunks so no single INSERT can be too large. + // On failure the unstored chunks are dropped, but they are re-collected + // next round: the ClickHouse watermark is MAX(worker_timestamp) per worker + // (see get_last_stored), and a single worker's logs are appended to the + // buffer in ascending-timestamp order (collect_logs in server.rs requests + // pages sequentially per worker). Chunks split the buffer at contiguous + // positions, so any un-stored row has a timestamp >= the advanced + // watermark and will be requested again. This ordering invariant is what + // makes partial-failure safe; it breaks if collection is ever parallelized + // within a single worker. + let mut stored = 0; + let mut chunk: Vec = Vec::new(); + let mut chunk_size = 0; + for row in logs { + let row_size = row.estimated_size(); + if chunk_size + row_size > self.max_batch_size { + if chunk.is_empty() { + // A single row exceeds the batch limit. Sending it alone is the + // best we can do; warn so a persistently-failing oversized row + // (which would stall this worker) is visible. + log::warn!( + "Single log row from {} is {row_size} bytes, exceeding the \ + {} byte batch limit", + row.worker_id(), + self.max_batch_size + ); + } else { + let count = chunk.len(); + self.flush_chunk(std::mem::take(&mut chunk), stored, total) + .await?; + stored += count; + chunk_size = 0; + } + } + chunk_size += row_size; + chunk.push(row); + } + if !chunk.is_empty() { + let count = chunk.len(); + self.flush_chunk(chunk, stored, total).await?; + stored += count; + } + + log::info!("Dumped {stored} logs to storage"); Ok(()) } + async fn flush_chunk( + &self, + chunk: Vec, + stored: usize, + total: usize, + ) -> anyhow::Result<()> { + self.storage + .store_logs(chunk.into_iter()) + .await + .inspect_err(|e| log::warn!("Stored {stored}/{total} logs before failure: {e:?}")) + } + pub async fn last_timestamps(&mut self) -> anyhow::Result> { let timestamps = self.storage.get_last_stored().await?; Ok(timestamps) } } + +fn env_size(var: &str, default: usize) -> usize { + match std::env::var(var) { + Ok(value) => match value.parse() { + Ok(parsed) => parsed, + Err(e) => { + log::warn!("Invalid {var}={value:?}: {e}; using default {default}"); + default + } + }, + Err(_) => default, + } +}