Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/collector-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "collector-utils"
version = "2.0.2"
version = "2.0.3"
edition = "2021"

[dependencies]
Expand Down
23 changes: 23 additions & 0 deletions crates/collector-utils/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,29 @@ pub struct QueryExecutedRow {
}

impl QueryExecutedRow {
pub fn worker_id(&self) -> &str {
&self.worker_id
}

/// Rough estimate of the heap + stack memory occupied by this row.
/// Used to bound the size of buffers and INSERT batches.
pub fn estimated_size(&self) -> usize {
std::mem::size_of::<Self>()
+ self.client_id.len()
+ self.worker_id.len()
+ self.query_id.len()
+ self.dataset.len()
+ self.dataset_id.len()
+ self.request_id.len()
+ self.chunk_id.len()
+ self.query.len()
+ self.query_hash.len()
+ self.output_hash.len()
+ self.error_msg.len()
+ self.client_signature.len()
+ self.worker_version.len()
}

pub fn try_from(
query_executed: QueryExecuted,
worker_id: PeerId,
Expand Down
2 changes: 1 addition & 1 deletion crates/logs-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "logs-collector"
version = "2.1.8"
version = "2.1.9"
edition = "2021"

[dependencies]
Expand Down
133 changes: 126 additions & 7 deletions crates/logs-collector/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,43 @@ use sqd_network_transport::PeerId;

use collector_utils::{QueryExecutedRow, Storage};

/// Default maximum estimated size (bytes) of a single INSERT batch sent to
/// ClickHouse. Buffers larger than this are split into several INSERTs so that
/// an oversized batch can't fail repeatedly and stall log collection.
const DEFAULT_MAX_BATCH_SIZE: usize = 32 << 20; // 32 MiB

/// Default maximum estimated size (bytes) of logs kept in memory between dumps.
/// Once reached, further logs are dropped for the current round; since the
/// ClickHouse watermark hasn't advanced for them, they are re-collected on the
/// next round once the buffer drains.
const DEFAULT_MAX_BUFFER_SIZE: usize = 256 << 20; // 256 MiB

struct Buffer {
logs: Vec<QueryExecutedRow>,
/// Running estimate of `logs`' memory footprint, kept in sync on push/take.
size: usize,
}

pub struct LogsCollector<T: Storage + Sync> {
storage: T,
buffered_logs: Mutex<Vec<QueryExecutedRow>>,
buffer: Mutex<Buffer>,
max_batch_size: usize,
max_buffer_size: usize,
}

impl<T: Storage + Sync> LogsCollector<T> {
pub fn new(storage: T) -> Self {
let max_batch_size = env_size("MAX_INSERT_BATCH_BYTES", DEFAULT_MAX_BATCH_SIZE);
let max_buffer_size =
env_size("MAX_BUFFER_BYTES", DEFAULT_MAX_BUFFER_SIZE).max(max_batch_size);
Self {
storage,
buffered_logs: Default::default(),
buffer: Mutex::new(Buffer {
logs: Vec::new(),
size: 0,
}),
max_batch_size,
max_buffer_size,
}
}

Expand All @@ -27,19 +54,111 @@ impl<T: Storage + Sync> LogsCollector<T> {
.map_err(|e| log::warn!("Invalid log message from {worker_id}: {e}"))
.ok()
});
self.buffered_logs.lock().extend(rows);
// TODO: limit memory usage

let mut buffer = self.buffer.lock();
let mut dropped = 0;
for row in rows {
if buffer.size >= self.max_buffer_size {
dropped += 1;
continue;
}
buffer.size += row.estimated_size();
buffer.logs.push(row);
}
if dropped > 0 {
log::warn!(
"Buffer full ({} bytes), dropped {dropped} logs from {worker_id}; \
they will be re-collected once the buffer drains",
self.max_buffer_size
);
}
}

pub async fn dump_buffer(&mut self) -> anyhow::Result<()> {
let logs = self.buffered_logs.get_mut().drain(..);
log::info!("Dumping {} logs to storage", logs.len());
self.storage.store_logs(logs).await?;
let logs = {
let buffer = self.buffer.get_mut();
buffer.size = 0;
std::mem::take(&mut buffer.logs)
};
let total = logs.len();
if total == 0 {
return Ok(());
}

// Flush in memory-bounded chunks so no single INSERT can be too large.
// On failure the unstored chunks are dropped, but they are re-collected
// next round: the ClickHouse watermark is MAX(worker_timestamp) per worker
// (see get_last_stored), and a single worker's logs are appended to the
// buffer in ascending-timestamp order (collect_logs in server.rs requests
// pages sequentially per worker). Chunks split the buffer at contiguous
// positions, so any un-stored row has a timestamp >= the advanced
// watermark and will be requested again. This ordering invariant is what
// makes partial-failure safe; it breaks if collection is ever parallelized
// within a single worker.
let mut stored = 0;
let mut chunk: Vec<QueryExecutedRow> = Vec::new();
let mut chunk_size = 0;
for row in logs {
let row_size = row.estimated_size();
if chunk_size + row_size > self.max_batch_size {
if chunk.is_empty() {
// A single row exceeds the batch limit. Sending it alone is the
// best we can do; warn so a persistently-failing oversized row
// (which would stall this worker) is visible.
log::warn!(
"Single log row from {} is {row_size} bytes, exceeding the \
{} byte batch limit",
row.worker_id(),
self.max_batch_size
);
} else {
let count = chunk.len();
self.flush_chunk(std::mem::take(&mut chunk), stored, total)
.await?;
stored += count;
chunk_size = 0;
}
}
chunk_size += row_size;
chunk.push(row);
}
if !chunk.is_empty() {
let count = chunk.len();
self.flush_chunk(chunk, stored, total).await?;
stored += count;
}

log::info!("Dumped {stored} logs to storage");
Ok(())
}

async fn flush_chunk(
&self,
chunk: Vec<QueryExecutedRow>,
stored: usize,
total: usize,
) -> anyhow::Result<()> {
self.storage
.store_logs(chunk.into_iter())
.await
.inspect_err(|e| log::warn!("Stored {stored}/{total} logs before failure: {e:?}"))
}

pub async fn last_timestamps(&mut self) -> anyhow::Result<HashMap<String, u64>> {
let timestamps = self.storage.get_last_stored().await?;
Ok(timestamps)
}
}

fn env_size(var: &str, default: usize) -> usize {
match std::env::var(var) {
Ok(value) => match value.parse() {
Ok(parsed) => parsed,
Err(e) => {
log::warn!("Invalid {var}={value:?}: {e}; using default {default}");
default
}
},
Err(_) => default,
}
}
Loading