Skip to content

[43991] add rollback to pipelines #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -6,19 +6,20 @@ A type-safe toolkit to easily compose synchronous process chains in TypeScript/J

## Table of Contents

- [Table of Contents](#table-of-contents)
- [Overview](#overview)
- [Installation](#installation)
- [Types](#types)
- [Builder](#builder)
- [Initializer](#initializer)
- [Stages](#stages)
- [Stage Arguments](#stage-arguments)
- [Stage Results](#stage-results)
- [Results Validator](#results-validator)
- [Middleware](#middleware)
- [Error Handling](#error-handling)
- [Example Use Cases](#example-use-cases)
- [Pipeline](#pipeline)
- [Table of Contents](#table-of-contents)
- [Overview](#overview)
- [Installation](#installation)
- [Types](#types)
- [Builder](#builder)
- [Initializer](#initializer)
- [Stages](#stages)
- [Stage Arguments](#stage-arguments)
- [Stage Results](#stage-results)
- [Results Validator](#results-validator)
- [Middleware](#middleware)
- [Error Handling](#error-handling)
- [Example Use Cases](#example-use-cases)

## Overview

@@ -67,6 +68,15 @@ The **Initializer** is a method that takes in the pipeline's arguments and produ

**Stages** are the independent steps in the process chain. They are processed synchronously (one at a time, in order) until the end of the chain is reached.

As of version `0.1.0` stages can be one of two types

- PipelineStage
- PipelineStageConfiguration

`PipelineStageConfiguration` adds the ability for the user to define a `rollback` function, which should undo changes made by the `execute` function.

The pipeline can support processing a collection of stages of either type.

### Stage Arguments

The following arguments are provided to a stage when it is executed:
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@fieldguide/pipeline",
"description": "A toolkit to easily build synchronous process pipelines in TypeScript/JavaScript",
"version": "0.0.1",
"version": "0.1.0",
"main": "build/index.js",
"types": "build/index.d.ts",
"scripts": {
21 changes: 20 additions & 1 deletion src/__mocks__/TestPipeline.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { last } from "lodash";
import { last, noop } from "lodash";
import type {
PipelineInitializer,
PipelineMiddleware,
PipelineResultValidator,
PipelineStage,
PipelineStageConfiguration,
} from "../types";

export interface TestPipelineArguments {
@@ -25,6 +26,12 @@ export type TestStage = PipelineStage<
TestPipelineResults
>;

export type TestStageWithRollback = PipelineStageConfiguration<
TestPipelineArguments,
TestPipelineContext,
TestPipelineResults
>;

export type TestMiddleware = PipelineMiddleware<
TestPipelineArguments,
TestPipelineContext,
@@ -84,6 +91,18 @@ export const errorStage: TestStage = () => {
throw Error("This stage throws an error!");
};

/**
* A stage that specifies a rollback function to undo changes
*/
export function generateStageWithRollback(
rollbackFunction: () => Promise<void> | void,
): TestStageWithRollback {
return {
execute: noop,
rollback: rollbackFunction,
};
}

/**
* A results validator for the test pipeline
*/
91 changes: 89 additions & 2 deletions src/__tests__/buildPipeline.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { PipelineRollbackError } from "error/PipelineRollbackError";
import { logStageMiddlewareFactory } from "middleware/logStageMiddlewareFactory";
import {
TestMiddleware,
TestPipelineArguments,
TestPipelineContext,
TestPipelineResults,
TestStage,
TestStageWithRollback,
additionStage,
errorStage,
generateStageWithRollback,
initializer,
returnHistoryResult,
testPipelineResultValidator,
@@ -28,7 +31,21 @@ const partialResultsStages: TestStage[] = [additionStage, returnSumResult];

const errorStages: TestStage[] = [errorStage, returnHistoryResult];

const rollback1 = jest.fn();
const rollback2 = jest.fn();

const stagesWithRollback: (TestStage | TestStageWithRollback)[] = [
additionStage,
generateStageWithRollback(rollback1),
generateStageWithRollback(rollback2),
errorStage,
];

describe("buildPipeline", () => {
beforeEach(() => {
jest.clearAllMocks();
});

describe("when running a simple pipeline", () => {
it("should produce a result when successful", async () => {
const results = await runPipelineForStages(successfulStages);
@@ -59,7 +76,7 @@ describe("buildPipeline", () => {
let testMiddleware1: TestMiddlewareMock;
let testMiddleware2: TestMiddlewareMock;

beforeAll(async () => {
beforeEach(async () => {
middlewareCalls = [];

const createMiddlewareMock = (name: string): TestMiddlewareMock => {
@@ -98,10 +115,80 @@ describe("buildPipeline", () => {
]);
});
});

describe("when using a pipeline stage that can rollback", () => {
let error: unknown;

describe("and the rollback is successful", () => {
beforeEach(async () => {
error = undefined;

try {
await runPipelineForStages(stagesWithRollback);
} catch (e) {
error = e;
}
});

it("should call configured rollback functions", () => {
expect(rollback1).toHaveBeenCalledTimes(1);
expect(rollback2).toHaveBeenCalledTimes(1);
});

it("should call the rollbacks in the proper order", () => {
expect(rollback2.mock.invocationCallOrder[0]).toBeLessThan(
rollback1.mock.invocationCallOrder[0] ?? 0,
);
});

it("should still throw the error", () => {
expect(error).toBeInstanceOf(PipelineError);
});
});

describe("and the rollback fails", () => {
const errorThrownInRollback = new Error("This is a rollback error");

beforeEach(async () => {
rollback1.mockImplementation(() => {
throw errorThrownInRollback;
});

error = undefined;

try {
await runPipelineForStages(stagesWithRollback);
} catch (e) {
error = e;
}
});

it("should run the rollbacks from subsequent stages", () => {
expect(rollback2).toHaveBeenCalledTimes(1);
});

it("should throw a PipelineRollbackError", () => {
expect(error).toBeInstanceOf(PipelineRollbackError);
});

it("should capture the original pipeline error", () => {
expect(
(error as PipelineRollbackError<object, object, object>)
.originalPipelineError.message,
).toBe("[TestPipeline] Error: This stage throws an error!");
});

it("should capture the original cause that was thrown in the rollback", () => {
expect(
(error as PipelineRollbackError<object, object, object>).cause,
).toBe(errorThrownInRollback);
});
});
});
});

function runPipelineForStages(
stages: TestStage[],
stages: (TestStage | TestStageWithRollback)[],
middleware: TestMiddleware[] = [],
) {
const pipeline = buildPipeline<
79 changes: 71 additions & 8 deletions src/buildPipeline.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { PipelineRollbackError } from "error/PipelineRollbackError";
import { merge } from "lodash";
import { isPipelineStageConfiguration } from "utils";
import { PipelineError } from "./error/PipelineError";
import type {
Pipeline,
@@ -7,6 +9,7 @@ import type {
PipelineMiddleware,
PipelineResultValidator,
PipelineStage,
PipelineStageConfiguration,
} from "./types";

interface BuildPipelineInput<
@@ -16,7 +19,7 @@ interface BuildPipelineInput<
> {
name: string;
initializer: PipelineInitializer<C, A>;
stages: PipelineStage<A, C, R>[];
stages: (PipelineStage<A, C, R> | PipelineStageConfiguration<A, C, R>)[];
resultsValidator: PipelineResultValidator<R>;
middleware?: PipelineMiddleware<A, C, R>[];
}
@@ -46,10 +49,26 @@ export function buildPipeline<
arguments: args,
};

try {
const stageNames = stages.map((s) => s.name);
const context = await initializer(args);

/** All stages converted to configurations */
const stageConfigurations: PipelineStageConfiguration<A, C, R>[] =
stages.map((stage) => {
if (isPipelineStageConfiguration(stage)) {
return stage;
}

return {
execute: stage,
};
});

const potentiallyProcessedStages = [];

const context = await initializer(args);
try {
const stageNames: string[] = stageConfigurations.map(
(s) => s.execute.name,
);
maybeContext = context;

const reversedMiddleware = [...middlewares].reverse();
@@ -70,15 +89,19 @@ export function buildPipeline<
};
};

for (const stage of stages) {
for (const stage of stageConfigurations) {
// initialize next() with the stage itself
let next = () => stage(context, metadata) as Promise<Partial<R>>;
let next = () =>
stage.execute(context, metadata) as Promise<Partial<R>>;

// wrap stage with middleware such that the first middleware is the outermost function
for (const middleware of reversedMiddleware) {
next = wrapMiddleware(middleware, stage.name, next);
next = wrapMiddleware(middleware, stage.execute.name, next);
}

// Add stage to a stack that can be rolled back if necessary
potentiallyProcessedStages.push(stage);

// invoke middleware-wrapped stage
const stageResults = await next();

@@ -94,13 +117,53 @@ export function buildPipeline<

return results;
} catch (cause) {
throw new PipelineError(
const pipelineError = new PipelineError(
String(cause),
maybeContext,
results,
metadata,
cause,
);

await rollback(
potentiallyProcessedStages,
context,
metadata,
results,
pipelineError,
);

// Throw error after rolling back all stages
throw pipelineError;
}
};
}

/**
* Rollback changes made by stages in reverse order
*/
async function rollback<A extends object, C extends object, R extends object>(
stages: PipelineStageConfiguration<A, C, R>[],
context: C,
metadata: PipelineMetadata<A>,
results: R,
originalPipelineError: PipelineError<A, C, R>,
) {
let stage;
while ((stage = stages.pop()) !== undefined) {
try {
if (stage.rollback) {
await stage.rollback(context, metadata);
}
} catch (rollbackCause) {
throw new PipelineRollbackError(
String(`Rollback failed for stage: ${stage.execute.name}`),
context,
results,
metadata,
originalPipelineError,
rollbackCause,
);
}
}
}
24 changes: 24 additions & 0 deletions src/error/PipelineRollbackError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PipelineMetadata } from "../types";
import { PipelineError } from "./PipelineError";

/**
* An error when executing a pipeline.
*/
export class PipelineRollbackError<
A extends object,
C extends object,
R extends object,
> extends PipelineError<A, C, R> {
constructor(
message: string,
protected override pipelineContext: C | undefined,
protected override pipelineResults: Partial<R>,
protected override pipelineMetadata: PipelineMetadata<A>,
/** The PipelineError that prompted the rollback */
public originalPipelineError: PipelineError<A, C, R>,
/** A throwable that caused this exception */
public override readonly cause?: unknown,
) {
super(message, pipelineContext, pipelineResults, pipelineMetadata, cause);
}
}
17 changes: 17 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -14,6 +14,23 @@ export type PipelineStage<
R extends object,
> = (context: C, metadata: PipelineMetadata<A>) => PipelineStageResult<R>;

/**
* A more explicit configuration for a pipeline stage
* * execute: the function that executes the stage (identical to `PipelineStage`)
* * rollback: the function that rolls back any changes made within `execute` should an error occur
*/
export interface PipelineStageConfiguration<
A extends object,
C extends object,
R extends object,
> {
execute: PipelineStage<A, C, R>;
rollback?: (
context: C,
metadata: PipelineMetadata<A>,
) => Promise<void> | void;
}

/**
* Optional partial result that gets merged with results from other stages
*/
11 changes: 11 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { PipelineStage, PipelineStageConfiguration } from "types";

export function isPipelineStageConfiguration<
A extends object,
C extends object,
R extends object,
>(
stage: PipelineStage<A, C, R> | PipelineStageConfiguration<A, C, R>,
): stage is PipelineStageConfiguration<A, C, R> {
return (stage as PipelineStageConfiguration<A, C, R>).execute !== undefined;
}