Putting MongoDB Change Streams to Work

Pranay Singhal
5 min readFeb 19, 2020
Image Source: Wikipedia

MongoDB comes with a change stream feature that enables applications to subscribe to streams of change events taking place inside MongoDB. The ability to receive data changes as events makes it simple for applications to implement use-cases that depend on the ability to react to such change events.

One useful application of MongoDB change streams is data change auditing — keeping track of changes to the fields of a collection (who changed a field, when, and what change was made?). Such granular field-level data auditing is often necessary for regulatory and compliance reasons in domains such as healthcare, finance, etc. With a reactive model based on MongoDB change streams, the applications producing the changes do not have to be modified or instrumented to create the change audits. Instead, you can create decoupled audit logging applications that can subscribe to change streams and perform the auditing independently.

In this post, I’ll illustrate the design and implementation of a data change auditing solution that utilizes MongoDB’s change stream feature.

Change streams in MongoDB

The details of how MongoDB change streams work are available in MongoDB documentation: https://docs.mongodb.com/manual/changeStreams/

Here are a few key features:

  • Change streams are available in replica sets and sharded clusters.
  • Change streams are available for a single collection, a database (all collections in the database), or an entire deployment (all collections in all databases).
  • Change stream events contain a variety of data about the change. The subscriber can filter, refine and reshape that data to extract information of interest to them.
  • Each change stream event has a unique token associated with it. In case a subscriber gets disconnected from the stream, it can resume receiving change events from the point it left off by supplying the token for the last change event it received. This makes change stream subscribers resilient to subscription outages/disconnects.

Implementing a change stream watcher

The diagram below illustrates the basic design of the change stream watcher that implements the audit logger use-case.

The watcher connects to the change stream of the Application collection it is configured to watch in order to subscribe to the change stream. If some audit records already exist in the Audit collection, it assumes it needs to resume a previously disrupted subscription — which it does by fetching and using the resume token from the latest audit record when creating the subscription. As change events come through, the watcher filters the events to identify the ones it is interested in (which happen to be inserts and updates in this case). It also transforms the structure of the change data to a format that makes sense for audit records. It then persists each audit record in the Audit collection.

Configurability of the change stream watcher

The watcher allows configuration of several things: the collection to subscribe to for change streams, the collection for storing audit records, which change types to store the complete source record as part of the audit record for, etc. A sample is show below:

{
"appDbUrl": "mongodb://@localhost:27017",
"appDatabaseName": "test",
"appDatabaseCollection": "streamtest",
"userFieldPath": "updatedBy",
"auditDbUrl": "mongodb://@localhost:27017",
"auditDatabaseName": "test",
"auditDatabaseCollection": "audit",
"fullDocRecordOperations": {
"insert": true
},
"version": "1.0"
}

One of the important configuration points for the watcher is to identify the field in the source records where the user ID of the user making the data change is stored. Since the identify of the MongoDB connection cannot be used for business auditing, the audit trail relies on the incoming change data event to contain a field that contains the identify of the business user who made the change. Most business applications maintain this information in their data model. The path of this field in the source records can be configured via the “userFieldPath” attribute. This could be the path to a field in an embedded subdocument.

Structure of the audit records

Following is a sample of an audit record created by the watcher:

{ 
"_id" : {
"_data" : "825E446661000000012B022C0100296E5A1004EC1E76078DCE4C489A2BFE17218EC79F46645F696400645C5D85C62FEF357A165CCABF0004"
},
"collection" : "streamtest",
"database" : "test",
"documentKey" : "5c5d85c62fef357a165ccabf",
"fullDocument" : null,
"operationType" : "update",
"timestamp" : "2020-02-14T15:03:27Z",
"updateDescription" : {
"updatedFields" : {
"lineItems.0.procedures.0.procedureModCodes.0" : "332"
},
"removedFields" : [

]
},
"user" : "tcadmin"
}
  • The unique change token (“resume token”) received from the change stream is used as the ID for the record. This allows the watcher to fetch the latest resume token in order to resume a broken subscription, if necessary.
  • The record contains details of the audited record, such as the source database and collection, the ID for the audited record, the timestamp of the change, the business user who made the change, and the specific details of the changes (such as updated fields, and the updated value for those fields).
  • For “insert” operation types, the entire source record is stored in the “fullDocument” field.

Benefits of using change streams for auditing

The biggest benefit of using the change stream watcher for capturing data change audit logs is the ability to completely decouple the audit functionality from the business transactions that cause the changes. The change watcher, along with the audit records persistence can be “plugged” into any MongoDB collection in a configurable manner, without requiring any changes or redeployment of the applications that write those changes, or affecting their performance or throughput. Essentially, this makes the audit logger an independently deployable microservice.

REST API to fetch field audit trails

Once you start logging audit records for changes happening to data in your source collection, you can use this audit data in a variety of ways. For example, given a particular record, you might need to fetch the audit trail of a particular field — who changed what, and when?

The included source code provides the implementation of a REST API that can fetch field level audit trails. Following is a sample of an audit trail request to this API:

GET /auditrecords/5c5d85c62fef357a165ccabf/lineItems.0.procedures.0.procedureModCodes.0

This request fetches the audit trail for the field identified by the path “lineItems.0.procedures.0.procedureModCodes.0” for the record identified by the key “5c5d85c62fef357a165ccabf”.

Following is the sample response for this request:

[
{
"fieldId": "lineItems.0.procedures.0.procedureModCodes.0",
"fieldValue": "332",
"updatedBy": "jdoe",
"updatedAt": "2020-02-12T15:56:01-05:00"
},
{
"fieldId": "lineItems.0.procedures.0.procedureModCodes.0",
"fieldValue": "453",
"updatedBy": "ssmith",
"updatedAt": "2020-02-12T15:32:33-05:00"
},
{
"fieldId": "lineItems.0.procedures.0.procedureModCodes.0",
"fieldValue": "444",
"updatedBy": "mwilson",
"updatedAt": "2020-02-12T12:36:40-05:00"
},
{
"fieldId": "lineItems.0.procedures.0.procedureModCodes.0",
"fieldValue": "303",
"updatedBy": "tparker",
"updatedAt": "2020-01-30T12:06:52-05:00"
}
]

Source code

The Golang implementation of the change stream watcher described in this post is available here: https://github.com/psinghal04/mongo-changestream-watcher-go

The Golang implementation of the audit trail REST API is available here: https://github.com/psinghal04/mongo-auditapi-go

Each of these comes with a README that provides all the details you would need to know to get them running in your own environment. Try it out!

--

--