Skip to main content

AWS SQS Connector

AWS SQS Connector

The NPipeline.Connectors.Aws.Sqs package provides specialized source and sink nodes for working with Amazon Simple Queue Service (SQS). This allows you to easily integrate SQS message processing into your pipelines as an input source or an output destination.

This connector uses the robust AWSSDK.SQS library under the hood, so it is powerful and highly configurable for production workloads.

Installation

To use the AWS SQS connector, install the NPipeline.Connectors.Aws.Sqs NuGet package:

dotnet add package NPipeline.Connectors.Aws.Sqs

For the core NPipeline package and other available extensions, see the Installation Guide.

AWS Credentials

The SQS connector supports multiple credential methods to provide flexibility across different deployment scenarios:

Access Key and Secret Key

var config = new SqsConfiguration
{
AccessKeyId = "AKIAIOSFODNN7EXAMPLE",
SecretAccessKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
Region = "us-east-1"
};

AWS Profile

var config = new SqsConfiguration
{
ProfileName = "default",
Region = "us-east-1"
};

Default Credential Chain

var config = new SqsConfiguration
{
Region = "us-east-1"
};

Why multiple credential methods: The default credential chain provides the most flexibility for deployment scenarios (EC2, ECS, Lambda), while explicit credentials are useful for local development and testing. The connector automatically falls back to the default AWS credential chain when no explicit credentials are provided.

SqsSourceNode<T>

The SqsSourceNode<T> continuously polls an SQS queue and emits each deserialized message as an item of type SqsMessage<T>.

Source Configuration

The constructor for SqsSourceNode<T> takes configuration for connecting to SQS:

public SqsSourceNode(SqsConfiguration configuration)
public SqsSourceNode(IAmazonSQS sqsClient, SqsConfiguration configuration)
  • configuration: The SqsConfiguration object with queue URL, polling settings, and AWS credentials.
  • sqsClient: (Optional) A custom IAmazonSQS client. If not provided, one is created from the configuration.

Example: Reading from an SQS Queue

using NPipeline.Connectors.AwsSqs.Configuration;
using NPipeline.Connectors.AwsSqs.Nodes;
using NPipeline.DataFlow.DataPipes;
using NPipeline.DataFlow;
using NPipeline.Execution;
using NPipeline.Nodes;
using NPipeline.Pipeline;

public sealed record OrderMessage(string OrderId, string CustomerId, decimal Amount, DateTime OrderDate);

public sealed class SqsReaderPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var config = new SqsConfiguration
{
Region = "us-east-1",
SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/orders",
MaxNumberOfMessages = 10,
WaitTimeSeconds = 20,
VisibilityTimeout = 30
};

var sourceNode = new SqsSourceNode<OrderMessage>(config);
var source = builder.AddSource(sourceNode, "sqs_source");
var sink = builder.AddSink<ConsoleSinkNode, SqsMessage<OrderMessage>>("console_sink");

builder.Connect(source, sink);
}
}

public sealed class ConsoleSinkNode : SinkNode<SqsMessage<OrderMessage>>
{
public override async Task ExecuteAsync(
IDataPipe<SqsMessage<OrderMessage>> input,
PipelineContext context,
IPipelineActivity parentActivity,
CancellationToken cancellationToken)
{
await foreach (var message in input.WithCancellation(cancellationToken))
{
Console.WriteLine($"Received: {message.Body}");
}
}
}

Polling Configuration

The source node supports configurable polling behavior:

PropertyTypeDefaultDescription
SourceQueueUrlstringRequiredSQS queue URL to poll from
MaxNumberOfMessagesint10Maximum messages per poll (1-10, SQS API limit)
WaitTimeSecondsint20Long polling wait time (0-20 seconds)
VisibilityTimeoutint30Message visibility timeout in seconds
PollingIntervalMsint1000Polling interval when queue is empty (milliseconds)

Why long polling: Long polling (WaitTimeSeconds > 0) reduces cost and empty responses by keeping the request open until messages arrive or the timeout expires. The default of 20 seconds maximizes cost efficiency by minimizing the number of empty poll requests.

