Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Streaming Pipeline

The streaming pipeline enables real-time output of generated records through the PhaseSink trait and StreamPipeline orchestrator.

Architecture

Generator -> PhaseSink -> StreamTarget -> Output
               |
          Buffer (configurable)
               |
     Backpressure Strategy

PhaseSink Trait

The PhaseSink trait provides phase-aware streaming:

#![allow(unused)]
fn main() {
pub trait PhaseSink: Send + Sync {
    fn on_phase_start(&self, phase: &str) -> Result<(), SynthError>;
    fn on_record(&self, phase: &str, record: &serde_json::Value) -> Result<(), SynthError>;
    fn on_phase_end(&self, phase: &str) -> Result<(), SynthError>;
    fn flush(&self) -> Result<(), SynthError>;
}
}

Stream Targets

TargetDescription
FileWrites JSONL to a file path
HTTPPosts records to an HTTP endpoint
NoOpDiscards output (benchmarking)

JSONL Format

Each line is a JSON object:

{"phase":"journal_entries","record":{"id":"JE-001","date":"2024-01-15"},"timestamp":"2024-01-15T10:30:00Z"}

Backpressure Strategies

StrategyBehavior
blockBlock producer when buffer full (default, no data loss)
drop_oldestDrop oldest buffered items
drop_newestDrop incoming items
bufferUnbounded buffer growth (use with caution)

CLI Usage

# Stream to JSONL file
datasynth-data generate --config config.yaml --stream-file ./output/stream.jsonl --output ./output

Python Usage

from datasynth_py.config import blueprints

config = blueprints.retail_small()
config = blueprints.with_streaming(config, buffer_size=5000, backpressure="block")

# Or via generate()
result = synth.generate(config, stream_file="./output/stream.jsonl")

Configuration

streaming:
  enabled: true
  target: file
  file_path: ./output/stream.jsonl
  buffer_size: 1000
  backpressure: block
  phase_filters:
    master_data: true
    journal_entries: true
    document_flows: true
    anomaly_injection: true
    ocpm: true