Custom Node

What is a Custom Node? #

A custom node is a node designed for data transformation, filtering, routing, and business logic—without calling external APIs. Custom nodes process data internally within your workflows, transforming it from one format to another, applying business rules, or routing messages to different paths.

Key characteristics:

  • Data-focused: Built for transforming and processing data
  • No HTTP abstractions: Pure data manipulation
  • No authentication: Doesn't need external credentials
  • Business logic: Implement your domain-specific rules
  • Reusable: Register once, use across workflows
  • Lightweight: No overhead of API call infrastructure

When to Use Custom Nodes #

Use custom nodes when you need to process data internally:

  • Transform data between formats (e.g., "Convert order format from System A to System B")
  • Filter data based on conditions (e.g., "Remove orders under $10")
  • Route messages conditionally (e.g., "Send high-value orders to different path")
  • Map fields between schemas (e.g., "Map CRM fields to ERP fields")
  • Validate data (e.g., "Check if email format is valid")
  • Aggregate data (e.g., "Calculate total from line items")
  • Split/merge data (e.g., "Extract items from order into separate messages")
  • Apply business rules (e.g., "Calculate discount based on customer tier")

Custom Nodes vs Connectors #

Both are nodes based on the same foundation, registered the same way, and can be reusable. The difference is their purpose:

AspectCustom NodeConnector
PurposeTransform, filter, route dataCall external APIs/services
HTTP supportNone (pure data processing)Built-in via getSender()
AuthenticationNot applicableIntegrated with Applications
Rate limitingNot applicableAutomatic via Limiter
Retry policyManual if neededBuilt-in for API failures
Base classACommonNodeAConnector
Use forData transformation, business logicAPI calls, external services

Use custom nodes for internal data processing. Use connectors for external communication.

See Connector for calling external APIs.

Architecture #

Custom Node in the Workflow #

graph LR
    API[API Connector] -->|Raw data| CN[Custom Node]
    CN -->|Transformed data| DB[Database Connector]
    
    style CN fill:#e8f5e8
    style API fill:#e1f5ff
    style DB fill:#e1f5ff

Class Hierarchy #

ANode
  ↓
ACommonNode (You extend this)
  ↓
YourCustomNode

Both AConnector and ACommonNode extend the same base, so they share core functionality, but AConnector adds HTTP-specific features.

Implementation #

Basic Custom Node Structure #

import ACommonNode from '@orchesty/nodejs-sdk/lib/Commons/ACommonNode';
import ProcessDto from '@orchesty/nodejs-sdk/lib/Utils/ProcessDto';

export default class DataTransformerNode extends ACommonNode {
    
    public getName(): string {
        return 'data-transformer';
    }
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        // 1. Get input data
        const input = dto.getJsonData();
        
        // 2. Transform it
        const output = this.transform(input);
        
        // 3. Set output data
        dto.setJsonData(output);
        
        // 4. Return the modified DTO
        return dto;
    }
    
    private transform(data: any): any {
        return {
            id: data.customerId,
            name: `${data.firstName} ${data.lastName}`,
            email: data.emailAddress,
            phone: data.phoneNumber
        };
    }
}

Data Transformation #

Transform data from one format to another:

export default class OrderTransformerNode extends ACommonNode {
    
    public getName(): string {
        return 'order-transformer';
    }
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        const shopifyOrder = dto.getJsonData();
        
        // Transform Shopify order to internal format
        const internalOrder = {
            orderId: shopifyOrder.id.toString(),
            orderNumber: shopifyOrder.order_number,
            customerEmail: shopifyOrder.email,
            total: parseFloat(shopifyOrder.total_price),
            currency: shopifyOrder.currency,
            items: shopifyOrder.line_items.map(item => ({
                productId: item.product_id.toString(),
                variantId: item.variant_id.toString(),
                quantity: item.quantity,
                price: parseFloat(item.price),
                title: item.title
            })),
            shippingAddress: {
                firstName: shopifyOrder.shipping_address.first_name,
                lastName: shopifyOrder.shipping_address.last_name,
                address1: shopifyOrder.shipping_address.address1,
                address2: shopifyOrder.shipping_address.address2,
                city: shopifyOrder.shipping_address.city,
                zip: shopifyOrder.shipping_address.zip,
                country: shopifyOrder.shipping_address.country_code
            },
            createdAt: new Date(shopifyOrder.created_at).toISOString()
        };
        
        dto.setJsonData(internalOrder);
        return dto;
    }
}

Data Filtering #

Filter out messages based on conditions:

import ResultCode from '@orchesty/nodejs-sdk/lib/Utils/ResultCode';

export default class OrderFilterNode extends ACommonNode {
    