SqsSinkNode<T>

The SqsSinkNode<T> writes items from the pipeline to an SQS queue by serializing them to JSON.

Sink Configuration

The constructor for SqsSinkNode<T> takes configuration for connecting to SQS:

public SqsSinkNode(SqsConfiguration configuration)
public SqsSinkNode(IAmazonSQS sqsClient, SqsConfiguration configuration)
  • configuration: The SqsConfiguration object with queue URL, batch settings, and AWS credentials.
  • sqsClient: (Optional) A custom IAmazonSQS client. If not provided, one is created from the configuration.

Example: Writing to an SQS Queue

using NPipeline.Connectors.AwsSqs.Configuration;
using NPipeline.Connectors.AwsSqs.Nodes;
using NPipeline.Execution;
using NPipeline.Extensions.Testing;
using NPipeline.Nodes;
using NPipeline.Pipeline;

public sealed record ProcessedOrder(string OrderId, string CustomerId, decimal Amount, bool IsValid);

public sealed class SqsWriterPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var config = new SqsConfiguration
{
Region = "us-east-1",
SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/processed-orders",
BatchSize = 10
};

var source = builder.AddSource<InMemorySourceNode<ProcessedOrder>, ProcessedOrder>("source");
var sinkNode = new SqsSinkNode<ProcessedOrder>(config);
var sink = builder.AddSink(sinkNode, "sqs_sink");

builder.Connect(source, sink);
}
}

Sending Configuration

The sink node supports configurable sending behavior:

PropertyTypeDefaultDescription
SinkQueueUrlstringRequiredSQS queue URL to send messages to
BatchSizeint10Batch size for sending messages (1-10)
DelaySecondsint0Message delivery delay (0-900 seconds)
MessageAttributesIDictionary<string, MessageAttributeValue>?nullMessage attributes to add to all outgoing messages

Acknowledgment Strategies

The SQS connector provides multiple acknowledgment strategies to handle different processing scenarios:

AutoOnSinkSuccess (Default)

Messages are automatically acknowledged immediately after successful sink processing:

var config = new SqsConfiguration
{
SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/input-queue",
SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/output-queue",
AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess
};

Why this is default: Provides the best developer experience with automatic message cleanup after successful processing, reducing the risk of duplicate message handling.

Manual

Messages are sent to the sink but not acknowledged. You must manually call AcknowledgeAsync():

var config = new SqsConfiguration
{
AcknowledgmentStrategy = AcknowledgmentStrategy.Manual
};

// In a transform node
public class ManualAckTransform : ITransformNode<SqsMessage<OrderMessage>, SqsMessage<OrderMessage>>
{
public async Task<SqsMessage<OrderMessage>> ExecuteAsync(
SqsMessage<OrderMessage> input,
PipelineContext context,
CancellationToken cancellationToken)
{
// Process the message
var processed = ProcessOrder(input.Body);

// Manually acknowledge when ready
await input.AcknowledgeAsync(cancellationToken);

return input.WithBody(processed);
}
}

When to use: Use when you need fine-grained control over acknowledgment timing, such as when processing depends on external systems with their own transaction boundaries.

Delayed

Messages are acknowledged after a configurable delay:

var config = new SqsConfiguration
{
AcknowledgmentStrategy = AcknowledgmentStrategy.Delayed,
AcknowledgmentDelayMs = 5000 // 5 seconds
};

When to use: Useful when downstream systems need time to process messages before they are removed from the queue, providing a window for recovery if processing fails.

None

Messages are never acknowledged automatically:

var config = new SqsConfiguration
{
AcknowledgmentStrategy = AcknowledgmentStrategy.None
};

Warning: Messages remain in the queue until their visibility timeout expires and become available for reprocessing. Use with caution.

Batch Acknowledgment

Batch acknowledgment improves performance by reducing the number of SQS API calls:

