Topology
What is a Topology? #
A topology in Orchesty is a workflow that defines how data moves through your integration. It's a graph of nodes connected by queues, where each node processes data and passes it to the next. Topologies are the visual representation of your business processes, showing the path data takes from source to destination.
Key concepts:
- Nodes: Processing units (connectors, custom nodes, batch nodes)
- Queues: RabbitMQ queues connecting nodes
- ProcessDto: Data container flowing through the topology
- Headers: Tracking information (correlation IDs, node context)
- Message routing: How messages flow between nodes
Why Understanding Topologies Matters #
Understanding topologies helps you:
- Design workflows: Plan how data moves through your integration
- Debug issues: Trace data through nodes using correlation IDs
- Transform data: Know when and where to modify data
- Control routing: Direct messages to specific paths
- Handle errors: Understand what happens when processing fails
- Optimize performance: Design efficient processing patterns
Topology Architecture #
Visual Representation #
graph LR
Start[Starting Point] --> N1[Connector: Get Customer]
N1 --> Q1[Queue]
Q1 --> N2[Custom Node: Transform]
N2 --> Q2[Queue]
Q2 --> N3[Connector: Save to ERP]
N3 --> End[Complete]
style N1 fill:#e1f5ff
style N2 fill:#e8f5e8
style N3 fill:#e1f5ff
style Q1 fill:#fff4e1
style Q2 fill:#fff4e1
Topology Components #
graph TB
subgraph Topology
SP[Starting Point]
N1[Node 1]
N2[Node 2]
N3[Node 3]
Q1[Queue 1]
Q2[Queue 2]
Q3[Queue 3]
end
Client[External System] -->|HTTP Request| SP
SP --> N1
N1 --> Q1
Q1 --> N2
N2 --> Q2
Q2 --> N3
N3 --> Q3
Q3 -->|Response| Client
ProcessDto: Data Container in Topology #
At the center of every topology is ProcessDto, which carries data and metadata as it flows from node to node.
ProcessDto Lifecycle #
sequenceDiagram
participant Client as External System
participant Entry as Starting Point
participant N1 as Node 1
participant Queue as RabbitMQ
participant N2 as Node 2
participant DB as Database
Client->>Entry: HTTP Request with data
Entry->>Entry: Create ProcessDto
Entry->>Entry: Set headers (correlation ID, topology ID)
Entry->>N1: Process data
N1->>N1: Read dto.getJsonData()
N1->>N1: Transform/process
N1->>N1: Set dto.setJsonData(result)
N1->>Queue: Send to queue
Queue->>N2: Deliver message
N2->>N2: Process data
N2->>DB: Store result
N2-->>Client: Return response (async)
ProcessDto Structure #
{
// The actual data payload
data: {
customerId: "123",
orderTotal: 1500,
items: [...]
},
// HTTP headers for tracking and context
headers: {
"node-id": "node-abc-123",
"node-name": "get-customer",
"topology-id": "topo-xyz-456",
"topology-name": "order-processing",
"correlation-id": "corr-789-def",
"previous-correlation-id": "corr-456-abc",
"user": "user@example.com",
"application": "shopify"
},
// Metadata for flow control
metadata: {
resultCode: "success",
resultMessage: "Customer fetched successfully",
limiter: {...},
repeater: {...}
}
}
Key Headers in Topology #
| Header | Purpose | Example |
|---|---|---|
correlation-id | Unique ID for tracking this message through topology | corr-789-def |
topology-id | Which topology is running | topo-xyz-456 |
topology-name | Human-readable topology name | order-processing |
node-id | Current node identifier | node-abc-123 |
node-name | Human-readable node name | get-customer |
user | User context | user@example.com |
application | Associated application | shopify |
process-id | Process execution ID | proc-123-456 |
Working with Data in Topology #
Reading Data #
import ProcessDto from '@orchesty/nodejs-sdk/lib/Utils/ProcessDto';
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
// Read as JSON object
const data = dto.getJsonData();
console.log(data.customerId); // "123"
// Read as raw string
const rawData = dto.getData();
console.log(rawData); // '{"customerId":"123"}'
// Read specific header
const topologyName = dto.getHeader('topology-name');
const correlationId = dto.getHeader('correlation-id');
return dto;
}
Writing Data #
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
// Set JSON data (automatically stringified)
dto.setJsonData({
customerId: "123",
customerName: "John Doe",
processed: true
});
// Or set raw string data
dto.setData('{"customerId":"123"}');
// Or set binary data
const buffer = Buffer.from('binary data');
dto.setData(buffer.toString('base64'));
return dto;
}
Modifying Data #
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
// Get current data
const input = dto.getJsonData();
// Transform it
const output = {
...input,
processedAt: new Date().toISOString(),
total: input.items.reduce((sum, item) => sum + item.price, 0),
itemCount: input.items.length
};
// Set transformed data
dto.setJsonData(output);
return dto;
}
Message Routing in Topology #
Default Routing #
By default, messages flow to all connected followers:
graph LR
Node1[Node 1] --> Node2[Node 2]
Node1 --> Node3[Node 3]
Node1 --> Node4[Node 4]
Conditional Routing #
Use setForceFollowers() to route to specific nodes:
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const order = dto.getJsonData();
// Route based on order total
if (order.total > 1000) {
dto.setForceFollowers('high-value-handler');
} else if (order.total > 100) {
dto.setForceFollowers('standard-handler');
} else {
dto.setForceFollowers('low-value-handler');
}
return dto;
}
Multiple Followers #
// Send to multiple specific nodes
dto.setForceFollowers('validator', 'logger', 'notifier');
Stop Processing #
import ResultCode from '@orchesty/nodejs-sdk/lib/Utils/ResultCode';
// Stop with failure
dto.setStopProcess(
ResultCode.STOP_AND_FAILED,
'Customer not found'
);
// Stop without failure (filter out)
dto.setStopProcess(
ResultCode.DO_NOT_CONTINUE,
'Message filtered out'
);
Data Formats in Topology #
JSON Data (Most Common) #
// Input: {"name": "John", "age": 30}
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const data = dto.getJsonData();
// data is automatically parsed object
console.log(data.name); // "John"
console.log(data.age); // 30
// Modify and return
dto.setJsonData({
...data,
processed: true
});
return dto;
}
Raw String Data #
// Input: "plain text data"
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const rawData = dto.getData();
// Process as string
const processed = rawData.toUpperCase();
dto.setData(processed);
return dto;
}
XML Data #
import xml2js from 'xml2js';
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const xmlString = dto.getData();
// Parse XML
const parser = new xml2js.Parser();
const jsonData = await parser.parseStringPromise(xmlString);
// Return as JSON
dto.setJsonData(jsonData);
return dto;
}
Binary Data #
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
// Read binary data
const base64Data = dto.getData();
const buffer = Buffer.from(base64Data, 'base64');
// Process binary data
const processed = this.processBinary(buffer);
// Return as base64
dto.setData(processed.toString('base64'));
return dto;
}
Topology Execution Modes #
Single Message Processing #
Default mode - one message in, one message out:
// Input: {"orderId": "123"}
// Output: {"orderId": "123", "status": "processed"}
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const order = dto.getJsonData();
// Process single order
const result = await this.processOrder(order);
dto.setJsonData(result);
return dto;
}
Batch Processing #
Split one message into many:
import BatchProcessDto from '@orchesty/nodejs-sdk/lib/Utils/BatchProcessDto';
import ABatchNode from '@orchesty/nodejs-sdk/lib/Batch/ABatchNode';
// Input: {"orders": [order1, order2, order3]}
// Output: 3 separate messages, one per order
export default class SplitOrdersBatch extends ABatchNode {
public getName(): string {
return 'split-orders';
}
public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
const { orders } = dto.getJsonData();
// Each addItem creates a separate message
orders.forEach(order => {
dto.addItem(order);
});
return dto;
}
}
Topology Context and Tracking #
Correlation IDs #
Correlation IDs track messages through the topology:
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const correlationId = dto.getHeader('correlation-id');
console.log(`Processing message: ${correlationId}`);
// All logs will include this correlation ID
logger.info('Fetching customer data', dto);
return dto;
}
User Context #
Access user information:
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const user = dto.getUser();
// Use user context for credentials
const appInstall = await this.getApplicationInstallFromProcess(dto);
// Or for user-specific logic
if (user === 'admin@company.com') {
// Admin-specific processing
}
return dto;
}
Topology Context #
Know which topology is running:
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const topologyName = dto.getHeader('topology-name');
const topologyId = dto.getHeader('topology-id');
const nodeName = dto.getHeader('node-name');
console.log(`Running in topology: ${topologyName}, node: ${nodeName}`);
return dto;
}
Flow Control in Topology #
Success Processing #
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
// Process data
const result = await this.fetchData();
dto.setJsonData(result);
dto.setSuccessProcess('Data fetched successfully');
return dto;
}
Stopping Flow #
import ResultCode from '@orchesty/nodejs-sdk/lib/Utils/ResultCode';
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const data = dto.getJsonData();
// Validate
if (!data.email || !data.email.includes('@')) {
dto.setStopProcess(
ResultCode.STOP_AND_FAILED,
'Invalid email address'
);
return dto;
}
// Continue processing...
return dto;
}
Retry Processing #
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const result = await this.checkJobStatus();
if (result.status === 'pending') {
// Retry every 30 seconds, up to 10 times
dto.setRepeater(30, 10, 'Job still processing');
return dto;
}
if (result.status === 'completed') {
dto.removeRepeater();
dto.setJsonData(result.data);
return dto;
}
return dto;
}
Queue-Based Architecture #
How Queues Work in Topology #
graph LR
N1[Node 1: Fetch Data] -->|Publish| Q1[Queue 1]
Q1 -->|Consume| N2[Node 2: Transform]
N2 -->|Publish| Q2[Queue 2]
Q2 -->|Consume| N3[Node 3: Store]
style Q1 fill:#fff4e1
style Q2 fill:#fff4e1
Benefits of Queue-Based Topology #
- Asynchronous processing: Nodes don't wait for each other
- Reliability: Messages persist in queues if nodes fail
- Scalability: Multiple workers can consume from same queue
- Decoupling: Nodes don't need to know about each other
Message Persistence #
Messages are stored in RabbitMQ until:
- Successfully processed by all followers
- Explicitly stopped with
setStopProcess() - Maximum retry attempts exceeded
Common Topology Patterns #
Pattern 1: Linear Processing #
graph LR
Start --> Fetch[Fetch Data]
Fetch --> Transform[Transform]
Transform --> Validate[Validate]
Validate --> Save[Save]
Save --> Notify[Notify]
// Each node processes and passes to next
// Order: Fetch → Transform → Validate → Save → Notify
Pattern 2: Conditional Branching #
graph TB
Start --> Router{Router}
Router -->|High Value| HighPath[High Value Path]
Router -->|Standard| StandardPath[Standard Path]
Router -->|Low Value| LowPath[Low Value Path]
// Router node decides path based on data
if (order.total > 1000) {
dto.setForceFollowers('high-value-handler');
} else {
dto.setForceFollowers('standard-handler');
}
Pattern 3: Parallel Processing #
graph TB
Start --> Splitter[Splitter]
Splitter --> P1[Processor 1]
Splitter --> P2[Processor 2]
Splitter --> P3[Processor 3]
P1 --> Merge[Merge]
P2 --> Merge
P3 --> Merge
// Batch node splits into multiple parallel paths
// Each item processed independently
// Optional: Merge results at end
Pattern 4: Fan-Out #
graph LR
Source[Source] --> N1[Notification]
Source --> N2[Logging]
Source --> N3[Analytics]
Source --> N4[Archive]
// One message triggers multiple independent processes
// All followers receive the same data
Pattern 5: Enrichment Pipeline #
graph LR
Start --> Base[Base Data]
Base --> Enrich1[+ Customer Info]
Enrich1 --> Enrich2[+ Order History]
Enrich2 --> Enrich3[+ Preferences]
Enrich3 --> Complete[Complete Data]
// Each node adds more information
// Data grows as it flows through topology
Debugging Topologies #
View Message Contents #
Use logs to inspect data:
import logger from '@orchesty/nodejs-sdk/lib/Logger/Logger';
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
// Log input data
logger.info('Input data', dto);
const input = dto.getJsonData();
logger.debug(`Processing customer: ${input.customerId}`, dto);
// Process...
logger.info('Output data', dto);
return dto;
}
Trace with Correlation IDs #
const correlationId = dto.getHeader('correlation-id');
console.log(`[${correlationId}] Starting processing`);
// Later in logs, search for this correlation ID to see full message journey
Check Headers #
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const headers = dto.getHeaders();
console.log('All headers:', headers);
console.log('Topology:', dto.getHeader('topology-name'));
console.log('Node:', dto.getHeader('node-name'));
console.log('User:', dto.getUser());
return dto;
}
Topology Monitoring in Orchesty Admin #
In Orchesty Admin you can:
- View topology graph: See all nodes and connections
- Monitor execution: Watch messages flow through
- Check node status: See which nodes are processing
- View logs: See logs for each node execution
- Inspect data: See message contents at each step
- Trace failures: Follow correlation IDs through topology
Best Practices for Topologies #
1. Always Return ProcessDto #
// Good
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
dto.setJsonData(result);
return dto;
}
// Bad
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
dto.setJsonData(result);
// Forgot to return!
}
2. Don't Mutate Input Objects #
// Good
const input = dto.getJsonData();
const output = { ...input, processed: true };
dto.setJsonData(output);
// Bad
const input = dto.getJsonData();
input.processed = true; // Mutating input
dto.setJsonData(input);
3. Set Meaningful Success Messages #
// Good
dto.setSuccessProcess(`Processed ${itemCount} items successfully`);
// Bad
dto.setSuccessProcess('Done');
4. Validate Early in Topology #
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const data = dto.getJsonData();
// Validate first
if (!data.email) {
dto.setStopProcess(ResultCode.STOP_AND_FAILED, 'Email required');
return dto;
}
// Then process
const result = await this.processEmail(data.email);
dto.setJsonData(result);
return dto;
}
5. Use Structured Data #
// Good - structured
dto.setJsonData({
status: 'success',
data: result,
timestamp: new Date().toISOString(),
recordCount: result.length
});
// Bad - unstructured
dto.setData('Success! Got some data...');
6. Design for Failure #
// Always handle errors gracefully
try {
const result = await this.process();
dto.setJsonData(result);
} catch (error) {
logger.error('Processing failed', dto, error);
dto.setStopProcess(
ResultCode.STOP_AND_FAILED,
`Failed: ${error.message}`
);
}
return dto;
Related Concepts #
- Connector - Nodes that call external APIs
- Custom Node - Nodes for data transformation
- Pagination - Batch processing in topologies
- Error Handling - Handling failures in topologies
- Retry Policy - Retrying failed nodes
- Rate Limiting - Controlling flow in topologies
API References #
- ProcessDto - Complete ProcessDto documentation
- BatchProcessDto - Batch processing DTO
- AConnector - Connector base class
- ACommonNode - Custom node base class
- ABatchNode - Batch node base class
Next Steps #
- Understand Connectors to see how nodes call APIs
- Learn about Custom Nodes for data transformation
- Explore Pagination for batch processing patterns
- Read ProcessDto API docs for complete method reference
- Check Orchesty Admin documentation for topology design