MongoDB¶
The Nilus connector for MongoDB supports Change Data Capture (CDC), enabling near real-time replication of data changes from MongoDB to Supported Destinations, such as the Lakehouse. CDC captures change events from MongoDB’s oplog.rs and streams them continuously.
Info
Batch data movement is not supported for MongoDB.
Prerequisites¶
Before enabling CDC, ensure the following configurations depending on your hosting environment:
MongoDB Replica Set¶
- MongoDB must run as a replica set, even for single-node deployments.
- Nilus CDC for MongoDB relies on the
oplog.rscollection, which is only available in replica sets.
Info
Contact the Database Administrator (DBA) to set up and enable Change Data Capture (CDC) in MongoDB.
Enable oplog Access¶
- Nilus uses MongoDB’s
oplog.rsto capture changes. -
Nilus requires a user with
readaccess to business data and internal system databases to access theoplog. If the user is not created, create a user in MongoDB using the following:db.createUser({ user: "debezium", pwd: "dbz", roles: [ { role: "read", db: "your_app_db" }, // Read target database { role: "read", db: "local" }, // Read oplog { role: "read", db: "config" }, // Read cluster configuration { role: "readAnyDatabase", db: "admin" }, // Optional: discovery { role: "clusterMonitor", db: "admin" } // Recommended: monitoring ] })
Info
Grant only the roles required for your environment to follow the principle of least privilege.
Pre-created MongoDB Depot¶
A Depot must exist in DataOS with read-write access. To check the Depot, go to the Metis UI of the DataOS or use the following command:
dataos-ctl resource get -t depot -a
#Expected Output
NFO[0000] 🔍 get...
INFO[0000] 🔍 get...complete
| NAME | VERSION | TYPE | STATUS | OWNER |
| ------------ | ------- | ----- | ------ | -------- |
| mongodbdepot | v2alpha | depot | active | usertest |
If the Depot is not created use the following manifest configuration template to create the MongoDB Depot:
MongoDB Depot Manifest
name: ${{depot-name}}
version: v2alpha
type: depot
tags:
- ${{tag1}}
- ${{tag2}}
layer: user
depot:
type: mongodb
description: ${{description}}
compute: ${{runnable-default}}
mongodb:
subprotocol: ${{"mongodb+srv"}}
nodes: ${{["clusterabc.ezlggfy.mongodb.net"]}}
external: ${{true}}
secrets:
- name: ${{instance-secret-name}}-r
allkeys: ${{true}}
- name: ${{instance-secret-name}}-rw
allkeys: ${{true}}
Info
Update variables such as name, owner, compute, layer, etc., and contact the DataOS Administrator or Operator to obtain the appropriate secret name.
Sample Service Config¶
Following manifest configuration template can be use to apply the CDC for MongoDB:
name: ${{service-name}} # Service identifier
version: v1 # Version of the service
type: service # Defines the resource type
tags: # Classification tags
- ${{tag}}
- ${{tag}}
description: Nilus CDC Service for MongoDB description # Description of the service
workspace: public # Workspace where the service is deployed
service: # Service specification block
servicePort: 9010 # Service port
replicas: 1 # Number of replicas
logLevel: INFO # Logging level
compute: ${{query-default}} # Compute type
stack: ${{nilus:3.0}} # Nilus stack version
stackSpec: # Stack specification
source: # Source configuration block
address: dataos://mongodbdepot # Source depot address/UDL
options: # Source-specific options
engine: debezium # Required CDC engine; used for streaming changes
collection.include.list: "retail.products" # MongoDB collections to include
topic.prefix: "cdc_changelog" # Required topic prefix for CDC stream
max-table-nesting: "0" # Optional; prevents unnesting of nested documents
transforms.unwrap.array.encoding: array # Optional; preserves arrays in sink as-is
sink: # Sink configuration for CDC output
address: dataos://testinglh # Sink depot address
options: # Sink-specific options
dest-table: mdb_test_001 # Destination table name in the sink depot
incremental-strategy: append # Append-only strategy for streaming writes
Info
Ensure that all placeholder values and required fields (e.g., connection addresses, slot names, and access credentials) are properly updated before applying the configuration to a DataOS workspace.
Deploy the manifest file using the following command:
Info
The MongoDB host used in the CDC service YAML must match exactly the host defined during replica set initialization.
Source Options¶
Nilus supports the following source options for MongoDB CDC:
| Option | Default | Description |
|---|---|---|
database.include.list |
No Default | An optional comma-separated list of regular expressions or literals that match fully-qualified namespaces for MongoDB collections to be monitored. By default, the connector monitors all collections except those in the local and admin databases. When collection.include.list is set, the connector monitors only the collections that the property specifies. Other collections are excluded from monitoring. Collection identifiers are of the form databaseName.collectionName. |
collection.include.list |
No Default | An optional comma-separated list of regular expressions or literals that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring. When collection.exclude.list is set, the connector monitors every collection except the ones that the property specifies. Collection identifiers are of the form databaseName.collectionName. |
snapshot.mode |
initial |
Specifies the behavior for snapshots when the connector starts.
|
field.exclude.list |
No Default | An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form databaseName.collectionName.fieldName.nestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters. |
topic.prefix |
No Default | Topic prefix that provides a namespace for the particular MongoDB instance or cluster in which Nilus is capturing changes. The prefix should be unique across all other connectors. Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name. This is mandatory. This prefix is also appended to the sink table. |
transforms.unwrap.array.encoding |
No Default | It controls how array values are encoded when unwrapped by a Kafka Connect transform. Common options include "none" (default), "array", "json", or "string", which define how array elements are serialized into Kafka messages. |
max-table-nesting |
No Default | Specifies the maximum allowed depth for nested tables or objects (commonly in JSON or relational mapping). It helps prevent excessively deep or complex structures that can impact performance or compatibility. |
Sink Options¶
Nilus supports the following sink options for MongoDB CDC:
| Field | Description | Default |
|---|---|---|
dest-table |
Target table in the sink. | — |
incremental-strategy |
Write mode (append recommended for CDC). |
append |
Core Concepts¶
Nilus captures row-level changes from MongoDB using the replica set oplog. Below are the essential concepts for understanding how Nilus integrates with MongoDB.
-
Replica Set
- MongoDB must run as a replica set, even in single-node deployments.
- Nilus connects to the primary replica and tails the
oplog(local.oplog.rs). - Standalone MongoDB servers are not supported for CDC.
-
The MongoDB Oplog
- The oplog (
oplog.rs) is a capped collection in thelocaldatabase. - It records every
insert,update, anddeleteapplied to the primary. - Nilus reads this log to generate CDC events.
oplogentries roll off in FIFO (first-in-first-out) order once the allocated size is exhausted.
- The oplog (
-
Schema-Less Nature of MongoDB
- MongoDB is schema-less, but Nilus dynamically infers schemas.
- The sink table is created from the first document observed.
- Schema evolution is tracked using a Schema Registry with Avro.
-
oplogRetention & Disk Pressure- Nilus maintains a cursor within the MongoDB oplog to track change events. When the connector is paused or lags behind, MongoDB retains older oplog entries to accommodate delayed consumption.
- If the connector's lag exceeds the oplog retention threshold, expired entries may lead to data loss. In such cases, a full snapshot must be reinitiated to resume consistent processing.
- The term Disk Pressure refers to the stress placed on disk resources due to the continuous growth of the
oplog.rsfile. MongoDB retains these entries until Nilus acknowledges their processing.
Implications of High Disk Pressure
- The oplog continuously expands as new changes are recorded.
- Disk utilization increases on the volume where MongoDB stores its data.
- Excessive disk usage can lead to:
- Write operation slowdowns
- Replication failures
- Potential node instability or crashes
Although rare, this condition can result in a fatal error, placing the service in a pending state. Recovery requires redeployment of the CDC service. Upon redeployment, the system performs a full snapshot followed by real-time change data capture.
Error 286:
Handling MongoDB Error 286 in CDC Pipelines
TL;DR
- Error 286 is always a symptom of history loss - the resume token is gone.
- The root cause is an undersized or force‑truncated oplog relative to the maximum connector lag.
- Prevention = correct oplog sizing + connector throughput + monitoring.
- Recovery is straightforward (new snapshot or larger oplog) but can be time‑consuming—plan ahead.
Overview
Error 286 indicates that a MongoDB change stream (which Nilus relies on for CDC) attempted to resume from a point that is no longer present in the replica‑set oplog (
local.oplog.rs). When this happens, Nilus logs the following exception:Command failed with error 286 (ChangeStreamHistoryLost): Resume of change stream was not possible, as the resume point may no longer be in the oplogUnderstanding why the oplog entry disappeared and how to size & monitor the oplog is therefore critical for reliable CDC.
The MongoDB Oplog in a Nutshell
- A capped collection (
local.oplog.rs) that stores every write against the primary so secondaries (and tools such as Nilus) can replicate those changes. - The oplog is truncated on a first‑in‑first‑out basis once it reaches its allocated size.
- Default size – When you initiate a replica set, MongoDB chooses the oplog size automatically: 5 % of free disk space (minimum ≈ 990 MB).
Because the collection is capped, what really matters is time window: How many hours of history does this size translate to under your peak write load?
Why Error 286 Happens
- The connector is paused or slowed down; older entries roll off before it resumes.
- Manually shrinking the oplog or re-initializing the replica set discards old tokens.
- MongoDB may truncate more aggressively if the filesystem is full.
- Very high write bursts, such as a sudden surge (e.g., bulk load), shrink the effective time window.
When the connector restarts it looks up its resume token in the oplog; if that token has vanished, MongoDB throws error 286 and Nilus refuses to start.
Recovery Options
-
Delete and reapply the failing service with a new PV directory or PVC. (OR Keep the connector up and running, but delete the offset (from the PVC directory) so that Nilus believes it is new and snapshots again.)
OR
-
Change the name of the connector service (from
nilus:0.0.13onwards) using the same config. Nilus will take a snapshot (as specified by thesnapshot.mode) and then continue to stream changes.
Note: Error 286 cannot be resolved by simply restarting the connector. Manual intervention is required to restore the service once this error occurs.**
Preventing Error 286
-
Size the Oplog for Worst‑Case Lag
MongoDB 4.4+ also supports
replSetResizeOplog, and 6.0 addsminRetentionHoursfor time‑based guarantees.The objective is to ensure the oplog retains at least as much history as the connector could ever fall behind, with headroom for bursts.
Formula:
RequiredSize(MB) ≈ PeakWrites/sec × MaxLag(sec) × avgEntrySize × safetyFactor- Peak Writes/sec – Insert + Update + Delete ops during the busiest interval (consult
serverStatus().opcountersor monitoring). - Max Lag – Longest plausible outage/back‑pressure window (connector maintenance + downstream outage + buffer).
- avgEntrySize – In bytes; rule‑of‑thumb ≈ 1 kB if most documents are small.
- safetyFactor – 1.3–2.0 depending on risk appetite.
Example
Parameter Value Notes Peak writes/sec 15 000 ops Observed from Grafana at 95‑th percentile Max lag 30 min = 1 800 s Upgrade window + 10 min contingency Avg entry size 1 kB Typical BSON size of collection docs Safety factor 1.5 Gives headroom for burst writes - Recommendation: Round up to 48 GB when running replSetResizeOplog or the --oplogSize init option.
- A 48 GB oplog provides ~35 min at double the recorded peak, so the window remains safe even during black‑swan spikes.
- Peak Writes/sec – Insert + Update + Delete ops during the busiest interval (consult
-
Monitor Key Metrics
- Use
rs.printReplicationInfo()to retrieve information on the oplog status, including the size of the oplog and the time range of operations.
- Use
- Avoid Long Pauses
- Schedule connector downtime within the calculated oplog window.
-
Recommended Nilus Source Options
Sample Configuration
source: address: dataos://mongodept options: engine: debezium #mandatory for CDC; no need for batch collection.include.list: "spam.product" topic.prefix: "cdc_changelog" #mandatory; can be custom snapshot.mode: "when_needed" max.batch.size: 250 max.queue.size: 2000 max.queue.size.in.bytes: "134217728" heartbeat.interval.ms: 6000 offset.flush.interval.ms: 15000 sink: address: dataos://testawslh options: dest-table: mongodb_test incremental-strategy: append aws_region: us-west-2Property Why it helps Suggested value snapshot.modeControls what Nilus does when offsets are missing. - Set to initial(default) oralwaysif you anticipate long downtimes -when_needed- After the connector starts, it performs a snapshot only if either It cannot detect any topic offsets or a previously recorded offset specifies a log position that is not available on the serveroffset.flush.interval.msHow often offsets are committed. Shorter intervals reduce duplicate events after crashes. 15000 ms heartbeat.interval.msEmits heartbeat records to keep offsets moving even when no data changes. Helps detect lag early. 5000–10000 ms max.batch.size,max.queue.size&max.queue.size.in.bytesTune to keep connector processing speed > peak write rate, avoiding backlog. Start with small (eg. 250 / 2000 & 128 MB), adjust according to the data volume and change frequency
Operational Playbook
Phase Checklist Daily - Monitor oplog window& connector lag - Alert if lag > 80 % of windowBefore maintenance - Calculate expected pause; if > window, increase oplog temporarily After unplanned outage - If connector fails with 286, decide between re‑snapshot or clean the PV directory After success - Review sizing assumptions; adjust oplogSizeMBor Nilus throughput limitsUseful MongoDB Shell Commands
// Check how many hours of history are currently in the oplog rs.printReplicationInfo(); // Show the newest record in the oplog use local; db.oplog.rs.find().sort({$natural:-1}).limit(1).pretty(); // Resize oplog (requires primary) use admin; db.adminCommand({replSetResizeOplog:1, size: <MB>, minRetentionHours: <hours>});Warning Reference "Buffer Lock Warning":
BufferingChangeStreamCursor: Unable to acquire buffer lock, buffer queue is likely fullHandling MongoDB Warning BufferingChangeStreamCursor in CDC Pipelines
TL;DR
- This warning indicates that the MongoDB CDC pipeline temporarily stopped reading new changes because its internal buffer filled up.
- It is not an error by itself, but if it continues for long periods, it can cause CDC lag, resume-token failures, or change stream interruptions.
To prevent the issue:
- Reduce Debezium batch size
- Increase service memory & set memory limits (if not set already)
- Ensure sink writes are healthy (check the sourcets_ms)
- Avoid long-running Iceberg commits or high write spikes
If it occurs:
- Verify whether CDC is still progressing
- Check sink performance
- Restart the CDC service only if offsets stop advancing
- Take action before the MongoDB oplog window is exceeded
Overview
This warning comes from the Debezium MongoDB connector’s internal Change Stream Cursor reader. It means that Debezium’s internal in-memory buffer is full and cannot accept more events until the downstream consumer (sink writer) catches up.
Key points:¶
- The warning does not mean data loss.
- It can indicate backpressure in the pipeline.
- If backpressure persists long enough, it can eventually lead to MongoDB change stream errors, such as:
ChangeStreamHistoryLostInvalidResumeToken
This document explains why it happens, how to mitigate it proactively, and what to do if it occurs.
Root Cause
The warning appears when the event-production rate (Mongo writes) temporarily exceeds the event-consumption rate (Nilus processing + Iceberg writes).
Below are the typical contributing factors:
-
Downstream Sink Is Slow (Most Common)
Nilus writes the CDC data to a destination. If these operations slow down, for example, due to:
- Large Iceberg commits
- Compaction or manifest rewrites
- S3 throttling or retry storms
- High latency writes
Then Debezium cannot drain its queue fast enough. Result → buffer fills → warning appears.
-
Large Debezium Batch Size
max.batch.size: 2048(default)- May cause Debezium to process very large chunks at once.
- Large batches increase processing time and memory consumption, slowing down queue draining.
-
Insufficient Memory
This can lead to:
- Very small heap → too many minor GCs
- Very large heap → long GC pauses
- GC pauses → slow consumer thread → queue full
-
No Kubernetes Memory Limits
If container memory limits are not set:
- The JVM may assume it has access to full node memory.
- It may pick an inappropriate heap size automatically.
- During heavy load, this causes GC pressure and stalls.
-
CPU Contention
Heavy pipeline activity + large batches may saturate CPU in a 1-replica setup.
-
High Write Brusts from MongoDB
During traffic spikes, the change stream volume can exceed regular processing capacity.
Prevention
The following configuration adjustments significantly reduce the likelihood of buffer-full conditions.
-
Reduce Debezium Batch Size
Smaller batches = faster downstream commits = steady buffer drain.
-
Example
-
-
Allocate Sufficient (or more) Memory to the Service
This prevents the service from operating at the edge of its memory budget.
-
Define Resource Memory Limits
Why this matters:
- Predictable heap sizing
- Reduced GC stalls
- Improved Debezium throughput
Resolution
Use the following checklist to assess whether the warning is transient or serious. Steps
- Verify Whether CDC is Still Progressing
- Check the sink dataset:
- Are new rows appearing?
- Is the CDC timestamp (
_tsor equivalent) moving forward?
- Check offset logs:
- If offsets are updating → pipeline is healthy, warning was transient.
- Check if heartbeats are processing:
- Are new heartbeats committed to the heartbeat dataset?
- Latest heartbeat was commit timestamp; how far is it since the current timestamp?
- Check the sink dataset:
-
Measure CDC Lag
- Compare the last ingested timestamp vs the MongoDB server time
-
How to check MongoDB Server Time
Replica set members rely on synchronized clocks for:
- Oplog timestamp ordering
- Heartbeat timeouts and election timing
- Write concern “majority” acknowledgment
Run this in your MongoDB shell:
-
Output
Check the time on all Replica Set Members
#Consolidated Check rs.status().members.forEach(m => { print(m.name); printjson(db.getSiblingDB("admin").getMongo().getDB("admin").runCommand({ hello: 1 })); }); --OR-- #Individual Checks mongosh --host mongo1:27017 db.adminCommand({ hello: 1 }) mongosh --host mongo2:27017 db.adminCommand({ hello: 1 }) mongosh --host mongo3:27017 db.adminCommand({ hello: 1 })
-
Check Sink Performance
- Query the Nilus metadata table (stored in PostgreSQL) and check for the throughput.
-
Query
SELECT li.*, ri.dataos_resource_id, ri.total_records, -- Extract tag from dataos_resource_id regexp_extract(ri.dataos_resource_id, 'workflow:v1:wf-([^-]+)-', 1) AS tag, -- Calculate MB/sec CASE WHEN ri.duration_sec > 0 THEN li.files_size_mb / li.duration_sec ELSE NULL END AS mb_per_sec, -- Calculate records/sec CASE WHEN ri.duration_sec > 0 THEN ri.total_records / li.duration_sec ELSE NULL END AS events_per_sec FROM "nilusdb"."public".load_info li JOIN ( SELECT id, run_id, load_id, started_at, finished_at, duration_sec, files_size_mb, memory_mb, cpu_percent, dataos_resource_id, reduce( map_values(CAST(records_count AS map(varchar, integer))), 0, (s, x) -> s + x, s -> s ) AS total_records FROM "nilusdb"."public".runs_info WHERE run_as_user = 'dataos-manager' # define username AND dataos_resource_id LIKE 'workflow:v1:wf-%' #define your service name here # AND finished_at > TIMESTAMP '2025-09-17 09:38:00.000 UTC' ) ri ON li.load_id = ri.load_id AND li.run_id = ri.run_id # WHERE ri.total_records > 1000 ORDER BY ri.started_at DESC;
-
Assess the Stability of the Warning
- If the warning lasts < 5 seconds
- Normal temporary backpressure.
- No action needed.
- If the warning lasts ~5–30 seconds
- Monitor closely; CDC lag may grow.
- Check sink and memory usage.
- If the warning persists > 30 seconds
- Risk zone for
ChangeStreamHistoryLost - Take the Following Actions
-
Restart the service
This resets Debezium’s internal threads while preserving its position in the stream.
-
- Risk zone for
- If the warning lasts < 5 seconds
Useful MongoDB Shell Commands
-
Check oplog size & window
This helps determine the available oplog window (how far Debezium can fall behind).
-
Estimate Oplog Window Duration
This is crucial: If Debezium’s lag > oplog window, the change stream will fail.
-
Check Recent Write Rate
-
Check Collection-level Throughput
-
oplogPollingNilus continuously tails the
oplog:- Use a cursor to track the last processed entry. - Parses each entry and emits structured CDC events. - Keeps streaming aligned with replication order. -
MongoDB System Databases & Access
Nilus requires specific database access:
-
local- Source of oplog (
local.oplog.rs). - Requires
readpermissions.
- Source of oplog (
-
admin- Used for server metadata, discovery, and auth.
- Requires
readon commands likereplSetGetStatus,buildInfo,listDatabasesandisMaster.
-
config: Needed only in sharded clusters. -
Target Databases (Application Data)
- Collections you want to capture.
- Requires
readpermissions. - If snapshotting is enabled, Nilus reads all documents during startup.
-