var config = new SqsConfiguration
{
SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/input-queue",
SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/output-queue",
AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess,

BatchAcknowledgment = new BatchAcknowledgmentOptions
{
// Maximum messages per batch (1-10)
BatchSize = 10,

// Maximum wait time before flushing partial batch (milliseconds)
FlushTimeoutMs = 1000,

// Enable automatic batching
EnableAutomaticBatching = true,

// Maximum concurrent batch operations
MaxConcurrentBatches = 3
}
};

Why batch acknowledgment: Reduces SQS API calls and costs by acknowledging multiple messages in a single DeleteMessageBatch operation. The timeout-based flush ensures messages are acknowledged even when the batch size is not reached.

Batch Acknowledgment Options

PropertyTypeDefaultDescription
BatchSizeint10Maximum messages per batch (1-10)
FlushTimeoutMsint1000Maximum wait before flushing partial batch (milliseconds)
EnableAutomaticBatchingbooltrueEnable automatic batch acknowledgment
MaxConcurrentBatchesint3Maximum concurrent batch operations

Parallel Processing

Enable parallel processing for high-throughput scenarios:

var config = new SqsConfiguration
{
SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/input-queue",
SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/output-queue",

EnableParallelProcessing = true,
MaxDegreeOfParallelism = 4
};

When to use: Ideal for CPU-intensive transformations or when processing speed is critical and message order is not important. For ordered processing, keep parallelism disabled.

JSON Serialization

The connector uses System.Text.Json for serialization with configurable options:

PropertyTypeDefaultDescription
PropertyNamingPolicyJsonPropertyNamingPolicyCamelCaseJSON property naming policy
PropertyNameCaseInsensitivebooltrueCase-insensitive property matching

JsonPropertyNamingPolicy

ValueDescription
LowerCaseProperty names converted to lowercase
CamelCaseProperty names converted to camelCase (default)
SnakeCaseProperty names converted to snake_case
PascalCaseProperty names converted to PascalCase
AsIsProperty names used as-is

Error Handling

Retry Logic

The connector automatically retries on transient errors with exponential backoff:

var config = new SqsConfiguration
{
MaxRetries = 3,
RetryBaseDelayMs = 1000
};

Retry behavior: Transient errors (ServiceUnavailable, TooManyRequests, InternalServerError) trigger exponential backoff with jitter to avoid thundering herd problems.

Message Error Handler

Handle deserialization errors per message:

var config = new SqsConfiguration
{
SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/input-queue",

// Return true to skip the message, false to fail the pipeline
MessageErrorHandler = (exception, message) =>
{
Console.WriteLine($"Failed to deserialize message {message.MessageId}: {exception.Message}");
return true; // Skip and continue
},

ContinueOnError = true
};

Continue on Error

Configure whether to continue processing on errors:

var config = new SqsConfiguration
{
ContinueOnError = true // Continue processing on send failures
};

SqsMessage<T>

The SqsMessage<T> wraps deserialized messages with acknowledgment capability:

public sealed class SqsMessage<T> : IAcknowledgableMessage<T>
{
// The deserialized message body
public T Body { get; }

// SQS message ID
public string MessageId { get; }

// Receipt handle for deletion
public string ReceiptHandle { get; }

// Message attributes/metadata
public IDictionary<string, MessageAttributeValue> Attributes { get; }

// Message timestamp
public DateTime Timestamp { get; }

// Whether the message has been acknowledged
public bool IsAcknowledged { get; }

// Acknowledge the message (deletes from queue)
public Task AcknowledgeAsync(CancellationToken cancellationToken = default);

// Create a new message with different body but same acknowledgment behavior
public IAcknowledgableMessage<TNew> WithBody<TNew>(TNew body);
}

Why this wrapper: Preserves acknowledgment context through transformations, allowing you to modify message content while maintaining the ability to delete the original message from the queue.

Complete Pipeline Example

using NPipeline.Connectors.AwsSqs.Configuration;
using NPipeline.Connectors.AwsSqs.Nodes;
using NPipeline.Pipeline;

public sealed record OrderMessage(string OrderId, string CustomerId, decimal Amount, DateTime OrderDate);

public sealed record ProcessedOrder(string OrderId, string CustomerId, decimal Amount, bool IsValid, DateTime ProcessedAt);

