Node Definition Structure
NPipeline uses a NodeDefinition record to represent nodes in the pipeline graph. This record has been refactored to use a nested configuration structure that organizes node properties logically and makes the API more maintainable.
Overview
The NodeDefinition record is composed of several nested configuration records, each responsible for a specific aspect of node configuration:
- NodeIdentity: Contains identification information (ID and name)
- NodeTypeSystem: Contains type system information (node type, kind, input/output types)
- NodeExecutionConfig: Contains execution-related configuration (strategy, error handler, cardinality)
- NodeMergeConfig: Contains merge-related configuration (strategy, custom merge logic)
- NodeLineageConfig: Contains lineage-related configuration (adapters, mappers, unwrappers)
Node Configuration Hierarchy
Nested Configuration Records
NodeIdentity
/// <summary>
/// Represents the identity information of a node.
/// </summary>
/// <param name="Id">The unique identifier for the node.</param>
/// <param name="Name">A descriptive name for the node.</param>
public sealed record NodeIdentity(
string Id,
string Name);
NodeTypeSystem
/// <summary>
/// Represents the type system information of a node.
/// </summary>
/// <param name="NodeType">The type of the pipeline node, which must implement INode.</param>
/// <param name="Kind">The kind of node (Source, Transform, Sink, Join, or Aggregate).</param>
/// <param name="InputType">The type of input data for the node.</param>
/// <param name="OutputType">The type of output data for the node.</param>
public sealed record NodeTypeSystem(
Type NodeType,
NodeKind Kind,
Type? InputType = null,
Type? OutputType = null);
NodeExecutionConfig
/// <summary>
/// Represents the execution configuration of a node.
/// </summary>
/// <param name="ExecutionStrategy">The execution strategy for the node.</param>
/// <param name="ErrorHandlerType">The type of the error handler for the node.</param>
/// <param name="DeclaredCardinality">The declared cardinality transformation of the node.</param>
public sealed record NodeExecutionConfig(
IExecutionStrategy? ExecutionStrategy = null,
Type? ErrorHandlerType = null,
TransformCardinality? DeclaredCardinality = null);
NodeMergeConfig
/// <summary>
/// Represents the merge configuration of a node.
/// </summary>
/// <param name="MergeStrategy">The merge strategy for nodes that combine multiple inputs.</param>
/// <param name="HasCustomMerge">Whether this node has custom merge logic.</param>
/// <param name="IsJoin">Whether this node is a join node.</param>
/// <param name="CustomMerge">Optional custom merge delegate for merging multiple input streams.</param>
public sealed record NodeMergeConfig(
MergeType? MergeStrategy = null,
bool HasCustomMerge = false,
bool IsJoin = false,
CustomMergeDelegate? CustomMerge = null);
NodeLineageConfig
/// <summary>
/// Represents the lineage configuration of a node.
/// </summary>
/// <param name="LineageAdapter">Optional delegate for transforming lineage data.</param>
/// <param name="LineageMapperType">The type of the lineage mapper for non-1:1 transformations.</param>
/// <param name="SinkLineageUnwrap">Optional delegate for unwrapping lineage in sink nodes.</param>
public sealed record NodeLineageConfig(
LineageAdapterDelegate? LineageAdapter = null,
Type? LineageMapperType = null,
SinkLineageUnwrapDelegate? SinkLineageUnwrap = null);
Benefits of Nested Structure
The nested configuration structure provides several benefits:
- Logical Organization: Related properties are grouped together, making the API more intuitive
- Type Safety: Each configuration record has a focused set of related properties
- Immutability: Records provide built-in immutability and value-based equality
- Maintainability: Changes to specific aspects of node configuration are isolated
- Extensibility: New configuration aspects can be added without affecting existing code
Accessing Properties
The NodeDefinition record provides convenience properties that delegate to the nested configuration records:
// Accessing execution strategy through the nested structure
var strategy = nodeDefinition.ExecutionConfig.ExecutionStrategy;
// Accessing merge configuration
var mergeStrategy = nodeDefinition.MergeConfig.MergeStrategy;
var hasCustomMerge = nodeDefinition.MergeConfig.HasCustomMerge;
// Accessing lineage configuration
var lineageAdapter = nodeDefinition.LineageConfig.LineageAdapter;
Creating NodeDefinition Instances
When creating NodeDefinition instances, you can use either the constructor with individual parameters or the nested structure approach:
Using Individual Parameters (Backward Compatible)
var nodeDefinition = new NodeDefinition(
"myNodeId",
"My Node",
typeof(MyNode),
NodeKind.Transform,
executionStrategy: new MyExecutionStrategy(),
errorHandlerType: typeof(MyErrorHandler),
inputType: typeof(string),
outputType: typeof(int),
mergeStrategy: MergeType.Concat,
hasCustomMerge: false,
declaredCardinality: TransformCardinality.OneToOne,
lineageAdapter: null,
lineageMapperType: null,
isJoin: false,
customMerge: null,
sinkLineageUnwrap: null
);
Using Nested Configuration Records (Recommended)
var nodeDefinition = new NodeDefinition(
new NodeIdentity("myNodeId", "My Node"),
new NodeTypeSystem(typeof(MyNode), NodeKind.Transform, typeof(string), typeof(int)),
new NodeExecutionConfig(new MyExecutionStrategy(), typeof(MyErrorHandler), TransformCardinality.OneToOne),
new NodeMergeConfig(MergeType.Concat, false, false, null),
new NodeLineageConfig(null, null, null)
);
Updating NodeDefinition
The nested structure makes it easier to update specific aspects of node configuration using the with expression:
// Update execution strategy
var updatedNode = nodeDefinition with {
ExecutionConfig = nodeDefinition.ExecutionConfig with {
ExecutionStrategy = newExecutionStrategy
}
};
// Update merge configuration
var withCustomMerge = nodeDefinition with {
MergeConfig = nodeDefinition.MergeConfig with {
CustomMerge = myCustomMergeDelegate,
HasCustomMerge = true
}
};
Migrated Delegate Types
The following delegate types have been moved to the NPipeline.Graph.PipelineDelegates namespace:
CustomMergeDelegate: For custom merge logic in join nodesSinkLineageUnwrapDelegate: For unwrapping lineage in sink nodesLineageAdapterDelegate: For transforming lineage dataJoinKeySelectorDelegate: For extracting join keys
When using these delegates, ensure you import the correct namespace:
using NPipeline.Graph.PipelineDelegates;
// Now you can use the delegates
CustomMergeDelegate? myCustomMerge = (node, dataPipes, cancellationToken) => {
// Custom merge implementation
return mergedDataPipe;
};
See Also
- Defining Pipelines - Learn how to define pipelines with both fluent and class-based approaches
- Node Types - Overview of different node types
- Execution Strategies - Configure node execution