Data comparator
A data comparator is the node that asks "does this incoming record actually differ from the one we already have?" and stops the message early when it doesn't. It is the single biggest lever for cutting cost and noise in sync topologies that re-fetch full snapshots.
For the conceptual walkthrough see Comparator for data filtering (guide). This page is the reference.
When you need it #
Use a comparator whenever:
- The source system does not give you "changed since" semantics, so you re-fetch everything every run.
- The downstream side has rate limits, write costs, or sends notifications you don't want to fire spuriously.
- You want a quiet audit log: only real changes appear, not "we re-confirmed the same fields again".
You don't need it when the upstream already gives you a delta (a webhook, a changes-since cursor): in that case the source is already filtered.
The Comparator worker #
Orchesty ships orchesty/comparator-worker as a registerable Worker, available in the Components catalog from the launch of the new portal. It backs its snapshots with Redis and exposes two custom nodes — Filter (the diff engine) and Invalidate (cache control).
Source and releases: Orchesty/orchesty-comparator-worker on GitHub.
Deployment #
In a Docker Compose deployment, register the worker alongside Redis:
services:
orchesty-comparator-worker:
image: orchesty/comparator-worker:latest
environment:
CRYPT_SECRET: ${CRYPT_SECRET}
BACKEND_URL: ${BACKEND_URL}
STARTING_POINT_DSN: ${STARTING_POINT_DSN}
WORKER_API_HOST: ${WORKER_API_HOST}
ORCHESTY_API_KEY: ${ORCHESTY_API_KEY}
redis:
image: redis
Then go to Workers → Create in the Admin UI and register the worker. The Filter and Invalidate nodes appear in the topology designer.
Comparator Filter — input #
{
"items": [ /* array of items to compare, e.g. {id: 1, name: "test", timestamp: 123456789} */ ],
"configuration": {
"idField": "path.to.id.field",
"masterKey": "products",
"excludedFields": ["timestamp"],
"stopOnEmptyArray": null,
"ttl": null,
"deleted": true,
"isLast": true,
"totalCount": 200,
"passAsListOfExistingItems": false,
"skipComparison": false,
"lock": false
}
}
Configuration #
| Key | Type | Required | Description |
|---|---|---|---|
idField | string (path) | yes | Path to the unique id inside each item ("id", "sku", "externalId.value"). |
masterKey | string | yes | Collection identifier in Redis. May be shared across processes ("products"), per-tenant ("warehouse-1"), or per-batch. |
excludedFields | string (paths) | no | Paths to ignore during comparison (volatile fields like timestamp, lastViewedAt). |
stopOnEmptyArray | bool | null | no | If true, the process ends successfully when the comparison result is empty. |
ttl | int | null | no | Redis TTL of the snapshot in seconds. null means store until manually invalidated. |
deleted | bool | no | Enable deletion detection. Requires isLast and totalCount. Default false. |
isLast | bool | conditional | Set to true on the final batch of the run when deleted is enabled. |
totalCount | int | conditional | Total number of items across all batches in this run when deleted is enabled. |
passAsListOfExistingItems | bool | no | If true, output is a single merged array of new+changed items. Mutually exclusive with deleted. |
skipComparison | bool | no | If true, skips comparison and returns all items from input. |
lock | bool | no | Lock the masterKey against concurrent processes. Recommended during bootstrap. |
Comparator Filter — output #
Standard output:
{
"created": [ /* items new to the snapshot */ ],
"updated": [ /* items whose payload differs from the snapshot */ ],
"deleted": [ "id-of-removed-item" ]
}
With passAsListOfExistingItems: true:
[ /* merged array of new + changed items */ ]
The standard output produces three independent message streams in the topology, which can be routed to different downstream nodes (create / patch / delete endpoints).
Comparator Invalidate — input #
{
"masterKey": "products",
"externalId": "id-of-one-record"
}
externalId is optional. Omit it to invalidate the entire collection; include it to invalidate a single record.
Output: {}.
Detecting deletions #
Deletion detection requires the worker to know the full universe of items in this run, hence the isLast + totalCount pair:
deleted: true— opt in.totalCount: N— total items across all batches in this run.isLast: true— set on the last batch only.
Example: 200 products, paginated 20 per page, makes 10 process messages — set totalCount: 200 on every one and isLast: true only on the tenth. The worker then returns ids that were in the previous snapshot but absent from the current run.
Where in the topology #
The comparator usually sits right after the source connector and before any downstream side effect. Its job is to be the cheapest possible early exit.
For batch sources (a paginated full-snapshot pull), put the Comparator Filter after the batch fan-out so each item is compared on its own; otherwise an unchanged item drags an entire page through downstream nodes.
Force-resync #
Two built-in mechanisms, no code changes:
Comparator Invalidatenode — wipes amasterKey(whole collection) or oneexternalId. Drop it on a manual-trigger topology to give support engineers a button.skipComparison: true— flip on the Filter node to push every item through one run, regardless of cache state.
Operational notes #
excludedFieldsmatters. Excluding fields that tick on read (updatedAt,lastViewedAt, server-side counters) is what makes the skip rate meaningful.- Normalize before the worker. Trim, lowercase, sort arrays — the worker compares the JSON shape it receives.
- Bootstrap. First run has no snapshot; everything counts as
created. Setlock: trueto prevent a second process from racing the bootstrap.
When to roll your own #
The worker fits the vast majority of cases. Roll your own only when:
- You need per-field diff semantics the worker doesn't expose.
- You need to compare against storage other than Redis.
- You have an exotic key model that doesn't reduce to
(masterKey, idField).
A note on language choice: the worker is Node.js because at high volume the JSON normalization and hashing cost adds up and PHP becomes the bottleneck. For lower-volume or PHP-only deployments, the hand-rolled pattern below is fine in either language.
Hash compare (fallback) #
import { createHash } from 'crypto';
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const record = dto.getJsonData() as { externalId: string; [k: string]: unknown };
const fingerprint = createHash('sha256')
.update(JSON.stringify(this.relevantFields(record)))
.digest('hex');
const stored = await this.fingerprintRepo.find('contact', record.externalId);
if (stored?.hash === fingerprint) {
dto.setStopProcess(ResultCode.DO_NOT_CONTINUE, 'unchanged');
return dto;
}
await this.fingerprintRepo.upsert('contact', record.externalId, fingerprint);
return dto;
}
public function processAction(ProcessDto $dto): ProcessDto
{
$record = $dto->getJsonData();
$fingerprint = hash('sha256', json_encode($this->relevantFields($record)));
$stored = $this->fingerprintRepo->find('contact', $record['externalId']);
if ($stored !== null && $stored['hash'] === $fingerprint) {
$dto->setStopProcess(ProcessDtoAbstract::DO_NOT_CONTINUE, 'unchanged');
return $dto;
}
$this->fingerprintRepo->upsert('contact', $record['externalId'], $fingerprint);
return $dto;
}
A hash tells you that something changed, not what. For per-field diffs, store a normalized projection of the previous payload alongside the hash and compute the per-field delta. Higher storage cost and slower compare — only when downstream genuinely needs the granularity.
See also #
- Patterns: ID mapping
- Patterns: Scheduled processes
- Patterns: Pagination and batch
orchesty/comparator-workeron GitHub — source, releases, full configuration reference.