Topology

What is a Topology? #

A topology in Orchesty is a workflow that defines how data moves through your integration. It's a graph of nodes connected by queues, where each node processes data and passes it to the next. Topologies are the visual representation of your business processes, showing the path data takes from source to destination.

Key concepts:

  • Nodes: Processing units (connectors, custom nodes, batch nodes)
  • Queues: RabbitMQ queues connecting nodes
  • ProcessDto: Data container flowing through the topology
  • Headers: Tracking information (correlation IDs, node context)
  • Message routing: How messages flow between nodes

Why Understanding Topologies Matters #

Understanding topologies helps you:

  • Design workflows: Plan how data moves through your integration
  • Debug issues: Trace data through nodes using correlation IDs
  • Transform data: Know when and where to modify data
  • Control routing: Direct messages to specific paths
  • Handle errors: Understand what happens when processing fails
  • Optimize performance: Design efficient processing patterns

Topology Architecture #

Visual Representation #

graph LR
    Start[Starting Point] --> N1[Connector: Get Customer]
    N1 --> Q1[Queue]
    Q1 --> N2[Custom Node: Transform]
    N2 --> Q2[Queue]
    Q2 --> N3[Connector: Save to ERP]
    N3 --> End[Complete]
    
    style N1 fill:#e1f5ff
    style N2 fill:#e8f5e8
    style N3 fill:#e1f5ff
    style Q1 fill:#fff4e1
    style Q2 fill:#fff4e1

Topology Components #

graph TB
    subgraph Topology
        SP[Starting Point]
        N1[Node 1]
        N2[Node 2]
        N3[Node 3]
        Q1[Queue 1]
        Q2[Queue 2]
        Q3[Queue 3]
    end
    
    Client[External System] -->|HTTP Request| SP
    SP --> N1
    N1 --> Q1
    Q1 --> N2
    N2 --> Q2
    Q2 --> N3
    N3 --> Q3
    Q3 -->|Response| Client

ProcessDto: Data Container in Topology #

At the center of every topology is ProcessDto, which carries data and metadata as it flows from node to node.

ProcessDto Lifecycle #

sequenceDiagram
    participant Client as External System
    participant Entry as Starting Point
    participant N1 as Node 1
    participant Queue as RabbitMQ
    participant N2 as Node 2
    participant DB as Database
    
    Client->>Entry: HTTP Request with data
    Entry->>Entry: Create ProcessDto
    Entry->>Entry: Set headers (correlation ID, topology ID)
    Entry->>N1: Process data
    N1->>N1: Read dto.getJsonData()
    N1->>N1: Transform/process
    N1->>N1: Set dto.setJsonData(result)
    N1->>Queue: Send to queue
    Queue->>N2: Deliver message
    N2->>N2: Process data
    N2->>DB: Store result
    N2-->>Client: Return response (async)

ProcessDto Structure #

{
  // The actual data payload
  data: {
    customerId: "123",
    orderTotal: 1500,
    items: [...]
  },
  
  // HTTP headers for tracking and context
  headers: {
    "node-id": "node-abc-123",
    "node-name": "get-customer",
    "topology-id": "topo-xyz-456",
    "topology-name": "order-processing",
    "correlation-id": "corr-789-def",
    "previous-correlation-id": "corr-456-abc",
    "user": "user@example.com",
    "application": "shopify"
  },
  
  // Metadata for flow control
  metadata: {
    resultCode: "success",
    resultMessage: "Customer fetched successfully",
    limiter: {...},
    repeater: {...}
  }
}

Key Headers in Topology #

HeaderPurposeExample
correlation-idUnique ID for tracking this message through topologycorr-789-def
topology-idWhich topology is runningtopo-xyz-456
topology-nameHuman-readable topology nameorder-processing
node-idCurrent node identifiernode-abc-123
node-nameHuman-readable node nameget-customer
userUser contextuser@example.com
applicationAssociated applicationshopify
process-idProcess execution IDproc-123-456

Working with Data in Topology #

Reading Data #

