C# Resilient Pipeline with Compensation: Build Robust Workflows

C# resilient pipeline with compensation

Why Pipelines?

Distributed systems and workflows depend on sequences of operations such as fetching data, transforming it, storing it, and notifying services.

  • transform it,
  • persist to storage,
  • notify another service.

But handling failure in any of these steps is where complexity appears.

  • What if step 3 writes to the database successfully, but step 4 fails?
  • Do you retry? Roll back? Ignore?

This is where compensating pipelines help: they let you run steps and undo them if a later one fails.


Introducing PipelineExecutor

PipelineExecutor is a small C# repository I’ve been experimenting with.

It’s not a full-blown framework (yet), but rather a codebase that demonstrates how you can:

  • Define pipelines made of multiple steps.
  • Attach compensation handlers to those steps.
  • Execute pipelines with resilience, so that failures don’t cascade into chaos.

If there’s interest, this could evolve into a proper NuGet package — for now, it’s a repo you can explore, fork, or experiment with.


Forward Execution vs Compensation

Here’s the idea: each pipeline step has two parts:

  1. Execute — what the step normally does.
  2. Compensate — how to undo it if a later step fails.

Think of it like a mini transaction log for your workflow.


Resilience: Beyond Rollback

Compensation is one side of the story. Resilience is the other.

Pipelines often interact with external systems (APIs, databases, services) that may:

  • Fail temporarily,
  • Timeout,
  • Or reject requests under load.

PipelineExecutor is designed so that steps can be extended with resilience strategies:

  • Retry policies with exponential backoff,
  • Timeouts for long-running operations,
  • Fallbacks if a step cannot succeed

This part isn’t fully fleshed out in the demo yet, but the architecture allows for it.


Why Does This Matter?

  • Business consistency: Avoid ending up in a “half-done” state.
  • Error recovery: Don’t just fail fast — recover gracefully.
  • Composability: Think of pipelines as Lego blocks you can snap together, each with its own execution and rollback logic.

Demo examples:

Dynamic Pipelines with Compensation in C#

One of the strengths of PipelineExecutor is that it’s not a rigid, hard-coded pipeline. Instead, pipelines are configurable and can be built dynamically from code, configuration files, or even external sources (like a DB, API, or service registry).

This is shown in the DynamicConfigurationPipelineExample demo.


Defining a Pipeline Step

Each step in the pipeline implements IPipelineExecutorAction<T> with two methods:

  • ExecuteAsync → the main business logic.
  • CompensateAsync → the rollback logic if something downstream fails.

Example:

public class DynamicPipelineAnalyticsUserAction 
    : IPipelineExecutorAction<IPipelineExecutionCommandObject<DynamicPipelineCommand,DynamicPipelineResponse>>
{
    public async Task ExecuteAsync(
        IPipelineExecutionCommandObject<DynamicPipelineCommand, DynamicPipelineResponse> command, 
        CancellationToken cancellation)
    {
        command.Response.Analytics.Add(new DynamicPipelineAnalyticsUser()
        {
            AnalyticsId = Guid.NewGuid(),
            Event = "UserCreated",
            UserId = command.Response.UserId
        });
        await Task.CompletedTask;
    }

    public Task CompensateAsync(
        IPipelineExecutionCommandObject<DynamicPipelineCommand, DynamicPipelineResponse> command, 
        CancellationToken cancellation)
    {
        // undo analytics insert here if later steps fail
        throw new NotImplementedException();
    }
}

👉 This means every step can have forward logic (execute) and rollback logic (compensate).


Configuring the Pipeline Dynamically

Instead of hardcoding the pipeline in Program.cs, the demo builds its configuration inside DynamicPipeline.

public async Task InitializeAsync(DynamicPipelineCommand command,CancellationToken cancellation = default)
{
    base.SetConfig(BuildConfiguration(command.Source), _serviceProvider);
    await Task.CompletedTask;
}

private IPipelineExecutorConfig BuildConfiguration(DynamicPipelineSource source)
{
    var config = new PipelineExecutorConfig();
    switch (source)
    {
        case DynamicPipelineSource.Website:
            config = CreateConfigForWebsite();
            break;
        case DynamicPipelineSource.MobileApp:
            config = CreateConfigForMobileApp();
            break;
        default:
            throw new ArgumentOutOfRangeException(nameof(source), source, null);
    }

    return config;
}

This allows you to switch pipeline definitions depending on the request source (Website, MobileApp, etc.).

Example: Website Pipeline ConfigurationHere’s the Website version of the pipeline:

private PipelineExecutorConfig CreateConfigForWebsite()
{
    var config = new PipelineExecutorConfig()
    {
        ActionConfigContexts = new List<PipelineExecutorActionConfigContext>()
        {
            new () { ActionName = typeof(Actions.DynamicPipelineValidateCommandAction).ToString(), Position = 1, HasCompensate = false },
            new () { ActionName = typeof(Actions.DynamicPipelineSetUser).ToString(), Position = 2, HasCompensate = false },
            new () { ActionName = typeof(Actions.DynamicPipelineSetUserDetails).ToString(), Position = 3, HasCompensate = false },
            new () { ActionName = typeof(Actions.DynamicPipelineEmailNotification).ToString(), Position = 3, HasCompensate = false },
            new () { ActionName = typeof(Actions.DynamicPipelineSmsNotification).ToString(), Position = 4, HasCompensate = false },
            new () { ActionName = typeof(Actions.DynamicPipelineAnalyticsUserAction).ToString(), Position = 5, HasCompensate = false }
        }
    };
    return config;
}

