MongoDB Stats Pre-calculation and Historical Tracking¶
Temporary planning document — delete when implementation is complete.
Overview¶
This document is the revised technical design for pre-calculating project statistics. It supersedes the original 4-milestone embedded approach from GitHub Discussion #1823. User-facing context is in Discussion #1822. The epic is tracked as Issue #1831.
Problem: Loading Stage Overview charts is slow for larger projects because stats are recalculated from scratch via an 11-facet MongoDB aggregation pipeline (StudyStats.cs) on every page load and every SignalR push. This puts excessive load on the MongoDB Atlas cluster, increasing hosting costs. A MongoDB consultant confirmed the need for pre-calculation.
Solution: A single pmProjectStats collection with pre-calculated stats updated atomically via delta value objects returned from Study entity methods (DDD pattern). Real-time push via MongoDB change streams on the new collection. Historical trend data via daily snapshots in the same collection. Three-way stats audit job for correctness validation.
Key benefit: Reduced Atlas cluster load, potentially enabling tier reduction from M30 to M20, saving operational costs.
Design Decisions (vs Original Plan)¶
The original plan proposed embedding stats in the Project document across 4 progressive milestones. This design differs:
| Original plan | This design | Rationale |
|---|---|---|
Embed stats in Project document |
Separate pmProjectStats collection |
Project document is already 1,263 lines with a TODO to refactor for SRP; embedding makes it larger then requires migrating out later |
| 4 milestones (screening → annotation → historical → separate collection) | All stat types from the start, historical via same collection | Delta pattern is the same for screening and annotation; implementing twice is wasted effort |
Two collections (ProjectStats + HistoricalStats) |
Single pmProjectStats with current + dated documents |
Simpler model; current = real-time stats, dated = frozen daily snapshots |
| External delta calculation service | Study entity methods return delta value objects | DDD-aligned: the aggregate that performs the mutation describes what changed |
Nightly reconciliation recalculates from .Classify() |
Three-way audit comparing stored vs legacy aggregation vs streamed classification | Independent implementations cross-check each other; catches bugs in either path |
| Full recalculation on settings change | Recompute derived fields from stored tally dictionaries | Tallies are settings-independent; reclassification is pure arithmetic with no DB reads |
Terminology¶
To avoid confusion between session-level and study-level concepts, and to avoid collision with the domain term "reconciliation" (which means resolving annotation disagreements between reviewers):
| Concept | Term | Definition |
|---|---|---|
| Individual session saved but not finalised | Session incomplete | SessionStatus.Incomplete |
| Individual session finalised | Session completed | SessionStatus.Completed |
| Study has all required sessions completed | Study annotation fulfilled | CompletedCandidateSessions >= SessionCountTarget |
| Study has some but not all sessions completed | Study annotation in progress | CompletedCandidateSessions > 0 && < SessionCountTarget |
| Study has no annotation sessions | Study annotation not started | CompletedCandidateSessions == 0 |
| Study has more sessions than required | Study over-annotated | CompletedCandidateSessions > SessionCountTarget |
| Background job that validates stats accuracy | Stats audit | Not "reconciliation" — that term is reserved for the domain concept of reviewer disagreement resolution |
Collection Design: pmProjectStats¶
One document per project per date, plus a current document updated in real-time:
// Real-time document — updated atomically on every review action
{ projectId: CSUUID("..."), date: "current", updatedAt: ISODate("..."), settings: {...}, ... }
// Daily snapshot — frozen copy created by snapshot job
{ projectId: CSUUID("..."), date: "2026-03-28", updatedAt: ISODate("..."), settings: {...}, ... }
Index: { projectId: 1, date: 1 } (unique compound)
- Current stats:
find({ projectId, date: "current" })— direct lookup - Historical range:
find({ projectId, date: { $gte: "2026-01-01", $lte: "2026-03-28" } })— range scan (ISO date strings sort correctly)
Document Schema¶
Top-level fields¶
| Field | Type | Notes |
|---|---|---|
_id |
ObjectId |
Auto-generated |
projectId |
BinData(3) (CSUUID) |
Project aggregate ID |
date |
string |
"current" or ISO date "2026-03-28" |
updatedAt |
ISODate |
Timestamp of last update |
settings — configuration snapshot for interpreting stats¶
These are the project/stage settings that affect how studies are classified. Stored so that historical snapshots can be interpreted in the context of the settings that were active when the snapshot was taken.
| Field | Type | Source |
|---|---|---|
settings.agreementThreshold.absoluteAgreementRatio |
double? |
ProjectAgreementThreshold.AbsoluteAgreementRatio |
settings.agreementThreshold.numberScreened |
int |
ProjectAgreementThreshold.NumberScreened |
settings.agreementThreshold.agreementMode |
string |
Derived: "SingleScreening" / "ManualDualScreening" / "AutomatedDualScreening" / "Custom" |
settings.stages.<stageId>.sessionCountTarget |
int |
Stage.SessionCountTarget |
settings.stages.<stageId>.maxInProgress |
int? |
Stage.MaxInProgress |
settings.stages.<stageId>.allowSelfReconciliation |
bool |
Stage.AllowSelfReconciliation |
settings.stages.<stageId>.hideExcludedStudiesFromReviewers |
bool |
Stage.HideExcludedStudiesFromReviewers |
settings.stages.<stageId>.excludedSessionStatsGrouping |
string |
"WithUnexcluded" / "Separate" / "Unavailable" |
settings.stages.<stageId>.reviewMode |
string |
"Screening" / "Annotation" / "ScreeningAndAnnotation" |
projectScreening — project-wide screening stats¶
From ProjectScreeningStats. All integers updated via $inc deltas.
| Field | Type | Description |
|---|---|---|
count |
int |
Total number of studies in the project |
sufficientlyScreened |
int |
Studies where screening is complete per threshold |
insufficientlyScreened |
int |
Studies with screening started but not meeting threshold |
sufficientlyIncluded |
int |
Studies meeting threshold with inclusion decision |
sufficientlyExcluded |
int |
Studies meeting threshold with exclusion decision |
startedScreening |
int |
Studies with at least one screening |
overscreened |
int |
Studies with more screenings than numberScreened |
overscreenedYetInsufficientlyScreened |
int |
Overscreened but still incomplete (disagreement) |
overscreenedAndSufficientlyIncluded |
int |
Overscreened and included |
overscreenedAndSufficientlyExcluded |
int |
Overscreened and excluded |
screeningTallyCounts |
{ [numScreenings]: { [numIncluded]: count } } |
Settings-independent raw breakdown. Outer key = number of screenings on study, inner key = number of included decisions, value = study count |
stageAnnotation.<stageId> — per-stage annotation stats¶
From StageAnnotationStats + AnnotationSessionStats. One entry per stage.
For each of unexcludedSessionStats and excludedSessionStats:
| Field | Type | Description |
|---|---|---|
totalCount |
int |
Total studies in this exclusion category for this stage |
candidateSessionsCountLookup |
{ [numStarted]: { [numCompleted]: count } } |
Settings-independent raw breakdown. Outer key = candidate sessions started, inner key = completed, value = study count |
startedReconciliationCount |
int |
Studies with reconciliation started |
completedReconciliationCount |
int |
Studies with reconciliation completed |
membershipScreening.<membershipId> — per-reviewer screening stats¶
From MembershipScreeningStats. One entry per active project membership.
| Field | Type | Description |
|---|---|---|
investigatorId |
BinData(3) |
Reviewer's investigator ID |
screened |
int |
Studies this reviewer has screened |
included |
int |
Studies this reviewer marked as included |
excluded |
int |
Studies this reviewer marked as excluded |
sufficientlyIncluded |
int |
Studies screened by this reviewer that are project-level sufficiently included |
sufficientlyExcluded |
int |
Studies screened by this reviewer that are project-level sufficiently excluded |
sufficientlyIncludedAndAgree |
int |
Sufficiently included and this reviewer agreed (voted include) |
sufficientlyExcludedAndAgree |
int |
Sufficiently excluded and this reviewer agreed (voted exclude) |
sufficientlyIncludedAndDisagree |
int |
Sufficiently included but this reviewer voted exclude |
sufficientlyExcludedAndDisagree |
int |
Sufficiently excluded but this reviewer voted include |
sufficientlyScreened |
int |
Studies this reviewer screened that are sufficiently screened |
insufficientlyScreened |
int |
Studies this reviewer screened that are insufficiently screened |
overscreened |
int |
Studies this reviewer screened that are overscreened |
overscreenedYetInsufficientlyScreened |
int |
Overscreened but insufficiently screened |
overscreenedAndSufficientlyIncluded |
int |
Overscreened and sufficiently included |
overscreenedAndSufficientlyExcluded |
int |
Overscreened and sufficiently excluded |
available |
int |
Studies available for this reviewer to screen |
unavailable |
int |
Studies unavailable (already sufficient, or reviewer already screened) |
totalNumberStudies |
int |
Total study count (should equal screened + available + unavailable) |
screeningTallyCounts |
{ [numScreenings]: { [numIncluded]: count } } |
Per-membership settings-independent breakdown, scoped to studies this reviewer screened. Enables settings-change reclassification without querying studies. |
membershipAnnotation.<membershipId> — per-reviewer, per-stage annotation stats¶
From MembershipStageAnnotationStatsMap. One entry per membership with nested entries per stage.
| Field | Type | Description |
|---|---|---|
investigatorId |
BinData(3) |
Reviewer's investigator ID |
For each stage — membershipAnnotation.<membershipId>.stages.<stageId>:
| Field | Type | Description |
|---|---|---|
total |
int |
Total studies for this reviewer in this stage |
For each of unexcludedSessionStats and excludedSessionStats within each stage:
| Field | Type | Description |
|---|---|---|
available |
int |
Studies available for annotation |
inProgress |
int |
Studies where reviewer has a session with SessionStatus.Incomplete |
completed |
int |
Studies where reviewer has a completed session |
unavailable |
int |
Studies unavailable for annotation |
total |
int |
Total studies in this exclusion category |
candidateAnnotationSessionsGloballyCompleted |
int |
Studies that are annotation-fulfilled globally (all slots filled) |
reconcileAvailable |
int |
Studies available for reconciliation |
reconcileInProgress |
int |
Studies with reconciliation in progress |
reconcileCompleted |
int |
Studies with reconciliation completed |
reconcileUnavailable |
int |
Studies unavailable for reconciliation |
selfReconciliationEnabled |
bool |
From Stage.AllowSelfReconciliation |
candidateSessionsCountLookup |
{ [numStarted]: { [numCompleted]: count } } |
Already stored at this level in the existing DTO |
Derived at Read Time (NOT stored)¶
These are computed when mapping from ProjectStats document to the FullStats DTO, using stored counters + settings:
- All
percent*fields (e.g.,percentIncluded,percentAvailable) —Math.Truncate(10000 * field / total) / 100 mergedStats— combines unexcluded + excluded based onExcludedSessionStatsGroupingsettingseparatedExcluded— extracted from excluded stats when grouping isSeparatehasReachedMaxInProgress— frominProgresscounts +MaxInProgresssettinghasReachedMaxInProgressReconciliation— same patterninsufficientlyCompletedCandidateSessions—total - candidateAnnotationSessionsGloballyCompletednumberVerification—totalNumberStudies == screened + available + unavailableincompleteThreshold— directly fromStage.MaxInProgress
Delta Value Objects from Study Entity Methods¶
Following DDD Pattern B (functional domain modeling): the entity method that performs the mutation also returns a value object describing what changed. This guarantees the delta uses the same logic as the mutation — there is no separate classifier that could diverge.
Screening delta¶
Study.AddScreening() currently returns void. Change it to return a ScreeningDelta:
internal ScreeningDelta AddScreening(
Guid projectId, Guid screenerId, ScreeningDecision decision,
Guid stageId, ProjectAgreementThreshold threshold)
{
var before = ScreeningInfo.Classify(threshold);
ScreeningInfo = ScreeningInfo.ScreenStudy(Id, projectId, screenerId, decision, stageId);
var after = ScreeningInfo.Classify(threshold);
return ScreeningDelta.Between(before, after, screenerId);
}
ScreeningInfo.Classify(threshold) is a new pure method on the existing immutable ScreeningInfo record. It returns a ScreeningClassification value object describing how this study is classified given the threshold. This is the single source of truth for classification logic.
ScreeningDelta contains +1, -1, or 0 for each counter field in ProjectScreeningStats and MembershipScreeningStats, plus a TallyTransition describing which tally bucket the study moved from/to.
Note: count (total study count) is NOT in the screening delta — study count changes when studies are imported/removed, not when screened.
Annotation delta¶
Study.AddSessionData() similarly returns an AnnotationDelta:
internal AnnotationDelta AddSessionData(
Guid investigatorId, SessionSubmissionDto sessionSubmission,
IEnumerable<Guid> stageQuestionIds, bool includesDataExtraction,
int sessionCountTarget)
{
var before = ExtractionInfo.ClassifyAnnotation(sessionSubmission.StageId, sessionCountTarget);
ExtractionInfo.AddSessionData(investigatorId, sessionSubmission, stageQuestionIds, includesDataExtraction);
var after = ExtractionInfo.ClassifyAnnotation(sessionSubmission.StageId, sessionCountTarget);
return AnnotationDelta.Between(before, after, investigatorId, sessionSubmission.StageId);
}
Domain service orchestration¶
A domain service orchestrates the atomic persist:
public class ReviewDomainService
{
public async Task<ScreeningDelta> ScreenStudy(
Project project, Study study, Guid screenerId,
ScreeningDecision decision, Guid stageId,
IPmUnitOfWork unitOfWork, IProjectStatsRepository statsRepo)
{
GuardActiveMembership(project, screenerId);
var delta = study.AddScreening(
project.Id, screenerId, decision, stageId,
project.ProjectAgreementThreshold);
using var session = unitOfWork.StartSession();
session.StartTransaction();
await unitOfWork.SaveAsync(study, session);
await statsRepo.ApplyDeltaAsync(project.Id, delta, session);
await session.CommitTransactionAsync();
return delta;
}
}
All Events That Affect Stats¶
Every operation that changes which studies exist or their review state must include an atomic stats update in the same transaction.
| Event | Stats impact | Where it happens |
|---|---|---|
| Screening decision added | Project screening counts, membership screening counts, tally buckets | ReviewController.SubmitScreeningAndGetNextStudyForReview |
| Screening decision changed (rescreening) | Study moves between classification buckets | Same — ScreeningInfo.ScreenStudy detects existing screening |
| Annotation session saved (incomplete) | Membership annotation inProgress counts |
ReviewController.SubmitSessionAndGetNextStudyForReview |
| Annotation session completed | Membership annotation completed, stage candidateSessionsCountLookup, possibly candidateAnnotationSessionsGloballyCompleted |
Same controller |
| Reconciliation session saved/completed | reconcileInProgress, reconcileCompleted, startedReconciliationCount, completedReconciliationCount |
Reconciliation review controller |
| Studies added (systematic search import) | Two-phase: studies inserted with PendingImportJobId (invisible, stats unchanged), then atomic commit clears flag + applies bulk delta |
SearchImportJobConsumer (see two-phase import design below) |
| Studies removed (systematic search deletion) | Two-phase: studies marked PendingDeletion (invisible), then batched deletion with per-batch negative deltas |
ProjectManagementService (see two-phase deletion design below) |
| Agreement threshold changed | Recompute all derived screening fields from screeningTallyCounts — no Study queries needed (see below) |
ProjectAgreementThresholdUpdatedHandler |
Stage settings changed (sessionCountTarget, maxInProgress, excludedSessionStatsGrouping) |
Recompute derived annotation fields from candidateSessionsCountLookup — no Study queries needed |
Stage update handler |
| Membership added/removed | Membership stats entries added/removed | Membership management |
Two-phase study import (systematic search upload)¶
Current behaviour: BatchedSaveManyAsync inserts studies in batches of 200 with no transaction wrapping the overall import. Studies become visible to reviewers as each batch completes. If the import fails partway, the project has a partial dataset with incorrect stats — users see incomplete data and lose confidence in the system.
Revised design: A two-phase commit ensures studies are either fully imported with correct stats, or not present at all.
New field on Study entity¶
// Null = study is available. Set = study is pending import, invisible to reviewers.
public Guid? PendingImportJobId { get; set; }
All study queries used for review add a filter:
// Added to Filters.cs — included in every review query
public static FilterDefinition<Study> NotPendingImport =>
Builders<Study>.Filter.Eq(s => s.PendingImportJobId, null);
Index: { PendingImportJobId: 1 } (sparse) — only indexes documents where the field exists.
Phase 1: Staged insert (studies invisible, stats unchanged)¶
Each batch of 200 studies is inserted with PendingImportJobId = jobId. Studies exist in the database but are invisible to all review queries. Stats are not updated — the studies don't count yet.
// During BatchedSaveManyAsync — each study in the batch has:
study.PendingImportJobId = jobId;
// Standard batch insert (no transaction needed — studies are invisible anyway)
await unitOfWork.BatchedSaveManyAsync(studies, saveBatchSize, ...);
Failure during Phase 1: Delete all pending studies and mark the job as failed. No stats to undo.
catch (Exception ex)
{
await studyRepo.DeletePendingStudiesAsync(jobId);
// DeleteMany({ PendingImportJobId: jobId })
job.SetStatus(SearchImportJobStatus.Error);
await unitOfWork.SaveAsync(project);
}
Phase 2: Atomic commit (studies become available + stats updated)¶
Once all batches have been saved successfully:
using var session = unitOfWork.StartSession();
session.StartTransaction();
// 1. Clear PendingImportJobId on all studies for this job (single UpdateMany)
await studyRepo.ClearPendingImportAsync(jobId, session);
// UpdateMany({ PendingImportJobId: jobId }, { $unset: { PendingImportJobId: "" } })
// 2. Apply bulk stats delta
await projectStatsRepo.ApplyBulkImportDeltaAsync(projectId, totalStudies, project, session);
// $inc count +totalStudies, $inc screeningTallyCounts.0.0 +totalStudies,
// $inc available +totalStudies per membership, etc.
// 3. Mark job as Complete
job.SetStatus(SearchImportJobStatus.Complete);
await unitOfWork.SaveAsync(project, session);
await session.CommitTransactionAsync();
This transaction is small — one UpdateMany (clear a single field), one $inc (stats), one project save. Well within transaction limits even for 50,000 studies.
Result: Studies become available and stats update in a single atomic step. At no point can a reviewer see a partial import or inconsistent stats.
Two-phase systematic search deletion¶
Same principle, reversed.
Phase 1: Mark unavailable (immediate, studies disappear from review)¶
// Mark all studies in this search as pending deletion — immediate, no transaction needed
await studyRepo.MarkPendingDeletionAsync(projectId, searchId);
// UpdateMany(
// { ProjectId: projectId, SystematicSearchId: searchId },
// { $set: { PendingDeletion: true } }
// )
New field on Study:
Study query filter extended:
public static FilterDefinition<Study> Available =>
NotPendingImport & Builders<Study>.Filter.Ne(s => s.PendingDeletion, true);
Studies immediately disappear from all review queries. Any reviewer who had one loaded gets a "study no longer available" response on their next action.
Phase 2: Batched deletion with per-batch stats updates¶
Process in batches. Each batch classifies studies to compute negative deltas, then deletes + updates stats atomically:
const int deletionBatchSize = 500;
while (true)
{
var batch = await studyRepo.GetPendingDeletionBatchAsync(
projectId, searchId, deletionBatchSize);
if (batch.Count == 0) break;
// Classify each study to compute negative delta
var deltaAccumulator = new StatsDeltaAccumulator();
foreach (var study in batch)
{
var screeningClass = study.ScreeningInfo.Classify(project.ProjectAgreementThreshold);
deltaAccumulator.SubtractProjectScreening(screeningClass);
foreach (var membership in project.ActiveMemberships)
deltaAccumulator.SubtractMembershipScreening(study, membership, screeningClass);
foreach (var stage in project.Stages)
{
var annotationClass = study.ExtractionInfo.ClassifyAnnotation(
stage.Id, stage.SessionCountTarget);
deltaAccumulator.SubtractAnnotation(study, stage, annotationClass);
}
}
// Atomic: delete batch + apply negative delta
using var session = unitOfWork.StartSession();
session.StartTransaction();
await studyRepo.DeleteBatchAsync(batch.Select(s => s.Id), session);
await projectStatsRepo.ApplyDeltaAsync(projectId, deltaAccumulator.ToDelta(), session);
await session.CommitTransactionAsync();
}
Failure during Phase 2: The process is resumable. Studies already deleted have had their stats subtracted correctly. Remaining studies are still marked PendingDeletion: true — invisible to reviewers. The process resumes by querying for more PendingDeletion studies.
Result: At no point during the deletion can a user see inconsistent stats or a partial deletion.
Consistency guarantees summary¶
| Operation | User sees | Stats | Failure mode |
|---|---|---|---|
| Import Phase 1 (batching) | Nothing — studies invisible | Unchanged | Delete all pending studies, no stats to undo |
| Import Phase 2 (commit) | All studies appear atomically | Updated atomically | Transaction rollback — studies stay invisible |
| Delete Phase 1 (mark) | Studies disappear immediately | Unchanged | Unmark studies — they reappear |
| Delete Phase 2 (batching) | Studies already invisible | Updated per batch | Resume from where it stopped — remaining studies still invisible |
Settings changes — recompute from tally dictionaries¶
The tally dictionaries (screeningTallyCounts, candidateSessionsCountLookup) are settings-independent raw breakdowns. They record facts about studies that don't depend on any threshold or target.
When settings change, iterate the tally dictionary and reclassify using the new settings — pure arithmetic, no database reads, no Study collection queries:
Settings changed
→ Read current pmProjectStats document
→ Iterate screeningTallyCounts, reclassify each bucket with new threshold
→ Iterate candidateSessionsCountLookup per stage, reclassify with new sessionCountTarget
→ Write updated current document with new settings snapshot and recomputed derived fields
Per-membership tally dictionaries (membershipScreening.<id>.screeningTallyCounts) enable the same fast reclassification at the membership level.
This settings-change handler updates the current document immediately. Because the snapshot job compares the full snapshot state, including the persisted settings snapshot and recomputed derived counters, a settings change that affects how stats are interpreted will be seen as a change and will therefore produce a dated snapshot on the next snapshot run, even if no new review actions occurred that day.
Stats Reading — Remove Aggregation and Sync-over-Async from Project Details Loading¶
Currently ProjectStatsResolver runs StudyStats.GetFullProjectStatsAsync().Result (blocking synchronous call) inside AutoMapper on every ProjectDetailsDto mapping. This is the primary performance bottleneck.
Replace this with an async projection outside AutoMapper. Load the project and the pre-calculated current stats document explicitly, then assemble the DTO synchronously once both async reads have completed:
public async Task<ProjectDetailsDto> GetProjectDetails(Guid projectId)
{
var project = await unitOfWork.Projects.GetByIdAsync(projectId);
var projectStats = await unitOfWork.ProjectStats.GetCurrentAsync(projectId);
var dto = SyrfMapper.Mapper.Map<ProjectDetailsDto>(project);
dto.FullStats = projectStats?.ToFullStats(userId) ?? FullStats.Empty;
return dto;
}
This removes both sources of avoidable overhead:
- Single document read (
find({ projectId, date: "current" })) instead of an 11-facet aggregation pipeline - No
.Resultsync-over-async call inside AutoMapper
Once this is in place, ProjectStatsResolver can be deleted or reduced to a pure synchronous mapper over already-loaded values.
SignalR — Change Stream on pmProjectStats¶
SyRF uses MongoDB as a SignalR backplane — all API replicas connect to the same database, so change streams ensure all replicas push notifications to their connected clients. Notifications must NOT be triggered from controller actions (which would only reach connections on the handling replica).
The backend code for SubscribeToFullStatsForProjectAndInvestigator already exists in StudyCollectionSubscriptionManager but was intentionally never wired from the frontend because it re-runs the heavy aggregation on every Study change. The new approach removes this bottleneck.
Revised implementation: Watch the pmProjectStats collection for changes to the current document:
public bool SubscribeToFullStatsForProjectAndInvestigator(
string connectionId, Guid projectId, Guid investigatorId,
Action<EntityUpdateNotification<FullStats>> notificationAction)
{
return CreateOrReplaceSubscriptionForKey(
FullStatsSubLookupKey(projectId, investigatorId), connectionId,
() => _pmUnitOfWork.ProjectStats.GetChangeStream(projectId, date: "current")
.Select(projectStats => projectStats.ToFullStats(investigatorId))
.DistinctUntilChanged()
.Select(fullStats => new EntityUpdateNotification<FullStats>(
projectId, DateTime.UtcNow, fullStats))
.RetryWithExponentialBackoff(...)
.Subscribe(notificationAction, exception => ...));
}
Frontend wiring needed: signal-r.service.ts already handles ProjectStatsNotification and already defines _subscribeToProjectStats() / _unsubscribeFromProjectStats() helpers. The missing piece is invoking those helpers alongside the existing project subscribe/unsubscribe flow when the current project changes.
Stats Audit Job¶
A MassTransit scheduled job that validates pre-calculated stats by comparing three independent sources within a snapshot transaction. Uses the term "stats audit" to avoid confusion with the domain concept of reconciliation.
Three-way comparison¶
| Source | How computed | What it catches |
|---|---|---|
| A: Stored | Read current doc from pmProjectStats |
— (this is what we're validating) |
| B: Legacy aggregation | Run existing StudyStats.GetFullProjectStatsAsync() |
If A != B: delta logic has a bug |
| C: Streamed classification | Stream studies in batches, call .Classify() on each, accumulate |
If B != C: aggregation pipeline diverges from C# classification logic |
All three are computed against a consistent point-in-time view using readConcern: "snapshot" with atClusterTime (MongoDB 5.0+, no multi-document transaction required). This avoids the 60-second transaction timeout limit on Atlas M20 and reduces WiredTiger cache pressure compared to holding a transaction open.
// Capture cluster time — all subsequent reads see this point in time
var clusterTime = await GetClusterTime();
var readConcern = new ReadConcern(ReadConcernLevel.Snapshot, atClusterTime: clusterTime);
// Source A: read current pre-calculated stats
var stored = await projectStatsRepo.GetCurrentAsync(projectId, readConcern);
// Source B: run legacy aggregation (sees same point in time)
var legacy = await unitOfWork.Studies.GetFullProjectStatsAsync(project, readConcern);
// Source C: stream + classify (sees same point in time)
var streamed = await ComputeByStreamingClassification(projectId, project, readConcern);
// Compare — no transaction to commit/abort
var drift = DetectDrift(stored, legacy, streamed);
// Correction write is a separate operation (not in the snapshot)
if (drift.HasDrift)
await projectStatsRepo.ReplaceCurrentAsync(projectId, legacy, project);
The snapshot window is 300 seconds (minSnapshotHistoryWindowInSeconds on Atlas, configurable) — far more headroom than the 60-second transaction limit. For a project with 50,000 studies, the entire audit takes ~10-15 seconds:
- MongoDB batch reads: ~1-5 seconds (100 batches of 500 studies)
- C# classification: ~1-2 seconds (microseconds per study, pure arithmetic)
- Legacy aggregation: ~2-5 seconds (single server-side operation)
Important: The audit only processes studies where PendingImportJobId is null and PendingDeletion is not true — pending studies are excluded from all three computations, ensuring the audit doesn't count studies that are not yet visible to users.
Staggered execution¶
Projects are divided into time slots based on projectId.GetHashCode() % TotalSlots. Each audit run processes only the batch assigned to the current slot, spreading load across the day. Every project is audited at least once per day.
Drift detection and alerting¶
When drift is detected:
- Correct: Overwrite
currentdocument with the legacy aggregation result (the trusted, proven source) - Record: Log drift details including which fields differed, magnitude, and all three values
- Alert (summarised, rate-limited):
- Summary email listing all affected projects, not per-project notifications
- Rate-limited to at most once per hour
- Create or update a GitHub issue (label:
stats-drift,automated) for tracking - If an open drift issue exists and is < 7 days old, add a comment; otherwise create new
Streamed classification (Source C)¶
private async Task<FullStats> ComputeStatsByStreamingClassification(
Guid projectId, Project project, ReadConcern readConcern)
{
const int batchSize = 500;
var accumulator = new StatsAccumulator(project);
var processed = 0;
var totalStudies = await pmUnitOfWork.Studies
.CountAvailableForProjectAsync(projectId, readConcern);
// CountAvailable excludes PendingImportJobId != null and PendingDeletion == true
while (processed < totalStudies)
{
var batch = await pmUnitOfWork.Studies
.GetAvailableBatchForProjectAsync(
projectId, skip: processed, take: batchSize, readConcern);
foreach (var study in batch)
{
var screeningClass = study.ScreeningInfo.Classify(project.ProjectAgreementThreshold);
accumulator.AddProjectScreening(screeningClass);
foreach (var membership in project.ActiveMemberships)
accumulator.AddMembershipScreening(study, membership, screeningClass);
foreach (var stage in project.Stages)
{
var annotationClass = study.ExtractionInfo.ClassifyAnnotation(
stage.Id, stage.SessionCountTarget);
accumulator.AddAnnotation(study, stage, annotationClass);
}
}
processed += batch.Count;
}
return accumulator.ToFullStats();
}
Daily Snapshot Job¶
Separate from the stats audit. Runs once at 02:00 UTC.
Sparse snapshot optimization¶
Snapshots are recorded whenever the current document differs from the most recent dated snapshot. In practice this means a new snapshot is created when:
- screening or annotation activity changes the stored counters/tallies
- project or stage settings change in a way that updates the persisted settings snapshot and recomputed derived counters
If neither the persisted stats nor their interpretation settings have changed since the previous snapshot, no new dated document is created. This is more efficient than blindly copying every day, and still preserves the semantic points-in-time needed for historical interpretation. The frontend fill-forwards the last known snapshot for days with no entry.
To distinguish "no snapshot because nothing changed" from "no snapshot because the job failed", the job maintains a lightweight tracking document per project:
// Snapshot tracking document — one per project, updated on every job run
{
projectId: CSUUID("..."),
date: "_snapshotTracking", // sentinel value, same collection
firstRunDate: "2026-03-01", // when tracking began for this project
failures: [ // only populated on errors (should be rare)
{ date: "2026-03-15", error: "timeout after 30s" }
],
lastRunDate: "2026-03-28", // last successful job run (changed or unchanged)
lastSnapshotDate: "2026-03-25" // last date a snapshot was actually recorded
}
Index: The existing { projectId: 1, date: 1 } compound index covers this document via the _snapshotTracking sentinel.
Interpretation rules (used by frontend and API):
- Date has a snapshot document → data changed that day, use it
- Date is between
firstRunDateandlastRunDate, no snapshot, not infailures→ job ran successfully, and neither the persisted stats nor the persisted settings snapshot changed since the previous snapshot (carry forward) - Date is in
failures→ job failed, state is unknown (can flag visually) - Date is before
firstRunDate→ no tracking existed yet
Job implementation¶
public class StatsDailySnapshotJobConsumer : IJobConsumer<ISnapshotProjectStatsCommand>
{
public async Task Run(JobContext<ISnapshotProjectStatsCommand> context)
{
var today = DateTime.UtcNow.ToString("yyyy-MM-dd");
var allProjectIds = await pmUnitOfWork.Projects.GetActiveProjectIds();
foreach (var projectId in allProjectIds)
{
try
{
var current = await projectStatsRepo.GetCurrentAsync(projectId);
if (current == null) continue;
var lastSnapshot = await projectStatsRepo.GetLatestSnapshotAsync(projectId);
var hasChanged = lastSnapshot == null
|| !current.StatsEqual(lastSnapshot);
if (hasChanged)
await projectStatsRepo.SnapshotCurrentAsync(projectId, today);
await projectStatsRepo.RecordSuccessfulRunAsync(projectId, today,
snapshotTaken: hasChanged);
}
catch (Exception ex)
{
await projectStatsRepo.RecordFailedRunAsync(projectId, today, ex.Message);
}
}
}
}
StatsEqual() compares the full historical snapshot state needed to interpret charts correctly:
- persisted settings snapshot
- tally dictionaries
- derived counter fields
Therefore, a settings-only change that affects interpretation still produces a new dated snapshot, even if there was no new review activity. Historical snapshots continue to preserve the settings that were active when that snapshot was recorded.
Historical API response shape¶
{
"snapshots": [ ... ],
"firstRunDate": "2026-03-01",
"failures": [
{ "date": "2026-03-15", "error": "timeout after 30s" }
]
}
snapshots— sparse array of datedProjectStatsdocuments (only days where data changed)firstRunDate— when snapshot tracking began, so the frontend knows the boundary of reliable datafailures— dates where the job failed, so the frontend can distinguish "unchanged" from "unknown"
The frontend fill-forwards the last known snapshot for missing dates when building area chart series.
Frontend Integration¶
Existing infrastructure (from PR #2469 refactoring)¶
PR #2469 creates clean extension points:
screening-area-series.builders.ts— placeholder history helpers exist; the top-level selector path still returnsnulluntil real screening history counts are wiredannotation-area-series.builders.ts— history-to-series transform helpers are implemented; the selector path still returnsnulluntil real annotation history counts are wiredselectScreeningHistoryCountsForCurrentProject— returnsnull, ready for real selectorselectAnnotationHistoryCountsForCurrentProject— returnsnull, ready for real selectorfullStatsSchemanormalizr schema already exists and handles all 4 stat entity typessignal-r.service.tsalready handlesProjectStatsNotification, dispatchesstageActions.receivedFullStats, and defines project-stats subscribe/unsubscribe helpers
Wiring steps¶
- Invoke the existing project-stats subscribe/unsubscribe helpers when project context changes to enable live current-stats SignalR push
- New ngrx entity
progressSnapshot+ actions + effect to call historical API endpoint - Replace selector stubs with real selectors reading from entity state
- Finish wiring the area series builders to consume real history data with fill-forward logic (see below)
- Enable area chart components (currently hidden because data is
null)
Frontend fill-forward for sparse snapshots¶
The historical API returns sparse data — only dates where stats changed. The area series builders must fill forward the last known snapshot for missing dates to produce a continuous Highcharts series. This is a small transform (~5 lines) in each builder:
// Given sparse snapshots, produce a daily series by carrying forward
function fillForward(
snapshots: { date: string; tallyCounts: TallyCounts }[],
from: string, to: string
): { date: string; tallyCounts: TallyCounts }[] {
const result: { date: string; tallyCounts: TallyCounts }[] = [];
let lastKnown: TallyCounts | null = null;
let snapshotIndex = 0;
for (let d = from; d <= to; d = nextDay(d)) {
if (snapshotIndex < snapshots.length && snapshots[snapshotIndex].date === d) {
lastKnown = snapshots[snapshotIndex].tallyCounts;
snapshotIndex++;
}
if (lastKnown) result.push({ date: d, tallyCounts: lastKnown });
}
return result;
}
Dates in the failures array can optionally be rendered with a visual indicator (e.g. a subtle gap or warning icon on the x-axis) to distinguish "unchanged" from "job failed".
Implementation Order¶
| Step | What | Why this order |
|---|---|---|
| 1 | ScreeningInfo.Classify() and ExtractionInfo.ClassifyAnnotation() + tests |
Foundation for delta logic; must be correct before anything else |
| 2 | ScreeningDelta and AnnotationDelta value objects |
Define the contract between Study entity and stats repository |
| 3 | Modify Study.AddScreening() and Study.AddSessionData() to return deltas |
DDD pattern — entity produces change description |
| 4 | ProjectStats document entity + IProjectStatsRepository + pmProjectStats collection |
Infrastructure for storing pre-calculated stats |
| 5 | ReviewDomainService with transaction wiring |
Atomic Study + Stats writes for screening and annotation |
| 6 | Two-phase study import (PendingImportJobId field, staged insert, atomic commit) |
Consistency guarantee for bulk uploads |
| 7 | Two-phase systematic search deletion (PendingDeletion field, batched delete with deltas) |
Consistency guarantee for bulk removals |
| 8 | Extend to remaining events (session save, reconciliation session) | Complete coverage of all stats-affecting operations |
| 9 | Settings change handler (recompute from tallies) | Fast reclassification without Study queries |
| 10 | Replace ProjectStatsResolver with collection read |
Immediate performance win for page loads |
| 11 | Stats audit job (three-way comparison, staggered, readConcern: "snapshot" with atClusterTime) |
Correctness safety net with drift alerting |
| 12 | Wire SubscribeToProjectFullStats to pmProjectStats change stream |
Real-time push via MongoDB backplane |
| 13 | Wire frontend to call SubscribeToProjectFullStats |
Enables SignalR stats push |
| 14 | Daily snapshot job (sparse — compare-and-skip, tracking document, failure recording) | Starts accumulating historical data |
| 15 | Historical API endpoint (sparse response with firstRunDate + failures) + frontend fill-forward + area chart wiring |
Trend visualisation |
Related Issues¶
Epic¶
| # | Title | Status |
|---|---|---|
| #1831 | MongoDB Stats Pre-calculation and Historical Tracking | Open |
Original milestone issues (may need updating to reflect revised design)¶
| # | Title | Points | Status |
|---|---|---|---|
| #1836 | Data Model Changes | — | Open |
| #1837 | Incremental Update Logic | 5 | Open |
| #1838 | Client-Side Integration | 3 | Open |
| #1839 | Basic Reconciliation Job | 3 | Open |
| #1840-#1850 | Milestones 2-4 issues | Various | Open |
Related PRs¶
| PR | Title | Status | Relevance |
|---|---|---|---|
| #2469 | refactor(web): extract and reorganize stage overview chart code | Open | Frontend extension points for this work |
| #1457 | Stats-overhaul | Merged (2024-02) | Previous stats rework |
| #2391 | fix(web): use deduplicated study count | Closed | Analysis of ProjectScreeningStats.Count vs frontend |
| #2454 | fix(pm): remove unused MongoDB indexes | Open | Production $indexStats analysis |
| #1749 | Improving project stats feature | Open | Stats page layout improvements |
Discussions¶
| # | Title | Category |
|---|---|---|
| #1822 | Improving Performance for Project Statistics | Ideas |
| #1823 | Progressive Implementation Plan: MongoDB Stats Pre-calculation | Feature Spec |