import ProcessDto from '@orchesty/nodejs-sdk/lib/Utils/ProcessDto';

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    // Read as JSON object
    const data = dto.getJsonData();
    console.log(data.customerId);  // "123"
    
    // Read as raw string
    const rawData = dto.getData();
    console.log(rawData);  // '{"customerId":"123"}'
    
    // Read specific header
    const topologyName = dto.getHeader('topology-name');
    const correlationId = dto.getHeader('correlation-id');
    
    return dto;
}

Writing Data #

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    // Set JSON data (automatically stringified)
    dto.setJsonData({
        customerId: "123",
        customerName: "John Doe",
        processed: true
    });
    
    // Or set raw string data
    dto.setData('{"customerId":"123"}');
    
    // Or set binary data
    const buffer = Buffer.from('binary data');
    dto.setData(buffer.toString('base64'));
    
    return dto;
}

Modifying Data #

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    // Get current data
    const input = dto.getJsonData();
    
    // Transform it
    const output = {
        ...input,
        processedAt: new Date().toISOString(),
        total: input.items.reduce((sum, item) => sum + item.price, 0),
        itemCount: input.items.length
    };
    
    // Set transformed data
    dto.setJsonData(output);
    
    return dto;
}

Message Routing in Topology #

Default Routing #

By default, messages flow to all connected followers:

graph LR
    Node1[Node 1] --> Node2[Node 2]
    Node1 --> Node3[Node 3]
    Node1 --> Node4[Node 4]

Conditional Routing #

Use setForceFollowers() to route to specific nodes:

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const order = dto.getJsonData();
    
    // Route based on order total
    if (order.total > 1000) {
        dto.setForceFollowers('high-value-handler');
    } else if (order.total > 100) {
        dto.setForceFollowers('standard-handler');
    } else {
        dto.setForceFollowers('low-value-handler');
    }
    
    return dto;
}

Multiple Followers #

// Send to multiple specific nodes
dto.setForceFollowers('validator', 'logger', 'notifier');

Stop Processing #

import ResultCode from '@orchesty/nodejs-sdk/lib/Utils/ResultCode';

// Stop with failure
dto.setStopProcess(
    ResultCode.STOP_AND_FAILED,
    'Customer not found'
);

// Stop without failure (filter out)
dto.setStopProcess(
    ResultCode.DO_NOT_CONTINUE,
    'Message filtered out'
);

Data Formats in Topology #

JSON Data (Most Common) #

// Input: {"name": "John", "age": 30}

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const data = dto.getJsonData();
    
    // data is automatically parsed object
    console.log(data.name);  // "John"
    console.log(data.age);   // 30
    
    // Modify and return
    dto.setJsonData({
        ...data,
        processed: true
    });
    
    return dto;
}

Raw String Data #

// Input: "plain text data"

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const rawData = dto.getData();
    
    // Process as string
    const processed = rawData.toUpperCase();
    
    dto.setData(processed);
    return dto;
}

XML Data #

import xml2js from 'xml2js';

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const xmlString = dto.getData();
    
    // Parse XML
    const parser = new xml2js.Parser();
    const jsonData = await parser.parseStringPromise(xmlString);
    
    // Return as JSON
    dto.setJsonData(jsonData);
    return dto;
}

Binary Data #

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    // Read binary data
    const base64Data = dto.getData();
    const buffer = Buffer.from(base64Data, 'base64');
    
    // Process binary data
    const processed = this.processBinary(buffer);
    
    // Return as base64
    dto.setData(processed.toString('base64'));
    return dto;
}

Topology Execution Modes #

Single Message Processing #

Default mode - one message in, one message out:

// Input: {"orderId": "123"}
// Output: {"orderId": "123", "status": "processed"}

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const order = dto.getJsonData();
    
    // Process single order
    const result = await this.processOrder(order);
    
    dto.setJsonData(result);
    return dto;
}

Batch Processing #

Split one message into many:

import BatchProcessDto from '@orchesty/nodejs-sdk/lib/Utils/BatchProcessDto';
import ABatchNode from '@orchesty/nodejs-sdk/lib/Batch/ABatchNode';

// Input: {"orders": [order1, order2, order3]}
// Output: 3 separate messages, one per order

