Chat / Messaging
Topic-based Pub/Sub, query.set(), Built-in Morphium Messaging
What Morphium Offers
Morphium includes a MongoDB-backed message queue out of the box. No Kafka, no RabbitMQ, no extra
infrastructure. Messaging supports topics, exclusive messages, broadcast, priorities and
time-to-live — all stored in MongoDB with change-stream-based delivery for near-realtime
processing.
The Challenge
Inter-service messaging normally requires a separate broker (Kafka, RabbitMQ, ActiveMQ) with its own deployment, configuration and operational overhead. If you already run MongoDB, Morphium gives you messaging without adding another moving part.
Morphium Features Used
Prerequisites & Key Concepts
- Built-in Messaging vs. Custom Entity — Morphium ships with a full messaging system based on the
Msgclass,SingleCollectionMessagingmanager, andMessageListenerinterface. It supports topics, priorities, TTL, exclusive consumers, and pause/unpause. This showcase uses a customChatMessageentity instead to demonstrate how Morphium's ORM features (queries, sort, set) work with any entity. Both approaches store documents in MongoDB. query.set()updates a single field without loading the document. Morphium translatesquery.set("read", true, false, false, null)into MongoDB's{"$set": {"read": true}}. The 5th parameter is an async callback (AsyncOperationCallback), ornullfor synchronous execution. This avoids race conditions and reduces network traffic.- Topic Filtering — Uses
query.f(ChatMessage.Fields.topic).eq(value)which translates to{"topic": "general"}in MongoDB. Multiple.f().eq()calls are combined with AND logic. @CreationTimefires during thestore()lifecycle, before the document reaches MongoDB. It only sets the field when the entity is new (no_idyet).
Built-in Messaging System
For reference, here is how Morphium's built-in messaging API works. This showcase uses a custom entity, but the built-in system is available for production message queue patterns.
import de.caluga.morphium.messaging.SingleCollectionMessaging; import de.caluga.morphium.messaging.Msg; import de.caluga.morphium.messaging.MessageListener; // Create a SingleCollectionMessaging instance (wraps a Morphium instance) SingleCollectionMessaging messaging = new SingleCollectionMessaging(morphium, 100, true);1 messaging.start(); // Listen for messages on a specific topic messaging.addListenerForTopic("chat-topic", (msg, m) -> {2 System.out.println("Received: " + m.getValue()); return null; // return a Msg to send a reply, or null }); // Send a message to a topic Msg msg = new Msg("chat-topic", "Hello!", "Message body");3 msg.setTtl(300000); // 5 min TTL4 messaging.sendMessage(msg);5
SingleCollectionMessaging instance wrapping a Morphium instance; the second arg is the polling interval in ms.addListenerForTopic() subscribes to a named topic; the lambda receives each incoming Msg.Msg constructor takes topic name, subject, and body; the built-in system stores this in MongoDB automatically.setTtl() sets a time-to-live in milliseconds; expired messages are ignored by consumers.sendMessage() persists the message to MongoDB and notifies all matching listeners.Entity Source Code
import de.caluga.morphium.annotations.CreationTime; import de.caluga.morphium.annotations.Entity; import de.caluga.morphium.annotations.Id; import de.caluga.morphium.driver.MorphiumId; import lombok.Data; import lombok.Builder; import lombok.NoArgsConstructor; import lombok.AllArgsConstructor; import lombok.experimental.FieldNameConstants; import java.time.LocalDateTime; @Entity(collectionName = "chat_messages") 1 @Data @NoArgsConstructor @AllArgsConstructor @Builder @FieldNameConstants 2 public class ChatMessage { @Id 3 private MorphiumId id; private String sender; private String recipient; private String topic; private String text; @CreationTime 4 private LocalDateTime sentAt; private boolean read; 5 }
chat_messages collection. This is a custom entity — separate from Morphium's built-in SingleCollectionMessaging which uses its own internal collection.ChatMessage.Fields.sender, .topic, .sentAt etc. for type-safe query field references_id. Auto-generated on first store().store(). Never updated on subsequent saves.query.set("read", true, false, false, null) for atomic field-level updates without rewriting the full documentService Code
import de.caluga.morphium.Morphium; import de.caluga.morphium.query.Query; @Inject Morphium morphium; // Find all messages sorted by sentAt descending (newest first) morphium.createQueryFor(ChatMessage.class) .sort(Map.of(ChatMessage.Fields.sentAt, -1))1 .asList(); // Filter by topic morphium.createQueryFor(ChatMessage.class) .f(ChatMessage.Fields.topic).eq("general")2 .sort(Map.of(ChatMessage.Fields.sentAt, -1)) .asList(); // Server-side count (no documents transferred) morphium.createQueryFor(ChatMessage.class).countAll();3
sort(Map.of(field, -1)) orders results descending by the given field; -1 = descending, 1 = ascending..f(field).eq(value) adds a filter condition; chaining multiple calls combines them with AND logic.countAll() executes a server-side count without transferring any documents to the Java application.// Build a query targeting a single document by _id var query = morphium.createQueryFor(ChatMessage.class) .f(ChatMessage.Fields.id).eq(new MorphiumId(id));1 // set() performs a MongoDB $set — only the "read" field is updated // Parameters: field, value, upsert=false, multiple=false, callback=null query.set(ChatMessage.Fields.read, true, false, false, null);2
_id; Morphium translates this into a {"_id": ObjectId(...)} filter.query.set() issues a MongoDB $set update — only the read field is written; no document round-trip to Java.Related Documentation
- Messaging System — Built-in Queue, Topics, Listeners
- Messaging Implementations — sendMessage(), addListenerForTopic()
- API Reference — query.set(), countAll(), sort()