    public getName(): string {
        return 'order-filter';
    }
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        const order = dto.getJsonData();
        
        // Filter out low-value orders
        if (order.total < 100) {
            dto.setStopProcess(
                ResultCode.DO_NOT_CONTINUE,
                `Order ${order.id} filtered out: total ($${order.total}) below threshold ($100)`
            );
            return dto;
        }
        
        // Filter out test orders
        if (order.customerEmail?.includes('test@')) {
            dto.setStopProcess(
                ResultCode.DO_NOT_CONTINUE,
                `Order ${order.id} filtered out: test order`
            );
            return dto;
        }
        
        // Order passed filters
        dto.setSuccessProcess(`Order ${order.id} passed filters`);
        return dto;
    }
}

Data Routing #

Route messages to different paths based on conditions:

export default class OrderRouterNode extends ACommonNode {
    
    public getName(): string {
        return 'order-router';
    }
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        const order = dto.getJsonData();
        
        // Route based on order value
        if (order.total > 1000) {
            dto.setForceFollowers('high-value-handler');
        } else if (order.total > 100) {
            dto.setForceFollowers('standard-handler');
        } else {
            dto.setForceFollowers('low-value-handler');
        }
        
        // Route based on customer type
        if (order.customer?.type === 'wholesale') {
            dto.setForceFollowers('wholesale-processor');
        }
        
        // Route based on product category
        const hasDigitalProducts = order.items.some(item => 
            item.category === 'digital'
        );
        
        if (hasDigitalProducts) {
            dto.setForceFollowers('digital-fulfillment');
        } else {
            dto.setForceFollowers('physical-fulfillment');
        }
        
        return dto;
    }
}

Field Mapping #

Map fields from one schema to another:

export default class FieldMapperNode extends ACommonNode {
    
    public getName(): string {
        return 'field-mapper';
    }
    
    private readonly fieldMapping = {
        // CRM field -> ERP field
        'customer_id': 'client_id',
        'first_name': 'given_name',
        'last_name': 'family_name',
        'email_address': 'email',
        'phone_number': 'phone',
        'company_name': 'organization',
        'street_address': 'address_line1',
        'city': 'city',
        'state': 'state_province',
        'zip_code': 'postal_code',
        'country': 'country_code'
    };
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        const crmData = dto.getJsonData();
        const erpData: any = {};
        
        // Map fields
        for (const [crmField, erpField] of Object.entries(this.fieldMapping)) {
            if (crmData[crmField] !== undefined) {
                erpData[erpField] = crmData[crmField];
            }
        }
        
        // Add computed fields
        erpData.full_name = `${crmData.first_name} ${crmData.last_name}`;
        erpData.created_at = new Date().toISOString();
        
        dto.setJsonData(erpData);
        return dto;
    }
}

Data Validation #

Validate data and stop processing if invalid:

import ResultCode from '@orchesty/nodejs-sdk/lib/Utils/ResultCode';

export default class DataValidatorNode extends ACommonNode {
    
    public getName(): string {
        return 'data-validator';
    }
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        const data = dto.getJsonData();
        const errors: string[] = [];
        
        // Validate required fields
        if (!data.email) {
            errors.push('Email is required');
        } else if (!this.isValidEmail(data.email)) {
            errors.push('Email format is invalid');
        }
        
        if (!data.name || data.name.length < 2) {
            errors.push('Name is required (min 2 characters)');
        }
        
        if (!data.phone) {
            errors.push('Phone is required');
        } else if (!this.isValidPhone(data.phone)) {
            errors.push('Phone format is invalid');
        }
        
        // Validate business rules
        if (data.age && data.age < 18) {
            errors.push('Customer must be 18 or older');
        }
        
        if (data.country === 'US' && !data.state) {
            errors.push('State is required for US customers');
        }
        
        // Stop if validation fails
        if (errors.length > 0) {
            dto.setStopProcess(
                ResultCode.STOP_AND_FAILED,
                `Validation failed: ${errors.join(', ')}`
            );
            return dto;
        }
        
        // Validation passed
        dto.setSuccessProcess('Data validation passed');
        return dto;
    }
    
    private isValidEmail(email: string): boolean {
        return /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email);
    }
    
    private isValidPhone(phone: string): boolean {
        return /^\+?[\d\s\-()]+$/.test(phone);
    }
}

Data Enrichment #

Add calculated or derived fields:

export default class DataEnricherNode extends ACommonNode {
    
    public getName(): string {
        return 'data-enricher';
    }
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        const order = dto.getJsonData();
        
        // Calculate totals
        const subtotal = order.items.reduce((sum, item) => 
            sum + (item.price * item.quantity), 0
        );
        
        const tax = subtotal * 0.21; // 21% VAT
        const shipping = this.calculateShipping(order);
        const total = subtotal + tax + shipping;
        
        // Add discount if applicable
        let discount = 0;
        if (order.customer?.tier === 'premium') {
            discount = total * 0.10; // 10% discount
        }
        
        // Enrich order with calculations
        const enrichedOrder = {
            ...order,
            subtotal,
            tax,
            shipping,
            discount,
            total: total - discount,
            itemCount: order.items.reduce((sum, item) => 
                sum + item.quantity, 0
            ),
            averageItemPrice: subtotal / order.items.length,
            processedAt: new Date().toISOString(),
            processingDuration: Date.now() - new Date(order.createdAt).getTime()
        };
        
        dto.setJsonData(enrichedOrder);
        return dto;
    }
    
    private calculateShipping(order: any): number {
        // Simple shipping calculation
        const itemCount = order.items.reduce((sum, item) => 
            sum + item.quantity, 0
        );
        
        if (order.total > 100) return 0; // Free shipping
        if (itemCount <= 3) return 5;
        return 5 + (itemCount - 3) * 2;
    }
}

Common Patterns #

Pattern 1: Format Converter #

export default class XmlToJsonNode extends ACommonNode {
    
    public getName(): string {
        return 'xml-to-json';
    }
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        const xmlString = dto.getData();
        
        // Parse XML to JSON
        const xml2js = require('xml2js');
        const parser = new xml2js.Parser();
        const jsonData = await parser.parseStringPromise(xmlString);
        
        dto.setJsonData(jsonData);
        return dto;
    }
}

Pattern 2: Data Aggregator #

export default class OrderAggregatorNode extends ACommonNode {
    
    public getName(): string {
        return 'order-aggregator';
    }
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        const orders = dto.getJsonData().orders;
        
        const summary = {
            totalOrders: orders.length,
            totalRevenue: orders.reduce((sum, o) => sum + o.total, 0),
            averageOrderValue: 0,
            byStatus: {},
            byCountry: {},
            topProducts: this.getTopProducts(orders)
        };
        
        summary.averageOrderValue = summary.totalRevenue / summary.totalOrders;
        
        // Group by status
        orders.forEach(order => {
            summary.byStatus[order.status] = (summary.byStatus[order.status] || 0) + 1;
        });
        
        // Group by country
        orders.forEach(order => {
            const country = order.shippingAddress?.country || 'Unknown';
            summary.byCountry[country] = (summary.byCountry[country] || 0) + 1;
        });
        
        dto.setJsonData(summary);
        return dto;
    }
    
    private getTopProducts(orders: any[]): any[] {
        const productCounts = {};
        
        orders.forEach(order => {
            order.items?.forEach(item => {
                const key = `${item.productId}:${item.title}`;
                productCounts[key] = (productCounts[key] || 0) + item.quantity;
            });
        });
        
        return Object.entries(productCounts)
            .sort((a, b) => b[1] - a[1])
            .slice(0, 10)
            .map(([product, count]) => ({
                product: product.split(':')[1],
                quantity: count
            }));
    }
}

Pattern 3: Conditional Transformer #

export default class ConditionalTransformerNode extends ACommonNode {
    
    public getName(): string {
        return 'conditional-transformer';
    }
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        const data = dto.getJsonData();
        
        // Apply different transformations based on data type
        if (data.type === 'order') {
            dto.setJsonData(this.transformOrder(data));
        } else if (data.type === 'customer') {
            dto.setJsonData(this.transformCustomer(data));
        } else if (data.type === 'product') {
            dto.setJsonData(this.transformProduct(data));
        } else {
            dto.setStopProcess(
                ResultCode.STOP_AND_FAILED,
                `Unknown data type: ${data.type}`
            );
        }
        
        return dto;
    }
    
    private transformOrder(order: any): any {
        return {
            id: order.id,
            total: order.total,
            customer: order.customer_id,
            items: order.line_items
        };
    }
    
    private transformCustomer(customer: any): any {
        return {
            id: customer.id,
            name: customer.name,
            email: customer.email
        };
    }
    
    private transformProduct(product: any): any {
        return {
            id: product.id,
            title: product.title,
            price: product.price
        };
    }
}

Pattern 4: Data Splitter #

export default class DataSplitterNode extends ACommonNode {
    
    public getName(): string {
        return 'data-splitter';
    }
    
