Skip to content

RFC: Log Streaming & SIEM Integration #522

@lakhansamani

Description

@lakhansamani

RFC: Log Streaming & SIEM Integration

Phase: 5 — Advanced Security & Enterprise
Priority: P3 — Medium
Estimated Effort: Medium
Depends on: Audit Logs (#505)


Problem Statement

Enterprise customers need to feed auth events into their existing security tooling (Datadog, Splunk, S3, SIEM platforms). The audit log system (#505) stores events in Authorizer's database, but enterprises need real-time streaming to external destinations for centralized monitoring and compliance. WorkOS Log Streams provides this capability.


Proposed Solution

1. Log Stream Configuration

New schema: internal/storage/schemas/log_stream.go

type LogStream struct {
    ID              string `json:"id" gorm:"primaryKey;type:char(36)"`
    Name            string `json:"name" gorm:"type:varchar(256)"`
    DestinationType string `json:"destination_type" gorm:"type:varchar(50)"`   // webhook | s3 | datadog | splunk
    Config          string `json:"config" gorm:"type:text"`                     // JSON destination-specific config
    EventFilter     string `json:"event_filter" gorm:"type:text"`              // JSON: which event types to stream
    IsActive        bool   `json:"is_active" gorm:"type:bool;default:true"`
    OrganizationID  string `json:"organization_id" gorm:"type:char(36);index"` // optional org scoping
    CreatedAt       int64  `json:"created_at" gorm:"autoCreateTime"`
    UpdatedAt       int64  `json:"updated_at" gorm:"autoUpdateTime"`
}

2. Destination Providers

Interface: internal/log_stream/

type Destination interface {
    Send(ctx context.Context, events []AuditEvent) error
    HealthCheck(ctx context.Context) error
    Name() string
}

Supported destinations:

Destination Config Fields Format
HTTP Webhook url, headers, auth_header JSON POST
AWS S3 bucket, prefix, region, access_key, secret_key JSONL files
Datadog api_key, site (us1/eu1/etc) Datadog Log API
Splunk HEC url, token Splunk HTTP Event Collector

3. Structured Event Format

All destinations receive events in a consistent JSON format:

{
    "id": "evt_a1b2c3d4",
    "timestamp": "2026-03-30T10:15:30Z",
    "type": "user.login_success",
    "version": "1.0",
    "actor": {
        "id": "usr_123",
        "type": "user",
        "email": "john@example.com",
        "ip_address": "203.0.113.42",
        "user_agent": "Mozilla/5.0..."
    },
    "target": {
        "type": "session",
        "id": "sess_456"
    },
    "metadata": {
        "method": "password",
        "mfa_used": true,
        "device_hash": "abc123..."
    },
    "organization_id": "org_789",
    "environment": {
        "authorizer_version": "2.0.0",
        "instance_id": "inst_..."
    }
}

4. Streaming Architecture

Built on top of audit log system (#505) — the audit log's buffered channel feeds both database writes and log streams:

func (a *auditProvider) flushLoop() {
    for {
        select {
        case event := <-a.eventChan:
            batch = append(batch, event)
            if len(batch) >= batchSize || ticker fired {
                // Write to database
                a.writeBatch(batch)
                // Fan out to active log streams
                a.streamBatch(batch)
                batch = batch[:0]
            }
        }
    }
}

func (a *auditProvider) streamBatch(events []AuditEvent) {
    streams, _ := a.store.ListActiveLogStreams(ctx)
    for _, stream := range streams {
        // Filter events based on stream's event filter
        filtered := filterEvents(events, stream.EventFilter)
        if len(filtered) == 0 { continue }
        
        // Send asynchronously with retry
        go a.sendWithRetry(stream, filtered)
    }
}

5. Batching and Retry

  • Batch size: configurable (default: 100 events or 5 seconds, whichever comes first)
  • Retry: exponential backoff (1s, 2s, 4s, 8s, max 60s), max 5 retries
  • Dead letter: after max retries, log failure and store in dead letter table for manual retry
  • Backpressure: if destination is consistently slow, buffer up to max buffer size, then drop oldest events with warning
func (a *auditProvider) sendWithRetry(stream *schemas.LogStream, events []AuditEvent) {
    dest := a.getDestination(stream)
    for attempt := 0; attempt < maxRetries; attempt++ {
        err := dest.Send(ctx, events)
        if err == nil { return }
        time.Sleep(backoff(attempt))
    }
    // Dead letter
    a.storeDeadLetter(stream.ID, events)
    a.log.Error().Str("stream", stream.Name).Msg("log stream delivery failed after retries")
}

6. GraphQL Admin API

type LogStream {
    id: ID!
    name: String!
    destination_type: String!
    config: Map                        # masked sensitive fields
    event_filter: [String!]
    is_active: Boolean!
    organization_id: String
    created_at: Int64!
}

type Mutation {
    _create_log_stream(params: CreateLogStreamInput!): LogStream!
    _update_log_stream(params: UpdateLogStreamInput!): LogStream!
    _delete_log_stream(id: ID!): Response!
    _test_log_stream(id: ID!): Response!       # Send test event to verify connectivity
}

type Query {
    _log_streams(params: PaginatedInput): LogStreams!
    _log_stream(id: ID!): LogStream!
}

input CreateLogStreamInput {
    name: String!
    destination_type: String!          # webhook | s3 | datadog | splunk
    config: Map!                       # destination-specific config
    event_filter: [String!]            # empty = all events
    organization_id: String            # optional
}

CLI Configuration Flags

--log-stream-batch-size=100                # Events per batch
--log-stream-flush-interval=5s             # Max time before flush
--log-stream-max-retries=5                 # Retry attempts
--log-stream-buffer-size=10000             # Max buffered events before dropping

Testing Plan

  • Unit tests for each destination provider (mock HTTP)
  • Integration test: create stream → trigger events → verify delivery
  • Test batching (events arrive in batches)
  • Test retry with exponential backoff
  • Test dead letter storage after max retries
  • Test event filtering (only configured event types streamed)
  • Test _test_log_stream sends test event

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions