Materialization and Buffering
Materialization is the process of buffering incoming items before processing them, which enables replay functionality when a node needs to restart. This capability is essential for the PipelineErrorDecision.RestartNode feature and is a critical component of NPipeline's resilience framework.
What is Materialization?
In NPipeline, data flows through nodes as streams. By default, streaming data is processed once and discarded - it cannot be replayed. Materialization changes this by:
- Buffering incoming items in memory before processing
- Maintaining a replayable buffer that can be re-enumerated
- Enabling restart functionality by replaying buffered items after failures
Figure: Materialization enables replay functionality by buffering items before processing.
The Critical Role of MaxMaterializedItems
The MaxMaterializedItems parameter in PipelineRetryOptions controls the materialization behavior:
- When
MaxMaterializedItemsis null (default): Unbounded materialization - all items are buffered - When
MaxMaterializedItemshas a value: Limited materialization - only the specified number of items are buffered
This parameter is critical because it determines:
- How many items can be replayed during a node restart
- Memory usage for the buffer
- Whether restart functionality works at all for streaming inputs
⚠️ Critical Warning: Setting MaxMaterializedItems to null (unbounded) silently disables node restart functionality. For detailed explanation of why unbounded buffers break resilience guarantees, see the Node Restart Quick Start Checklist.
CappedReplayableDataPipe Implementation
NPipeline uses the CappedReplayableDataPipe to implement materialization:
The CappedReplayableDataPipe provides:
- Replay capability: Can be re-enumerated multiple times
- Memory limits: Enforces
MaxMaterializedItemswhen specified - Overflow protection: Throws exceptions when buffer limits are exceeded
Memory vs. Durability Trade-offs
Materialization involves a fundamental trade-off between memory usage and resilience capabilities:
Unbounded Materialization (Default)
var unboundedOptions = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: null // No limit
);
Pros:
- Maximum resilience - all items can be replayed
- Simple configuration - no need to estimate buffer size
- Works for any data volume within memory constraints
Cons:
- Potential memory exhaustion with large datasets
- OutOfMemoryException risk in production
- Not suitable for long-running pipelines with high throughput
- Silently disables restart functionality - see Node Restart Quick Start Checklist for details
Bounded Materialization
var boundedOptions = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: 1000 // Limit to 1000 items
);
Pros:
- Predictable memory usage
- Protection against memory exhaustion
- Suitable for production environments
- Enables restart functionality
Cons:
- Limited replay capability
- Buffer overflow exceptions if limits exceeded
- Requires careful capacity planning
Practical Guidance for Setting Buffer Limits
Factors to Consider
- Item Size: Larger items require more memory per item
- Failure Window: How many items might be processed between failures
- Memory Constraints: Available memory for the buffer
- Throughput: Items processed per second
- Recovery Requirements: How far back you need to replay
Calculation Examples
Example 1: Small Items with High Throughput
// Scenario: Processing 1000 small JSON objects per second
// Each object ~1KB, want to buffer 30 seconds of data
var itemsPerSecond = 1000;
var bufferSizeKB = 1;
var secondsToBuffer = 30;
var calculatedLimit = itemsPerSecond * secondsToBuffer; // 30,000 items
var memoryUsageMB = (calculatedLimit * bufferSizeKB) / 1024; // ~29MB
var options = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: calculatedLimit // 30,000 items
);
Example 2: Large Items with Low Throughput
// Scenario: Processing 10 large documents per second
// Each document ~10MB, want to buffer 60 seconds of data
var itemsPerSecond = 10;
var bufferSizeMB = 10;
var secondsToBuffer = 60;
var calculatedLimit = itemsPerSecond * secondsToBuffer; // 600 items
var memoryUsageMB = calculatedLimit * bufferSizeMB; // 6000MB (6GB)
// Since 6GB is too much, we need to compromise
var realisticLimit = 100; // 100 items = 1GB
var realisticSecondsBuffered = realisticLimit / itemsPerSecond; // 10 seconds
var options = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: realisticLimit // 100 items
);
Configuration Examples
Basic Materialization Setup
using NPipeline;
using NPipeline.ErrorHandling;
using NPipeline.Execution.Strategies;
using NPipeline.Pipeline;
public class BufferingPipelineDefinition : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var sourceHandle = builder.AddSource<StreamingDataSource, Data>("source");
var transformHandle = builder
.AddTransform<DataTransform, Data, ProcessedData>("transform")
.WithExecutionStrategy(builder, new ResilientExecutionStrategy(
new SequentialExecutionStrategy()
));
var sinkHandle = builder.AddSink<DataSink, ProcessedData>("sink");
builder.Connect(sourceHandle, transformHandle);
builder.Connect(transformHandle, sinkHandle);
builder.AddPipelineErrorHandler<DefaultPipelineErrorHandler>();
builder.WithRetryOptions(new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5
));
}
}
Per-Node Materialization Configuration
public class PerNodeMaterializationPipelineDefinition : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var sourceHandle = builder.AddSource<StreamingDataSource, Data>("source");
var criticalHandle = builder
.AddTransform<CriticalTransform, Data, ProcessedData>("criticalNode")
.WithExecutionStrategy(builder, new ResilientExecutionStrategy(
new SequentialExecutionStrategy()
))
.WithRetryOptions(builder, new PipelineRetryOptions(
MaxItemRetries: 5,
MaxNodeRestartAttempts: 5,
MaxSequentialNodeAttempts: 10
));
var nonCriticalHandle = builder
.AddTransform<NonCriticalTransform, ProcessedData, FinalData>("nonCriticalNode")
.WithExecutionStrategy(builder, new ResilientExecutionStrategy(
new SequentialExecutionStrategy()
))
.WithRetryOptions(builder, new PipelineRetryOptions(
MaxItemRetries: 2,
MaxNodeRestartAttempts: 1,
MaxSequentialNodeAttempts: 3
));
var sinkHandle = builder.AddSink<DataSink, FinalData>("sink");
builder.Connect(sourceHandle, criticalHandle);
builder.Connect(criticalHandle, nonCriticalHandle);
builder.Connect(nonCriticalHandle, sinkHandle);
builder.AddPipelineErrorHandler<DefaultPipelineErrorHandler>();
}
}
Monitoring and Observability
Buffer Usage Metrics
Monitor materialization buffer usage to detect potential issues:
// Custom observer to track buffer usage
public class MaterializationObserver : IExecutionObserver
{
public void OnRetry(NodeRetryEvent retryEvent)
{
if (retryEvent.RetryKind == RetryKind.NodeRestart)
{
Console.WriteLine($"Node restart: {retryEvent.NodeId}, Attempt: {retryEvent.Attempt}");
}
}
public void OnBufferUsage(string nodeId, int currentItems, int maxItems)
{
var usagePercent = (currentItems * 100) / maxItems;
if (usagePercent > 80)
{
Console.WriteLine($"Warning: Node {nodeId} buffer at {usagePercent}% capacity");
}
}
}
// Register the observer
var context = PipelineContext.Default;
context.ExecutionObserver = new MaterializationObserver();
Detecting Buffer Overflow
When MaxMaterializedItems is set, monitor for overflow exceptions:
public class OverflowAwareErrorHandler : IPipelineErrorHandler
{
public async Task<PipelineErrorDecision> HandleNodeFailureAsync(
string nodeId,
Exception error,
PipelineContext context,
CancellationToken cancellationToken)
{
// Detect buffer overflow
if (error.Message.Contains("Resilience materialization exceeded MaxMaterializedItems"))
{
Console.WriteLine($"Buffer overflow detected for node {nodeId}");
// Consider alternative recovery strategy
return PipelineErrorDecision.ContinueWithoutNode;
}
// Normal error handling logic
return await HandleNormalFailure(nodeId, error, context, cancellationToken);
}
}
Best Practices
1. Start with Conservative Limits
// Conservative starting point
var conservativeOptions = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: 1000 // Start with 1000 items
);
2. Monitor and Adjust
// Production monitoring approach
public class ProductionBufferMonitor
{
private readonly Dictionary<string, BufferStats> _stats = new();
public void RecordBufferUsage(string nodeId, int currentItems, int maxItems)
{
if (!_stats.ContainsKey(nodeId))
_stats[nodeId] = new BufferStats();
_stats[nodeId].Update(currentItems, maxItems);
// Alert if consistently high usage
if (_stats[nodeId].AverageUsagePercent > 80)
{
AlertHighBufferUsage(nodeId, _stats[nodeId]);
}
}
}
3. Plan for Growth
// Growth-aware configuration
var growthFactor = 1.5; // 50% growth buffer
var baselineItems = 1000;
var growthAwareLimit = (int)(baselineItems * growthFactor);
var growthAwareOptions = new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2,
MaxSequentialNodeAttempts: 5,
MaxMaterializedItems: growthAwareLimit
);
Common Pitfalls
Pitfall 1: Underestimating Item Size
// WRONG: Assuming small items
var wrongOptions = new PipelineRetryOptions(
MaxMaterializedItems: 10000 // 10,000 items
);
// CORRECT: Accounting for actual item size
var itemSizeKB = EstimateItemSize();
var memoryBudgetMB = 500; // 500MB budget
var calculatedLimit = (memoryBudgetMB * 1024) / itemSizeKB;
var correctOptions = new PipelineRetryOptions(
MaxMaterializedItems: calculatedLimit
);
Pitfall 2: Ignoring Memory Pressure
// WRONG: No monitoring
public class UnmonitoredPipelineDefinition : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var nodeHandle = builder
.AddTransform<MyTransform, Input, Output>("node")
.WithExecutionStrategy(builder, new ResilientExecutionStrategy(
new SequentialExecutionStrategy()
));
}
}
// CORRECT: With memory monitoring
public class MonitoredPipelineDefinition : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var nodeHandle = builder
.AddTransform<MyTransform, Input, Output>("node")
.WithExecutionStrategy(builder, new ResilientExecutionStrategy(
new SequentialExecutionStrategy()
));
builder.AddPipelineErrorHandler<MemoryAwareErrorHandler>();
}
}
Next Steps
- Node Restart Quick Start Checklist: Complete step-by-step configuration guide for node restart functionality
- Dependency Chains: Understand the critical prerequisite relationships
- Configuration Guide: Get practical implementation guidance
- Troubleshooting: Learn to diagnose and resolve materialization issues