Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleDS.cxx
Go to the documentation of this file.
1/// \file RNTupleDS.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \author Enrico Guiraud <enrico.guiraud@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
15#include <ROOT/RDataFrame.hxx>
16#include <ROOT/RField.hxx>
17#include <ROOT/RFieldUtils.hxx>
20#include <ROOT/RNTupleDS.hxx>
21#include <ROOT/RNTupleUtil.hxx>
22#include <ROOT/RPageStorage.hxx>
23#include <string_view>
24
25#include <TError.h>
26#include <TSystem.h>
27
28#include <cassert>
29#include <memory>
30#include <mutex>
31#include <string>
32#include <vector>
33#include <typeinfo>
34#include <utility>
35
36// clang-format off
37/**
38* \class ROOT::RDF::RNTupleDS
39* \ingroup dataframe
40* \brief The RDataSource implementation for RNTuple. It lets RDataFrame read RNTuple data.
41*
42* An RDataFrame that reads RNTuple data can be constructed using FromRNTuple().
43*
44* For each column containing an array or a collection, a corresponding column `#colname` is available to access
45* `colname.size()` without reading and deserializing the collection values.
46*
47**/
48// clang-format on
49
50namespace ROOT::Internal::RDF {
51/// An artificial field that transforms an RNTuple column that contains the offset of collections into
52/// collection sizes. It is used to provide the "number of" RDF columns for collections, e.g.
53/// `R_rdf_sizeof_jets` for a collection named `jets`.
54///
55/// This field owns the collection offset field but instead of exposing the collection offsets it exposes
56/// the collection sizes (offset(N+1) - offset(N)). For the time being, we offer this functionality only in RDataFrame.
57/// TODO(jblomer): consider providing a general set of useful virtual fields as part of RNTuple.
59protected:
60 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view /* newName */) const final
61 {
62 return std::make_unique<RRDFCardinalityField>();
63 }
64 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
65
66public:
67 RRDFCardinalityField() : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kLeaf, false /* isSimple */) {}
70 ~RRDFCardinalityField() override = default;
71
81 // Field is only used for reading
82 void GenerateColumns() final { assert(false && "Cardinality fields must only be used for reading"); }
87
88 size_t GetValueSize() const final { return sizeof(std::size_t); }
89 size_t GetAlignment() const final { return alignof(std::size_t); }
90
91 /// Get the number of elements of the collection identified by globalIndex
99
100 /// Get the number of elements of the collection identified by clusterIndex
108};
109
110/**
111 * @brief An artificial field that provides the size of a fixed-size array
112 *
113 * This is the implementation of `R_rdf_sizeof_column` in case `column` contains
114 * fixed-size arrays on disk.
115 */
117private:
118 std::size_t fArrayLength;
119
120 std::unique_ptr<ROOT::RFieldBase> CloneImpl(std::string_view) const final
121 {
122 return std::make_unique<RArraySizeField>(fArrayLength);
123 }
124 void GenerateColumns() final { assert(false && "RArraySizeField fields must only be used for reading"); }
126 void ReadGlobalImpl(ROOT::NTupleSize_t /*globalIndex*/, void *to) final
127 {
128 *static_cast<std::size_t *>(to) = fArrayLength;
129 }
130 void ReadInClusterImpl(RNTupleLocalIndex /*localIndex*/, void *to) final
131 {
132 *static_cast<std::size_t *>(to) = fArrayLength;
133 }
134
135public:
137 : ROOT::RFieldBase("", "std::size_t", ROOT::ENTupleStructure::kLeaf, false /* isSimple */),
139 {
140 }
146
147 void ConstructValue(void *where) const final { *static_cast<std::size_t *>(where) = 0; }
148 std::size_t GetValueSize() const final { return sizeof(std::size_t); }
149 std::size_t GetAlignment() const final { return alignof(std::size_t); }
150};
151
152/// Every RDF column is represented by exactly one RNTuple field
156
157 RNTupleDS *fDataSource; ///< The data source that owns this column reader
158 RFieldBase *fProtoField; ///< The prototype field from which fField is cloned
159 std::unique_ptr<RFieldBase> fField; ///< The field backing the RDF column
160 std::unique_ptr<RFieldBase::RValue> fValue; ///< The memory location used to read from fField
161 std::shared_ptr<void> fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources
162 Long64_t fLastEntry = -1; ///< Last entry number that was read
163 /// For chains, the logical entry and the physical entry in any particular file can be different.
164 /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the corresponding
165 /// data source was opened.
167
168public:
170 ~RNTupleColumnReader() override = default;
171
172 /// Connect the field and its subfields to the page source
174 {
175 assert(fLastEntry == -1);
177
178 // Create a new, real field from the prototype and set its field ID in the context of the given page source
180 {
181 auto descGuard = source.GetSharedDescriptorGuard();
182 // Set the on-disk field IDs for the field and the subfield
183 fField->SetOnDiskId(
185 auto iProto = fProtoField->cbegin();
186 auto iReal = fField->begin();
187 for (; iReal != fField->end(); ++iProto, ++iReal) {
188 iReal->SetOnDiskId(descGuard->FindFieldId(fDataSource->fFieldId2QualifiedName.at(iProto->GetOnDiskId())));
189 }
190 }
191
192 try {
194 } catch (const ROOT::RException &err) {
195 auto onDiskType = source.GetSharedDescriptorGuard()->GetFieldDescriptor(fField->GetOnDiskId()).GetTypeName();
196 std::string msg = "RNTupleDS: invalid type \"" + fField->GetTypeName() + "\" for column \"" +
197 fDataSource->fFieldId2QualifiedName[fField->GetOnDiskId()] + "\" with on-disk type \"" +
198 onDiskType + "\"";
199 throw std::runtime_error(msg);
200 }
201
202 if (fValuePtr) {
203 // When the reader reconnects to a new file, the fValuePtr is already set
204 fValue = std::make_unique<RFieldBase::RValue>(fField->BindValue(fValuePtr));
205 fValuePtr = nullptr;
206 } else {
207 // For the first file, create a new object for this field (reader)
208 fValue = std::make_unique<RFieldBase::RValue>(fField->CreateValue());
209 }
210 }
211
213 {
214 if (fValue && keepValue) {
215 fValuePtr = fValue->GetPtr<void>();
216 }
217 fValue = nullptr;
218 fField = nullptr;
219 fLastEntry = -1;
220 }
221
222 void *GetImpl(Long64_t entry) final
223 {
224 if (entry != fLastEntry) {
225 fValue->Read(entry - fEntryOffset);
227 }
228 return fValue->GetPtr<void>().get();
229 }
230};
231} // namespace ROOT::Internal::RDF
232
234
236 ROOT::DescriptorId_t fieldId, std::vector<RNTupleDS::RFieldInfo> fieldInfos,
237 bool convertToRVec)
238{
239 // As an example for the mapping of RNTuple fields to RDF columns, let's consider an RNTuple
240 // using the following types and with a top-level field named "event" of type Event:
241 //
242 // struct Event {
243 // int id;
244 // std::vector<Track> tracks;
245 // };
246 // struct Track {
247 // std::vector<Hit> hits;
248 // };
249 // struct Hit {
250 // float x;
251 // float y;
252 // };
253 //
254 // AddField() will be called from the constructor with the RNTuple root field (ENTupleStructure::kRecord).
255 // From there, we recurse into the "event" sub field (also ENTupleStructure::kRecord) and further down the
256 // tree of sub fields and expose the following RDF columns:
257 //
258 // "event" [Event]
259 // "event.id" [int]
260 // "event.tracks" [RVec<Track>]
261 // "R_rdf_sizeof_event.tracks" [unsigned int]
262 // "event.tracks.hits" [RVec<RVec<Hit>>]
263 // "R_rdf_sizeof_event.tracks.hits" [RVec<unsigned int>]
264 // "event.tracks.hits.x" [RVec<RVec<float>>]
265 // "R_rdf_sizeof_event.tracks.hits.x" [RVec<unsigned int>]
266 // "event.tracks.hits.y" [RVec<RVec<float>>]
267 // "R_rdf_sizeof_event.tracks.hits.y" [RVec<unsigned int>]
268
269 const auto &fieldDesc = desc.GetFieldDescriptor(fieldId);
270 const auto &nRepetitions = fieldDesc.GetNRepetitions();
271 if ((fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) || (nRepetitions > 0)) {
272 // The field is a collection or a fixed-size array.
273 // We open a new collection scope with fieldID being the inner most collection. E.g. for "event.tracks.hits",
274 // fieldInfos would already contain the fieldID of "event.tracks"
275 fieldInfos.emplace_back(fieldId, nRepetitions);
276 }
277
278 if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kCollection) {
279 // Inner fields of collections are provided as projected collections of only that inner field,
280 // E.g. we provide a projected collection RVec<RVec<float>> for "event.tracks.hits.x" in the example
281 // above.
283 convertToRVec && (fieldDesc.GetTypeName().substr(0, 19) == "ROOT::VecOps::RVec<" ||
284 fieldDesc.GetTypeName().substr(0, 12) == "std::vector<" || fieldDesc.GetTypeName() == "");
285 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
287
288 // Note that at the end of the recursion, we handled the inner sub collections as well as the
289 // collection as whole, so we are done.
290 return;
291
292 } else if (nRepetitions > 0) {
293 // Fixed-size array, same logic as ROOT::RVec.
294 const auto &f = *desc.GetFieldIterable(fieldDesc.GetId()).begin();
295 AddField(desc, colName, f.GetId(), fieldInfos);
296 return;
297 } else if (fieldDesc.GetStructure() == ROOT::ENTupleStructure::kRecord) {
298 // Inner fields of records are provided as individual RDF columns, e.g. "event.id"
299 for (const auto &f : desc.GetFieldIterable(fieldDesc.GetId())) {
300 auto innerName = colName.empty() ? f.GetFieldName() : (std::string(colName) + "." + f.GetFieldName());
301 // Inner fields of collections of records are always exposed as ROOT::RVec
302 AddField(desc, innerName, f.GetId(), fieldInfos);
303 }
304 }
305
306 // The fieldID could be the root field or the class of fieldId might not be loaded.
307 // In these cases, only the inner fields are exposed as RDF columns.
308 auto fieldOrException = ROOT::RFieldBase::Create(fieldDesc.GetFieldName(), fieldDesc.GetTypeName());
309 if (!fieldOrException)
310 return;
311 auto valueField = fieldOrException.Unwrap();
312 valueField->SetOnDiskId(fieldId);
313 for (auto &f : *valueField) {
314 f.SetOnDiskId(desc.FindFieldId(f.GetFieldName(), f.GetParent()->GetOnDiskId()));
315 }
316 std::unique_ptr<ROOT::RFieldBase> cardinalityField;
317 // Collections get the additional "number of" RDF column (e.g. "R_rdf_sizeof_tracks")
318 if (!fieldInfos.empty()) {
319 const auto &info = fieldInfos.back();
320 if (info.fNRepetitions > 0) {
321 cardinalityField = std::make_unique<ROOT::Internal::RDF::RArraySizeField>(info.fNRepetitions);
322 } else {
323 cardinalityField = std::make_unique<ROOT::Internal::RDF::RRDFCardinalityField>();
324 }
325 cardinalityField->SetOnDiskId(info.fFieldId);
326 }
327
328 for (auto i = fieldInfos.rbegin(); i != fieldInfos.rend(); ++i) {
329 const auto &fieldInfo = *i;
330
331 if (fieldInfo.fNRepetitions > 0) {
332 // Fixed-size array, read it as ROOT::RVec in memory
333 valueField = std::make_unique<ROOT::RArrayAsRVecField>("", std::move(valueField), fieldInfo.fNRepetitions);
334 } else {
335 // Actual collection. A std::vector or ROOT::RVec gets added as a ROOT::RVec. All other collection types keep
336 // their original type.
337 if (convertToRVec) {
338 valueField = std::make_unique<ROOT::RRVecField>("", std::move(valueField));
339 } else {
340 auto outerFieldType = desc.GetFieldDescriptor(fieldInfo.fFieldId).GetTypeName();
342 }
343 }
344
345 valueField->SetOnDiskId(fieldInfo.fFieldId);
346
347 // Skip the inner-most collection level to construct the cardinality column
348 // It's taken care of by the `if (!fieldInfos.empty())` scope above
349 if (i != fieldInfos.rbegin()) {
350 if (fieldInfo.fNRepetitions > 0) {
351 // This collection level refers to a fixed-size array
353 std::make_unique<ROOT::RArrayAsRVecField>("", std::move(cardinalityField), fieldInfo.fNRepetitions);
354 } else {
355 // This collection level refers to an RVec
356 cardinalityField = std::make_unique<ROOT::RRVecField>("", std::move(cardinalityField));
357 }
358
359 cardinalityField->SetOnDiskId(fieldInfo.fFieldId);
360 }
361 }
362
363 if (cardinalityField) {
364 fColumnNames.emplace_back("R_rdf_sizeof_" + std::string(colName));
365 fColumnTypes.emplace_back(cardinalityField->GetTypeName());
366 fProtoFields.emplace_back(std::move(cardinalityField));
367 }
368
369 fieldInfos.emplace_back(fieldId, nRepetitions);
370 fColumnNames.emplace_back(colName);
371 fColumnTypes.emplace_back(valueField->GetTypeName());
372 fProtoFields.emplace_back(std::move(valueField));
373}
374
375ROOT::RDF::RNTupleDS::RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource)
376{
377 pageSource->Attach();
378 fPrincipalDescriptor = pageSource->GetSharedDescriptorGuard()->Clone();
379 fStagingArea.emplace_back(std::move(pageSource));
380
381 AddField(fPrincipalDescriptor, "", fPrincipalDescriptor.GetFieldZeroId(),
382 std::vector<ROOT::RDF::RNTupleDS::RFieldInfo>());
383}
384
385namespace {
386
388{
389 // The setting is for now a global one, must be decided before running the
390 // program by setting the appropriate environment variable. Make sure that
391 // option configuration is thread-safe and happens only once.
393 static std::once_flag flag;
394 std::call_once(flag, []() {
395 if (auto env = gSystem->Getenv("ROOT_RNTUPLE_CLUSTERBUNCHSIZE"); env != nullptr && strlen(env) > 0) {
396 std::string envStr{env};
397 auto envNum{std::stoul(envStr)};
398 envNum = envNum == 0 ? 1 : envNum;
400 }
401 });
402 return opts;
403}
404
405std::unique_ptr<ROOT::Internal::RPageSource> CreatePageSource(std::string_view ntupleName, std::string_view fileName)
406{
408}
409} // namespace
410
411ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
412 : RNTupleDS(CreatePageSource(ntupleName, fileName))
413{
414}
415
416ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames)
417 : RNTupleDS(CreatePageSource(ntupleName, fileNames[0]))
418{
421 fStagingArea.resize(fFileNames.size());
422}
423
425ROOT::RDF::RNTupleDS::GetColumnReadersImpl(std::string_view /* name */, const std::type_info & /* ti */)
426{
427 // This datasource uses the newer GetColumnReaders() API
428 return {};
429}
430
431std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
432ROOT::RDF::RNTupleDS::GetColumnReaders(unsigned int slot, std::string_view name, const std::type_info &tid)
433{
434 // At this point we can assume that `name` will be found in fColumnNames
435 const auto index = std::distance(fColumnNames.begin(), std::find(fColumnNames.begin(), fColumnNames.end(), name));
437
439 // If the field corresponding to the provided name is not a cardinality column and the requested type is different
440 // from the proto field that was created when the data source was constructed, we first have to create an
441 // alternative proto field for the column reader. Otherwise, we can directly use the existing proto field.
442 if (name.substr(0, 13) != "R_rdf_sizeof_" && requestedType != fColumnTypes[index]) {
443 auto &altProtoFields = fAlternativeProtoFields[index];
444 auto altProtoField = std::find_if(altProtoFields.begin(), altProtoFields.end(),
445 [&requestedType](const std::unique_ptr<ROOT::RFieldBase> &fld) {
446 return fld->GetTypeName() == requestedType;
447 });
449 field = altProtoField->get();
450 } else {
453 throw std::runtime_error("RNTupleDS: Could not create field with type \"" + requestedType +
454 "\" for column \"" + std::string(name));
455 }
457 newAltProtoField->SetOnDiskId(fProtoFields[index]->GetOnDiskId());
458 field = newAltProtoField.get();
459 altProtoFields.emplace_back(std::move(newAltProtoField));
460 }
461 } else {
462 field = fProtoFields[index].get();
463 }
464
465 // Map the field's and subfields' IDs to qualified names so that we can later connect the fields to
466 // other page sources from the chain
467 fFieldId2QualifiedName[field->GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(field->GetOnDiskId());
468 for (const auto &s : *field) {
469 fFieldId2QualifiedName[s.GetOnDiskId()] = fPrincipalDescriptor.GetQualifiedFieldName(s.GetOnDiskId());
470 }
471
472 auto reader = std::make_unique<ROOT::Internal::RDF::RNTupleColumnReader>(this, field);
473 fActiveColumnReaders[slot].emplace_back(reader.get());
474
475 return reader;
476}
477
479{
480 while (true) {
481 std::unique_lock lock(fMutexStaging);
482 fCvStaging.wait(lock, [this] { return fIsReadyForStaging || fStagingThreadShouldTerminate; });
483 if (fStagingThreadShouldTerminate)
484 return;
485
486 assert(!fHasNextSources);
487 StageNextSources();
488 fHasNextSources = true;
489 fIsReadyForStaging = false;
490
491 lock.unlock();
492 fCvStaging.notify_one();
493 }
494}
495
497{
498 const auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
499 for (auto i = fNextFileIndex; (i < nFiles) && ((i - fNextFileIndex) < fNSlots); ++i) {
500 if (fStagingThreadShouldTerminate)
501 return;
502
503 if (fStagingArea[i]) {
504 // The first file is already open and was used to read the schema
505 assert(i == 0);
506 } else {
507 fStagingArea[i] = CreatePageSource(fNTupleName, fFileNames[i]);
508 fStagingArea[i]->LoadStructure();
509 }
510 }
511}
512
514{
515 assert(fNextRanges.empty());
516 auto nFiles = fFileNames.empty() ? 1 : fFileNames.size();
517 auto nRemainingFiles = nFiles - fNextFileIndex;
518 if (nRemainingFiles == 0)
519 return;
520
521 // Easy work scheduling: one file per slot. We skip empty files (files without entries).
522 if (nRemainingFiles >= fNSlots) {
523 while ((fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles)) {
525
526 std::swap(fStagingArea[fNextFileIndex], range.fSource);
527
528 if (!range.fSource) {
529 // Typically, the prestaged source should have been present. Only if some of the files are empty, we need
530 // to open and attach files here.
531 range.fSource = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
532 }
533 range.fSource->Attach();
534 fNextFileIndex++;
535
536 auto nEntries = range.fSource->GetNEntries();
537 if (nEntries == 0)
538 continue;
539
540 range.fLastEntry = nEntries; // whole file per slot, i.e. entry range [0..nEntries - 1]
541 fNextRanges.emplace_back(std::move(range));
542 }
543 return;
544 }
545
546 // Work scheduling of the tail: multiple slots work on the same file.
547 // Every slot still has its own page source but these page sources may open the same file.
548 // Again, we need to skip empty files.
549 unsigned int nSlotsPerFile = fNSlots / nRemainingFiles;
550 for (std::size_t i = 0; (fNextRanges.size() < fNSlots) && (fNextFileIndex < nFiles); ++i) {
551 std::unique_ptr<ROOT::Internal::RPageSource> source;
552 std::swap(fStagingArea[fNextFileIndex], source);
553 if (!source) {
554 // Empty files trigger this condition
555 source = CreatePageSource(fNTupleName, fFileNames[fNextFileIndex]);
556 }
557 source->Attach();
558 fNextFileIndex++;
559
560 auto nEntries = source->GetNEntries();
561 if (nEntries == 0)
562 continue;
563
564 // If last file: use all remaining slots
565 if (i == (nRemainingFiles - 1))
566 nSlotsPerFile = fNSlots - fNextRanges.size();
567
568 std::vector<std::pair<ULong64_t, ULong64_t>> rangesByCluster;
569 {
570 auto descriptorGuard = source->GetSharedDescriptorGuard();
571 auto clusterId = descriptorGuard->FindClusterId(0, 0);
573 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterId);
574 rangesByCluster.emplace_back(std::make_pair<ULong64_t, ULong64_t>(
575 clusterDesc.GetFirstEntryIndex(), clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()));
576 clusterId = descriptorGuard->FindNextClusterId(clusterId);
577 }
578 }
579 const unsigned int nRangesByCluster = rangesByCluster.size();
580
581 // Distribute slots equidistantly over the entry range, aligned on cluster boundaries
583 const auto remainder = nRangesByCluster % nSlotsPerFile;
584 std::size_t iRange = 0;
585 unsigned int iSlot = 0;
586 const unsigned int N = std::min(nSlotsPerFile, nRangesByCluster);
587 for (; iSlot < N; ++iSlot) {
588 auto start = rangesByCluster[iRange].first;
589 iRange += nClustersPerSlot + static_cast<int>(iSlot < remainder);
590 assert(iRange > 0);
591 auto end = rangesByCluster[iRange - 1].second;
592
594 // The last range for this file just takes the already opened page source. All previous ranges clone.
595 if (iSlot == N - 1) {
596 range.fSource = std::move(source);
597 } else {
598 range.fSource = source->Clone();
599 }
600 range.fSource->SetEntryRange({start, end - start});
601 range.fFirstEntry = start;
602 range.fLastEntry = end;
603 fNextRanges.emplace_back(std::move(range));
604 }
605 } // loop over tail of remaining files
606}
607
608std::vector<std::pair<ULong64_t, ULong64_t>> ROOT::RDF::RNTupleDS::GetEntryRanges()
609{
610 std::vector<std::pair<ULong64_t, ULong64_t>> ranges;
611
612 // We need to distinguish between single threaded and multi-threaded runs.
613 // In single threaded mode, InitSlot is only called once and column readers have to be rewired
614 // to new page sources of the chain in GetEntryRanges. In multi-threaded mode, on the other hand,
615 // InitSlot is called for every returned range, thus rewiring the column readers takes place in
616 // InitSlot and FinalizeSlot.
617
618 if (fNSlots == 1) {
619 for (auto r : fActiveColumnReaders[0]) {
620 r->Disconnect(true /* keepValue */);
621 }
622 }
623
624 // If we have fewer files than slots and we run multiple event loops, we can reuse fCurrentRanges and don't need
625 // to worry about loading the fNextRanges. I.e., in this case we don't enter the if block.
626 if (fCurrentRanges.empty() || (fSeenEntries > 0)) {
627 // Otherwise, i.e. start of the first event loop or in the middle of the event loop, prepare the next ranges
628 // and swap with the current ones.
629 {
630 std::unique_lock lock(fMutexStaging);
631 fCvStaging.wait(lock, [this] { return fHasNextSources; });
632 }
633 PrepareNextRanges();
634 if (fNextRanges.empty()) {
635 // No more data
636 return ranges;
637 }
638
639 assert(fNextRanges.size() <= fNSlots);
640
641 fCurrentRanges.clear();
642 std::swap(fCurrentRanges, fNextRanges);
643 }
644
645 // Stage next batch of files for the next call to GetEntryRanges()
646 {
647 std::lock_guard _(fMutexStaging);
648 fIsReadyForStaging = true;
649 fHasNextSources = false;
650 }
651 fCvStaging.notify_one();
652
653 // Create ranges for the RDF loop manager from the list of REntryRangeDS records.
654 // The entry ranges that are relative to the page source in REntryRangeDS are translated into absolute
655 // entry ranges, given the current state of the entry cursor.
656 // We remember the connection from first absolute entry index of a range to its REntryRangeDS record
657 // so that we can properly rewire the column reader in InitSlot
658 fFirstEntry2RangeIdx.clear();
660 for (std::size_t i = 0; i < fCurrentRanges.size(); ++i) {
661 // Several consecutive ranges may operate on the same file (each with their own page source clone).
662 // We can detect a change of file when the first entry number jumps back to 0.
663 if (fCurrentRanges[i].fFirstEntry == 0) {
664 // New source
665 fSeenEntries += nEntriesPerSource;
667 }
668 auto start = fCurrentRanges[i].fFirstEntry + fSeenEntries;
669 auto end = fCurrentRanges[i].fLastEntry + fSeenEntries;
670 nEntriesPerSource += end - start;
671
672 fFirstEntry2RangeIdx[start] = i;
673 ranges.emplace_back(start, end);
674 }
675 fSeenEntries += nEntriesPerSource;
676
677 if ((fNSlots == 1) && (fCurrentRanges[0].fSource)) {
678 for (auto r : fActiveColumnReaders[0]) {
679 r->Connect(*fCurrentRanges[0].fSource, ranges[0].first);
680 }
681 }
682
683 return ranges;
684}
685
687{
688 if (fNSlots == 1)
689 return;
690
691 auto idxRange = fFirstEntry2RangeIdx.at(firstEntry);
692 for (auto r : fActiveColumnReaders[slot]) {
693 r->Connect(*fCurrentRanges[idxRange].fSource, firstEntry - fCurrentRanges[idxRange].fFirstEntry);
694 }
695}
696
698{
699 if (fNSlots == 1)
700 return;
701
702 for (auto r : fActiveColumnReaders[slot]) {
703 r->Disconnect(true /* keepValue */);
704 }
705}
706
707std::string ROOT::RDF::RNTupleDS::GetTypeName(std::string_view colName) const
708{
709 auto colNamePos = std::find(fColumnNames.begin(), fColumnNames.end(), colName);
710
711 if (colNamePos == fColumnNames.end()) {
712 auto msg = std::string("RNTupleDS: There is no column with name \"") + std::string(colName) + "\"";
713 throw std::runtime_error(msg);
714 }
715
716 const auto index = std::distance(fColumnNames.begin(), colNamePos);
717 return fColumnTypes[index];
718}
719
720bool ROOT::RDF::RNTupleDS::HasColumn(std::string_view colName) const
721{
722 return std::find(fColumnNames.begin(), fColumnNames.end(), colName) != fColumnNames.end();
723}
724
726{
727 fSeenEntries = 0;
728 fNextFileIndex = 0;
729 fIsReadyForStaging = fHasNextSources = fStagingThreadShouldTerminate = false;
730 fThreadStaging = std::thread(&RNTupleDS::ExecStaging, this);
731 assert(fNextRanges.empty());
732
733 if (fCurrentRanges.empty() || (fFileNames.size() > fNSlots)) {
734 // First event loop or large number of files: start the staging process.
735 {
736 std::lock_guard _(fMutexStaging);
737 fIsReadyForStaging = true;
738 }
739 fCvStaging.notify_one();
740 } else {
741 // Otherwise, we will reuse fCurrentRanges. Make sure that staging and preparing next ranges will be a noop
742 // (already at the end of the list of files).
743 fNextFileIndex = std::max(fFileNames.size(), std::size_t(1));
744 }
745}
746
748{
749 for (unsigned int i = 0; i < fNSlots; ++i) {
750 for (auto r : fActiveColumnReaders[i]) {
751 r->Disconnect(false /* keepValue */);
752 }
753 }
754 {
755 std::lock_guard _(fMutexStaging);
756 fStagingThreadShouldTerminate = true;
757 }
758 fCvStaging.notify_one();
759 fThreadStaging.join();
760 // If we have a chain with more files than the number of slots, the files opened at the end of the
761 // event loop won't be reused when the event loop restarts, so we can close them.
762 if (fFileNames.size() > fNSlots) {
763 fCurrentRanges.clear();
764 fNextRanges.clear();
765 fStagingArea.clear();
766 fStagingArea.resize(fFileNames.size());
767 }
768}
769
771{
772 assert(fNSlots == 0);
773 assert(nSlots > 0);
774 fNSlots = nSlots;
775 fActiveColumnReaders.resize(fNSlots);
776}
777
778ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, std::string_view fileName)
779{
780 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileName));
781}
782
783ROOT::RDataFrame ROOT::RDF::FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames)
784{
785 return ROOT::RDataFrame(std::make_unique<ROOT::RDF::RNTupleDS>(ntupleName, fileNames));
786}
#define f(i)
Definition RSha256.hxx:104
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define _(A, B)
Definition cfortran.h:108
void GetCollectionInfo(const ROOT::NTupleSize_t globalIndex, RNTupleLocalIndex *collectionStart, ROOT::NTupleSize_t *collectionSize)
For offset columns only, look at the two adjacent values that define a collection's coordinates.
Definition RColumn.hxx:283
An artificial field that provides the size of a fixed-size array.
void GenerateColumns(const ROOT::RNTupleDescriptor &) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
void ReadGlobalImpl(ROOT::NTupleSize_t, void *to) final
RArraySizeField(const RArraySizeField &other)=delete
RArraySizeField(RArraySizeField &&other)=default
RArraySizeField & operator=(RArraySizeField &&other)=default
void ReadInClusterImpl(RNTupleLocalIndex, void *to) final
std::size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
RArraySizeField(std::size_t arrayLength)
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
RArraySizeField & operator=(const RArraySizeField &other)=delete
std::size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Every RDF column is represented by exactly one RNTuple field.
void * GetImpl(Long64_t entry) final
void Connect(RPageSource &source, Long64_t entryOffset)
Connect the field and its subfields to the page source.
RNTupleColumnReader(RNTupleDS *ds, RFieldBase *protoField)
std::unique_ptr< RFieldBase::RValue > fValue
The memory location used to read from fField.
std::unique_ptr< RFieldBase > fField
The field backing the RDF column.
Long64_t fEntryOffset
For chains, the logical entry and the physical entry in any particular file can be different.
std::shared_ptr< void > fValuePtr
Used to reuse the object created by fValue when reconnecting sources.
RNTupleDS * fDataSource
The data source that owns this column reader.
RFieldBase * fProtoField
The prototype field from which fField is cloned.
Long64_t fLastEntry
Last entry number that was read.
An artificial field that transforms an RNTuple column that contains the offset of collections into co...
Definition RNTupleDS.cxx:58
RRDFCardinalityField(RRDFCardinalityField &&other)=default
size_t GetAlignment() const final
As a rule of thumb, the alignment is equal to the size of the type.
Definition RNTupleDS.cxx:89
void GenerateColumns() final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:82
void ReadGlobalImpl(ROOT::NTupleSize_t globalIndex, void *to) final
Get the number of elements of the collection identified by globalIndex.
Definition RNTupleDS.cxx:92
RRDFCardinalityField & operator=(RRDFCardinalityField &&other)=default
void GenerateColumns(const ROOT::RNTupleDescriptor &desc) final
Implementations in derived classes should create the backing columns corresponding to the field type ...
Definition RNTupleDS.cxx:83
const RColumnRepresentations & GetColumnRepresentations() const final
Implementations in derived classes should return a static RColumnRepresentations object.
Definition RNTupleDS.cxx:72
size_t GetValueSize() const final
The number of bytes taken by a value of the appropriate type.
Definition RNTupleDS.cxx:88
void ReadInClusterImpl(ROOT::RNTupleLocalIndex localIndex, void *to) final
Get the number of elements of the collection identified by clusterIndex.
std::unique_ptr< ROOT::RFieldBase > CloneImpl(std::string_view) const final
Called by Clone(), which additionally copies the on-disk ID.
Definition RNTupleDS.cxx:60
void ConstructValue(void *where) const final
Constructs value in a given location of size at least GetValueSize(). Called by the base class' Creat...
Definition RNTupleDS.cxx:64
static void SetClusterBunchSize(RNTupleReadOptions &options, unsigned int val)
Abstract interface to read data from an ntuple.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
std::vector< void * > Record_t
The RDataSource implementation for RNTuple.
Definition RNTupleDS.hxx:47
void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, std::vector< RFieldInfo > fieldInfos, bool convertToRVec=true)
Provides the RDF column "colName" given the field identified by fieldID.
std::vector< std::pair< ULong64_t, ULong64_t > > GetEntryRanges() final
Return ranges of entries to distribute to tasks.
void ExecStaging()
The main function of the fThreadStaging background thread.
std::vector< std::unique_ptr< ROOT::Internal::RPageSource > > fStagingArea
The staging area is relevant for chains of files, i.e.
Definition RNTupleDS.hxx:82
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > GetColumnReaders(unsigned int, std::string_view, const std::type_info &) final
If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
std::vector< std::unique_ptr< ROOT::RFieldBase > > fProtoFields
We prepare a prototype field for every column.
Definition RNTupleDS.hxx:89
void SetNSlots(unsigned int nSlots) final
Inform RDataSource of the number of processing slots (i.e.
std::vector< std::string > fFileNames
Definition RNTupleDS.hxx:65
void InitSlot(unsigned int slot, ULong64_t firstEntry) final
Convenience method called at the start of the data processing associated to a slot.
RNTupleDS(std::unique_ptr< ROOT::Internal::RPageSource > pageSource)
std::string GetTypeName(std::string_view colName) const final
Type of a column as a string, e.g.
std::unordered_map< ROOT::DescriptorId_t, std::string > fFieldId2QualifiedName
Connects the IDs of active proto fields and their subfields to their fully qualified name (a....
Definition RNTupleDS.hxx:98
std::string fNTupleName
The data source may be constructed with an ntuple name and a list of files.
Definition RNTupleDS.hxx:64
void PrepareNextRanges()
Populates fNextRanges with the next set of entry ranges.
void StageNextSources()
Starting from fNextFileIndex, opens the next fNSlots files.
void Finalize() final
Convenience method called after concluding an event-loop.
std::vector< std::string > fColumnTypes
void Initialize() final
Convenience method called before starting an event-loop.
std::vector< std::string > fColumnNames
Definition RNTupleDS.hxx:99
bool HasColumn(std::string_view colName) const final
Checks if the dataset has a certain column.
Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final
type-erased vector of pointers to pointers to column values - one per slot
void FinalizeSlot(unsigned int slot) final
Convenience method called at the end of the data processing associated to a slot.
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
The list of column representations a field can have.
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::Internal::RColumn * fPrincipalColumn
All fields that have columns have a distinct main column.
RConstSchemaIterator cbegin() const
const std::string & GetFieldName() const
static RResult< std::unique_ptr< RFieldBase > > Create(const std::string &fieldName, const std::string &typeName, const ROOT::RCreateFieldOptions &options, const ROOT::RNTupleDescriptor *desc, ROOT::DescriptorId_t fieldId)
Factory method to resurrect a field from the stored on-disk type information.
ROOT::DescriptorId_t GetOnDiskId() const
std::unique_ptr< RFieldBase > Clone(std::string_view newName) const
Copies the field and its subfields using a possibly new name and a new, unconnected set of columns.
The on-storage metadata of an RNTuple.
RFieldDescriptorIterable GetFieldIterable(const RFieldDescriptor &fieldDesc) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
ROOT::DescriptorId_t FindFieldId(std::string_view fieldName, ROOT::DescriptorId_t parentId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Common user-tunable settings for reading RNTuples.
const_iterator begin() const
const_iterator end() const
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1677
std::string TypeID2TypeName(const std::type_info &id)
Returns the name of a type starting from its type_info An empty string is returned in case of failure...
Definition RDFUtils.cxx:123
void CallConnectPageSourceOnField(RFieldBase &, ROOT::Internal::RPageSource &)
std::string GetRenormalizedTypeName(const std::string &metaNormalizedName)
Given a type name normalized by ROOT meta, renormalize it for RNTuple. E.g., insert std::prefix.
RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ENTupleStructure
The fields in the ntuple model tree can carry different structural information about the type system.
The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
Definition RNTupleDS.hxx:53