Lineage Use Cases
This guide covers common use cases and practical examples for NPipeline Lineage extension.
Data Governance
Maintain complete audit trails for regulatory compliance:
Compliance Tracking
Track all data transformations for GDPR, HIPAA, SOX compliance:
services.AddNPipelineLineage<DatabaseLineageSink>();
// In pipeline
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 1; // 100% for compliance
options.DeterministicSampling = true; // Consistent tracking
options.RedactData = false; // Keep data for audit
options.MaterializationCap = int.MaxValue; // Complete records
options.OverflowPolicy = LineageOverflowPolicy.Materialize;
});
Why This Works:
- Complete audit trail of all data movements
- Immutable records for legal requirements
- Timestamped transformations for accountability
- Queryable lineage for audits
Audit Report Generation
Generate compliance reports from lineage data:
public sealed class ComplianceReportGenerator
{
private readonly ILineageCollector _collector;
public ComplianceReportGenerator(ILineageCollector collector)
{
_collector = collector;
}
public ComplianceReport GenerateReport(DateTime startDate, DateTime endDate)
{
var allLineage = _collector.GetAllLineageInfo();
var report = new ComplianceReport
{
PeriodStart = startDate,
PeriodEnd = endDate,
TotalItemsProcessed = allLineage.Count,
TransformationsApplied = allLineage
.SelectMany(li => li.LineageHops)
.Count(),
UniqueNodesInvolved = allLineage
.SelectMany(li => li.TraversalPath)
.Distinct()
.Count()
};
return report;
}
}
Sensitive Data Handling
Track sensitive data without storing actual values:
builder.EnableItemLevelLineage(options =>
{
options.RedactData = true; // Don't store PII
options.SampleEvery = 1; // Track all items
});
Benefits:
- Compliance with data protection regulations
- Audit trail without exposing sensitive data
- Reduced memory footprint
- Maintains transformation history
Debugging
Quickly identify which node introduced issues:
Root Cause Analysis
Trace problems back to their source:
public sealed class LineageDebugger
{
private readonly ILineageCollector _collector;
private readonly ILogger _logger;
public LineageDebugger(ILineageCollector collector, ILogger<LineageDebugger> logger)
{
_collector = collector;
_logger = logger;
}
public void DebugItem(Guid lineageId)
{
var lineageInfo = _collector.GetLineageInfo(lineageId);
if (lineageInfo == null)
{
_logger.LogWarning("Lineage {LineageId} not found", lineageId);
return;
}
_logger.LogInformation("=== Lineage Debug Report ===");
_logger.LogInformation("Lineage ID: {LineageId}", lineageInfo.LineageId);
_logger.LogInformation("Traversal Path: {Path}",
string.Join(" → ", lineageInfo.TraversalPath));
foreach (var hop in lineageInfo.LineageHops)
{
_logger.LogInformation("Hop {Index}: {NodeId}",
lineageInfo.LineageHops.IndexOf(hop), hop.NodeId);
_logger.LogInformation(" Outcome: {Outcome}", hop.Outcome);
_logger.LogInformation(" Cardinality: {Cardinality}", hop.Cardinality);
_logger.LogInformation(" Input Count: {InputCount}", hop.InputCount);
_logger.LogInformation(" Output Count: {OutputCount}", hop.OutputCount);
if (hop.Outcome != "Success")
{
_logger.LogError(" ⚠️ Issue detected at this hop!");
}
}
}
}
Identify Problematic Nodes
Find nodes with high failure rates:
public sealed class NodeHealthAnalyzer
{
private readonly ILineageCollector _collector;
public NodeHealthAnalyzer(ILineageCollector collector)
{
_collector = collector;
}
public Dictionary<string, NodeHealth> AnalyzeNodeHealth()
{
var allLineage = _collector.GetAllLineageInfo();
var nodeHealth = new Dictionary<string, NodeHealth>();
foreach (var lineage in allLineage)
{
foreach (var hop in lineage.LineageHops)
{
if (!nodeHealth.ContainsKey(hop.NodeId))
{
nodeHealth[hop.NodeId] = new NodeHealth
{
NodeId = hop.NodeId,
TotalHops = 0,
SuccessCount = 0,
FailureCount = 0
};
}
var health = nodeHealth[hop.NodeId];
health.TotalHops++;
if (hop.Outcome == "Success")
health.SuccessCount++;
else
health.FailureCount++;
}
}
// Calculate success rates
foreach (var health in nodeHealth.Values)
{
health.SuccessRate = (double)health.SuccessCount / health.TotalHops;
}
return nodeHealth;
}
}
public sealed record NodeHealth(
string NodeId,
int TotalHops,
int SuccessCount,
int FailureCount,
double SuccessRate
);
Debug Specific Data Items
Trace exact journey of a problematic item:
// After pipeline execution
var collector = serviceProvider.GetRequiredService<ILineageCollector>();
// Find lineage for a specific item
var problematicItemId = Guid.Parse("your-item-id-here");
var lineageInfo = collector.GetLineageInfo(problematicItemId);
if (lineageInfo != null)
{
Console.WriteLine($"Item entered at: {lineageInfo.LineageHops[0].NodeId}");
foreach (var hop in lineageInfo.LineageHops)
{
Console.WriteLine($" → {hop.NodeId}");
Console.WriteLine($" Outcome: {hop.Outcome}");
Console.WriteLine($" Cardinality: {hop.Cardinality}");
Console.WriteLine($" Input: {hop.InputCount}, Output: {hop.OutputCount}");
}
}
Impact Analysis
Understand dependencies before making changes:
Find Affected Downstream Processes
Identify all items that passed through a specific node:
public sealed class ImpactAnalyzer
{
private readonly ILineageCollector _collector;
public ImpactAnalyzer(ILineageCollector collector)
{
_collector = collector;
}
public ImpactReport AnalyzeImpact(string nodeId)
{
var allLineage = _collector.GetAllLineageInfo();
var affectedItems = allLineage
.Where(li => li.TraversalPath.Contains(nodeId))
.ToList();
var report = new ImpactReport
{
NodeId = nodeId,
AffectedItemCount = affectedItems.Count,
AffectedLineageIds = affectedItems.Select(li => li.LineageId).ToList(),
DownstreamNodes = GetDownstreamNodes(nodeId, allLineage)
};
return report;
}
private List<string> GetDownstreamNodes(string nodeId, IReadOnlyList<ILineageInfo> allLineage)
{
var downstream = new HashSet<string>();
foreach (var lineage in allLineage)
{
var nodeIndex = lineage.TraversalPath.IndexOf(nodeId);
if (nodeIndex >= 0 && nodeIndex < lineage.TraversalPath.Count - 1)
{
downstream.Add(lineage.TraversalPath[nodeIndex + 1]);
}
}
return downstream.ToList();
}
}
public sealed record ImpactReport(
string NodeId,
int AffectedItemCount,
List<Guid> AffectedLineageIds,
List<string> DownstreamNodes
);
Before Change Analysis
Assess impact before modifying a node:
public class Program
{
public static async Task Main(string[] args)
{
// Run pipeline with lineage
await RunPipelineWithLineage();
// Analyze impact of changing "ValidationNode"
var analyzer = serviceProvider.GetRequiredService<ImpactAnalyzer>();
var impact = analyzer.AnalyzeImpact("ValidationNode");
Console.WriteLine($"Impact Analysis for {impact.NodeId}:");
Console.WriteLine($" Affected Items: {impact.AffectedItemCount}");
Console.WriteLine($" Downstream Nodes: {string.Join(", ", impact.DownstreamNodes)}");
if (impact.AffectedItemCount > 10000)
{
Console.WriteLine(" ⚠️ High impact - consider careful testing");
}
}
}
Dependency Mapping
Build a complete dependency graph:
public sealed class DependencyMapper
{
private readonly ILineageCollector _collector;
public DependencyMapper(ILineageCollector collector)
{
_collector = collector;
}
public DependencyGraph BuildDependencyGraph()
{
var allLineage = _collector.GetAllLineageInfo();
var graph = new DependencyGraph();
foreach (var lineage in allLineage)
{
for (int i = 0; i < lineage.TraversalPath.Count - 1; i++)
{
var source = lineage.TraversalPath[i];
var target = lineage.TraversalPath[i + 1];
graph.AddEdge(source, target);
}
}
return graph;
}
}
public sealed class DependencyGraph
{
private readonly Dictionary<string, HashSet<string>> _edges = new();
public void AddEdge(string source, string target)
{
if (!_edges.ContainsKey(source))
_edges[source] = new HashSet<string>();
_edges[source].Add(target);
}
public HashSet<string> GetDownstream(string node)
{
return _edges.GetValueOrDefault(node, new HashSet<string>());
}
public HashSet<string> GetUpstream(string node)
{
var upstream = new HashSet<string>();
foreach (var kvp in _edges)
{
if (kvp.Value.Contains(node))
upstream.Add(kvp.Key);
}
return upstream;
}
}
Performance Monitoring
Identify bottlenecks in complex pipelines:
Find Slow Transformations
Analyze hop durations to find performance issues:
public sealed class PerformanceAnalyzer
{
private readonly ILineageCollector _collector;
public PerformanceAnalyzer(ILineageCollector collector)
{
_collector = collector;
}
public List<NodePerformance> AnalyzePerformance()
{
var allLineage = _collector.GetAllLineageInfo();
var nodePerformance = new Dictionary<string, NodePerformance>();
foreach (var lineage in allLineage)
{
foreach (var hop in lineage.LineageHops)
{
if (!nodePerformance.ContainsKey(hop.NodeId))
{
nodePerformance[hop.NodeId] = new NodePerformance
{
NodeId = hop.NodeId,
TotalHops = 0,
TotalDurationMs = 0
};
}
var perf = nodePerformance[hop.NodeId];
perf.TotalHops++;
// Note: Duration would need to be tracked in LineageHop
// This is a placeholder for the concept
perf.TotalDurationMs += 0; // Would be hop.DurationMs
}
}
return nodePerformance.Values.OrderByDescending(p => p.TotalDurationMs).ToList();
}
}
public sealed record NodePerformance(
string NodeId,
int TotalHops,
double TotalDurationMs,
double AverageDurationMs => TotalDurationMs / TotalHops
);
Throughput Analysis
Measure processing rates across nodes:
public sealed class ThroughputAnalyzer
{
private readonly ILineageCollector _collector;
public ThroughputAnalyzer(ILineageCollector collector)
{
_collector = collector;
}
public List<NodeThroughput> AnalyzeThroughput()
{
var allLineage = _collector.GetAllLineageInfo();
var nodeThroughput = new Dictionary<string, NodeThroughput>();
foreach (var lineage in allLineage)
{
foreach (var hop in lineage.LineageHops)
{
if (!nodeThroughput.ContainsKey(hop.NodeId))
{
nodeThroughput[hop.NodeId] = new NodeThroughput
{
NodeId = hop.NodeId,
TotalInputCount = 0,
TotalOutputCount = 0
};
}
var throughput = nodeThroughput[hop.NodeId];
throughput.TotalInputCount += hop.InputCount;
throughput.TotalOutputCount += hop.OutputCount;
}
}
return nodeThroughput.Values.ToList();
}
}
public sealed record NodeThroughput(
string NodeId,
long TotalInputCount,
long TotalOutputCount,
double FilterRatio => TotalInputCount > 0 ? (double)TotalOutputCount / TotalInputCount : 0
);
Cardinality Analysis
Understand data transformation patterns:
public sealed class CardinalityAnalyzer
{
private readonly ILineageCollector _collector;
public CardinalityAnalyzer(ILineageCollector collector)
{
_collector = collector;
}
public Dictionary<string, CardinalityStats> AnalyzeCardinality()
{
var allLineage = _collector.GetAllLineageInfo();
var stats = new Dictionary<string, CardinalityStats>();
foreach (var lineage in allLineage)
{
foreach (var hop in lineage.LineageHops)
{
if (!stats.ContainsKey(hop.NodeId))
{
stats[hop.NodeId] = new CardinalityStats
{
NodeId = hop.NodeId,
TotalHops = 0,
CardinalityCounts = new Dictionary<Cardinality, int>()
};
}
var nodeStats = stats[hop.NodeId];
nodeStats.TotalHops++;
if (!nodeStats.CardinalityCounts.ContainsKey(hop.Cardinality))
nodeStats.CardinalityCounts[hop.Cardinality] = 0;
nodeStats.CardinalityCounts[hop.Cardinality]++;
}
}
return stats;
}
}
public sealed record CardinalityStats(
string NodeId,
int TotalHops,
Dictionary<Cardinality, int> CardinalityCounts
);
public enum Cardinality
{
OneToOne, // 1 input → 1 output
OneToMany, // 1 input → N outputs
ManyToOne, // N inputs → 1 output
ManyToMany // N inputs → N outputs
}
Data Science and Analytics
Support reproducibility and data cataloging:
Dataset Provenance
Document exactly how datasets were created:
public sealed class DatasetProvenanceTracker
{
private readonly ILineageCollector _collector;
public DatasetProvenanceTracker(ILineageCollector collector)
{
_collector = collector;
}
public DatasetProvenance GetProvenance(string datasetName)
{
var allLineage = _collector.GetAllLineageInfo();
var provenance = new DatasetProvenance
{
DatasetName = datasetName,
CreationTimestamp = DateTime.UtcNow,
SourceNodes = allLineage
.SelectMany(li => li.LineageHops)
.Where(h => h.Outcome == "Source")
.Select(h => h.NodeId)
.Distinct()
.ToList(),
TransformationsApplied = allLineage
.SelectMany(li => li.LineageHops)
.Where(h => h.Outcome == "Success")
.GroupBy(h => h.NodeId)
.Select(g => new TransformationSummary
{
NodeId = g.Key,
ApplicationCount = g.Count()
})
.ToList(),
TotalItemsProcessed = allLineage.Count
};
return provenance;
}
}
public sealed record DatasetProvenance(
string DatasetName,
DateTime CreationTimestamp,
List<string> SourceNodes,
List<TransformationSummary> TransformationsApplied,
int TotalItemsProcessed
);
public sealed record TransformationSummary(
string NodeId,
int ApplicationCount
);
Model Training Lineage
Understand the provenance of training data:
public sealed class ModelTrainingLineage
{
private readonly ILineageCollector _collector;
public ModelTrainingLineage(ILineageCollector collector)
{
_collector = collector;
}
public ModelTrainingReport GenerateReport(string modelName)
{
var allLineage = _collector.GetAllLineageInfo();
var report = new ModelTrainingReport
{
ModelName = modelName,
TrainingDataSources = allLineage
.SelectMany(li => li.LineageHops)
.Where(h => h.Outcome == "Source")
.Select(h => h.NodeId)
.Distinct()
.ToList(),
DataTransformations = allLineage
.SelectMany(li => li.LineageHops)
.GroupBy(h => h.NodeId)
.Select(g => new
{
NodeId = g.Key,
TransformCount = g.Count(),
SuccessRate = g.Count(h => h.Outcome == "Success") / (double)g.Count()
})
.ToList(),
SampleSize = allLineage.Count,
DataQualityMetrics = CalculateQualityMetrics(allLineage)
};
return report;
}
private DataQualityMetrics CalculateQualityMetrics(IReadOnlyList<ILineageInfo> lineage)
{
// Calculate quality metrics based on lineage
// This is a placeholder for the concept
return new DataQualityMetrics
{
SuccessRate = lineage.Count(li => li.LineageHops.All(h => h.Outcome == "Success")) / (double)lineage.Count,
AverageHopCount = lineage.Average(li => li.LineageHops.Count)
};
}
}
public sealed record ModelTrainingReport(
string ModelName,
List<string> TrainingDataSources,
List<object> DataTransformations,
int SampleSize,
DataQualityMetrics DataQualityMetrics
);
public sealed record DataQualityMetrics(
double SuccessRate,
double AverageHopCount
);
Data Cataloging
Build a comprehensive catalog of data sources and transformations:
public sealed class DataCatalogBuilder
{
private readonly ILineageCollector _collector;
public DataCatalogBuilder(ILineageCollector collector)
{
_collector = collector;
}
public DataCatalog BuildCatalog()
{
var allLineage = _collector.GetAllLineageInfo();
var catalog = new DataCatalog
{
DataSources = ExtractDataSources(allLineage),
Transformations = ExtractTransformations(allLineage),
DataFlows = ExtractDataFlows(allLineage)
};
return catalog;
}
private List<DataSource> ExtractDataSources(IReadOnlyList<ILineageInfo> lineage)
{
return lineage
.SelectMany(li => li.LineageHops)
.Where(h => h.Outcome == "Source")
.GroupBy(h => h.NodeId)
.Select(g => new DataSource
{
NodeId = g.Key,
UsageCount = g.Count()
})
.ToList();
}
private List<DataTransformation> ExtractTransformations(IReadOnlyList<ILineageInfo> lineage)
{
return lineage
.SelectMany(li => li.LineageHops)
.Where(h => h.Outcome == "Success")
.GroupBy(h => h.NodeId)
.Select(g => new DataTransformation
{
NodeId = g.Key,
ApplicationCount = g.Count(),
CardinalityDistribution = g.GroupBy(h => h.Cardinality)
.Select(cg => new { Cardinality = cg.Key, Count = cg.Count() })
.ToList()
})
.ToList();
}
private List<DataFlow> ExtractDataFlows(IReadOnlyList<ILineageInfo> lineage)
{
return lineage
.Select(li => new DataFlow
{
LineageId = li.LineageId,
FlowPath = li.TraversalPath,
HopCount = li.LineageHops.Count
})
.ToList();
}
}
public sealed record DataCatalog(
List<DataSource> DataSources,
List<DataTransformation> Transformations,
List<DataFlow> DataFlows
);
public sealed record DataSource(
string NodeId,
int UsageCount
);
public sealed record DataTransformation(
string NodeId,
int ApplicationCount,
List<object> CardinalityDistribution
);
public sealed record DataFlow(
Guid LineageId,
IReadOnlyList<string> FlowPath,
int HopCount
);
Complete Examples
Example 1: ETL Pipeline with Lineage
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Lineage;
using NPipeline.Lineage.DependencyInjection;
public class EtlPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<DatabaseSource, RawData>("source");
var validate = builder.AddTransform<ValidationTransform, RawData, ValidatedData>("validate");
var transform = builder.AddTransform<DataTransform, ValidatedData, ProcessedData>("transform");
var sink = builder.AddSink<DataWarehouseSink, ProcessedData>("sink");
builder.Connect(source, validate);
builder.Connect(validate, transform);
builder.Connect(transform, sink);
}
}
public class Program
{
public static async Task Main(string[] args)
{
var services = new ServiceCollection();
// Add lineage with database sink for compliance
services.AddNPipelineLineage<DatabaseLineageSink>();
services.AddNPipeline(typeof(Program).Assembly);
var serviceProvider = services.BuildServiceProvider();
var builder = new PipelineBuilder("EtlPipeline");
// Enable lineage for compliance
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 1; // 100% for compliance
options.DeterministicSampling = true;
options.RedactData = false; // Keep data for audit
});
var pipeline = new EtlPipeline();
pipeline.Define(builder, new PipelineContext());
await serviceProvider.RunPipelineAsync(builder.Build());
}
}
Example 2: Debugging with Lineage
public class Program
{
public static async Task Main(string[] args)
{
var services = new ServiceCollection();
services.AddNPipelineLineage(); // Use logging sink
services.AddNPipeline(typeof(Program).Assembly);
var serviceProvider = services.BuildServiceProvider();
var builder = new PipelineBuilder("DebugPipeline");
// Enable lineage for debugging
builder.EnableItemLevelLineage(options =>
{
options.SampleEvery = 1; // Track everything
options.RedactData = false; // Keep data for inspection
});
var pipeline = new DebugPipeline();
pipeline.Define(builder, new PipelineContext());
// Run pipeline
await serviceProvider.RunPipelineAsync(builder.Build());
// Analyze lineage for debugging
var collector = serviceProvider.GetRequiredService<ILineageCollector>();
var debugger = new LineageDebugger(collector, logger);
// Debug a specific problematic item
var problematicItemId = Guid.Parse("your-item-id");
debugger.DebugItem(problematicItemId);
}
}
Best Practices
1. Use Appropriate Sampling
Choose sampling rate based on use case:
| Use Case | Sampling Rate | Reasoning |
|---|---|---|
| Compliance | 100% | Complete audit trail required |
| Debugging | 100% | Need complete visibility |
| Monitoring | 1-10% | Representative samples sufficient |
| Analytics | 0.1-1% | Minimal overhead needed |
2. Enable Redaction for Sensitive Data
Always redact PII, financial data, or health records:
options.RedactData = true;
3. Use Deterministic Sampling for Reproducibility
When you need consistent behavior:
options.DeterministicSampling = true;
4. Implement Custom Sinks for Production
Use database or external system sinks:
services.AddNPipelineLineage<DatabaseLineageSink>();
5. Analyze Lineage Regularly
Build tools to analyze lineage data:
var analyzer = new ImpactAnalyzer(collector);
var impact = analyzer.AnalyzeImpact("ValidationNode");
Related Topics
- Getting Started - Installation and basic setup
- Configuration - Configuration options and settings
- Architecture - Internal architecture and design decisions
- Performance - Performance characteristics and optimization