Comparator for data filtering
Skip the work when nothing changed. The pattern that makes nightly snapshots cheap and incremental syncs safe — and the ready-made worker that ships with Orchesty to do it for you.
A data comparator is the small node that asks "does this incoming record actually differ from the one we already have?" and stops the message early when it doesn't. It is the single biggest lever for cutting cost, noise, and load in any sync topology that re-fetches full snapshots from the source.
If your nightly job touches 200,000 records but only 1,500 of them actually changed, every downstream write, webhook, and notification on the other 198,500 is pure waste. Worse than waste, in fact: it pollutes your audit log, burns rate-limit budget on the destination API, and makes it impossible to tell real business events from noise. A comparator turns that 200,000 message wave into 1,500 meaningful messages, with everything else exiting cleanly at the second node.
When you need it #
Use a comparator whenever:
- The source system does not give you "changed since" semantics, so you re-fetch everything every run.
- The downstream side has rate limits, write costs, or sends notifications you don't want to fire spuriously.
- You want a quiet audit log where only real changes appear, not "we re-confirmed the same fields again".
You don't need a comparator when the upstream already gives you a delta — a webhook payload, a changes-since cursor, a CDC stream. In that case the source has already filtered for you and adding a comparator on top is overhead without benefit.
The Comparator worker (the recommended path) #
You don't have to build a comparator yourself. Orchesty ships orchesty/comparator-worker — a production-grade implementation that handles tens of millions of records per run efficiently. It is available in the Components catalog from the launch of the new portal: register it from the Workers screen in the Admin UI and the comparator nodes appear in the topology designer alongside your own.
Under the hood the worker keeps per-collection snapshots in Redis (with a configurable TTL) and compares incoming items against them. There is nothing to write — you drop a node into the topology, point it at the right collection, and it returns a clean delta.
Two nodes, two responsibilities #
The worker exposes two custom nodes:
Comparator Filter #
The diff engine. You hand it a batch of items, it returns what is genuinely new, what changed, and (optionally) what disappeared since the last run.
{
"items": [ { "id": 1, "name": "Widget", "stock": 12 } ],
"configuration": {
"idField": "id",
"masterKey": "products",
"excludedFields": ["lastViewedAt"]
}
}
{
"created": [ { "id": 1, "name": "Widget", "stock": 12 } ],
"updated": [ /* items whose payload differs from the cached snapshot */ ],
"deleted": [ "id-of-removed-item" ]
}
The output goes downstream as three independent message streams in the topology, so you can route created to the create endpoint, updated to the patch/put endpoint, and deleted to the delete endpoint without writing any branching logic of your own.
Comparator Invalidate #
Cache control. Wipes the snapshot so the next run treats everything as new — useful for forced full resyncs, schema changes in the source, or recovering from a downstream that has lost data.
{
"masterKey": "products",
"externalId": "id-of-one-record"
}
externalId is optional. Omit it to invalidate the entire collection; include it to invalidate only one record (useful when a single record is known to be out of sync but you don't want to re-push the rest).
Configuration cheatsheet #
The most useful keys you'll set on a Comparator Filter node:
| Key | What it does | When to set it |
|---|---|---|
masterKey | Collection identifier in Redis. Can be shared across processes ("products"), per-tenant ("warehouse-1"), or per-batch. | Always. |
idField | Path to the unique id inside each item (e.g. "id", "sku", "externalId.value"). | Always. |
excludedFields | Paths to ignore during comparison. | Whenever the payload contains volatile fields (updatedAt, lastViewedAt, server-side counters) that would otherwise mark every record as changed. |
deleted + isLast + totalCount | Enables deletion detection. See callout below. | When you need to mirror deletions from source to target. |
ttl | Redis TTL of the snapshot in seconds. null = keep until manually invalidated. | Set when a stale snapshot is worse than rebuilding from scratch. |
lock | Locks the masterKey against concurrent processes. | During a bootstrap, or when two processes might run for the same collection at the same time. |
skipComparison | Returns every item as-is, ignoring the cache. | Forced full resync without invalidating. |
stopOnEmptyArray | Ends the process successfully if the result is empty. | When zero changes means nothing to do, not something is wrong. |
passAsListOfExistingItems | Merges created + updated into a single output array. | Downstream that doesn't differentiate create vs update. Mutually exclusive with deleted. |
Detecting deletions needs to know the universe
A comparator cannot tell that something has been removed from the source unless it knows what the source currently contains in full. The worker handles this by giving you three coordinated fields:
deleted: true— opt in to deletion detection.totalCount: N— the total number of items the source has across all batches in this run.isLast: true— set on the final batch only; signals the worker that it now has the complete universe and can compute what is missing.
For example: 200 products, paginated 20 per page, makes 10 process messages — set totalCount: 200 on every one, and isLast: true only on the tenth. The worker can then return the ids that were in the previous snapshot but not in this run.
This is genuinely hard to implement correctly by hand. If you need it, use the worker.
Where in the topology #
The comparator usually sits right after the source connector and before any downstream side effect. Its job is to be the cheapest possible early exit, so anything you do upstream of it — fetching, parsing, normalizing — is the only work you pay for on unchanged records.
For batch sources that pull a paginated full snapshot, place the Comparator Filter after the Batch fan-out so each item is compared on its own. If you compare at the page level, a single changed item drags an entire page through downstream nodes.
Comparator + ID mapping is the canonical sync pair
Most production sync topologies use both patterns side by side: the ID mapping resolver runs first to enrich the message with the destination identifier, then the comparator decides whether anything has actually changed since the last run. Together they turn a brute-force snapshot pull into a clean delta sync without requiring the source system to support deltas natively.
Force-resync, operationally #
The day a downstream system loses data and asks you to "send everything again" — or the schema changes and last week's snapshot becomes meaningless — you have two built-in escape hatches, no code changes required:
Comparator Invalidatenode — drop it on a one-off topology (or a manual trigger node) to wipe amasterKey(whole collection) or a singleexternalId(one record). The next run rebuilds the snapshot.skipComparison: true— flip the flag on the Filter node temporarily to push every item through, regardless of cache state. Useful when you want to force one run without touching the snapshot.
This is the same operational pattern you would have built yourself; the worker just ships it.
Operational notes #
- Pick
excludedFieldsdeliberately. If the source returns timestamps that tick on read (lastViewedAt,lastSyncedAt, server-side counters), exclude them — otherwise every record looks "changed" on every run and the savings disappear. - Normalize what the source gives you. Trim strings, lowercase emails, sort arrays of tags, round timestamps to the second before they reach the comparator. The worker compares the JSON shape it sees, so consistent input is what makes the diff meaningful.
- Initial bootstrap. The first run after enabling the comparator has no stored snapshot, so everything counts as new. Plan for one expensive run in a low-traffic window, and consider
lock: trueso a second invocation can't race the bootstrap. - Observability. Track the ratio of created+updated+deleted vs total items. A sudden drop in the skip rate often means the source system started touching unrelated fields, or that someone added a volatile field that should be on the
excludedFieldslist.
When to roll your own (rare) #
The worker fits the vast majority of cases. There are still a few situations where a hand-rolled comparator inside a custom node makes sense:
- You need per-field diff semantics that the worker doesn't expose — for example a topology that fires a different webhook per attribute that changed, or that writes a partial PATCH naming the changed columns.
- You need to compare against storage other than Redis — an existing internal database, an object store, or a backend the worker doesn't talk to.
- You have an exotic key model — composite keys assembled at runtime, hierarchical entities, or a comparison that depends on cross-record context.
For those cases, the same pattern in your own node is straightforward — typically a stable hash of the relevant fields, compared to the hash you stored last time. A note on language choice: the Comparator worker is Node.js for a reason — at high volume the JSON normalization and hashing cost adds up, and PHP suffers under load. For less demanding cases a PHP node is perfectly fine; for anything bulk, prefer the worker (or, if you must roll your own, keep it on Node.js).
import { createHash } from 'crypto';
public async processAction(dto: ProcessDto): Promise<ProcessDto> {
const record = dto.getJsonData() as { externalId: string; [k: string]: unknown };
const fingerprint = createHash('sha256')
.update(JSON.stringify(this.relevantFields(record)))
.digest('hex');
const stored = await this.fingerprintRepo.find('contact', record.externalId);
if (stored?.hash === fingerprint) {
dto.setStopProcess(ResultCode.DO_NOT_CONTINUE, 'unchanged');
return dto;
}
await this.fingerprintRepo.upsert('contact', record.externalId, fingerprint);
return dto;
}
This is the "that-vs-what" trade-off — a hash tells you that something changed, not which fields. If the answer to that question matters downstream, store a normalized projection of the previous payload instead of (or alongside) the hash and compute the per-field delta. Expect higher storage cost and slower compare; only reach for it when downstream genuinely needs the granularity.
Related #
- Patterns: Data comparator (docs) — the reference page that backs this guide.
- High-volume catalog reconciliation use case — the Comparator worker on a real ten-million-row catalog with deletion detection and operational escape hatches.
- Eshop synchronization use case — the Comparator placed after the Mapper in a multi-stream e-commerce sync.
- ID mapping guide — the pattern that resolves the destination id before the comparator decides whether to act.
- Handling large datasets with Orchesty — where to place the comparator inside a batched, streamed pull.
orchesty/comparator-workeron GitHub — source, releases, and the full configuration reference.