    public async processAction(dto: ProcessDto): Promise<ProcessDto> {
        const order = dto.getJsonData();
        
        // Split order into separate concerns
        const orderData = {
            id: order.id,
            total: order.total,
            status: order.status,
            createdAt: order.createdAt
        };
        
        const customerData = {
            id: order.customer.id,
            name: order.customer.name,
            email: order.customer.email
        };
        
        const shippingData = {
            orderId: order.id,
            address: order.shippingAddress,
            method: order.shippingMethod,
            trackingNumber: order.trackingNumber
        };
        
        // Set data to forward based on routing
        dto.setJsonData({
            order: orderData,
            customer: customerData,
            shipping: shippingData
        });
        
        return dto;
    }
}

Registration #

Custom nodes are registered the same way as connectors:

import DIContainer from '@orchesty/nodejs-sdk/lib/DIContainer/Container';
import DataTransformerNode from './nodes/DataTransformerNode';
import OrderFilterNode from './nodes/OrderFilterNode';
import OrderRouterNode from './nodes/OrderRouterNode';

const container = new DIContainer();

// Register custom nodes
container.setCustomNode(new DataTransformerNode());
container.setCustomNode(new OrderFilterNode());
container.setCustomNode(new OrderRouterNode());

// Start the SDK server
container.listen();

Testing Custom Nodes #

Unit Testing #

import DataTransformerNode from '../DataTransformerNode';
import ProcessDto from '@orchesty/nodejs-sdk/lib/Utils/ProcessDto';

describe('DataTransformerNode', () => {
    it('should transform data correctly', async () => {
        const node = new DataTransformerNode();
        const dto = new ProcessDto();
        
        dto.setJsonData({
            customerId: '123',
            firstName: 'John',
            lastName: 'Doe',
            emailAddress: 'john@example.com'
        });
        
        const result = await node.processAction(dto);
        const output = result.getJsonData();
        
        expect(output.id).toBe('123');
        expect(output.name).toBe('John Doe');
        expect(output.email).toBe('john@example.com');
    });
});

Testing via HTTP #

# List custom nodes
curl http://localhost:8080/custom-node/list

# Execute custom node
curl -X POST http://localhost:8080/custom-node/data-transformer/action \
  -H "Content-Type: application/json" \
  -d '{
    "customerId": "123",
    "firstName": "John",
    "lastName": "Doe",
    "emailAddress": "john@example.com"
  }'

Best Practices #

1. Keep Nodes Focused #

// Good - single responsibility
class EmailValidatorNode extends ACommonNode { }
class PhoneValidatorNode extends ACommonNode { }

// Bad - too much responsibility
class DataValidatorNode extends ACommonNode {
    // validates everything
}

2. Make Transformations Explicit #

// Good - clear what happens
const output = {
    clientId: input.customerId,
    clientName: `${input.firstName} ${input.lastName}`,
    contactEmail: input.email
};

// Bad - implicit transformation
const output = this.transform(input);

3. Handle Missing Data #

const output = {
    id: input.id,
    name: input.name || 'Unknown',
    email: input.email || null,
    phone: input.phone || input.mobile || null
};

4. Validate Before Transforming #

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const input = dto.getJsonData();
    
    // Validate first
    if (!input.id || !input.name) {
        dto.setStopProcess(ResultCode.STOP_AND_FAILED, 'Missing required fields');
        return dto;
    }
    
    // Then transform
    const output = this.transform(input);
    dto.setJsonData(output);
    return dto;
}

5. Use Type Safety #

interface InputData {
    customerId: string;
    firstName: string;
    lastName: string;
    emailAddress: string;
}

interface OutputData {
    id: string;
    name: string;
    email: string;
}

private transform(input: InputData): OutputData {
    return {
        id: input.customerId,
        name: `${input.firstName} ${input.lastName}`,
        email: input.emailAddress
    };
}

6. Log Transformations #

import logger from '@orchesty/nodejs-sdk/lib/Logger/Logger';

public async processAction(dto: ProcessDto): Promise<ProcessDto> {
    const input = dto.getJsonData();
    
    logger.debug(`Transforming data: ${input.id}`, dto);
    
    const output = this.transform(input);
    
    logger.info(`Data transformed: ${output.id}`, dto);
    
    dto.setJsonData(output);
    return dto;
}

When NOT to Use Custom Nodes #

Don't use custom nodes when you need to:

  • Call external APIs → Use Connector
  • Handle authentication → Use Connector with Application
  • Respect rate limits → Use Connector with Limiter
  • Retry failed API calls → Use Connector with retry policy
  • Process paginated data → Use Batch Node

API References #

Next Steps #

  1. Learn about Connectors for calling external APIs
  2. Understand Data Flow to master data transformation
  3. Read about Error Handling for validation patterns
  4. Explore Routing for conditional message routing
© 2025 Orchesty Solutions. All rights reserved.