Sales Analytics
Aggregation Pipelines, $group, $project, $sort, $bucket
What Morphium Offers
Morphium's Aggregation Pipeline API builds complex analytics with a fluent, type-safe builder
instead of raw BSON documents. Aggregator<T, R> chains stages like $group,
$project, $sort, $bucket and $limit — keeping
aggregation code readable, refactor-safe and compile-time checked.
The Challenge
Aggregation pipelines with the raw MongoDB driver mean deeply nested Map structures that are hard to read, impossible to refactor and invisible to the compiler. A typo in a field name silently produces wrong results.
Morphium Features Used
Prerequisites & Key Concepts
- MongoDB field names vs Java field names — Aggregation pipelines operate on MongoDB field names (snake_case by default), not Java field names. Morphium converts camelCase to snake_case automatically, so
productNamebecomes"product_name"andunitPricebecomes"unit_price"in the database. Pipeline expressions must use the MongoDB names (e.g."$unit_price"). - Fluent group() builder — Morphium's
group()method returns a builder that chains accumulators:agg.group("$region").sum("total", "$field").avg("average", "$field").end(). The.end()call finalizes the$groupstage and returns to the pipeline builder. - Pipeline stage order matters — Stages execute sequentially:
$projectfirst reshapes documents, then$groupaggregates, then$sortorders, then$limitcaps output. Re-ordering stages changes the result. - Result type —
Aggregator<In, Out>takes an input entity type and an output type. The output type can be a custom class orMapfor ad-hoc results.
Aggregation Pipeline
$project
Extract month/year from sale_date
Compute revenue: unit_price * quantity
→
$group
Group by: region / product_name / month
Accumulators: $sum, $avg
→
$sort
Order by totalRevenue descending
→
$limit
Return top N results
Entity Source Code
SalesRecord.java
Java
import de.caluga.morphium.annotations.Entity; import de.caluga.morphium.annotations.Id; import de.caluga.morphium.annotations.Index; import de.caluga.morphium.driver.MorphiumId; import java.time.LocalDateTime; import java.util.List; @Entity(collectionName = "sales_records")1 @Index({"region, product_name"})2 @Data @NoArgsConstructor @AllArgsConstructor @Builder @FieldNameConstants3 public class SalesRecord { @Id4 private MorphiumId id; @Index5 private String productName; private double unitPrice; private int quantity; private double discount; private String region; private String salesRep; @Index6 private LocalDateTime saleDate; private List<String> categories; }
1 Maps this class to the
sales_records MongoDB collection.2 Class-level
@Index defines a compound index on (region, product_name). Uses the MongoDB snake_case field name.3 Lombok:
@Builder enables the builder pattern, @FieldNameConstants generates a Fields inner class with constants for each field name.4 Designates this field as the MongoDB document
_id; Morphium auto-generates a MorphiumId on first store.5 Field-level
@Index creates a single-field index on productName (stored as product_name).6 Field-level
@Index on saleDate enables efficient time-range queries and date sorting.Service Code
Aggregation Pipeline
Morphium API — Aggregation Pipeline
Java
import de.caluga.morphium.Morphium; import de.caluga.morphium.aggregation.Aggregator; @Inject Morphium morphium; // Create an aggregator for SalesRecord → Map Aggregator<SalesRecord, Map> agg = morphium.createAggregator(1 SalesRecord.class, Map.class); // $group — group by region using the fluent builder agg.group("$region")2 .sum("totalRevenue",3 Map.of("$multiply", List.of("$unit_price", "$quantity"))) .sum("totalOrders", 1)4 .avg("avgOrderValue",5 Map.of("$multiply", List.of("$unit_price", "$quantity"))) .end();6 // $sort — order by totalRevenue descending agg.sort(Map.of("totalRevenue", -1));7 // Execute the pipeline List<Map> results = agg.aggregate();8
1 Creates a typed aggregation pipeline: input documents are
SalesRecord, output is a plain Map.2
group("$region") starts a $group stage with region as the grouping key (_id). Returns a fluent builder for chaining accumulators.3
.sum("totalRevenue", expr) accumulates the sum of unit_price * quantity using a $multiply expression. Note: uses MongoDB field names ($unit_price, $quantity).4
.sum("totalOrders", 1) counts documents by summing 1 for each document in the group.5
.avg("avgOrderValue", expr) computes the average of the $multiply expression across the group.6
.end() finalizes the $group stage and returns to the pipeline builder.7
$sort stage — orders groups by totalRevenue descending (-1).8 Sends the assembled pipeline to MongoDB and returns the results as a
List<Map>.
Morphium API — Distinct Values & Count
Java
// Get distinct regions List<String> regions = morphium.createQueryFor(1 SalesRecord.class).distinct("region"); // Get distinct products (uses MongoDB field name!) List<String> products = morphium.createQueryFor(2 SalesRecord.class).distinct("product_name"); // Count total records long count = morphium.createQueryFor(SalesRecord.class).countAll();3
1
createQueryFor().distinct("region") returns all unique values for a field, equivalent to MongoDB's db.collection.distinct().2 Uses the MongoDB field name
"product_name" (snake_case), not the Java field name "productName". This is a common gotcha with Morphium's default name mapping.3
countAll() executes a server-side count without loading any documents into memory.Related Documentation
- Aggregation Examples — $group, $project, $sort, $bucket
- API Reference — Aggregator, Expr, aggregate()
- Developer Guide — Aggregation Framework