public sealed class SqsProcessingPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var config = new SqsConfiguration
{
Region = "us-east-1",
SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/orders",
SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/processed-orders",
MaxNumberOfMessages = 10,
WaitTimeSeconds = 20,
AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess,
BatchAcknowledgment = new BatchAcknowledgmentOptions
{
BatchSize = 10,
FlushTimeoutMs = 1000
}
};

// Add SQS source
var source = builder.AddSource(
new SqsSourceNode<OrderMessage>(config),
"sqs-source");

// Add transform to validate and process orders
var transform = builder.AddTransform<OrderTransform, SqsMessage<OrderMessage>, ProcessedOrder>("transform");

// Add SQS sink
var sink = builder.AddSink(
new SqsSinkNode<ProcessedOrder>(config),
"sqs-sink");

// Connect the nodes
builder.Connect(source, transform);
builder.Connect(transform, sink);
}
}

public sealed class OrderTransform : ITransformNode<SqsMessage<OrderMessage>, ProcessedOrder>
{
public Task<ProcessedOrder> ExecuteAsync(
SqsMessage<OrderMessage> input,
PipelineContext context,
CancellationToken cancellationToken)
{
var order = input.Body;

return Task.FromResult(new ProcessedOrder
{
OrderId = order.OrderId,
CustomerId = order.CustomerId,
Amount = order.Amount,
IsValid = order.Amount > 0,
ProcessedAt = DateTime.UtcNow
});
}
}

Configuration Reference

SqsConfiguration

PropertyTypeDefaultDescription
AccessKeyIdstring?nullAWS access key ID
SecretAccessKeystring?nullAWS secret access key
Regionstring"us-east-1"AWS region
ProfileNamestring?nullAWS profile name from ~/.aws/credentials
SourceQueueUrlstring""SQS queue URL for source
SinkQueueUrlstring""SQS queue URL for sink
MaxNumberOfMessagesint10Maximum messages per poll (1-10)
WaitTimeSecondsint20Long polling wait time (0-20)
VisibilityTimeoutint30Message visibility timeout (seconds)
PollingIntervalMsint1000Polling interval when empty (ms)
BatchSizeint10Batch size for sending (1-10)
DelaySecondsint0Message delivery delay (0-900)
MessageAttributesIDictionary<string, MessageAttributeValue>?nullMessage attributes for outgoing messages
PropertyNamingPolicyJsonPropertyNamingPolicyCamelCaseJSON property naming policy
PropertyNameCaseInsensitivebooltrueCase-insensitive property matching
MaxRetriesint3Maximum retry attempts for transient errors
RetryBaseDelayMsint1000Base delay for retry backoff (ms)
ContinueOnErrorbooltrueContinue processing on errors
MessageErrorHandlerFunc<Exception, SqsMessage<object>, bool>?nullHandler for message mapping errors
AcknowledgmentStrategyAcknowledgmentStrategyAutoOnSinkSuccessMessage acknowledgment strategy
AcknowledgmentDelayMsint5000Delay for delayed acknowledgment (ms)
BatchAcknowledgmentBatchAcknowledgmentOptions?nullBatch acknowledgment options
MaxConnectionPoolSizeint10Maximum SQS client connections to pool
MaxDegreeOfParallelismint1Maximum degree of parallelism
EnableParallelProcessingboolfalseEnable parallel message processing

AcknowledgmentStrategy

ValueDescription
AutoOnSinkSuccessAcknowledge immediately after successful sink processing (default)
ManualManual acknowledgment via AcknowledgeAsync()
DelayedAcknowledge after configurable delay
NoneNever acknowledge automatically

BatchAcknowledgmentOptions

PropertyTypeDefaultDescription
BatchSizeint10Maximum messages per batch (1-10)
FlushTimeoutMsint1000Maximum wait before flushing partial batch (ms)
EnableAutomaticBatchingbooltrueEnable automatic batch acknowledgment
MaxConcurrentBatchesint3Maximum concurrent batch operations

For more advanced configuration, refer to the AWS SDK for .NET documentation.