Pagination
What is Pagination? #
Pagination in Orchesty refers to fetching and processing data in chunks rather than all at once. This is essential when working with APIs that return large datasets (hundreds or thousands of records) which cannot or should not be retrieved in a single request.
Key concepts:
- Batch nodes: Special nodes that split one message into many
- Cursor-based pagination: Using tokens/IDs to fetch next page
- Page-based pagination: Using numeric page numbers
- Iterate-only mode: Collect all pages before sending to followers
- Item distribution: Each item becomes a separate message
Why Use Pagination? #
API Limitations #
Most APIs limit response sizes:
- HubSpot: 100 contacts per request
- Salesforce: 2000 records per query
- Shopify: 250 products per page
- Google Drive: 1000 files per request
Performance #
Large datasets cause problems:
- Memory issues: Loading 10,000 records at once
- Timeouts: Long-running requests fail
- Network limits: Large payloads fail to transmit
- Rate limits: One huge request vs. many small ones
Processing Control #
Pagination enables:
- Parallel processing: Process items concurrently
- Progress tracking: Monitor page-by-page progress
- Error isolation: One failed item doesn't break everything
- Resource management: Control system load
How Pagination Works #
Regular Node vs Batch Node #
graph TB
subgraph Regular Node
RN[Node] -->|1 message in| RN2[Process]
RN2 -->|1 message out| RN3[Follower]
end
subgraph Batch Node
BN[Batch Node] -->|1 message in| BN2[Process]
BN2 -->|3 items added| BN3[Split]
BN3 -->|3 messages out| F1[Follower]
BN3 -->|"(one per item)"| F2[Follower]
BN3 --> F3[Follower]
end
Batch Node Execution Flow #
sequenceDiagram
participant T as Topology
participant B as Batch Node
participant Q as Queue
participant F as Followers
T->>B: Send message
B->>B: Fetch page 1
B->>B: addItem() for each result
alt Has more pages
B->>B: setBatchCursor('page2')
B->>Q: Return to queue
Q->>B: Execute again
B->>B: Fetch page 2
B->>B: addItem() for each result
else No more pages
B->>B: removeBatchCursor()
B->>F: Send all items as separate messages
end
Implementing Batch Nodes #
Basic Batch Node Structure #
import ABatchNode from '@orchesty/nodejs-sdk/lib/Batch/ABatchNode';
import BatchProcessDto from '@orchesty/nodejs-sdk/lib/Utils/BatchProcessDto';
export default class GetAllContactsBatch extends ABatchNode {
public getName(): string {
return 'get-all-contacts';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
// 1. Get current cursor (page number, token, etc.)
const cursor = dto.getBatchCursor();
// 2. Fetch data for current page
const response = await this.fetchPage(cursor);
// 3. Add each item to batch
response.items.forEach(item => {
dto.addItem(item);
});
// 4. Set cursor for next page, or remove if done
if (response.hasMore) {
dto.setBatchCursor(response.nextCursor);
} else {
dto.removeBatchCursor();
}
return dto;
}
private async fetchPage(cursor: string | null): Promise<any> {
// Implementation
return { items: [], hasMore: false, nextCursor: null };
}
}
Page-Based Pagination #
Using numeric page numbers:
import ABatchNode from '@orchesty/nodejs-sdk/lib/Batch/ABatchNode';
import BatchProcessDto from '@orchesty/nodejs-sdk/lib/Utils/BatchProcessDto';
import RequestDto from '@orchesty/nodejs-sdk/lib/Transport/Curl/RequestDto';
import { HttpMethods } from '@orchesty/nodejs-sdk/lib/Transport/HttpMethods';
export default class FetchAllOrdersBatch extends ABatchNode {
public getName(): string {
return 'fetch-all-orders';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
// Get current page (default to 1)
const page = parseInt(dto.getBatchCursor() || '1', 10);
const pageSize = 100;
// Fetch current page
const requestDto = new RequestDto(
`https://api.shop.com/orders?page=${page}&limit=${pageSize}`,
HttpMethods.GET,
dto
);
const response = await this.getSender().send(requestDto);
const data = JSON.parse(response.getBody());
// Add each order as separate item
data.orders.forEach(order => {
dto.addItem(order);
});
// Check if more pages exist
if (data.orders.length === pageSize && page < data.totalPages) {
// More pages available
dto.setBatchCursor((page + 1).toString());
} else {
// No more pages
dto.removeBatchCursor();
}
return dto;
}
}
Cursor-Based Pagination #
Using tokens/IDs for pagination:
export default class FetchContactsCursorBatch extends ABatchNode {
public getName(): string {
return 'fetch-contacts-cursor';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const cursor = dto.getBatchCursor(); // null on first run
// Build URL with cursor
const url = cursor
? `https://api.crm.com/contacts?cursor=${cursor}`
: 'https://api.crm.com/contacts';
const requestDto = new RequestDto(url, HttpMethods.GET, dto);
const response = await this.getSender().send(requestDto);
const data = JSON.parse(response.getBody());
// Add contacts
data.contacts.forEach(contact => {
dto.addItem(contact);
});
// Use next cursor from API response
if (data.nextCursor) {
dto.setBatchCursor(data.nextCursor);
} else {
dto.removeBatchCursor();
}
return dto;
}
}
Offset-Based Pagination #
Using offset/limit:
export default class FetchProductsOffsetBatch extends ABatchNode {
public getName(): string {
return 'fetch-products-offset';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const limit = 250;
const offset = parseInt(dto.getBatchCursor() || '0', 10);
const requestDto = new RequestDto(
`https://api.shop.com/products?limit=${limit}&offset=${offset}`,
HttpMethods.GET,
dto
);
const response = await this.getSender().send(requestDto);
const data = JSON.parse(response.getBody());
data.products.forEach(product => {
dto.addItem(product);
});
// Check if more results exist
if (data.products.length === limit) {
// Likely more pages
dto.setBatchCursor((offset + limit).toString());
} else {
// Last page
dto.removeBatchCursor();
}
return dto;
}
}
Iterate-Only Mode #
What is Iterate-Only? #
By default, batch nodes send items to followers after each iteration. With iterate-only mode, items are sent only after all pages are fetched.
When to Use Iterate-Only #
Use iterate-only when:
- You need all data before processing (e.g., sorting, deduplication)
- Order matters and API doesn't guarantee it
- You want to show total count before processing
- Processing requires complete dataset
Don't use iterate-only when:
- Dataset is very large (thousands of items)
- You want to process items as soon as possible
- Memory constraints exist
Implementing Iterate-Only #
export default class FetchAllContactsIterateOnly extends ABatchNode {
public getName(): string {
return 'fetch-all-contacts-iterate';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const page = parseInt(dto.getBatchCursor() || '1', 10);
const response = await this.fetchPage(page);
// Add items
response.contacts.forEach(contact => {
dto.addItem(contact);
});
if (page < response.totalPages) {
// Set cursor with iterate-only flag = true
dto.setBatchCursor((page + 1).toString(), true);
} else {
// All pages fetched - now send to followers
dto.removeBatchCursor();
}
return dto;
}
}
Iterate-Only vs Regular #
graph TB
subgraph Regular Mode
R1[Fetch Page 1] -->|Send 100 items| RF1[Followers]
R2[Fetch Page 2] -->|Send 100 items| RF2[Followers]
R3[Fetch Page 3] -->|Send 100 items| RF3[Followers]
end
subgraph Iterate-Only Mode
I1[Fetch Page 1] --> I2[Store items]
I3[Fetch Page 2] --> I4[Store items]
I5[Fetch Page 3] --> I6[Store items]
I6 -->|Send all 300 items| IF[Followers]
end
Common Pagination Patterns #
Pattern 1: Simple Page-Based #
export default class SimplePaginationBatch extends ABatchNode {
public getName(): string {
return 'simple-pagination';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const page = parseInt(dto.getBatchCursor() || '1', 10);
const perPage = 100;
const url = `https://api.example.com/items?page=${page}&per_page=${perPage}`;
const response = await this.fetchData(url);
response.forEach(item => dto.addItem(item));
// Continue if we got a full page
if (response.length === perPage) {
dto.setBatchCursor((page + 1).toString());
} else {
dto.removeBatchCursor();
}
return dto;
}
}
Pattern 2: API with Total Pages #
export default class TotalPagesPaginationBatch extends ABatchNode {
public getName(): string {
return 'total-pages-pagination';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const page = parseInt(dto.getBatchCursor() || '1', 10);
const requestDto = new RequestDto(
`https://api.example.com/data?page=${page}`,
HttpMethods.GET,
dto
);
const response = await this.getSender().send(requestDto);
const data = JSON.parse(response.getBody());
// Add items
data.results.forEach(item => dto.addItem(item));
// Use totalPages from API
if (page < data.totalPages) {
dto.setBatchCursor((page + 1).toString());
} else {
dto.removeBatchCursor();
}
return dto;
}
}
Pattern 3: Link Header Pagination #
export default class LinkHeaderPaginationBatch extends ABatchNode {
public getName(): string {
return 'link-header-pagination';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const url = dto.getBatchCursor() || 'https://api.github.com/repos/owner/repo/issues';
const requestDto = new RequestDto(url, HttpMethods.GET, dto);
const response = await this.getSender().send(requestDto);
const items = JSON.parse(response.getBody());
items.forEach(item => dto.addItem(item));
// Parse Link header for next page
const linkHeader = response.getResponseHeaders()['link'];
const nextUrl = this.parseNextLink(linkHeader);
if (nextUrl) {
dto.setBatchCursor(nextUrl);
} else {
dto.removeBatchCursor();
}
return dto;
}
private parseNextLink(linkHeader: string | undefined): string | null {
if (!linkHeader) return null;
const nextMatch = linkHeader.match(/<([^>]+)>;\s*rel="next"/);
return nextMatch ? nextMatch[1] : null;
}
}
Pattern 4: Date-Based Pagination #
export default class DatePaginationBatch extends ABatchNode {
public getName(): string {
return 'date-pagination';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const input = dto.getJsonData();
const cursor = dto.getBatchCursor();
// Use cursor as lastDate, or start from input date
const fromDate = cursor || input.fromDate || '2024-01-01';
const toDate = input.toDate || new Date().toISOString().split('T')[0];
const requestDto = new RequestDto(
`https://api.example.com/events?from=${fromDate}&to=${toDate}&limit=1000`,
HttpMethods.GET,
dto
);
const response = await this.getSender().send(requestDto);
const data = JSON.parse(response.getBody());
data.events.forEach(event => dto.addItem(event));
// Use last event's date as cursor
if (data.events.length > 0 && data.hasMore) {
const lastDate = data.events[data.events.length - 1].date;
dto.setBatchCursor(lastDate);
} else {
dto.removeBatchCursor();
}
return dto;
}
}
Pattern 5: Splitting Large Arrays #
Sometimes you receive all data at once but need to split it:
export default class SplitArrayBatch extends ABatchNode {
public getName(): string {
return 'split-array';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const input = dto.getJsonData();
const allOrders = input.orders; // Array of 1000 orders
const batchSize = 50;
const startIndex = parseInt(dto.getBatchCursor() || '0', 10);
// Get current batch
const batch = allOrders.slice(startIndex, startIndex + batchSize);
// Add items
batch.forEach(order => dto.addItem(order));
// Check if more batches remain
if (startIndex + batchSize < allOrders.length) {
dto.setBatchCursor((startIndex + batchSize).toString());
} else {
dto.removeBatchCursor();
}
return dto;
}
}
Pattern 6: Authenticated Pagination #
import CoreFormsEnum from '@orchesty/nodejs-sdk/lib/Application/Base/CoreFormsEnum';
export default class AuthenticatedPaginationBatch extends ABatchNode {
public getName(): string {
return 'authenticated-pagination';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const page = parseInt(dto.getBatchCursor() || '1', 10);
// Get credentials
const appInstall = await this.getApplicationInstallFromProcess(dto);
const apiKey = appInstall.getSettings()[CoreFormsEnum.AUTHORIZATION_FORM]['api_key'];
// Create authenticated request
const requestDto = new RequestDto(
`https://api.example.com/data?page=${page}`,
HttpMethods.GET,
dto
);
requestDto.setHeaders({
'Authorization': `Bearer ${apiKey}`
});
const response = await this.getSender().send(requestDto);
const data = JSON.parse(response.getBody());
data.items.forEach(item => dto.addItem(item));
if (data.hasNextPage) {
dto.setBatchCursor((page + 1).toString());
} else {
dto.removeBatchCursor();
}
return dto;
}
}
Batch Node Configuration #
Constructor Options #
export default class MyBatch extends ABatchNode {
constructor() {
super(
false // resultAsBatch: false = each item is separate message
// true = all items in one batch message
);
}
}
ResultAsBatch: False (Default) #
Each item becomes a separate message:
// Input: 3 items added
// Output: 3 separate messages to followers
dto.addItem({ id: 1, name: 'Item 1' });
dto.addItem({ id: 2, name: 'Item 2' });
dto.addItem({ id: 3, name: 'Item 3' });
// Followers receive:
// Message 1: { id: 1, name: 'Item 1' }
// Message 2: { id: 2, name: 'Item 2' }
// Message 3: { id: 3, name: 'Item 3' }
ResultAsBatch: True #
All items in one batch message:
constructor() {
super(true); // resultAsBatch = true
}
// Input: 3 items added
// Output: 1 message with array of items
dto.addItem({ id: 1, name: 'Item 1' });
dto.addItem({ id: 2, name: 'Item 2' });
dto.addItem({ id: 3, name: 'Item 3' });
// Followers receive:
// One message: [
// { id: 1, name: 'Item 1' },
// { id: 2, name: 'Item 2' },
// { id: 3, name: 'Item 3' }
// ]
Rate Limiting with Pagination #
Respecting API Rate Limits #
export default class RateLimitedPaginationBatch extends ABatchNode {
public getName(): string {
return 'rate-limited-pagination';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const page = parseInt(dto.getBatchCursor() || '1', 10);
// Set rate limiter before each API call
dto.setLimiter(
`api-pagination-${dto.getUser()}`,
60, // time window: 60 seconds
100 // max requests: 100
);
const response = await this.fetchPage(page);
response.items.forEach(item => dto.addItem(item));
if (response.hasMore) {
dto.setBatchCursor((page + 1).toString());
} else {
dto.removeBatchCursor();
}
return dto;
}
}
Error Handling in Pagination #
Retry Failed Pages #
import OnRepeatException from '@orchesty/nodejs-sdk/lib/Exception/OnRepeatException';
export default class RetryablePaginationBatch extends ABatchNode {
public getName(): string {
return 'retryable-pagination';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const page = parseInt(dto.getBatchCursor() || '1', 10);
try {
const response = await this.fetchPage(page);
response.items.forEach(item => dto.addItem(item));
if (response.hasMore) {
dto.setBatchCursor((page + 1).toString());
} else {
dto.removeBatchCursor();
}
} catch (error) {
if (error.response?.status >= 500) {
// Server error - retry this page
throw new OnRepeatException(
30,
5,
`Failed to fetch page ${page}: ${error.message}`
);
}
// Other errors - stop processing
throw error;
}
return dto;
}
}
Skip Invalid Items #
export default class SkipInvalidItemsBatch extends ABatchNode {
public getName(): string {
return 'skip-invalid-items';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const page = parseInt(dto.getBatchCursor() || '1', 10);
const response = await this.fetchPage(page);
// Filter and add only valid items
response.items.forEach(item => {
if (this.isValid(item)) {
dto.addItem(item);
} else {
console.warn(`Skipping invalid item: ${item.id}`);
}
});
if (response.hasMore) {
dto.setBatchCursor((page + 1).toString());
} else {
dto.removeBatchCursor();
}
return dto;
}
private isValid(item: any): boolean {
return item.id && item.name && item.email?.includes('@');
}
}
Best Practices #
1. Choose Appropriate Batch Sizes #
// Too small - many API calls
const pageSize = 10; // Not recommended
// Good balance
const pageSize = 100; // Recommended
// Too large - timeouts, memory issues
const pageSize = 10000; // Not recommended
2. Always Check for More Pages #
// Good - explicit check
if (data.hasMore) {
dto.setBatchCursor(nextCursor);
} else {
dto.removeBatchCursor();
}
// Risky - might create infinite loop
if (data.nextCursor) {
dto.setBatchCursor(data.nextCursor);
}
3. Set Rate Limits #
// Always set rate limits when paginating
dto.setLimiter(
`${this.getName()}-${dto.getUser()}`,
60,
100
);
4. Log Progress #
import logger from '@orchesty/nodejs-sdk/lib/Logger/Logger';
const page = parseInt(dto.getBatchCursor() || '1', 10);
logger.info(`Fetching page ${page}, added ${response.items.length} items`, dto);
5. Validate Cursor Values #
const cursor = dto.getBatchCursor();
// Validate cursor is expected format
if (cursor && !/^\d+$/.test(cursor)) {
throw new Error(`Invalid cursor format: ${cursor}`);
}
const page = cursor ? parseInt(cursor, 10) : 1;
6. Handle Empty Pages #
const response = await this.fetchPage(page);
if (response.items.length === 0) {
// No items on this page - stop
dto.removeBatchCursor();
return dto;
}
Monitoring Pagination #
Track Progress #
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const page = parseInt(dto.getBatchCursor() || '1', 10);
const response = await this.fetchPage(page);
response.items.forEach(item => dto.addItem(item));
// Log progress
console.log(`Page ${page}/${response.totalPages}: ${response.items.length} items`);
if (page < response.totalPages) {
dto.setBatchCursor((page + 1).toString());
} else {
dto.removeBatchCursor();
console.log(`Pagination complete: ${page} pages total`);
}
return dto;
}
Related Concepts #
- Data Flow - Understanding BatchProcessDto
- Rate Limiting - Preventing API throttling
- Retry Policy - Handling pagination failures
- Connector - Regular connectors vs batch nodes
- Error Handling - Managing pagination errors
API References #
- ABatchNode - Batch node base class
- BatchProcessDto - Batch data transfer object
- ProcessDto - Regular data transfer object
- AConnector - Regular connector class
Next Steps #
- Read ABatchNode documentation for complete API reference
- Learn about Data Flow to understand message routing
- Explore Rate Limiting for API throttling
- Check BatchProcessDto reference for batch-specific methods