Skip to main content

Thread Safety Guidelines

NPipeline is designed for high-performance streaming data processing. This document clarifies thread safety requirements and best practices.

Overview​

NPipeline is primarily designed for single-threaded pipeline execution. However, it provides extensive support for parallel processing of individual data items. Understanding the distinction is critical.

πŸ”‘ Critical Concept

NPipeline separates pipeline-level execution from item-level parallelism:

  • Pipeline-level: Single-threaded (one pipeline instance, one context)
  • Item-level: Can be parallel (different items processed by different threads)
  • Shared state: Only the node instances are shared; items are always independent

Single-Pipeline Execution (Default)​

In the most common scenario, a single pipeline processes a stream of data items sequentially:

var context = new PipelineContext();
var pipeline = builder.Build();
await pipeline.ExecuteAsync(dataSource, context);

In this case:

  • No synchronization needed
  • Direct access to context.Items, context.Parameters, and context.Properties is safe
  • All operations are single-threaded
  • Maximum performance with zero overhead

Parallel Node Execution​

When using parallel execution strategies, NPipeline processes independent data items concurrently:

var context = new PipelineContext();
var pipeline = builder
.AddNode(sourceNode)
.AddNode(transformNode)
.ConfigureParallel(parallelNode, options => options.WithMaxDegreeOfParallelism(4))
.AddNode(sinkNode)
.Build();

Critical Distinction​

Each worker thread processes different data items, not shared state. The PipelineContext itself is not shared across threadsβ€”only node instances and immutable configuration are shared.

Shared State During Parallel Execution​

If you need shared mutable state during parallel execution, DO NOT access context.Items or context.Parameters directly. Instead:

⚠️ Warning

Do NOT modify context.Items or context.Parameters during parallel item processing. These are not thread-safe. Use IPipelineStateManager or node-level synchronization instead.

// During context setup
var stateManager = new MyThreadSafeStateManager();
context.Properties[PipelineContextKeys.StateManager] = stateManager;

// In your transform node
public override async ValueTask<TOut> TransformAsync(TIn input, PipelineContext context, CancellationToken ct)
{
var stateManager = context.StateManager;
if (stateManager != null)
{
// Thread-safe operations
var state = await stateManager.GetStateAsync("myKey", ct);
// ... process ...
await stateManager.UpdateStateAsync("myKey", newState, ct);
}
return output;
}

Option 2: Node-Level Synchronization​

public class ThreadSafeTransform<T> : TransformNode<T, T>
{
private readonly object _syncLock = new();

public override async ValueTask<T> TransformAsync(T input, PipelineContext context, CancellationToken ct)
{
lock (_syncLock)
{
// Synchronize access to shared state
var value = context.Items.TryGetValue("key", out var v) ? v : null;
context.Items["key"] = UpdateSharedState(value);
}
return input;
}
}

Option 3: Atomic Operations for Simple Counters​

For simple counters or flags, use System.Threading.Interlocked:

public class CountingTransform : TransformNode<int, int>
{
private long _processedCount = 0;

public override async ValueTask<int> TransformAsync(int input, PipelineContext context, CancellationToken ct)
{
Interlocked.Increment(ref _processedCount);
return input;
}
}

Context Dictionary Thread Safety​

Parameters Dictionary​

  • Thread Safe? NO
  • When Safe? During initialization (before pipeline execution)
  • Use Case? Configuration values that don't change during execution
  • Recommendation? Populate this during setup phase only

Items Dictionary​

  • Thread Safe? NO
  • When Safe? Single-threaded pipeline execution
  • Use Case? Node-to-node communication, metrics storage
  • Recommendation? In parallel scenarios, use IPipelineStateManager

Properties Dictionary​

  • Thread Safe? NO
  • When Safe? Single-threaded pipeline execution
  • Use Case? Extension points, plugin configuration
  • Recommendation? Store thread-safe objects (like IPipelineStateManager) here

Why Not ConcurrentDictionary?​

You might wonder: "Why not just use ConcurrentDictionary for thread safety?"

Design Rationale:

  1. Performance: Thread-safe operations add overhead (locks, memory barriers, allocations)
  2. Common Case: ~99% of pipelines run single-threaded; paying the overhead for all is wasteful
  3. Philosophy: NPipeline follows "pay only for what you use"
  4. Alternatives: When thread-safe state IS needed, IPipelineStateManager is more purpose-built

Parallel Execution Best Practices​

DO​

  • Use IPipelineStateManager for shared state in parallel scenarios
  • Process independent data items in parallel workers
  • Store immutable configuration in context properties
  • Use atomic operations for simple counters
  • Synchronize access to shared mutable state

DON'T​

  • Directly modify context.Items from multiple threads without synchronization
  • Assume context dictionaries are thread-safe
  • Share mutable state between parallel workers without explicit synchronization
  • Access context.Parameters after pipeline execution has started (in parallel scenarios)

Example: Safe Parallel Processing​

public class SafeParallelTransform : TransformNode<DataItem, ProcessedItem>
{
public override async ValueTask<ProcessedItem> TransformAsync(
DataItem input,
PipelineContext context,
CancellationToken ct)
{
// Process the individual item (independent work)
var processed = ProcessItem(input);

// If you need to share state across parallel workers:
var stateManager = context.StateManager;
if (stateManager != null)
{
// Use thread-safe state manager
await stateManager.UpdateMetricsAsync("processed_count", 1, ct);
}

return processed;
}

private ProcessedItem ProcessItem(DataItem input)
{
// Independent processing - no shared state access
return new ProcessedItem { /* ... */ };
}
}

See Also​