Skip to main content

Namespace: activity

This library provides tools for authoring activities.

Import this module from Activity code - must not be used in Workflows.

Any function can be used as an Activity as long as its parameters and return value are serializable using a DataConverter.

Cancellation

Activities may be cancelled only if they emit heartbeats.
There are 2 ways to handle Activity cancellation:

  1. await on Context.current().cancelled
  2. Pass the context's abort signal at Context.current().cancellationSignal to a library that supports it

Examples

An Activity that fakes progress and can be cancelled

import { Context } from '@temporalio/activity';
import { CancelledFailure } from '@temporalio/common';

export async function fakeProgress(sleepIntervalMs = 1000): Promise<void> {
try {
// allow for resuming from heartbeat
const startingPoint = Context.current().info.heartbeatDetails || 1;
console.log('Starting activity at progress:', startingPoint);
for (let progress = startingPoint; progress <= 100; ++progress) {
// simple utility to sleep in activity for given interval or throw if Activity is cancelled
// don't confuse with Workflow.sleep which is only used in Workflow functions!
console.log('Progress:', progress);
await Context.current().sleep(sleepIntervalMs);
Context.current().heartbeat(progress);
}
} catch (err) {
if (err instanceof CancelledFailure) {
console.log('Fake progress activity cancelled');
// Cleanup
}
throw err;
}
}

An Activity that makes a cancellable HTTP request

import fetch from 'node-fetch';
import { Context } from '@temporalio/activity';

export async function cancellableFetch(url: string): Promise<Uint8Array> {
const response = await fetch(url, { signal: Context.current().cancellationSignal });
const contentLengthHeader = response.headers.get('Content-Length');
if (contentLengthHeader === null) {
throw new Error('expected Content-Length header to be set');
}
const contentLength = parseInt(contentLengthHeader);
let bytesRead = 0;
const chunks: Buffer[] = [];

for await (const chunk of response.body) {
if (!(chunk instanceof Buffer)) {
throw new TypeError('Expected Buffer');
}
bytesRead += chunk.length;
chunks.push(chunk);
Context.current().heartbeat(bytesRead / contentLength);
}
return Buffer.concat(chunks);
}

Classes

Interfaces

References

ActivityFunction

Re-exports ActivityFunction


ActivityInterface

Re-exports ActivityInterface


CancelledFailure

Re-exports CancelledFailure

Functions

errorCode

errorCode(error): string | undefined

Get error code from an Error or return undefined

Parameters

NameType
errorunknown

Returns

string | undefined


errorMessage

errorMessage(err): string | undefined

Get error message from an Error or string or return undefined

Parameters

NameType
errunknown

Returns

string | undefined