ABatchNode
Overview #
ABatchNode is the base class for creating batch processing nodes. Batch nodes process multiple items at once, allowing you to fetch paginated data from APIs, split large datasets, or aggregate multiple records.
Purpose:
- Process data in batches/chunks
- Handle paginated API responses
- Split large datasets into multiple messages
- Aggregate multiple items
Location: orchesty-nodejs-sdk/lib/Batch/ABatchNode.ts
When to Use #
Use ABatchNode when you need to:
- Fetch paginated data (e.g., get all customers from an API with pagination)
- Split large datasets (e.g., process 1000 records as 10 batches of 100)
- Iterate through API results with cursors/pagination tokens
- Aggregate data from multiple sources into batches
Common use cases:
- "Fetch all contacts from CRM (paginated)"
- "Split order list into batches of 50"
- "Iterate through API pages with cursor"
Class Hierarchy #
ANode
↓
ACommonConnector
↓
ABatchNode (You extend this)
↓
YourBatchNode
Constructor #
constructor(protected resultAsBatch = false)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
resultAsBatch | boolean | false | If true, wraps all items in a single batch message |
Example:
export default class MyBatchNode extends ABatchNode {
constructor() {
super(false); // Each item becomes a separate message
}
}
Abstract Methods You Must Implement #
processAction() #
public abstract processAction(dto: BatchProcessDto): BatchProcessDto | Promise<BatchProcessDto>
Main method where you implement batch processing logic.
Parameters:
| Parameter | Type | Description |
|---|---|---|
dto | BatchProcessDto | Batch data transfer object |
Returns: BatchProcessDto | Promise<BatchProcessDto> - Modified batch DTO with items
Pattern:
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
// 1. Get batch cursor (for pagination)
const cursor = dto.getBatchCursor();
// 2. Fetch data from API (with cursor if continuing pagination)
const response = await this.fetchDataFromAPI(cursor);
// 3. Add each item to the batch
response.items.forEach(item => {
dto.addItem(item);
});
// 4. If more data available, set cursor for next iteration
if (response.hasMore) {
dto.setBatchCursor(response.nextCursor);
} else {
dto.removeBatchCursor(); // Done - no more pages
}
return dto;
}
getName() #
public abstract getName(): string
Returns the unique identifier for your batch node.
Example:
public getName(): string {
return 'hubspot-get-all-contacts';
}
Inherited Methods Available #
getSender() #
protected getSender(): CurlSender
Returns the HTTP client for making API requests.
getApplication() #
protected getApplication<T extends IApplication>(): T
Returns the associated application instance.
getApplicationInstallFromProcess() #
protected async getApplicationInstallFromProcess(dto: AProcessDto): Promise<ApplicationInstall>
Gets application credentials for the current user.
Usage Examples #
Basic Batch Node #
From: orchesty-nodejs-sdk/test/Batch/TestBatch.ts
import ABatchNode from '../../lib/Batch/ABatchNode';
import BatchProcessDto from '../../lib/Utils/BatchProcessDto';
export default class TestBatch extends ABatchNode {
public getName(): string {
return 'testbatch';
}
public processAction(dto: BatchProcessDto): BatchProcessDto {
// Add items to the batch
dto.addItem({
dataTest: 'testValue',
});
// Example: Check cursor for pagination
if (dto.getBatchCursor() === 'testCursor') {
dto.removeBatchCursor(); // Done iterating
return dto;
}
// Set cursor to continue pagination
dto.setBatchCursor('testCursor');
return dto;
}
}
Paginated API Fetching #
import ABatchNode from '../../lib/Batch/ABatchNode';
import BatchProcessDto from '../../lib/Utils/BatchProcessDto';
import RequestDto from '../../lib/Transport/Curl/RequestDto';
import { HttpMethods } from '../../lib/Transport/HttpMethods';
export default class GetAllContactsBatch extends ABatchNode {
public getName(): string {
return 'get-all-contacts';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
// Get application credentials
const appInstall = await this.getApplicationInstallFromProcess(dto);
const apiKey = appInstall.getSettings()['authForm']['api_key'];
// Get cursor from previous iteration (empty string on first call)
const cursor = dto.getBatchCursor();
// Build API URL with cursor
let url = 'https://api.example.com/contacts?limit=100';
if (cursor) {
url += `&cursor=${cursor}`;
}
// Make API request
const requestDto = new RequestDto(url, HttpMethods.GET, dto);
requestDto.setHeaders({ 'Authorization': `Bearer ${apiKey}` });
const responseDto = await this.getSender().send(requestDto);
const response = JSON.parse(responseDto.getBody());
// Add each contact as a separate item
response.contacts.forEach((contact: any) => {
dto.addItem(contact);
});
// Check if more pages exist
if (response.hasMore && response.nextCursor) {
// Set cursor to continue on next iteration
dto.setBatchCursor(response.nextCursor);
} else {
// No more pages - remove cursor to stop iteration
dto.removeBatchCursor();
}
return dto;
}
}
Splitting Large Dataset #
export default class SplitOrdersBatch extends ABatchNode {
public getName(): string {
return 'split-orders';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
// Get input data
const inputData = dto.getJsonData();
const allOrders = inputData.orders; // Array of 1000 orders
// Split into batches of 50
const batchSize = 50;
for (let i = 0; i < allOrders.length; i += batchSize) {
const batch = allOrders.slice(i, i + batchSize);
// Add each batch as a single item
dto.addItem({
orders: batch,
batchNumber: Math.floor(i / batchSize) + 1,
totalBatches: Math.ceil(allOrders.length / batchSize)
});
}
return dto;
}
}
Cursor-Only Iteration #
export default class IteratePagesBatch extends ABatchNode {
public getName(): string {
return 'iterate-pages';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const cursor = dto.getBatchCursor();
// Fetch page
const page = await this.fetchPage(cursor || '1');
// Add items
page.items.forEach(item => {
dto.addItem(item);
});
// Set cursor for iteration only (don't call followers on each iteration)
if (page.hasNextPage) {
dto.setBatchCursor(page.nextPage.toString(), true); // iterateOnly = true
} else {
dto.removeBatchCursor();
}
return dto;
}
}
Common Patterns #
Pattern 1: API Pagination with Cursor #
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const cursor = dto.getBatchCursor();
const response = await this.callAPI(cursor);
response.data.forEach(item => dto.addItem(item));
if (response.nextCursor) {
dto.setBatchCursor(response.nextCursor);
} else {
dto.removeBatchCursor();
}
return dto;
}
Pattern 2: Page-Based Pagination #
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const page = parseInt(dto.getBatchCursor() || '1', 10);
const response = await this.callAPI(page);
response.items.forEach(item => dto.addItem(item));
if (page < response.totalPages) {
dto.setBatchCursor((page + 1).toString());
} else {
dto.removeBatchCursor();
}
return dto;
}
Pattern 3: Split and Process #
public processAction(dto: BatchProcessDto): BatchProcessDto {
const data = dto.getJsonData();
const items = data.items;
// Process and add items
const processed = this.processItems(items);
processed.forEach(item => dto.addItem(item));
return dto;
}
Next Steps #
- BatchProcessDto - Work with batch data
- AConnector - Create regular connectors
- ProcessDto - Standard data handling
- DIContainer - Register batch nodes