Each PipelineExecutorActionConfigContext defines:

  • ActionName → which step class to run.
  • Position → execution order.
  • HasCompensate → whether compensation logic exists for this step.

Running the Pipeline

Once configured, execution is straightforward:

var executionModel = new DynamicPipelineModel()
{
    Command = new DynamicPipelineCommand()
    {
        Name = "John Doe",
        Email = "jHoidsandsaiDoe@gmail.com",
        Source = DynamicPipelineSource.Website
    }
};

await pipeline.InitializeAsync(executionModel.Command);
await pipeline.ProcessActionsAsync(executionModel, CancellationToken.None);

This flow:

  1. Builds the pipeline dynamically based on the Source.
  2. Registers all the configured steps.
  3. Runs them sequentially, applying compensation if something fails mid-pipeline.

Example: Website Pipeline With Resilience

Here’s how you can update the Website configuration to include resilience policies:

private PipelineExecutorConfig CreateConfigForWebsite()
{
    var config = new PipelineExecutorConfig()
    {
        ActionConfigContexts = new List<PipelineExecutorActionConfigContext>()
        {
            new ()
            {
                ActionName = typeof(Actions.DynamicPipelineValidateCommandAction).ToString(),
                Position = 1,
                HasCompensate = false
            },
            new()
            {
                ActionName = typeof(Actions.DynamicPipelineSetUser).ToString(),
                Position = 2,
                HasCompensate = false
            },
            new()
            {
                ActionName = typeof(Actions.DynamicPipelineSetUserDetails).ToString(),
                Position = 3,
                HasCompensate = false
            },
            new()
            {
                ActionName = typeof(Actions.DynamicPipelineEmailNotification).ToString(),
                Position = 3,
                HasCompensate = true,
                ResilientConfig = new PipelineExecutorActionConfigResilientContext()
                {
                    RetryCount = 3,
                    RetryIntervalInMs = 10,
                    CompensateBeforeRetry = true,
                    IsResilient = true
                }
            },
            new()
            {
                ActionName = typeof(Actions.DynamicPipelineSmsNotification).ToString(),
                Position = 4,
                HasCompensate = true,
                ResilientConfig = new PipelineExecutorActionConfigResilientContext()
                {
                    RetryCount = 3,
                    RetryIntervalInMs = 10,
                    CompensateBeforeRetry = true,
                    IsResilient = true
                }
            },
            new ()
            {
                ActionName = typeof(Actions.DynamicPipelineAnalyticsUserAction).ToString(),
                Position = 5,
                HasCompensate = false
            }
        }
    };
    return config;
}

What This Means

  • IsResilient = true
    Mark this step as “protected” with resilience policies.
  • RetryCount = 3, RetryIntervalInMs = 10
    If the step fails, the pipeline retries it up to 3 times, with a 10ms delay between attempts.
  • CompensateBeforeRetry = true
    Before retrying the failing step, the pipeline calls its compensation handler to undo partial work — ensuring retries happen on a clean slate.

Execution Flow With Resilience

Imagine the DynamicPipelineEmailNotification step fails due to a transient SMTP error:

  1. Step executes and fails.
  2. CompensateAsync for that step is triggered to clean up.
  3. Pipeline retries the step (up to 3 times, with 10ms between tries).
  4. If retries succeed, → pipeline continues normally.
  5. If retries still fail → compensation cascades backward through all prior steps.

This makes the pipeline both resilient (it can recover from transient issues) and safe (partial work is cleaned up before retry).


Demo Summary

The DynamicConfigurationPipelineExample demo shows how PipelineExecutor can:

  • Dynamically build pipelines based on configuration (code, DB, API, or external source).
  • Support multiple application contexts — e.g., Website and MobileApp each run slightly different pipelines.
  • Register steps with compensation capability, so each action can define how to roll back its work if something downstream fails.
  • Provide resilience hooks — while not all are showcased in the demo, the design supports retries, fallbacks, and fault handling.

This makes PipelineExecutor more than a simple pipeline runner. It’s a foundation for building robust workflows in C#, adaptable to different business contexts, with compensation and resilience built in.


What’s Next?

Right now, PipelineExecutor is:

  • ✅ A GitHub repo with working code.
  • ✅ Able to define pipelines with forward and compensation steps.
  • 🚧 Still light on resilience demos.
  • 🚧 Not yet a NuGet package.

If this sparks interest, I’ll shape it into a polished library and publish it on NuGet.


Try It Out

You can explore the repository here:
👉 PipelineExecutor on GitHub

Clone it, play with the demo, and let me know:

  • Would you use a pipeline library with built-in compensation?
  • What resilience features would you need most (retry, timeout, etc.)?

Closing ThoughtsBuilding resilient systems is all about expecting failure and planning for recovery.PipelineExecutor is my take on making that easier in C#.It’s early days, but I believe this approach — pipelines with compensation and resilience — can help tame complexity in real-world workflows.

Leave a Reply

Your email address will not be published. Required fields are marked *