What Morphium Offers

Morphium includes a MongoDB-backed message queue out of the box. No Kafka, no RabbitMQ, no extra infrastructure. Messaging supports topics, exclusive messages, broadcast, priorities and time-to-live — all stored in MongoDB with change-stream-based delivery for near-realtime processing.

The Challenge

Inter-service messaging normally requires a separate broker (Kafka, RabbitMQ, ActiveMQ) with its own deployment, configuration and operational overhead. If you already run MongoDB, Morphium gives you messaging without adding another moving part.

Morphium Features Used

Morphium Messaging Morphium includes a built-in messaging system based on the Msg class and MessageListener interface. This showcase uses a custom ChatMessage entity instead, but the built-in system supports topics, priorities, TTL, and exclusive message processing out of the box. MongoDBAtlasCosmosDB Topics Topics allow messages to be filtered by channel or category. In the built-in system, use addListenerForTopic(). In this custom entity, topics are plain String fields filtered with query.f(field).eq(value). MongoDBAtlasCosmosDB query.set() ($set) Performs a targeted field-level update using MongoDB's $set operator. Only the specified field is modified on the server — the document is never loaded into Java memory. Signature: query.set(field, value, upsert, multiple, AsyncOperationCallback). MongoDBAtlasCosmosDB @Entity Maps a Java class to a MongoDB collection. The collectionName attribute sets the collection explicitly. Import: de.caluga.morphium.annotations.Entity MongoDBAtlasCosmosDB @CreationTime Automatically populated by Morphium with the current timestamp on first store(). Supports Date, long (epoch millis), and String. Not modified on subsequent updates. Import: de.caluga.morphium.annotations.CreationTime MongoDBAtlasCosmosDB @FieldNameConstants Lombok annotation that generates ChatMessage.Fields.sender, .topic, .sentAt etc. at compile time. Used in Morphium queries for type-safe field references instead of hard-coded strings. Import: lombok.experimental.FieldNameConstants MongoDBAtlasCosmosDB sort / countAll sort(Map.of(field, -1)) orders results descending. countAll() executes a server-side count without transferring documents. Both are methods on Morphium's Query object. MongoDBAtlasCosmosDB

Prerequisites & Key Concepts

  • Built-in Messaging vs. Custom Entity — Morphium ships with a full messaging system based on the Msg class, SingleCollectionMessaging manager, and MessageListener interface. It supports topics, priorities, TTL, exclusive consumers, and pause/unpause. This showcase uses a custom ChatMessage entity instead to demonstrate how Morphium's ORM features (queries, sort, set) work with any entity. Both approaches store documents in MongoDB.
  • query.set() updates a single field without loading the document. Morphium translates query.set("read", true, false, false, null) into MongoDB's {"$set": {"read": true}}. The 5th parameter is an async callback (AsyncOperationCallback), or null for synchronous execution. This avoids race conditions and reduces network traffic.
  • Topic Filtering — Uses query.f(ChatMessage.Fields.topic).eq(value) which translates to {"topic": "general"} in MongoDB. Multiple .f().eq() calls are combined with AND logic.
  • @CreationTime fires during the store() lifecycle, before the document reaches MongoDB. It only sets the field when the entity is new (no _id yet).

Built-in Messaging System

For reference, here is how Morphium's built-in messaging API works. This showcase uses a custom entity, but the built-in system is available for production message queue patterns.

Built-in Messaging API Java
import de.caluga.morphium.messaging.SingleCollectionMessaging;
import de.caluga.morphium.messaging.Msg;
import de.caluga.morphium.messaging.MessageListener;

// Create a SingleCollectionMessaging instance (wraps a Morphium instance)
SingleCollectionMessaging messaging = new SingleCollectionMessaging(morphium, 100, true);1
messaging.start();

// Listen for messages on a specific topic
messaging.addListenerForTopic("chat-topic", (msg, m) -> {2
    System.out.println("Received: " + m.getValue());
    return null; // return a Msg to send a reply, or null
});

// Send a message to a topic
Msg msg = new Msg("chat-topic", "Hello!", "Message body");3
msg.setTtl(300000); // 5 min TTL4
messaging.sendMessage(msg);5
1 Create a SingleCollectionMessaging instance wrapping a Morphium instance; the second arg is the polling interval in ms.
2 addListenerForTopic() subscribes to a named topic; the lambda receives each incoming Msg.
3 Msg constructor takes topic name, subject, and body; the built-in system stores this in MongoDB automatically.
4 setTtl() sets a time-to-live in milliseconds; expired messages are ignored by consumers.
5 sendMessage() persists the message to MongoDB and notifies all matching listeners.

Entity Source Code

ChatMessage.java Java
import de.caluga.morphium.annotations.CreationTime;
import de.caluga.morphium.annotations.Entity;
import de.caluga.morphium.annotations.Id;
import de.caluga.morphium.driver.MorphiumId;
import lombok.Data;
import lombok.Builder;
import lombok.NoArgsConstructor;
import lombok.AllArgsConstructor;
import lombok.experimental.FieldNameConstants;
import java.time.LocalDateTime;

@Entity(collectionName = "chat_messages") 1
@Data @NoArgsConstructor @AllArgsConstructor @Builder
@FieldNameConstants 2
public class ChatMessage {

    @Id 3
    private MorphiumId id;

    private String sender;
    private String recipient;
    private String topic;
    private String text;

    @CreationTime 4
    private LocalDateTime sentAt;

    private boolean read; 5
}
1 Maps to the chat_messages collection. This is a custom entity — separate from Morphium's built-in SingleCollectionMessaging which uses its own internal collection.
2 Lombok: generates ChatMessage.Fields.sender, .topic, .sentAt etc. for type-safe query field references
3 Primary key → MongoDB _id. Auto-generated on first store().
4 Automatically set to current time on first store(). Never updated on subsequent saves.
5 Used with query.set("read", true, false, false, null) for atomic field-level updates without rewriting the full document

Service Code

Morphium API — Topic Filtering & Sorting Java
import de.caluga.morphium.Morphium;
import de.caluga.morphium.query.Query;

@Inject Morphium morphium;

// Find all messages sorted by sentAt descending (newest first)
morphium.createQueryFor(ChatMessage.class)
    .sort(Map.of(ChatMessage.Fields.sentAt, -1))1
    .asList();

// Filter by topic
morphium.createQueryFor(ChatMessage.class)
    .f(ChatMessage.Fields.topic).eq("general")2
    .sort(Map.of(ChatMessage.Fields.sentAt, -1))
    .asList();

// Server-side count (no documents transferred)
morphium.createQueryFor(ChatMessage.class).countAll();3
1 sort(Map.of(field, -1)) orders results descending by the given field; -1 = descending, 1 = ascending.
2 .f(field).eq(value) adds a filter condition; chaining multiple calls combines them with AND logic.
3 countAll() executes a server-side count without transferring any documents to the Java application.
Mark as Read (query.set) Java
// Build a query targeting a single document by _id
var query = morphium.createQueryFor(ChatMessage.class)
    .f(ChatMessage.Fields.id).eq(new MorphiumId(id));1

// set() performs a MongoDB $set — only the "read" field is updated
// Parameters: field, value, upsert=false, multiple=false, callback=null
query.set(ChatMessage.Fields.read, true, false, false, null);2
1 The query targets a single document by its _id; Morphium translates this into a {"_id": ObjectId(...)} filter.
2 query.set() issues a MongoDB $set update — only the read field is written; no document round-trip to Java.

Related Documentation