export default class SplitOrdersBatch extends ABatchNode {
    
    public getName(): string {
        return 'split-orders';
    }
    
    public async processAction(dto: BatchProcessDto): Promise<BatchProcessDto> {
        const { orders } = dto.getJsonData();
        
        // Each addItem creates a separate message
        orders.forEach(order => {
            dto.addItem(order);
        });
        
        return dto;
    }
}

Topology Context and Tracking #

Correlation IDs #

Correlation IDs track messages through the topology:

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const correlationId = dto.getHeader('correlation-id');
    
    console.log(`Processing message: ${correlationId}`);
    
    // All logs will include this correlation ID
    logger.info('Fetching customer data', dto);
    
    return dto;
}

User Context #

Access user information:

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const user = dto.getUser();
    
    // Use user context for credentials
    const appInstall = await this.getApplicationInstallFromProcess(dto);
    
    // Or for user-specific logic
    if (user === 'admin@company.com') {
        // Admin-specific processing
    }
    
    return dto;
}

Topology Context #

Know which topology is running:

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const topologyName = dto.getHeader('topology-name');
    const topologyId = dto.getHeader('topology-id');
    const nodeName = dto.getHeader('node-name');
    
    console.log(`Running in topology: ${topologyName}, node: ${nodeName}`);
    
    return dto;
}

Flow Control in Topology #

Success Processing #

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    // Process data
    const result = await this.fetchData();
    
    dto.setJsonData(result);
    dto.setSuccessProcess('Data fetched successfully');
    
    return dto;
}

Stopping Flow #

import ResultCode from '@orchesty/nodejs-sdk/lib/Utils/ResultCode';

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const data = dto.getJsonData();
    
    // Validate
    if (!data.email || !data.email.includes('@')) {
        dto.setStopProcess(
            ResultCode.STOP_AND_FAILED,
            'Invalid email address'
        );
        return dto;
    }
    
    // Continue processing...
    return dto;
}

Retry Processing #

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const result = await this.checkJobStatus();
    
    if (result.status === 'pending') {
        // Retry every 30 seconds, up to 10 times
        dto.setRepeater(30, 10, 'Job still processing');
        return dto;
    }
    
    if (result.status === 'completed') {
        dto.removeRepeater();
        dto.setJsonData(result.data);
        return dto;
    }
    
    return dto;
}

Queue-Based Architecture #

How Queues Work in Topology #

graph LR
    N1[Node 1: Fetch Data] -->|Publish| Q1[Queue 1]
    Q1 -->|Consume| N2[Node 2: Transform]
    N2 -->|Publish| Q2[Queue 2]
    Q2 -->|Consume| N3[Node 3: Store]
    
    style Q1 fill:#fff4e1
    style Q2 fill:#fff4e1

Benefits of Queue-Based Topology #

  1. Asynchronous processing: Nodes don't wait for each other
  2. Reliability: Messages persist in queues if nodes fail
  3. Scalability: Multiple workers can consume from same queue
  4. Decoupling: Nodes don't need to know about each other

Message Persistence #

Messages are stored in RabbitMQ until:

  • Successfully processed by all followers
  • Explicitly stopped with setStopProcess()
  • Maximum retry attempts exceeded

Common Topology Patterns #

Pattern 1: Linear Processing #

graph LR
    Start --> Fetch[Fetch Data]
    Fetch --> Transform[Transform]
    Transform --> Validate[Validate]
    Validate --> Save[Save]
    Save --> Notify[Notify]
// Each node processes and passes to next
// Order: Fetch → Transform → Validate → Save → Notify

Pattern 2: Conditional Branching #

graph TB
    Start --> Router{Router}
    Router -->|High Value| HighPath[High Value Path]
    Router -->|Standard| StandardPath[Standard Path]
    Router -->|Low Value| LowPath[Low Value Path]
// Router node decides path based on data
if (order.total > 1000) {
    dto.setForceFollowers('high-value-handler');
} else {
    dto.setForceFollowers('standard-handler');
}

Pattern 3: Parallel Processing #

