基于spring-data 实现mongodb changestream

最近项目要求对mongodb的数据变更操作记录操作日志,首先想到的是基于spring的AOP对变更的接口进行拦截处理,由于调用接口的点很多不是很方便的去梳理,考虑使用mongodb的CDC机制,实时监控数据的变更。

首先是springboot集成mongodb,mongdb需要是3.6以上的版本才能支持changestream

<parent>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-parent</artifactId>     <version>2.1.3.RELEASE</version> </parent>  <dependencies>    <dependency>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-data-mongodb</artifactId>     </dependency> </dependencies>

首先配置mongo监听器,用于接收数据库的变更信息

import com.mongodb.client.model.changestream.ChangeStreamDocument; import lombok.extern.slf4j.Slf4j; import org.bson.Document; import org.springframework.data.mongodb.core.messaging.Message; import org.springframework.data.mongodb.core.messaging.MessageListener; import org.springframework.stereotype.Component;  @Component @Slf4j public class DocumnetMessageListener implements MessageListener<ChangeStreamDocument<Document>, Document> {      @Override     public void onMessage(Message<ChangeStreamDocument<Document>, Document> message) {         log.info("Received Message in collection: {},message raw: {}, message body:{}",                 message.getProperties().getCollectionName(), message.getRaw(), message.getBody());      } }


配置 mongo监听器的容器MessageListenerContainer,spring启动时会自动启动监听的任务用于接收changestream

import com.mongodb.client.model.changestream.FullDocument; import org.bson.Document; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest; import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer; import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;  import java.util.concurrent.Executor; import java.util.concurrent.Executors;  import static org.springframework.data.mongodb.core.aggregation.Aggregation.match; import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation; import static org.springframework.data.mongodb.core.query.Criteria.where;  @Configuration public class MongoConfig {     @Bean     MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumnetMessageListener documnetMessageListener) {         Executor executor = Executors.newSingleThreadExecutor();         MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, executor) {             @Override             public boolean isAutoStartup() {                 return true;             }         };          ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(documnetMessageListener)                 .collection("topic")  //需要监听的集合名,不指定默认监听数据库的                 .filter(newAggregation(match(where("operationType").in("insert", "update", "replace")))) //过滤需要监听的操作类型,可以根据需求指定过滤条件                 .fullDocumentLookup(FullDocument.UPDATE_LOOKUP) //不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息                 .build();         messageListenerContainer.register(request, Document.class);          return messageListenerContainer;     } }


返回的数据格式类似这样子

{ 	"_id": { 		"_data": { 			"$binary": "glzquiIAAAACRmRfaWQAZFzquiK0lDNo+K0DpwBaEARUMrm0ruVACoftuxjt1RtCBA==", 			"$type": "00" 		} 	}, 	"operationType": "insert", 	"fullDocument": { 		"_id": { 			"$oid": "5ceaba22b4943368f8ad03a7" 		}, 		"y": 1 	}, 	"ns": { 		"db": "test", 		"coll": "bar" 	}, 	"documentKey": { 		"_id": { 			"$oid": "5ceaba22b4943368f8ad03a7" 		} 	} }




您可能还会对下面的文章感兴趣: