Pipeline Composition Extension
Overview
The NPipeline.Extensions.Composition extension enables creating hierarchical, modular pipelines by treating entire pipelines as reusable transform nodes. This powerful capability allows you to build complex data processing workflows from simpler, well-tested building blocks.
Key Features
- Modular Design: Break complex pipelines into smaller, reusable sub-pipelines
- Type Safety: Full compile-time type checking across pipeline boundaries
- Context Control: Fine-grained control over what data flows between parent and sub-pipelines
- Isolation: Sub-pipelines execute in isolated contexts, preventing unintended side effects
- Nested Composition: Unlimited nesting depth for hierarchical pipeline structures
- High Performance: Minimal overhead with scoped runner resolution and optimized context creation
- Observability Inheritance: Control whether child pipelines inherit observers, lineage sinks, and dead letter handlers
- Pipeline Identity: Lineage and metrics carry pipeline names for unambiguous nested node identification
Installation
dotnet add package NPipeline.Extensions.Composition
Quick Start
Basic Composition
using NPipeline.Extensions.Composition;
using NPipeline.Pipeline;
// Define a sub-pipeline for validation
public class ValidationPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<Customer>, Customer>("input");
var validate = builder.AddTransform<ValidatorNode, Customer, ValidatedCustomer>("validate");
var output = builder.AddSink<PipelineOutputSink<ValidatedCustomer>, ValidatedCustomer>("output");
builder.Connect(input, validate);
builder.Connect(validate, output);
}
}
// Use in parent pipeline
public class DataProcessingPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<CustomerSource, Customer>("customers");
// Add validation as a composite node
var validate = builder.AddComposite<Customer, ValidatedCustomer, ValidationPipeline>("validate");
var sink = builder.AddSink<DatabaseSink, ValidatedCustomer>("database");
builder.Connect(source, validate);
builder.Connect(validate, sink);
}
}
Core Concepts
Composite Transform Node
A composite node is a special transform node that executes an entire sub-pipeline for each input item. It:
- Receives an input item from the parent pipeline
- Creates an isolated sub-pipeline context
- Passes the input to the sub-pipeline
- Executes the sub-pipeline
- Retrieves the output from the sub-pipeline
- Returns the output to the parent pipeline
Sub-Pipeline Structure
Sub-pipelines must follow a specific structure:
- Input: Use
AddCompositeInput<T>()(preferred) orAddSource<PipelineInputSource<T>, T>()to receive data from the parent - Processing: Use any standard NPipeline nodes (transforms, filters, etc.)
- Output: Use
AddCompositeOutput<T>()(preferred) orAddSink<PipelineOutputSink<T>, T>()to return data to the parent
Using the dedicated AddCompositeInput / AddCompositeOutput helpers registers bridge nodes with
NodeKind.CompositeInput and NodeKind.CompositeOutput so tools like NPipeline Studio can distinguish
them from regular source/sink nodes.
public class MySubPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
// Input node — receives from parent (CompositeInput kind)
var input = builder.AddCompositeInput<TInput>("input");
// Processing nodes
var transform = builder.AddTransform<MyTransform, TInput, TOutput>("process");
// Output node — returns to parent (CompositeOutput kind)
var output = builder.AddCompositeOutput<TOutput>("output");
builder.Connect(input, transform);
builder.Connect(transform, output);
}
}
Context Configuration
Control what data the sub-pipeline inherits from the parent:
// No inheritance (default)
builder.AddComposite<TIn, TOut, SubPipeline>(
contextConfiguration: CompositeContextConfiguration.Default);
// Inherit everything
builder.AddComposite<TIn, TOut, SubPipeline>(
contextConfiguration: CompositeContextConfiguration.InheritAll);
// Custom inheritance
builder.AddComposite<TIn, TOut, SubPipeline>(
contextConfiguration: new CompositeContextConfiguration
{
InheritParentParameters = true,
InheritParentItems = false,
InheritParentProperties = true
});
// Using configuration action
builder.AddComposite<TIn, TOut, SubPipeline>(
configureContext: config =>
{
config.InheritParentParameters = true;
config.InheritParentItems = true;
});
Observability and Lineage Inheritance
Control whether child pipelines inherit instrumentation from the parent:
builder.AddComposite<TIn, TOut, SubPipeline>(
contextConfiguration: new CompositeContextConfiguration
{
InheritExecutionObserver = true, // Child node events flow through parent observer
InheritLineageSink = true, // Child lineage uses parent sink
InheritDeadLetterDecorator = true, // Child dead letters use parent handler
InheritRunIdentity = true, // Shared run identity
});
Preconfigured Node Override
Runtime systems can replace preconfigured node instances (e.g., to inject run-scoped composite nodes):
// Replace an existing preconfigured instance
builder.SetPreconfiguredNodeInstance(nodeId, newInstance, replaceExisting: true);
Architecture
Data Flow
Parent Pipeline:
[Source] → [Composite Node] → [Sink]
↓
Sub-Pipeline:
[PipelineInputSource] → [Transform] → [PipelineOutputSink]
Context Isolation
Sub-pipelines execute in isolated contexts:
- Isolated by Default: Changes to sub-pipeline context don't affect parent
- Optional Inheritance: Parent context data can be copied to sub-pipeline
- Thread-Safe: Multiple composite nodes can execute concurrently
- Resource Management: Sub-pipeline resources are properly disposed
Performance Characteristics
- Single-Item Processing: Each item is processed independently
- Minimal Overhead: Runner resolved from DI when available, with a fallback static runner
- Memory Efficient: Only input/output items in memory at once
- No Buffering: Items flow directly through the pipeline hierarchy
Node Kinds
Composite pipelines introduce three dedicated NodeKind values:
| Kind | Description |
|---|---|
Composite | A transform node that executes a sub-pipeline. Registered via AddComposite<>(). |
CompositeInput | A bridge source inside a sub-pipeline that reads from the parent context. Registered via AddCompositeInput<T>(). |
CompositeOutput | A bridge sink inside a sub-pipeline that writes back to the parent context. Registered via AddCompositeOutput<T>(). |
These kinds allow tools like NPipeline Studio to distinguish composite nodes from regular transforms/sources/sinks without fragile type-hierarchy scanning.
Child Definition Type & Metadata
Each composite node stores the child IPipelineDefinition type directly on NodeDefinition.ChildDefinitionType. This eliminates the need for reflection-based type detection:
var compositeNode = graph.Nodes.First(n => n.Kind == NodeKind.Composite);
var childType = compositeNode.ChildDefinitionType; // e.g., typeof(ValidationPipeline)
You can also attach arbitrary metadata to any node using SetNodeMetadata:
builder.SetNodeMetadata(handle.Id, "Description", "Validates customer records");
builder.SetNodeMetadata(handle.Id, "Version", 2);
Child Graphs
When a pipeline is built, the builder automatically extracts and attaches child sub-pipeline graphs for all composite nodes. Access them via PipelineGraph.ChildGraphs:
var pipeline = builder.Build();
// Child graphs keyed by composite node ID
if (pipeline.Graph.ChildGraphs is not null)
{
foreach (var (compositeNodeId, childGraph) in pipeline.Graph.ChildGraphs)
{
Console.WriteLine($"Composite '{compositeNodeId}' has {childGraph.Nodes.Length} child nodes");
}
}
This eliminates the need for consumers to separately instantiate child definitions and build their graphs.
Node ID Namespacing
When multiple composite nodes contain sub-pipelines with the same node names (e.g., "input", "output"), use CompositeNaming to create globally unique IDs:
// Create namespaced node IDs
var id = CompositeNaming.PrefixNodeId("validate", "input"); // "validate::input"
// Parse namespaced IDs
CompositeNaming.IsNamespaced("validate::input"); // true
CompositeNaming.GetParentNodeId("validate::input"); // "validate"
CompositeNaming.GetChildNodeId("validate::input"); // "input"
Dependency Injection Support
Composite nodes can use DI-resolved child pipeline definitions by passing an IServiceProvider:
// Register child definition in DI container
services.AddTransient<ValidationPipeline>();
// Pass service provider when adding composite node
builder.AddComposite<Customer, ValidatedCustomer, ValidationPipeline>(
name: "validate",
serviceProvider: serviceProvider);
You can also run pre-instantiated definitions directly:
var runner = PipelineRunner.Create();
var definition = new MyPipelineDefinition(injectedService);
await runner.RunAsync(definition, context);
Advanced Topics
See the following guides for detailed information:
- Context Inheritance - Detailed guide on context configuration
- Nested Composition - Building deep pipeline hierarchies
- Error Handling - Managing errors across pipeline boundaries
- Performance Optimization - Best practices for high-performance scenarios
- Testing Strategies - How to test composite pipelines effectively
Examples
Complete examples are available in the samples directory:
- Basic composition with simple sub-pipelines
- Context inheritance patterns
- Nested composition scenarios
- Error handling across boundaries
- Complex multi-stage processing
Best Practices
1. Keep Sub-Pipelines Focused
Each sub-pipeline should have a single, well-defined responsibility:
✅ Good: ValidationPipeline, EnrichmentPipeline, TransformationPipeline
❌ Bad: DoEverythingPipeline
2. Use Meaningful Names
Name composite nodes and sub-pipelines descriptively:
✅ Good:
builder.AddComposite<Customer, ValidatedCustomer, ValidationPipeline>("validate-customer");
❌ Bad:
builder.AddComposite<Customer, ValidatedCustomer, ValidationPipeline>("node1");
3. Minimize Context Inheritance
Only inherit what you need:
✅ Good:
new CompositeContextConfiguration
{
InheritParentParameters = true // Only parameters needed
}
❌ Bad:
CompositeContextConfiguration.InheritAll // Unless you really need everything
4. Test Sub-Pipelines Independently
Test each sub-pipeline in isolation before composing:
[Fact]
public async Task ValidationPipeline_WithInvalidData_ShouldProduceErrors()
{
var runner = PipelineRunner.Create();
var context = new PipelineContext();
// Test the sub-pipeline directly
await runner.RunAsync<ValidationPipeline>(context);
// Assert expected behavior
}
5. Document Input/Output Contracts
Clearly document what each sub-pipeline expects and produces:
/// <summary>
/// Validates customer data and returns validation results.
/// </summary>
/// <remarks>
/// Input: Customer with Id, Name, Email
/// Output: ValidatedCustomer with IsValid flag and error list
/// </remarks>
public class ValidationPipeline : IPipelineDefinition
{
// ...
}
API Reference
Extension Methods
AddComposite<TIn, TOut, TDefinition>
Adds a composite node to the pipeline.
public static TransformNodeHandle<TIn, TOut> AddComposite<TIn, TOut, TDefinition>(
this PipelineBuilder builder,
string? name = null,
CompositeContextConfiguration? contextConfiguration = null)
where TDefinition : IPipelineDefinition, new()
Parameters:
builder: The pipeline buildername: Optional node name (defaults to type name)contextConfiguration: Optional context configuration
Returns: Handle to the composite node
AddComposite<TIn, TOut, TDefinition> (with configuration action)
Adds a composite node with a configuration action.
public static TransformNodeHandle<TIn, TOut> AddComposite<TIn, TOut, TDefinition>(
this PipelineBuilder builder,
Action<CompositeContextConfiguration> configureContext,
string? name = null)
where TDefinition : IPipelineDefinition, new()
Parameters:
builder: The pipeline builderconfigureContext: Action to configure context inheritancename: Optional node name
Returns: Handle to the composite node
Classes
CompositeContextConfiguration
Configuration for sub-pipeline context inheritance.
Properties:
InheritParentParameters: Copy parent Parameters dictionaryInheritParentItems: Copy parent Items dictionaryInheritParentProperties: Copy parent Properties dictionary
Static Properties:
Default: No inheritance (all flags false)InheritAll: Full inheritance (all flags true)
PipelineInputSource<T>
Source node that retrieves input from parent context.
Type Parameters:
T: Type of input item
PipelineOutputSink<T>
Sink node that stores output in parent context.
Type Parameters:
T: Type of output item
CompositeContextKeys
Well-known context keys for composite nodes.
Constants:
InputItem: Key for input item storageOutputItem: Key for output item storage
Troubleshooting
Common Issues
"Sub-pipeline did not produce an output item"
Cause: Sub-pipeline is missing PipelineOutputSink or it received no data.
Solution: Ensure your sub-pipeline has:
- A
PipelineOutputSink<T>as the final node - Data flowing through the pipeline to the sink
// ✅ Correct
public void Define(PipelineBuilder builder, PipelineContext context)
{
var input = builder.AddSource<PipelineInputSource<T>, T>("input");
var output = builder.AddSink<PipelineOutputSink<T>, T>("output");
builder.Connect(input, output);
}
"No input item found in pipeline context"
Cause: Sub-pipeline is missing PipelineInputSource or accessing context incorrectly.
Solution: Always use PipelineInputSource<T> as the first node in sub-pipelines.
Type Mismatch Errors
Cause: Sub-pipeline output type doesn't match composite node's TOut type parameter.
Solution: Ensure type consistency:
// ✅ Correct - types match
builder.AddComposite<Customer, ValidatedCustomer, ValidationPipeline>(...);
// In ValidationPipeline:
var output = builder.AddSink<PipelineOutputSink<ValidatedCustomer>, ValidatedCustomer>("output");