Thread Safety Guidelines
NPipeline is designed for high-performance streaming data processing. This document clarifies thread safety requirements and best practices.
Overviewβ
NPipeline is primarily designed for single-threaded pipeline execution. However, it provides extensive support for parallel processing of individual data items. Understanding the distinction is critical.
π Critical Concept
NPipeline separates pipeline-level execution from item-level parallelism:
- Pipeline-level: Single-threaded (one pipeline instance, one context)
- Item-level: Can be parallel (different items processed by different threads)
- Shared state: Only the node instances are shared; items are always independent
Single-Pipeline Execution (Default)β
In the most common scenario, a single pipeline processes a stream of data items sequentially:
var context = new PipelineContext();
var pipeline = builder.Build();
await pipeline.ExecuteAsync(dataSource, context);
In this case:
- No synchronization needed
- Direct access to
context.Items,context.Parameters, andcontext.Propertiesis safe - All operations are single-threaded
- Maximum performance with zero overhead
Parallel Node Executionβ
When using parallel execution strategies, NPipeline processes independent data items concurrently:
var context = new PipelineContext();
var pipeline = builder
.AddNode(sourceNode)
.AddNode(transformNode)
.ConfigureParallel(parallelNode, options => options.WithMaxDegreeOfParallelism(4))
.AddNode(sinkNode)
.Build();
Critical Distinctionβ
Each worker thread processes different data items, not shared state. The PipelineContext itself is not shared across threadsβonly node instances and immutable configuration are shared.
Shared State During Parallel Executionβ
If you need shared mutable state during parallel execution, DO NOT access context.Items or context.Parameters directly. Instead:
β οΈ Warning
Do NOT modify
context.Itemsorcontext.Parametersduring parallel item processing. These are not thread-safe. UseIPipelineStateManageror node-level synchronization instead.
Option 1: Use IPipelineStateManager (Recommended)β
// During context setup
var stateManager = new MyThreadSafeStateManager();
context.Properties[PipelineContextKeys.StateManager] = stateManager;
// In your transform node
public override async ValueTask<TOut> TransformAsync(TIn input, PipelineContext context, CancellationToken ct)
{
var stateManager = context.StateManager;
if (stateManager != null)
{
// Thread-safe operations
var state = await stateManager.GetStateAsync("myKey", ct);
// ... process ...
await stateManager.UpdateStateAsync("myKey", newState, ct);
}
return output;
}
Option 2: Node-Level Synchronizationβ
public class ThreadSafeTransform<T> : TransformNode<T, T>
{
private readonly object _syncLock = new();
public override async ValueTask<T> TransformAsync(T input, PipelineContext context, CancellationToken ct)
{
lock (_syncLock)
{
// Synchronize access to shared state
var value = context.Items.TryGetValue("key", out var v) ? v : null;
context.Items["key"] = UpdateSharedState(value);
}
return input;
}
}
Option 3: Atomic Operations for Simple Countersβ
For simple counters or flags, use System.Threading.Interlocked:
public class CountingTransform : TransformNode<int, int>
{
private long _processedCount = 0;
public override async ValueTask<int> TransformAsync(int input, PipelineContext context, CancellationToken ct)
{
Interlocked.Increment(ref _processedCount);
return input;
}
}
Context Dictionary Thread Safetyβ
Parameters Dictionaryβ
- Thread Safe? NO
- When Safe? During initialization (before pipeline execution)
- Use Case? Configuration values that don't change during execution
- Recommendation? Populate this during setup phase only
Items Dictionaryβ
- Thread Safe? NO
- When Safe? Single-threaded pipeline execution
- Use Case? Node-to-node communication, metrics storage
- Recommendation? In parallel scenarios, use
IPipelineStateManager
Properties Dictionaryβ
- Thread Safe? NO
- When Safe? Single-threaded pipeline execution
- Use Case? Extension points, plugin configuration
- Recommendation? Store thread-safe objects (like
IPipelineStateManager) here
Why Not ConcurrentDictionary?β
You might wonder: "Why not just use ConcurrentDictionary for thread safety?"
Design Rationale:
- Performance: Thread-safe operations add overhead (locks, memory barriers, allocations)
- Common Case: ~99% of pipelines run single-threaded; paying the overhead for all is wasteful
- Philosophy: NPipeline follows "pay only for what you use"
- Alternatives: When thread-safe state IS needed,
IPipelineStateManageris more purpose-built
Parallel Execution Best Practicesβ
DOβ
- Use
IPipelineStateManagerfor shared state in parallel scenarios - Process independent data items in parallel workers
- Store immutable configuration in context properties
- Use atomic operations for simple counters
- Synchronize access to shared mutable state
DON'Tβ
- Directly modify
context.Itemsfrom multiple threads without synchronization - Assume context dictionaries are thread-safe
- Share mutable state between parallel workers without explicit synchronization
- Access
context.Parametersafter pipeline execution has started (in parallel scenarios)
Example: Safe Parallel Processingβ
public class SafeParallelTransform : TransformNode<DataItem, ProcessedItem>
{
public override async ValueTask<ProcessedItem> TransformAsync(
DataItem input,
PipelineContext context,
CancellationToken ct)
{
// Process the individual item (independent work)
var processed = ProcessItem(input);
// If you need to share state across parallel workers:
var stateManager = context.StateManager;
if (stateManager != null)
{
// Use thread-safe state manager
await stateManager.UpdateMetricsAsync("processed_count", 1, ct);
}
return processed;
}
private ProcessedItem ProcessItem(DataItem input)
{
// Independent processing - no shared state access
return new ProcessedItem { /* ... */ };
}
}