feat(connectors): add S3 sink connector#3103
feat(connectors): add S3 sink connector#3103atharvalade wants to merge 5 commits intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3103 +/- ##
=============================================
- Coverage 74.08% 19.27% -54.82%
Complexity 943 943
=============================================
Files 1159 1163 +4
Lines 102033 91064 -10969
Branches 79084 68132 -10952
=============================================
- Hits 75593 17550 -58043
- Misses 23770 73103 +49333
+ Partials 2670 411 -2259
🚀 New features to boost your workflow:
|
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
slbotbm
left a comment
There was a problem hiding this comment.
I left some comments.
Also, do you plan to support using parquet files as well in the future?
| "S3 sink ID: {id} invalid output_format '{}': {e}, defaulting to json_lines", | ||
| config.output_format, | ||
| ); | ||
| OutputFormat::JsonLines | ||
| } | ||
| }; | ||
|
|
||
| let max_file_size_bytes = match parse_file_size(&config.max_file_size) { | ||
| Ok(size) => size, | ||
| Err(e) => { | ||
| tracing::warn!( | ||
| "S3 sink ID: {id} invalid max_file_size '{}': {e}, defaulting to 8 MiB", | ||
| config.max_file_size, | ||
| ); | ||
| 8 * 1024 * 1024 | ||
| } | ||
| }; | ||
|
|
||
| let delay_str = config.retry_delay.as_deref().unwrap_or(DEFAULT_RETRY_DELAY); | ||
| let retry_delay = match humantime::Duration::from_str(delay_str) { | ||
| Ok(d) => d.into(), | ||
| Err(e) => { | ||
| tracing::warn!( | ||
| "S3 sink ID: {id} invalid retry_delay '{delay_str}': {e}, defaulting to 1s", | ||
| ); | ||
| std::time::Duration::from_secs(1) |
There was a problem hiding this comment.
Here, if an error occurs, the connector picks the default option, which may not be something that all users want. Would it be better to gate using the default options behind some kind of flag?
There was a problem hiding this comment.
Good point, I'll change these to return hard errors on invalid config
|
|
||
| let max_messages = self.config.max_messages_per_file.unwrap_or(u64::MAX); |
There was a problem hiding this comment.
If someone chooses number of messages for file rotation and forgets to define max_messages_per_file, max messages will be set to u64::MAX, which means that processes other than the connector will force the flush to s3, which I believe is not an ideal situation.
There was a problem hiding this comment.
I'll add a validation in open() that returns an error if file_rotation = "messages" but max_messages_per_file is not set. do you think this is a good approach?
There was a problem hiding this comment.
I think that is a good approach. In addition, how about erroring hard in let max_messages = self.config.max_messages_per_file.unwrap_or(u64::MAX); instead of passing u64::MAX as a fallback?
| pub fn validate_credentials(config: &S3SinkConfig) -> Result<(), Error> { | ||
| match (&config.access_key_id, &config.secret_access_key) { | ||
| (Some(_), Some(_)) | (None, None) => Ok(()), | ||
| _ => Err(Error::InvalidConfigValue( | ||
| "Partially configured credentials. You must provide both access_key_id \ | ||
| and secret_access_key, or omit both." | ||
| .to_owned(), | ||
| )), | ||
| } | ||
| } | ||
|
|
||
| pub async fn create_bucket(config: &S3SinkConfig) -> Result<Box<Bucket>, Error> { | ||
| validate_credentials(config)?; |
There was a problem hiding this comment.
Wouldn't it be better to inline this function? As far as I can see, it is being used in only one place. You could add a comment instead saying that this piece of code validates the creds.
There was a problem hiding this comment.
yeah fair enough
| pub fn from_str_config(s: &str) -> Result<Self, Error> { | ||
| match s.to_lowercase().as_str() { | ||
| "json_lines" | "jsonl" | "jsonlines" => Ok(OutputFormat::JsonLines), | ||
| "json_array" | "json" => Ok(OutputFormat::JsonArray), | ||
| "raw" => Ok(OutputFormat::Raw), | ||
| other => Err(Error::InvalidConfigValue(format!( | ||
| "Unknown output format: '{other}'. Expected: json_lines, json_array, or raw" | ||
| ))), | ||
| } | ||
| } |
There was a problem hiding this comment.
Wouldn't using From for OutputFormat make more sense here? (though the current version also works just fine)
There was a problem hiding this comment.
I'll implement TryFrom<&str> for OutputFormat which is more idiomatic
|
I also feel data loss due to maximum retries being exceeded should be mentioned in readme.md as a precaution. |
96ed8d1 to
87e0cc0
Compare
oh yes absolutely.. parquet support is on the roadmap as a future |
I agree, I'll add that |
Write Iggy stream messages to Amazon S3 and S3-compatible stores with buffered uploads, configurable rotation, and deterministic offset-based keys.
0a37619 to
00fc2df
Compare
Which issue does this PR close?
Closes #2956
Rationale
Iggy lacks a native way to write stream messages to Amazon S3 and S3-compatible stores (MinIO, Cloudflare R2, Backblaze B2, DigitalOcean Spaces). This is a frequently requested capability for data lake ingestion and long-term archival pipelines.
What changed?
There was no connector for persisting Iggy messages to object storage. Users had to build custom consumers and upload logic to get data into S3.
This PR adds a new
iggy_connector_s3_sinkcrate that implements theSinktrait. It buffers messages in-memory per stream/topic/partition, rotates files by size or message count, renders S3 keys from a configurable path template ({stream}/{topic}/{date}/{hour}/...), and uploads with retry + exponential backoff. Supportsjson_lines,json_array, andrawoutput formats with optional Iggy metadata and header embedding. Usesrust-s3(already in workspace) with path-style addressing auto-enabled for custom endpoints.Key implementation details:
lib.rs(config + entry point),client.rs(S3 client init + bucket verification),buffer.rs(in-memory accumulation + rotation logic),formatter.rs(JSON/raw output + metadata/header inclusion),path.rs(template engine for S3 keys with offset-based filenames),sink.rs(Sink trait: open/consume/close lifecycle)_build_rust_artifacts.ymlandedge-release.ymlfor cdylib plugin builds and release notesLocal Execution
cargo fmt --check-- passcargo clippy --tests -D warnings-- pass (zero warnings)cargo test -p iggy_connector_s3_sink-- 36/36 passmarkdownlint --check-- passtrailing-whitespace-- passtrailing-newline-- passlicense-headers-- passAI Usage
-D warnings, and end-to-end testing with MinIO Docker + Iggy server + CLI producer + connector runtimeHere are all the relevant screenshots:
iggy-testbucketapplication_logsand topicapi_requests.jsonlfile in the correct path structure (application_logs/api_requests/{date}/{hour}/)cargo clippy --tests -D warningspassing with zero warnings