Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ntpl014_framework.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_ntuple
3/// \notebook
4///
5/// Example of framework usage for writing RNTuples:
6/// 1. Creation of (bare) RNTupleModels and RFieldTokens.
7/// 2. Creation of RNTupleWriter and RNTupleParallelWriter when appending to a single TFile.
8/// 3. Creation of RNTupleFillContext and RRawPtrWriteEntry per thread, and usage of BindRawPtr.
9/// 4. Usage of FillNoFlush(), RNTupleFillStatus::ShouldFlushCluster(), FlushColumns(), and FlushCluster().
10///
11/// Please note that this tutorial has very simplified versions of classes that could be found in a framework, such as
12/// DataProduct, FileService, ParallelOutputter, and SerializingOutputter. They try to mimick the usage in a framework
13/// (for example, Outputters are agnostic of the data written, which is encapsulated in std::vector<DataProduct>), but
14/// are not meant for production usage!
15///
16/// Also note that this tutorial uses std::thread and std::mutex directly instead of a task scheduling library such as
17/// Threading Building Blocks (TBB). For that reason, turning on ROOT's implicit multithreading (IMT) would not be very
18/// efficient with the simplified code in this tutorial because a thread blocking to acquire a std::mutex cannot "help"
19/// the other thread that is currently in the critical section by executing its tasks. If that is wanted, the framework
20/// should use synchronization methods provided by TBB directly (which goes beyond the scope of this tutorial).
21///
22/// \macro_code
23///
24/// \date September 2024
25/// \author The ROOT Team
26
27// NOTE: The RNTupleParallelWriter, RNTupleFillContext, and RRawPtrWriteEntry are experimental at this point.
28// Functionality and interface are still subject to changes.
29
31#include <ROOT/RFieldToken.hxx>
34#include <ROOT/RNTupleModel.hxx>
38
39#include <cassert>
40#include <cstddef> // for std::size_t
41#include <cstdint> // for std::uint32_t
42#include <functional> // for std::ref
43#include <memory>
44#include <mutex>
45#include <random>
46#include <string>
47#include <string_view>
48#include <thread>
49#include <utility> // for std::pair
50#include <vector>
51
52// Import classes from Experimental namespace for the time being
56
57using ModelTokensPair = std::pair<std::unique_ptr<ROOT::RNTupleModel>, std::vector<ROOT::RFieldToken>>;
58
59// A DataProduct associates an arbitrary address to an index in the model.
60struct DataProduct {
61 std::size_t index;
62 const void *address;
63
64 DataProduct(std::size_t i, const void *a) : index(i), address(a) {}
65};
66
67// The FileService opens a TFile and provides synchronization.
68class FileService {
69 std::unique_ptr<TFile> fFile;
70 std::mutex fMutex;
71
72public:
73 FileService(std::string_view url, std::string_view options = "")
74 {
75 fFile.reset(TFile::Open(std::string(url).c_str(), std::string(options).c_str()));
76 // The file is automatically closed when destructing the std::unique_ptr.
77 }
78
79 TFile &GetFile() { return *fFile; }
80 std::mutex &GetMutex() { return fMutex; }
81};
82
83// An Outputter provides the interface to fill DataProducts into an RNTuple.
84class Outputter {
85public:
86 virtual ~Outputter() = default;
87
88 virtual void InitSlot(unsigned slot) = 0;
89 virtual void Fill(unsigned slot, const std::vector<DataProduct> &products) = 0;
90};
91
92// A ParallelOutputter uses an RNTupleParallelWriter to append an RNTuple to a TFile.
93class ParallelOutputter final : public Outputter {
95 std::unique_ptr<RNTupleParallelWriter> fParallelWriter;
96 std::vector<ROOT::RFieldToken> fTokens;
97
98 struct SlotData {
99 std::shared_ptr<RNTupleFillContext> fillContext;
100 std::unique_ptr<RRawPtrWriteEntry> entry;
101 };
102 std::vector<SlotData> fSlots;
103
104public:
106 const ROOT::RNTupleWriteOptions &options)
108 {
109 auto &model = modelTokens.first;
110
111 std::lock_guard g(fileService.GetMutex());
112 fParallelWriter = RNTupleParallelWriter::Append(std::move(model), ntupleName, fFileService.GetFile(), options);
113 }
114
115 void InitSlot(unsigned slot) final
116 {
117 if (slot >= fSlots.size()) {
118 fSlots.resize(slot + 1);
119 }
120 // Create an RNTupleFillContext and RRawPtrWriteEntry that are used for all fills from this slot.
121 fSlots[slot].fillContext = fParallelWriter->CreateFillContext();
122 fSlots[slot].entry = fSlots[slot].fillContext->GetModel().CreateRawPtrWriteEntry();
123 }
124
125 void Fill(unsigned slot, const std::vector<DataProduct> &products) final
126 {
127 assert(slot < fSlots.size());
128 auto &fillContext = *fSlots[slot].fillContext;
129 auto &entry = *fSlots[slot].entry;
130
131 // Use the field tokens to bind the products' raw pointers.
132 for (auto &&product : products) {
133 entry.BindRawPtr(fTokens[product.index], product.address);
134 }
135
136 // Fill the entry without triggering an implicit flush.
138 fillContext.FillNoFlush(entry, status);
139 if (status.ShouldFlushCluster()) {
140 // If we are asked to flush, first try to do as much work as possible outside of the critical section:
141 // FlushColumns() will flush column data and trigger compression, but not actually write to storage.
142 // (A framework may of course also decide to flush more often.)
143 fillContext.FlushColumns();
144
145 {
146 // FlushCluster() will flush data to the underlying TFile, so it requires synchronization.
147 std::lock_guard g(fFileService.GetMutex());
148 fillContext.FlushCluster();
149 }
150 }
151 }
152};
153
154// A SerializingOutputter uses a sequential RNTupleWriter to append an RNTuple to a TFile and a std::mutex to
155// synchronize multiple threads. Note that ROOT's implicit multithreading would not be very efficient with this
156// implementation because a thread blocking to acquire a std::mutex cannot "help" the other thread that is currently
157// in the critical section by executing its tasks. See also the note at the top of the file.
158class SerializingOutputter final : public Outputter {
160 std::unique_ptr<ROOT::RNTupleWriter> fWriter;
161 std::mutex fWriterMutex;
162 std::vector<ROOT::RFieldToken> fTokens;
163
164 struct SlotData {
165 std::unique_ptr<RRawPtrWriteEntry> entry;
166 };
167 std::vector<SlotData> fSlots;
168
169public:
171 const ROOT::RNTupleWriteOptions &options)
173 {
174 auto &model = modelTokens.first;
175
176 std::lock_guard g(fileService.GetMutex());
177 fWriter = ROOT::RNTupleWriter::Append(std::move(model), ntupleName, fileService.GetFile(), options);
178 }
179
180 void InitSlot(unsigned slot) final
181 {
182 if (slot >= fSlots.size()) {
183 fSlots.resize(slot + 1);
184 }
185 // Create an RRawPtrWriteEntry that is used for all fills from this slot.
186 fSlots[slot].entry = fWriter->GetModel().CreateRawPtrWriteEntry();
187 }
188
189 void Fill(unsigned slot, const std::vector<DataProduct> &products) final
190 {
191 assert(slot < fSlots.size());
192 auto &entry = *fSlots[slot].entry;
193
194 // Use the field tokens to bind the products' raw pointers.
195 for (auto &&product : products) {
196 entry.BindRawPtr(fTokens[product.index], product.address);
197 }
198
199 {
200 // Fill the entry without triggering an implicit flush. This requires synchronization with other threads using
201 // the same writer, but not (yet) with the underlying TFile.
202 std::lock_guard g(fWriterMutex);
204 fWriter->FillNoFlush(entry, status);
205 if (status.ShouldFlushCluster()) {
206 // If we are asked to flush, first try to do as much work as possible outside of the critical section:
207 // FlushColumns() will flush column data and trigger compression, but not actually write to storage.
208 // (A framework may of course also decide to flush more often.)
209 fWriter->FlushColumns();
210
211 {
212 // FlushCluster() will flush data to the underlying TFile, so it requires synchronization.
213 std::lock_guard g(fFileService.GetMutex());
214 fWriter->FlushCluster();
215 }
216 }
217 }
218 }
219};
220
221// === END OF TUTORIAL FRAMEWORK CODE ===
222
223// Simple structs to store events
224struct Track {
225 float eta;
226 float mass;
227 float pt;
228 float phi;
229};
230
231struct ChargedTrack : public Track {
232 std::int8_t charge;
233};
234
235struct Event {
236 std::uint32_t eventId;
237 std::uint32_t runId;
238 std::vector<ChargedTrack> electrons;
239 std::vector<Track> photons;
240 std::vector<ChargedTrack> muons;
241};
242
243// RNTupleModel for Events; in a real framework, this would likely be dynamic.
245{
246 // We recommend creating a bare model if the default entry is not used.
247 auto model = ROOT::RNTupleModel::CreateBare();
248 // For more efficient access, also create field tokens.
249 std::vector<ROOT::RFieldToken> tokens;
250
251 model->MakeField<decltype(Event::eventId)>("eventId");
252 tokens.push_back(model->GetToken("eventId"));
253
254 model->MakeField<decltype(Event::runId)>("runId");
255 tokens.push_back(model->GetToken("runId"));
256
257 model->MakeField<decltype(Event::electrons)>("electrons");
258 tokens.push_back(model->GetToken("electrons"));
259
260 model->MakeField<decltype(Event::photons)>("photons");
261 tokens.push_back(model->GetToken("photons"));
262
263 model->MakeField<decltype(Event::muons)>("muons");
264 tokens.push_back(model->GetToken("muons"));
265
266 return {std::move(model), std::move(tokens)};
267}
268
269// DataProducts with addresses that point into the Event object.
270std::vector<DataProduct> CreateEventDataProducts(Event &event)
271{
272 std::vector<DataProduct> products;
273 // The indices have to match the order of std::vector<ROOT::RFieldToken> above.
274 products.emplace_back(0, &event.eventId);
275 products.emplace_back(1, &event.runId);
276 products.emplace_back(2, &event.electrons);
277 products.emplace_back(3, &event.photons);
278 products.emplace_back(4, &event.muons);
279 return products;
280}
281
282// Simple struct to store runs
283struct Run {
284 std::uint32_t runId;
285 std::uint32_t nEvents;
286};
287
288// RNTupleModel for Runs; in a real framework, this would likely be dynamic.
290{
291 // We recommend creating a bare model if the default entry is not used.
292 auto model = ROOT::RNTupleModel::CreateBare();
293 // For more efficient access, also create field tokens.
294 std::vector<ROOT::RFieldToken> tokens;
295
296 model->MakeField<decltype(Run::runId)>("runId");
297 tokens.push_back(model->GetToken("runId"));
298
299 model->MakeField<decltype(Run::nEvents)>("nEvents");
300 tokens.push_back(model->GetToken("nEvents"));
301
302 return {std::move(model), std::move(tokens)};
303}
304
305// DataProducts with addresses that point into the Run object.
306std::vector<DataProduct> CreateRunDataProducts(Run &run)
307{
308 std::vector<DataProduct> products;
309 // The indices have to match the order of std::vector<ROOT::RFieldToken> above.
310 products.emplace_back(0, &run.runId);
311 products.emplace_back(1, &run.nEvents);
312 return products;
313}
314
315constexpr unsigned kNRunsPerThread = 100;
316constexpr unsigned kMeanNEventsPerRun = 400;
317constexpr unsigned kStddevNEventsPerRun = 100;
318constexpr unsigned kMeanNTracks = 5;
319
321{
322 std::mt19937 gen(threadId);
323 std::normal_distribution<double> nEventsDist(kMeanNEventsPerRun, kStddevNEventsPerRun);
324 std::poisson_distribution<> nTracksDist(kMeanNTracks);
325 std::uniform_real_distribution<float> floatDist;
326
327 for (std::uint32_t runId = threadId * kNRunsPerThread; runId < (threadId + 1) * kNRunsPerThread; runId++) {
328 double nEventsD = nEventsDist(gen);
329 std::uint32_t nEvents = 0;
330 if (nEventsD > 0) {
331 nEvents = static_cast<std::uint32_t>(nEventsD);
332 }
333
334 // Process events, reusing a single Event object.
335 Event event;
336 event.runId = runId;
338 for (std::uint32_t eventId = 0; eventId < nEvents; eventId++) {
339 event.eventId = eventId;
340
341 // Produce some data; eta, phi, and pt are just filled with uniformly distributed data.
342 event.electrons.resize(nTracksDist(gen));
343 for (auto &electron : event.electrons) {
344 electron.eta = floatDist(gen);
345 electron.mass = 0.511 /* MeV */;
346 electron.phi = floatDist(gen);
347 electron.pt = floatDist(gen);
348 electron.charge = (gen() % 2 ? 1 : -1);
349 }
350 event.photons.resize(nTracksDist(gen));
351 for (auto &photon : event.photons) {
352 photon.eta = floatDist(gen);
353 photon.mass = 0;
354 photon.phi = floatDist(gen);
355 photon.pt = floatDist(gen);
356 }
357 event.muons.resize(nTracksDist(gen));
358 for (auto &muon : event.muons) {
359 muon.eta = floatDist(gen);
360 muon.mass = 105.658 /* MeV */;
361 muon.phi = floatDist(gen);
362 muon.pt = floatDist(gen);
363 muon.charge = (gen() % 2 ? 1 : -1);
364 }
365
367 }
368
369 // Fill the Run data.
370 Run run;
371 run.runId = runId;
372 run.nEvents = nEvents;
373
376 }
377}
378
379constexpr unsigned kNThreads = 4;
380
382{
383 FileService fileService("ntpl014_framework.root", "RECREATE");
384
386 // Parallel writing requires buffered writing; force it on (even if it is the default).
387 options.SetUseBufferedWrite(true);
388 // For demonstration purposes, reduce the cluster size to 2 MiB.
389 options.SetApproxZippedClusterSize(2 * 1024 * 1024);
391
392 // SerializingOutputter also relies on buffered writing; force it on (even if it is the default).
393 options.SetUseBufferedWrite(true);
394 // For demonstration purposes, reduce the cluster size for the very simple Run data to 1 KiB.
395 options.SetApproxZippedClusterSize(1024);
397
398 // Initialize slots in the two Outputters.
399 for (unsigned i = 0; i < kNThreads; i++) {
400 eventOutputter.InitSlot(i);
401 runOutputter.InitSlot(i);
402 }
403
404 std::vector<std::thread> threads;
405 for (unsigned i = 0; i < kNThreads; i++) {
406 threads.emplace_back(ProcessRunsAndEvents, i, std::ref(eventOutputter), std::ref(runOutputter));
407 }
408 for (unsigned i = 0; i < kNThreads; i++) {
409 threads[i].join();
410 }
411}
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
A container of const raw pointers, corresponding to a row in the data set.
A context for filling entries (data) into clusters of an RNTuple.
A writer to fill an RNTuple from multiple contexts.
A status object after filling an entry.
bool ShouldFlushCluster() const
Return true if the caller should call FlushCluster.
static std::unique_ptr< RNTupleModel > CreateBare()
Creates a "bare model", i.e. an RNTupleModel with no default entry.
Common user-tunable settings for storing RNTuples.
void SetApproxZippedClusterSize(std::size_t val)
static std::unique_ptr< RNTupleWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Throws an exception if the model is null.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:131
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4131
TPaveText * pt
double product(double const *factors, std::size_t nFactors)
Definition MathFuncs.h:92