What Morphium Offers

Morphium's Aggregation Pipeline API builds complex analytics with a fluent, type-safe builder instead of raw BSON documents. Aggregator<T, R> chains stages like $group, $project, $sort, $bucket and $limit — keeping aggregation code readable, refactor-safe and compile-time checked.

The Challenge

Aggregation pipelines with the raw MongoDB driver mean deeply nested Map structures that are hard to read, impossible to refactor and invisible to the compiler. A typo in a field name silently produces wrong results.

Morphium Features Used

Aggregator Entry point for building aggregation pipelines. Created via morphium.createAggregator(InputType.class, OutputType.class). Stages are chained fluently. Import: de.caluga.morphium.aggregation.Aggregator MongoDBAtlasCosmosDB $group Groups documents by a key expression and applies accumulators ($sum, $avg, $min, $max) to each group. Equivalent to SQL GROUP BY. Called via aggregator.group("$field").sum(...).avg(...).end(). MongoDBAtlasCosmosDB $project Reshapes documents: include/exclude fields or compute new ones. Called via aggregator.project(...). Only projected fields appear in subsequent stages. MongoDBAtlasCosmosDB $sort Orders documents by one or more fields. 1 = ascending, -1 = descending. Called via aggregator.sort(...). Place after $group to sort aggregated results. MongoDBAtlasCosmosDB $limit Restricts the pipeline output to N documents. Called via aggregator.limit(n). Use after $sort to get "top N" results. MongoDBAtlasCosmosDB $bucket Categorises documents into buckets defined by boundary values. Useful for histograms (e.g. price ranges, quantity ranges). Called via aggregator.bucket(...). MongoDBAtlasCosmosDB @Entity Maps a Java class to a MongoDB collection. The collectionName attribute sets the collection explicitly. Import: de.caluga.morphium.annotations.Entity MongoDBAtlasCosmosDB @Index Creates indexes for faster queries. At class level: compound index on multiple fields (e.g. "region, product_name"). At field level: single-field index. Import: de.caluga.morphium.annotations.Index MongoDBAtlasCosmosDB

Prerequisites & Key Concepts

  • MongoDB field names vs Java field names — Aggregation pipelines operate on MongoDB field names (snake_case by default), not Java field names. Morphium converts camelCase to snake_case automatically, so productName becomes "product_name" and unitPrice becomes "unit_price" in the database. Pipeline expressions must use the MongoDB names (e.g. "$unit_price").
  • Fluent group() builder — Morphium's group() method returns a builder that chains accumulators: agg.group("$region").sum("total", "$field").avg("average", "$field").end(). The .end() call finalizes the $group stage and returns to the pipeline builder.
  • Pipeline stage order matters — Stages execute sequentially: $project first reshapes documents, then $group aggregates, then $sort orders, then $limit caps output. Re-ordering stages changes the result.
  • Result typeAggregator<In, Out> takes an input entity type and an output type. The output type can be a custom class or Map for ad-hoc results.

Aggregation Pipeline

$project
Extract month/year from sale_date
Compute revenue: unit_price * quantity
$group
Group by: region / product_name / month
Accumulators: $sum, $avg
$sort
Order by totalRevenue descending
$limit
Return top N results

Entity Source Code

SalesRecord.java Java
import de.caluga.morphium.annotations.Entity;
import de.caluga.morphium.annotations.Id;
import de.caluga.morphium.annotations.Index;
import de.caluga.morphium.driver.MorphiumId;
import java.time.LocalDateTime;
import java.util.List;

@Entity(collectionName = "sales_records")1
@Index({"region, product_name"})2
@Data @NoArgsConstructor @AllArgsConstructor
@Builder @FieldNameConstants3
public class SalesRecord {

    @Id4
    private MorphiumId id;

    @Index5
    private String productName;

    private double unitPrice;
    private int quantity;
    private double discount;
    private String region;
    private String salesRep;

    @Index6
    private LocalDateTime saleDate;

    private List<String> categories;
}
1 Maps this class to the sales_records MongoDB collection.
2 Class-level @Index defines a compound index on (region, product_name). Uses the MongoDB snake_case field name.
3 Lombok: @Builder enables the builder pattern, @FieldNameConstants generates a Fields inner class with constants for each field name.
4 Designates this field as the MongoDB document _id; Morphium auto-generates a MorphiumId on first store.
5 Field-level @Index creates a single-field index on productName (stored as product_name).
6 Field-level @Index on saleDate enables efficient time-range queries and date sorting.

Service Code

Aggregation Pipeline

Morphium API — Aggregation Pipeline Java
import de.caluga.morphium.Morphium;
import de.caluga.morphium.aggregation.Aggregator;

@Inject Morphium morphium;

// Create an aggregator for SalesRecord → Map
Aggregator<SalesRecord, Map> agg = morphium.createAggregator(1
    SalesRecord.class, Map.class);

// $group — group by region using the fluent builder
agg.group("$region")2
    .sum("totalRevenue",3
        Map.of("$multiply", List.of("$unit_price", "$quantity")))
    .sum("totalOrders", 1)4
    .avg("avgOrderValue",5
        Map.of("$multiply", List.of("$unit_price", "$quantity")))
    .end();6

// $sort — order by totalRevenue descending
agg.sort(Map.of("totalRevenue", -1));7

// Execute the pipeline
List<Map> results = agg.aggregate();8
1 Creates a typed aggregation pipeline: input documents are SalesRecord, output is a plain Map.
2 group("$region") starts a $group stage with region as the grouping key (_id). Returns a fluent builder for chaining accumulators.
3 .sum("totalRevenue", expr) accumulates the sum of unit_price * quantity using a $multiply expression. Note: uses MongoDB field names ($unit_price, $quantity).
4 .sum("totalOrders", 1) counts documents by summing 1 for each document in the group.
5 .avg("avgOrderValue", expr) computes the average of the $multiply expression across the group.
6 .end() finalizes the $group stage and returns to the pipeline builder.
7 $sort stage — orders groups by totalRevenue descending (-1).
8 Sends the assembled pipeline to MongoDB and returns the results as a List<Map>.
Morphium API — Distinct Values & Count Java
// Get distinct regions
List<String> regions = morphium.createQueryFor(1
    SalesRecord.class).distinct("region");

// Get distinct products (uses MongoDB field name!)
List<String> products = morphium.createQueryFor(2
    SalesRecord.class).distinct("product_name");

// Count total records
long count = morphium.createQueryFor(SalesRecord.class).countAll();3
1 createQueryFor().distinct("region") returns all unique values for a field, equivalent to MongoDB's db.collection.distinct().
2 Uses the MongoDB field name "product_name" (snake_case), not the Java field name "productName". This is a common gotcha with Morphium's default name mapping.
3 countAll() executes a server-side count without loading any documents into memory.

Related Documentation