graph TB
    Start --> Splitter[Splitter]
    Splitter --> P1[Processor 1]
    Splitter --> P2[Processor 2]
    Splitter --> P3[Processor 3]
    P1 --> Merge[Merge]
    P2 --> Merge
    P3 --> Merge
// Batch node splits into multiple parallel paths
// Each item processed independently
// Optional: Merge results at end

Pattern 4: Fan-Out #

graph LR
    Source[Source] --> N1[Notification]
    Source --> N2[Logging]
    Source --> N3[Analytics]
    Source --> N4[Archive]
// One message triggers multiple independent processes
// All followers receive the same data

Pattern 5: Enrichment Pipeline #

graph LR
    Start --> Base[Base Data]
    Base --> Enrich1[+ Customer Info]
    Enrich1 --> Enrich2[+ Order History]
    Enrich2 --> Enrich3[+ Preferences]
    Enrich3 --> Complete[Complete Data]
// Each node adds more information
// Data grows as it flows through topology

Debugging Topologies #

View Message Contents #

Use logs to inspect data:

import logger from '@orchesty/nodejs-sdk/lib/Logger/Logger';

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    // Log input data
    logger.info('Input data', dto);
    
    const input = dto.getJsonData();
    logger.debug(`Processing customer: ${input.customerId}`, dto);
    
    // Process...
    
    logger.info('Output data', dto);
    return dto;
}

Trace with Correlation IDs #

const correlationId = dto.getHeader('correlation-id');
console.log(`[${correlationId}] Starting processing`);

// Later in logs, search for this correlation ID to see full message journey

Check Headers #

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const headers = dto.getHeaders();
    console.log('All headers:', headers);
    
    console.log('Topology:', dto.getHeader('topology-name'));
    console.log('Node:', dto.getHeader('node-name'));
    console.log('User:', dto.getUser());
    
    return dto;
}

Topology Monitoring in Orchesty Admin #

In Orchesty Admin you can:

  • View topology graph: See all nodes and connections
  • Monitor execution: Watch messages flow through
  • Check node status: See which nodes are processing
  • View logs: See logs for each node execution
  • Inspect data: See message contents at each step
  • Trace failures: Follow correlation IDs through topology

Best Practices for Topologies #

1. Always Return ProcessDto #

// Good
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    dto.setJsonData(result);
    return dto;
}

// Bad
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    dto.setJsonData(result);
    // Forgot to return!
}

2. Don't Mutate Input Objects #

// Good
const input = dto.getJsonData();
const output = { ...input, processed: true };
dto.setJsonData(output);

// Bad
const input = dto.getJsonData();
input.processed = true;  // Mutating input
dto.setJsonData(input);

3. Set Meaningful Success Messages #

// Good
dto.setSuccessProcess(`Processed ${itemCount} items successfully`);

// Bad
dto.setSuccessProcess('Done');

4. Validate Early in Topology #

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const data = dto.getJsonData();
    
    // Validate first
    if (!data.email) {
        dto.setStopProcess(ResultCode.STOP_AND_FAILED, 'Email required');
        return dto;
    }
    
    // Then process
    const result = await this.processEmail(data.email);
    dto.setJsonData(result);
    return dto;
}

5. Use Structured Data #

// Good - structured
dto.setJsonData({
    status: 'success',
    data: result,
    timestamp: new Date().toISOString(),
    recordCount: result.length
});

// Bad - unstructured
dto.setData('Success! Got some data...');

6. Design for Failure #

// Always handle errors gracefully
try {
    const result = await this.process();
    dto.setJsonData(result);
} catch (error) {
    logger.error('Processing failed', dto, error);
    dto.setStopProcess(
        ResultCode.STOP_AND_FAILED,
        `Failed: ${error.message}`
    );
}
return dto;

API References #

Next Steps #

  1. Understand Connectors to see how nodes call APIs
  2. Learn about Custom Nodes for data transformation
  3. Explore Pagination for batch processing patterns
  4. Read ProcessDto API docs for complete method reference
  5. Check Orchesty Admin documentation for topology design
© 2025 Orchesty Solutions. All rights reserved.