Coincidence Detection in Multi-Threaded Simulations#
This document describes the design of GateTimeSorter and
GateCoincidenceSorterActor and how they cooperate to perform coincidence
detection in a multi-threaded GATE simulation. It targets GATE developers
who want to understand, extend, or debug this subsystem.
Problem Statement#
In a multi-threaded simulation each worker thread runs an independent
event loop and produces its own per-thread digi stream. Those streams are
not globally time-sorted: a single EndOfEventAction call on thread A
may produce digis with a lower GlobalTime than digis that were produced
moments earlier on thread B.
Coincidence detection requires a single, globally time-ordered digi stream because a coincidence pair may consist of two singles originating from primary events simulated in different threads (random coincidences). Such a pair will never be found by an algorithm that only inspects one thread’s output at a time.
GateTimeSorter solves this by merging all per-thread digi streams into one
time-sorted stream. GateCoincidenceSorterActor consumes that stream to
identify coincidence pairs.
Component Overview#
GateTimeSorter(GateTimeSorter.h/.cpp)A self-contained merge-sorter. It is not a GATE actor but a helper object owned by
GateCoincidenceSorterActor. It manages several internal digi collections (see Data-Flow Diagram) and exposes three operations that callers use via the higher-level wrappersOnEndOfEventAction()andOnEndOfRunAction():Ingest()— copies the calling thread’s digis into a shared ingestion buffer. Always mutex-protected; deliberately minimal.Process()— sorts the buffered digis and drains the oldest ones to the output collection. Only one thread executes this at a time, and only the thread currently tracking the highestGlobalTimeis selected (see Phase 2 — Sorting and Output).Flush()— drains all remaining sorted digis at end-of-run, without applying the sorting window.
GateCoincidenceSorterActor(GateCoincidenceSorterActor.h/.cpp)A standard GATE actor. In
EndOfEventActionit passes a lambda toOnEndOfEventAction(). The time sorter first callsIngest()and, when conditions are met, callsProcess()followed by the lambda. The lambda callsProcessTimeSortedSingles()(copies new time-sorted digis into a local temporary storage) andDetectCoincidences()(scans that storage for coincidence pairs and writes them to the output collection).
Data-Flow Diagram#
Solid arrows show the forward data path. Dashed arrows (- ->) show
memory-reclamation paths.
+------------------ GateTimeSorter ----------------------------------+
| |
| Thread 0 Thread 1 ... Thread N |
| | | | |
| +--------- Ingest() [mutex] -+ |
| | |
| v |
| fIngestionBufferA <-- all threads write here |
| | |
| [buffer swap under mutex, inside Process()] |
| | |
| v |
| fIngestionBufferB <-- read only by Process() |
| | |
| [sort into min-heap] |
| | |
| v |
| fSortedCollectionA + fSortedIndicesA (min-heap) |
| | |
| [drain: most-recently-arrived time - oldest queued time |
| > fSortingWindow] |
| | |
| v |
| fOutputCollection |
| |
| - - Memory reclamation (Prune): - - |
| surviving digis: fSortedCollectionA - -> fSortedCollectionB, |
| fSortedCollectionA cleared, fSortedCollectionA <-> B swapped |
| |
+------------------------------+-------------------------------------+
|
ProcessTimeSortedSingles()
|
+------- GateCoincidenceSorterActor ---------------------------------+
| | |
| v |
| TemporaryStorage::digis (fCurrentStorage) |
| | |
| DetectCoincidences() |
| | |
| v |
| Coincidence output collection |
| |
| - - Memory reclamation (ClearProcessedSingles): - - |
| unprocessed digis: fCurrentStorage - -> fFutureStorage, |
| fCurrentStorage cleared, fCurrentStorage <-> fFutureStorage |
| swapped |
+--------------------------------------------------------------------+
GateTimeSorter: Design and Threading Model#
Phase 1 — Ingestion#
Every call to OnEndOfEventAction() begins with Ingest(). If the
calling thread’s input collection has no new digis, Ingest() returns false
and OnEndOfEventAction() returns immediately — an event with no digis
contributes nothing to the ingestion counter and requires no further work.
Otherwise, under the protection of fIngestionMutex, Ingest():
Iterates over all digis in the calling thread’s input collection.
Copies each digi into
fIngestionBufferAvia a pre-builtGateDigiAttributesFiller.Updates
fMaxGlobalTimePerThread[tid], a cache-line-padded atomicdouble(PaddedAtomicDouble) indexed by thread ID, to record the highestGlobalTimevalue seen by this thread so far.If the per-thread maximum actually increased, extends
fSortingWindowto cover the current spread ofGlobalTimeacross all threads (see The Sorting Window).
The critical section contains a sequential digi copy plus the (rare) sorting window extension. Threads therefore block each other for only a very short time.
Phase 2 — Sorting and Output#
After returning from Ingest(), OnEndOfEventAction() increments the
shared counter fNumIngestions (a relaxed atomic) and returns early unless
the counter has reached the threshold numIngestionsPerProcessCall (= 10).
This throttle avoids the overhead of sorting after every single ingestion.
Once the threshold is reached, the thread attempts to become the exclusive
processor by performing a compare-and-swap (CAS) on fProcessingOngoing:
bool expected = false;
fProcessingOngoing.compare_exchange_strong(
expected, true,
std::memory_order_acquire, // success ordering
std::memory_order_relaxed); // failure ordering
If the CAS fails — either because another thread already holds the token or the preliminary relaxed load showed it was taken — the calling thread simply returns and continues simulating. It is never blocked.
If the CAS succeeds, the thread checks whether it is the one currently
tracking the highest GlobalTime:
if (tid == fFastestThread.load()) {
Process();
work(); // actor lambda: ProcessTimeSortedSingles + DetectCoincidences
}
fProcessingOngoing.store(false, std::memory_order_release);
A thread that wins the CAS but is not the fastest one releases the token
immediately without calling Process(). The intent is that sorting work
migrates to whichever thread is furthest ahead in simulation time: the fastest
thread occasionally pauses to do sorting and coincidence detection work,
which gives the slower threads time to catch up, reducing GlobalTime
divergence.
Inside Process():
Buffer swap (under mutex).
fIngestionBufferAandfIngestionBufferBare pointer-swapped and the newly designatedfIngestionBufferAis cleared. This is the only point inProcess()wherefIngestionMutexis held. After the mutex is released, other threads can immediately resume ingesting into the cleared buffer, whileProcess()owns the data infIngestionBufferBexclusively (see Exclusive Ownership of fIngestionBufferB After the Swap).Sort. Digis from
fIngestionBufferBare iterated in arrival order. Each digi is appended tofSortedCollectionAand its (index, time) pair is pushed onto the min-heapfSortedIndicesA. The heap thereby provides a globally time-sorted view of all digis ingested so far.Drain. Digis are popped from the heap and copied to
fOutputCollectionas long as:fMostRecentTimeArrived - fSortedIndicesA.top().time > fSortingWindow
This sorting window guarantee ensures time-monotonicity in the output stream: no future ingestion can produce a digi with a low enough
GlobalTimeto be inserted before anything already drained.Fastest-thread re-evaluation. Because the current thread has been doing sorting and coincidence detection work since it was identified as the fastest, it may no longer hold that distinction.
IdentifyFastestThread()refreshesfFastestThreadso that subsequent processing calls are routed to the correct thread.
The Sorting Window#
GlobalTime in the input stream is not guaranteed to increase
monotonically. Sources of non-monotonicity include:
the travel time of a secondary particle from its creation point to the detector,
an upstream
DigitizerBlurringActorwithblur_attribute="GlobalTime",GlobalTimedivergence between worker threads.
fSortingWindow is the minimum GlobalTime gap required between the
oldest unsorted digi and the most recently arrived digi before the oldest
digi is considered safe to emit. A user-specified minimum
fMinimumSortingWindow provides the baseline. In multi-threaded mode the
window is automatically extended inside Ingest(), whenever the ingesting
thread observes a new maximum GlobalTime difference across threads:
fSortingWindow = max(fSortingWindow,
fMinimumSortingWindow
+ max(fMaxGlobalTimePerThread)
- min(fMaxGlobalTimePerThread))
The window grows monotonically and never shrinks, so a temporary burst of thread divergence leaves a permanent safety margin.
End of Run — Flush#
OnEndOfRunAction() atomically decrements fNumActiveWorkingThreads.
The thread that takes the counter to zero is the last active worker; it calls
Process() and Flush(), which drains all remaining digis from
fSortedIndicesA into fOutputCollection without applying the sorting
window, since no further digis will arrive.
It then calls lastThreadWork(). Every thread subsequently calls
anyThreadWork(). Before OnEndOfRunAction() returns,
each thread calls MarkThreadAsFinished(), which zeroes its
fMaxGlobalTimePerThread entry so it is no longer eligible to be selected
as the fastest thread.
Correctness of the Lock-Free Processing Path#
After the buffer swap inside Process(), the remainder of that function
reads and writes fIngestionBufferB, fSortedCollectionA,
fSortedIndicesA, fOutputCollection, fMostRecentTimeArrived,
fMostRecentTimeDeparted, and fFastestThread
without holding any mutex, and reads fSortingWindow via an atomic
load (fSortingWindow is std::atomic<double> because Ingest()
writes to it concurrently). This section explains why that is safe.
Exclusive Ownership of fIngestionBufferB After the Swap#
Process() is the only function that swaps fIngestionBufferA and
fIngestionBufferB. After the swap (performed under fIngestionMutex):
fIngestionBufferAnow points to the previously empty buffer. All subsequentIngest()calls will write there, becauseIngest()looks up its filler using the current pointer value offIngestionBufferAas the map key — the lookup therefore automatically redirects to the newly designated ingestion buffer.fIngestionBufferBnow points to the buffer that held the accumulated digis. No other code path writes to this buffer until the next call toProcess().
The mutex release inside Process() happens-before the mutex acquire inside
the next Ingest() call, guaranteeing that every subsequent Ingest()
observes the post-swap pointer values and writes into the correct buffer.
Serialisation of Process() Invocations#
The CAS on fProcessingOngoing acts as a lightweight, spinless critical
section around the body of Process():
The acquire ordering on a successful CAS establishes a happens-before relationship with the release store at the end of the previous
Process()invocation. All writes made by that invocation tofSortedCollectionA,fSortedIndicesA,fOutputCollection,fMostRecentTimeArrived,fMostRecentTimeDeparted, andfFastestThreadare therefore visible to the current invocation, even though no mutex guards those fields directly. (fSortingWindowis excluded from this list because it is an atomic written byIngest()independently of the CAS.)The release store at the end of
Process()makes all writes of the current invocation visible to the next thread that successfully acquires the token.
Because the CAS is an atomic read-modify-write, at most one thread can succeed
at a time, so all Process() invocations are globally serialised.
The actor lambda (ProcessTimeSortedSingles and DetectCoincidences) is
also serialised by this mechanism, because it is only ever called from within
the CAS-protected block. The actor-side fields fCurrentStorage,
fFutureStorage, and fIterPosition are therefore safe to access without
additional synchronisation.
Non-Processing Threads Are Never Blocked#
A thread that fails the CAS — or that wins it but is not the fastest thread
and immediately releases the token — returns from OnEndOfEventAction()
and continues simulating without ever waiting on a lock or spinning. The
entire fast path (ingestion counter increment → preliminary flag check → CAS
attempt → early return) is wait-free: it completes in a bounded number of
steps regardless of contention.
Visibility of fMaxGlobalTimePerThread#
fMaxGlobalTimePerThread is an array of PaddedAtomicDouble values, each
padded to one cache-line (64 bytes) to prevent false sharing. Each thread
writes only to its own element inside Ingest(). Ingest() also reads
all elements (under fIngestionMutex) to compute the sorting window
extension. Process() reads all elements to re-evaluate the fastest thread.
Concurrent access is safe because:
Per-thread ownership of writes. Each element is written by exactly one thread (its owner), so there are no write-write races.
Read ordering for window extension. The reads of all elements in
Ingest()happen underfIngestionMutex, which serialises concurrentIngest()calls. The resultingfSortingWindowupdate is an atomic store, immediately visible to any subsequent atomic load inProcess().Benign races for fastest-thread selection.
Ingest()can run concurrently withProcess()after the buffer swap. A load offMaxGlobalTimePerThread[tid]insideIdentifyFastestThread()may therefore see a value that is one ingestion stale. This is harmless: fastest-thread selection is heuristic and routing processing to a slightly suboptimal thread does not affect correctness.
GateCoincidenceSorterActor: Consuming the Sorted Stream#
BeginOfRunActionMasterThread creates and configures a GateTimeSorter
(sorting window = fSortingTime) and two TemporaryStorage objects
(fCurrentStorage and fFutureStorage). Each TemporaryStorage holds
a GateDigiCollection of time-sorted singles together with the
GlobalTime of the earliest and latest digi it currently contains.
At each EndOfEventAction, the actor passes a lambda to
fTimeSorter->OnEndOfEventAction(). When the time sorter decides that the
current thread should do processing, the lambda executes two steps:
- ProcessTimeSortedSingles()
Iterates over the new digis in
fOutputCollectionusingfTimeSorter->OutputIterator()(which tracks the position of the last processed digi) and appends each digi tofCurrentStorage->digis, updatingearliestTimeandlatestTime.MarkOutputAsProcessed()then advances the iterator so the same digis are not revisited.- DetectCoincidences()
Scans
fCurrentStorage->digisstarting atfIterPosition(the first not-yet-processed single). For each opening single at indexi0with timet0it searches forward for singles whoseGlobalTimefalls in the coincidence window:[t0 + fWindowOffset, t0 + fWindowOffset + fWindowSize]
and whose detector volume (at depth
fGroupVolumeDepth) differs from that of the opening single. Accepted coincidence pairs, after applying the configuredMultiplesPolicy, are written to the coincidence output collection.The outer loop terminates once the
GlobalTimespan currently held infCurrentStorageis too small to guarantee that all possible coincidence partners of the current opening single have been seen — i.e., when:latestTime - earliestTime <= fWindowSize + fWindowOffset
This condition ensures that
DetectCoincidences()never emits a coincidence that is later found to be incomplete due to a missing single that had not yet arrived.
At EndOfRunAction, OnEndOfRunAction ensures Flush() is called by
the last remaining thread. The last-thread lambda then calls
ProcessTimeSortedSingles followed by DetectCoincidences(true) (the
lastCall = true argument relaxes the termination condition to drain all
remaining singles).
Thread Lifecycle Summary#
Simulation phase |
Thread |
Actions and synchronisation |
|---|---|---|
|
Master only |
Creates |
|
All workers |
|
|
All workers |
Decrement |