Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>,
4/// Giacomo Parolini <giacomo.parolini@cern.ch>
5/// \date 2020-07-08
6/// \warning This is part of the ROOT 7 prototype! It will
7/// change without notice. It might trigger earthquakes. Feedback is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#include <ROOT/RError.hxx>
18#include <ROOT/RNTuple.hxx>
21#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleUtil.hxx>
25#include <ROOT/RPageStorage.hxx>
26#include <ROOT/RClusterPool.hxx>
28#include <ROOT/RNTupleZip.hxx>
30#include <TROOT.h>
31#include <TFileMergeInfo.h>
32#include <TError.h>
33#include <TFile.h>
34#include <TKey.h>
35
36#include <algorithm>
37#include <deque>
38#include <inttypes.h> // for PRIu64
39#include <initializer_list>
40#include <unordered_map>
41#include <vector>
42
53
54using namespace ROOT::Experimental::Internal;
55
56// TFile options parsing
57// -------------------------------------------------------------------------------------
58static bool BeginsWithDelimitedWord(const TString &str, const char *word)
59{
60 const Ssiz_t wordLen = strlen(word);
61 if (str.Length() < wordLen)
62 return false;
63 if (!str.BeginsWith(word, TString::ECaseCompare::kIgnoreCase))
64 return false;
65 return str.Length() == wordLen || str(wordLen) == ' ';
66}
67
68template <typename T>
69static std::optional<T> ParseStringOption(const TString &opts, const char *pattern,
70 std::initializer_list<std::pair<const char *, T>> validValues)
71{
72 const Ssiz_t patternLen = strlen(pattern);
73 assert(pattern[patternLen - 1] == '='); // we want to parse options with the format `option=Value`
74 if (auto idx = opts.Index(pattern, 0, TString::ECaseCompare::kIgnoreCase);
75 idx >= 0 && opts.Length() > idx + patternLen) {
76 auto sub = TString(opts(idx + patternLen, opts.Length() - idx - patternLen));
77 for (const auto &[name, value] : validValues) {
78 if (BeginsWithDelimitedWord(sub, name)) {
79 return value;
80 }
81 }
82 }
83 return std::nullopt;
84}
85
86static std::optional<ENTupleMergingMode> ParseOptionMergingMode(const TString &opts)
87{
88 return ParseStringOption<ENTupleMergingMode>(opts, "rntuple.MergingMode=",
89 {
90 {"Filter", ENTupleMergingMode::kFilter},
91 {"Union", ENTupleMergingMode::kUnion},
92 {"Strict", ENTupleMergingMode::kStrict},
93 });
94}
95
96static std::optional<ENTupleMergeErrBehavior> ParseOptionErrBehavior(const TString &opts)
97{
98 return ParseStringOption<ENTupleMergeErrBehavior>(opts, "rntuple.ErrBehavior=",
99 {
100 {"Abort", ENTupleMergeErrBehavior::kAbort},
101 {"Skip", ENTupleMergeErrBehavior::kSkip},
102 });
103}
104// -------------------------------------------------------------------------------------
105
106// Entry point for TFileMerger. Internally calls RNTupleMerger::Merge().
108// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
109try {
110 // Check the inputs
111 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo) {
112 Error("RNTuple::Merge", "Invalid inputs.");
113 return -1;
114 }
115
116 // Parse the input parameters
118
119 // First entry is the RNTuple name
120 std::string ntupleName = std::string(itr()->GetName());
121
122 // Second entry is the output file
123 TObject *secondArg = itr();
124 TFile *outFile = dynamic_cast<TFile *>(secondArg);
125 if (!outFile) {
126 Error("RNTuple::Merge", "Second input parameter should be a TFile, but it's a %s.", secondArg->ClassName());
127 return -1;
128 }
129
130 // Check if the output file already has a key with that name
131 TKey *outKey = outFile->FindKey(ntupleName.c_str());
132 ROOT::RNTuple *outNTuple = nullptr;
133 if (outKey) {
134 outNTuple = outKey->ReadObject<ROOT::RNTuple>();
135 if (!outNTuple) {
136 Error("RNTuple::Merge", "Output file already has key, but not of type RNTuple!");
137 return -1;
138 }
139 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
140 // pointer we just got.
141 }
142
143 const bool defaultComp = mergeInfo->fOptions.Contains("DefaultCompression");
144 const bool firstSrcComp = mergeInfo->fOptions.Contains("FirstSrcCompression");
145 const bool extraVerbose = mergeInfo->fOptions.Contains("rntuple.ExtraVerbose");
146 if (defaultComp && firstSrcComp) {
147 // this should never happen through hadd, but a user may call RNTuple::Merge() from custom code.
148 Warning("RNTuple::Merge", "Passed both options \"DefaultCompression\" and \"FirstSrcCompression\": "
149 "only the latter will apply.");
150 }
151 std::optional<std::uint32_t> compression;
152 if (firstSrcComp) {
153 // user passed -ff or -fk: use the same compression as the first RNTuple we find in the sources.
154 // (do nothing here, the compression will be fetched below)
155 } else if (!defaultComp) {
156 // compression was explicitly passed by the user: use it.
157 compression = outFile->GetCompressionSettings();
158 } else {
159 // user passed no compression-related options: use default
161 Info("RNTuple::Merge", "Using the default compression: %u", *compression);
162 }
163
164 // The remaining entries are the input files
165 std::vector<std::unique_ptr<RPageSourceFile>> sources;
166 std::vector<RPageSource *> sourcePtrs;
167
168 while (const auto &pitr = itr()) {
169 TFile *inFile = dynamic_cast<TFile *>(pitr);
170 ROOT::RNTuple *anchor = inFile ? inFile->Get<ROOT::RNTuple>(ntupleName.c_str()) : nullptr;
171 if (!anchor) {
172 Info("RNTuple::Merge", "No RNTuple anchor named '%s' from file '%s'", ntupleName.c_str(), inFile->GetName());
173 continue;
174 }
175
177 if (!compression) {
178 // Get the compression of this RNTuple and use it as the output compression.
179 // We currently assume all column ranges have the same compression, so we just peek at the first one.
180 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kRaw);
181 auto descriptor = source->GetSharedDescriptorGuard();
182 auto clusterIter = descriptor->GetClusterIterable();
184 if (firstCluster == clusterIter.end()) {
185 Error("RNTuple::Merge",
186 "Asked to use the first source's compression as the output compression, but the "
187 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
188 "determined.",
189 inFile->GetName());
190 return -1;
191 }
192 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
194 if (firstColRange == colRangeIter.end()) {
195 Error("RNTuple::Merge",
196 "Asked to use the first source's compression as the output compression, but the "
197 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
198 "determined.",
199 inFile->GetName());
200 return -1;
201 }
202 compression = (*firstColRange).GetCompressionSettings().value();
203 Info("RNTuple::Merge", "Using the first RNTuple's compression: %u", *compression);
204 }
205 sources.push_back(std::move(source));
206 }
207
210 writeOpts.SetCompression(*compression);
211 auto destination = std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, *outFile, writeOpts);
212 std::unique_ptr<ROOT::RNTupleModel> model;
213 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
214 if (outNTuple) {
216 outSource->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
217 auto desc = outSource->GetSharedDescriptorGuard();
218 model = destination->InitFromDescriptor(desc.GetRef(), true /* copyClusters */);
219 }
220
221 // Interface conversion
222 sourcePtrs.reserve(sources.size());
223 for (const auto &s : sources) {
224 sourcePtrs.push_back(s.get());
225 }
226
227 // Now merge
228 RNTupleMerger merger{std::move(destination), std::move(model)};
230 mergerOpts.fCompressionSettings = *compression;
231 mergerOpts.fExtraVerbose = extraVerbose;
232 if (auto mergingMode = ParseOptionMergingMode(mergeInfo->fOptions)) {
233 mergerOpts.fMergingMode = *mergingMode;
234 }
235 if (auto errBehavior = ParseOptionErrBehavior(mergeInfo->fOptions)) {
236 mergerOpts.fErrBehavior = *errBehavior;
237 }
238 merger.Merge(sourcePtrs, mergerOpts).ThrowOnError();
239
240 // Provide the caller with a merged anchor object (even though we've already
241 // written it).
242 *this = *outFile->Get<ROOT::RNTuple>(ntupleName.c_str());
243
244 return 0;
245} catch (const std::exception &ex) {
246 Error("RNTuple::Merge", "Exception thrown while merging: %s", ex.what());
247 return -1;
248}
249
250namespace {
251// Functor used to change the compression of a page to `options.fCompressionSettings`.
252struct RChangeCompressionFunc {
253 const RColumnElementBase &fSrcColElement;
254 const RColumnElementBase &fDstColElement;
255 const RNTupleMergeOptions &fMergeOptions;
256
257 RPageStorage::RSealedPage &fSealedPage;
259 std::uint8_t *fBuffer;
260
261 void operator()() const
262 {
263 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fPageAlloc).Unwrap();
265 sealConf.fElement = &fDstColElement;
266 sealConf.fPage = &page;
267 sealConf.fBuffer = fBuffer;
268 sealConf.fCompressionSettings = *fMergeOptions.fCompressionSettings;
269 sealConf.fWriteChecksum = fSealedPage.GetHasChecksum();
270 auto refSealedPage = RPageSink::SealPage(sealConf);
271 fSealedPage = refSealedPage;
272 }
273};
274
275struct RCommonField {
276 const ROOT::RFieldDescriptor *fSrc;
277 const ROOT::RFieldDescriptor *fDst;
278
279 RCommonField(const ROOT::RFieldDescriptor *src, const ROOT::RFieldDescriptor *dst) : fSrc(src), fDst(dst) {}
280};
281
282struct RDescriptorsComparison {
283 std::vector<const ROOT::RFieldDescriptor *> fExtraDstFields;
284 std::vector<const ROOT::RFieldDescriptor *> fExtraSrcFields;
285 std::vector<RCommonField> fCommonFields;
286};
287
288struct RColumnOutInfo {
289 ROOT::DescriptorId_t fColumnId;
290 ENTupleColumnType fColumnType;
291};
292
293// { fully.qualified.fieldName.colInputId => colOutputInfo }
294using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
295
296struct RColumnInfoGroup {
297 std::vector<RColumnMergeInfo> fExtraDstColumns;
298 // These are sorted by InputId
299 std::vector<RColumnMergeInfo> fCommonColumns;
300};
301
302} // namespace
303
304// These structs cannot be in the anon namespace becase they're used in RNTupleMerger's private interface.
307 // This column name is built as a dot-separated concatenation of the ancestry of
308 // the columns' parent fields' names plus the index of the column itself.
309 // e.g. "Muon.pt.x._0"
310 std::string fColumnName;
314 // If nullopt, use the default in-memory type
315 std::optional<std::type_index> fInMemoryType;
317};
318
319// Data related to a single call of RNTupleMerger::Merge()
337
339 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are
340 // never invalidated.
341 std::deque<RPageStorage::SealedPageSequence_t> fPagesV;
342 std::vector<RPageStorage::RSealedPageGroup> fGroups;
343 std::vector<std::unique_ptr<std::uint8_t[]>> fBuffers;
344};
345
346std::ostream &operator<<(std::ostream &os, const std::optional<ROOT::RColumnDescriptor::RValueRange> &x)
347{
348 if (x) {
349 os << '(' << x->fMin << ", " << x->fMax << ')';
350 } else {
351 os << "(null)";
352 }
353 return os;
354}
355
356} // namespace ROOT::Experimental::Internal
357
359{
360 // clang-format off
381 // clang-format on
382 return false;
383}
384
385/// Compares the top level fields of `dst` and `src` and determines whether they can be merged or not.
386/// In addition, returns the differences between `dst` and `src`'s structures
389{
390 // Cases:
391 // 1. dst == src
392 // 2. dst has fields that src hasn't
393 // 3. src has fields that dst hasn't
394 // 4. dst and src have fields that differ (compatible or incompatible)
395
396 std::vector<std::string> errors;
397 RDescriptorsComparison res;
398
399 std::vector<RCommonField> commonFields;
400
401 for (const auto &dstField : dst.GetTopLevelFields()) {
402 const auto srcFieldId = src.FindFieldId(dstField.GetFieldName());
404 const auto &srcField = src.GetFieldDescriptor(srcFieldId);
405 commonFields.push_back({&srcField, &dstField});
406 } else {
407 res.fExtraDstFields.emplace_back(&dstField);
408 }
409 }
410 for (const auto &srcField : src.GetTopLevelFields()) {
411 const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName());
413 res.fExtraSrcFields.push_back(&srcField);
414 }
415
416 // Check compatibility of common fields
417 for (const auto &field : commonFields) {
418 // NOTE: field.fSrc and field.fDst have the same name by construction
419 const auto &fieldName = field.fSrc->GetFieldName();
420
421 // Require that fields are both projected or both not projected
422 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
423 if (!projCompatible) {
424 std::stringstream ss;
425 ss << "Field `" << fieldName << "` is incompatible with previously-seen field with that name because the "
426 << (field.fSrc->IsProjectedField() ? "new" : "old") << " one is projected and the other isn't";
427 errors.push_back(ss.str());
428 } else if (field.fSrc->IsProjectedField()) {
429 // if both fields are projected, verify that they point to the same real field
430 const auto srcName = src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
431 const auto dstName = dst.GetQualifiedFieldName(field.fDst->GetProjectionSourceId());
432 if (srcName != dstName) {
433 std::stringstream ss;
434 ss << "Field `" << fieldName
435 << "` is projected to a different field than a previously-seen field with the same name (old: "
436 << dstName << ", new: " << srcName << ")";
437 errors.push_back(ss.str());
438 }
439 }
440
441 // Require that fields types match
442 // TODO(gparolini): allow non-identical but compatible types
443 const auto &srcTyName = field.fSrc->GetTypeName();
444 const auto &dstTyName = field.fDst->GetTypeName();
445 if (srcTyName != dstTyName) {
446 std::stringstream ss;
447 ss << "Field `" << fieldName
448 << "` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
449 << ", new: " << srcTyName << ")";
450 errors.push_back(ss.str());
451 }
452
453 // Require that type checksums match
454 const auto srcTyChk = field.fSrc->GetTypeChecksum();
455 const auto dstTyChk = field.fDst->GetTypeChecksum();
456 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
457 std::stringstream ss;
458 ss << "Field `" << field.fSrc->GetFieldName()
459 << "` has a different type checksum than previously-seen field with the same name";
460 errors.push_back(ss.str());
461 }
462
463 // Require that type versions match
464 const auto srcTyVer = field.fSrc->GetTypeVersion();
465 const auto dstTyVer = field.fDst->GetTypeVersion();
466 if (srcTyVer != dstTyVer) {
467 std::stringstream ss;
468 ss << "Field `" << field.fSrc->GetFieldName()
469 << "` has a different type version than previously-seen field with the same name (old: " << dstTyVer
470 << ", new: " << srcTyVer << ")";
471 errors.push_back(ss.str());
472 }
473
474 // Require that column representations match
475 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
476 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
477 if (srcNCols != dstNCols) {
478 std::stringstream ss;
479 ss << "Field `" << field.fSrc->GetFieldName()
480 << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
481 << ", new: " << srcNCols << ")";
482 errors.push_back(ss.str());
483 } else {
484 for (auto i = 0u; i < srcNCols; ++i) {
485 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
486 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
487 const auto &srcCol = src.GetColumnDescriptor(srcColId);
488 const auto &dstCol = dst.GetColumnDescriptor(dstColId);
489 // TODO(gparolini): currently we refuse to merge columns of different types unless they are Split/non-Split
490 // version of the same type, because we know how to treat that specific case. We should also properly handle
491 // different but compatible types.
492 if (srcCol.GetType() != dstCol.GetType() &&
493 !IsSplitOrUnsplitVersionOf(srcCol.GetType(), dstCol.GetType())) {
494 std::stringstream ss;
495 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
496 << "` has a different column type of the same column on the previously-seen field with the same name "
497 "(old: "
499 << ", new: " << RColumnElementBase::GetColumnTypeName(dstCol.GetType()) << ")";
500 errors.push_back(ss.str());
501 }
502 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
503 std::stringstream ss;
504 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
505 << "` has a different number of bits of the same column on the previously-seen field with the same "
506 "name "
507 "(old: "
508 << srcCol.GetBitsOnStorage() << ", new: " << dstCol.GetBitsOnStorage() << ")";
509 errors.push_back(ss.str());
510 }
511 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
512 std::stringstream ss;
513 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
514 << "` has a different value range of the same column on the previously-seen field with the same name "
515 "(old: "
516 << srcCol.GetValueRange() << ", new: " << dstCol.GetValueRange() << ")";
517 errors.push_back(ss.str());
518 }
519 if (srcCol.GetRepresentationIndex() > 0) {
520 std::stringstream ss;
521 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
522 << "` has a representation index higher than 0. This is not supported yet by the merger.";
523 errors.push_back(ss.str());
524 }
525 }
526 }
527 }
528
529 std::string errMsg;
530 for (const auto &err : errors)
531 errMsg += std::string("\n * ") + err;
532
533 if (!errMsg.empty())
534 errMsg = errMsg.substr(1); // strip initial newline
535
536 if (errMsg.length())
537 return R__FAIL(errMsg);
538
539 res.fCommonFields.reserve(commonFields.size());
540 for (const auto &[srcField, dstField] : commonFields) {
541 res.fCommonFields.emplace_back(srcField, dstField);
542 }
543
544 // TODO(gparolini): we should exhaustively check the field tree rather than just the top level fields,
545 // in case the user forgets to change the version number on one field.
546
547 return ROOT::RResult(res);
548}
549
550// Applies late model extension to `destination`, adding all `newFields` to it.
551static void ExtendDestinationModel(std::span<const ROOT::RFieldDescriptor *> newFields, ROOT::RNTupleModel &dstModel,
552 RNTupleMergeData &mergeData, std::vector<RCommonField> &commonFields)
553{
554 assert(newFields.size() > 0); // no point in calling this with 0 new cols
555
556 dstModel.Unfreeze();
558
559 if (mergeData.fMergeOpts.fExtraVerbose) {
560 std::string msg = "destination doesn't contain field";
561 if (newFields.size() > 1)
562 msg += 's';
563 msg += ' ';
564 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](const auto &acc, const auto *field) {
565 return acc + (acc.length() ? ", " : "") + '`' + field->GetFieldName() + '`';
566 });
567 Info("RNTuple::Merge", "%s: adding %s to the destination model (entry #%" PRIu64 ").", msg.c_str(),
568 (newFields.size() > 1 ? "them" : "it"), mergeData.fNumDstEntries);
569 }
570
571 changeset.fAddedFields.reserve(newFields.size());
572 // First add all non-projected fields...
573 for (const auto *fieldDesc : newFields) {
574 if (!fieldDesc->IsProjectedField()) {
575 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
576 changeset.AddField(std::move(field));
577 }
578 }
579 // ...then add all projected fields.
580 for (const auto *fieldDesc : newFields) {
581 if (!fieldDesc->IsProjectedField())
582 continue;
583
585 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
586 const auto sourceId = fieldDesc->GetProjectionSourceId();
587 const auto &sourceField = dstModel.GetConstField(mergeData.fSrcDescriptor->GetQualifiedFieldName(sourceId));
588 fieldMap[field.get()] = &sourceField;
589
590 for (const auto &subfield : *field) {
591 const auto &subFieldDesc = mergeData.fSrcDescriptor->GetFieldDescriptor(subfield.GetOnDiskId());
592 const auto subSourceId = subFieldDesc.GetProjectionSourceId();
593 const auto &subSourceField =
594 dstModel.GetConstField(mergeData.fSrcDescriptor->GetQualifiedFieldName(subSourceId));
596 }
597 changeset.fAddedProjectedFields.emplace_back(field.get());
599 }
600 dstModel.Freeze();
601 mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
602
603 commonFields.reserve(commonFields.size() + newFields.size());
604 for (const auto *field : newFields) {
605 const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName());
606 const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId);
607 commonFields.emplace_back(field, &newFieldInDst);
608 }
609}
610
611// Generates default (zero) values for the given columns
612static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
616{
618 return;
619
620 for (const auto &column : columns) {
621 const auto &columnId = column.fInputId;
622 const auto &columnDesc = dstDescriptor.GetColumnDescriptor(columnId);
623 const ROOT::RFieldDescriptor *field = column.fParentField;
624
625 // Skip all auxiliary columns
626 assert(!field->GetLogicalColumnIds().empty());
627 if (field->GetLogicalColumnIds()[0] != columnId)
628 continue;
629
630 // Check if this column is a child of a Collection or a Variant. If so, it has no data
631 // and can be skipped.
632 bool skipColumn = false;
633 auto nRepetitions = std::max<std::uint64_t>(field->GetNRepetitions(), 1);
634 for (auto parentId = field->GetParentId(); parentId != ROOT::kInvalidDescriptorId;) {
635 const ROOT::RFieldDescriptor &parent = srcDescriptor.GetFieldDescriptor(parentId);
638 skipColumn = true;
639 break;
640 }
641 nRepetitions *= std::max<std::uint64_t>(parent.GetNRepetitions(), 1);
642 parentId = parent.GetParentId();
643 }
644 if (skipColumn)
645 continue;
646
647 const auto structure = field->GetStructure();
648
649 if (structure == ROOT::ENTupleStructure::kStreamer) {
650 Fatal(
651 "RNTuple::Merge",
652 "Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
653 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.",
654 field->GetFieldName().c_str());
655 continue;
656 }
657
658 // NOTE: we cannot have a Record here because it has no associated columns.
660 structure == ROOT::ENTupleStructure::kLeaf);
661
662 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
664 const auto nBytesOnStorage = colElement->GetPackedSize(nElements);
665 // TODO(gparolini): make this configurable
666 constexpr auto kPageSizeLimit = 256 * 1024;
667 // TODO(gparolini): consider coalescing the last page if its size is less than some threshold
669 for (size_t i = 0; i < nPages; ++i) {
670 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : nBytesOnStorage - kPageSizeLimit * (nPages - 1);
672 const auto bufSize = pageSize + checksumSize;
673 assert(pageSize % colElement->GetSize() == 0);
674 const auto nElementsPerPage = pageSize / colElement->GetSize();
675 auto page = pageAlloc.NewPage(colElement->GetSize(), nElementsPerPage);
676 page.GrowUnchecked(nElementsPerPage);
677 memset(page.GetBuffer(), 0, page.GetNBytes());
678
679 auto &buffer = sealedPageData.fBuffers.emplace_back(new unsigned char[bufSize]);
681 sealConf.fElement = colElement.get();
682 sealConf.fPage = &page;
683 sealConf.fBuffer = buffer.get();
684 sealConf.fCompressionSettings = mergeData.fMergeOpts.fCompressionSettings.value();
685 sealConf.fWriteChecksum = mergeData.fDestination.GetWriteOptions().GetEnablePageChecksums();
687
688 sealedPageData.fPagesV.push_back({sealedPage});
689 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
690 sealedPageData.fPagesV.back().cend());
691 }
692 }
693}
694
695// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
696// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
699 std::span<const RColumnMergeInfo> commonColumns,
703{
707 return;
708
709 const RCluster *cluster = clusterPool.GetCluster(clusterDesc.GetId(), commonColumnSet);
710 // we expect the cluster pool to contain the requested set of columns, since they were
711 // validated by CompareDescriptorStructure().
713
714 for (size_t colIdx = 0; colIdx < nCommonColumnsInCluster; ++colIdx) {
715 const auto &column = commonColumns[colIdx];
716 const auto &columnId = column.fInputId;
717 R__ASSERT(clusterDesc.ContainsColumn(columnId));
718
719 const auto &columnDesc = mergeData.fSrcDescriptor->GetColumnDescriptor(columnId);
720 const auto srcColElement = column.fInMemoryType
721 ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, columnDesc.GetType())
723 const auto dstColElement = column.fInMemoryType
724 ? ROOT::Internal::GenerateColumnElement(*column.fInMemoryType, column.fColumnType)
725 : RColumnElementBase::Generate(column.fColumnType);
726
727 // Now get the pages for this column in this cluster
728 const auto &pages = clusterDesc.GetPageRange(columnId);
729
731 sealedPages.resize(pages.GetPageInfos().size());
732
733 // Each column range potentially has a distinct compression settings
734 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).GetCompressionSettings().value();
735 // If either the compression or the encoding of the source doesn't match that of the destination, we need
736 // to reseal the page. Otherwise, if both match, we can fast merge.
737 const bool needsResealing =
738 colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings.value() ||
739 srcColElement->GetIdentifier().fOnDiskType != dstColElement->GetIdentifier().fOnDiskType;
740
741 if (needsResealing && mergeData.fMergeOpts.fExtraVerbose) {
742 Info("RNTuple::Merge", "Resealing column %s: { compression: %d => %d, onDiskType: %s => %s }",
743 column.fColumnName.c_str(), colRangeCompressionSettings,
744 mergeData.fMergeOpts.fCompressionSettings.value(),
745 RColumnElementBase::GetColumnTypeName(srcColElement->GetIdentifier().fOnDiskType),
746 RColumnElementBase::GetColumnTypeName(dstColElement->GetIdentifier().fOnDiskType));
747 }
748
749 size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
750 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
751 // bother reserving memory for them.
752 if (needsResealing)
753 sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.GetPageInfos().size());
754
755 // If this column is deferred, we may need to fill "holes" until its real start. We fill any missing entry
756 // with zeroes, like we do for extraDstColumns.
757 // As an optimization, we don't do this for the first source (since we can rely on the FirstElementIndex and
758 // deferred column mechanism in that case).
759 // TODO: also avoid doing this if we added no real page of this column to the destination yet.
760 if (columnDesc.GetFirstElementIndex() > clusterDesc.GetFirstEntryIndex() && mergeData.fNumDstEntries > 0) {
761 const auto nMissingEntries = columnDesc.GetFirstElementIndex() - clusterDesc.GetFirstEntryIndex();
763 *mergeData.fSrcDescriptor, mergeData.fDstDescriptor, mergeData);
764 }
765
766 // Loop over the pages
767 std::uint64_t pageIdx = 0;
768 for (const auto &pageInfo : pages.GetPageInfos()) {
769 assert(pageIdx < sealedPages.size());
770 assert(sealedPageData.fBuffers.size() == 0 || pageIdx < sealedPageData.fBuffers.size());
771 assert(pageInfo.GetLocator().GetType() != RNTupleLocator::kTypePageZero);
772
774 auto onDiskPage = cluster->GetOnDiskPage(key);
775
776 const auto checksumSize = pageInfo.HasChecksum() * RPageStorage::kNBytesPageChecksum;
778 sealedPage.SetNElements(pageInfo.GetNElements());
779 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
780 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + checksumSize);
781 sealedPage.SetBuffer(onDiskPage->GetAddress());
782 // TODO(gparolini): more graceful error handling (skip the page?)
783 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
784 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
785
786 if (needsResealing) {
787 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements();
788 auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx];
790 RChangeCompressionFunc compressTask{
791 *srcColElement, *dstColElement, mergeData.fMergeOpts, sealedPage, *fPageAlloc, buffer.get(),
792 };
793
794 if (fTaskGroup)
795 fTaskGroup->Run(compressTask);
796 else
797 compressTask();
798 }
799
800 ++pageIdx;
801
802 } // end of loop over pages
803
804 if (fTaskGroup)
805 fTaskGroup->Wait();
806
807 sealedPageData.fPagesV.push_back(std::move(sealedPages));
808 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
809 sealedPageData.fPagesV.back().cend());
810 } // end loop over common columns
811}
812
813// Iterates over all clusters of `source` and merges their pages into `destination`.
814// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
815// the destination's schemas.
816// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
817// compression is unspecified or matches the original compression settings.
819 std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
820{
822
823 std::vector<RColumnMergeInfo> missingColumns{extraDstColumns.begin(), extraDstColumns.end()};
824
825 // Loop over all clusters in this file.
826 // descriptor->GetClusterIterable() doesn't guarantee any specific order, so we explicitly
827 // request the first cluster.
828 ROOT::DescriptorId_t clusterId = mergeData.fSrcDescriptor->FindClusterId(0, 0);
830 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
831 const auto nClusterEntries = clusterDesc.GetNEntries();
833
834 // NOTE: just because a column is in `commonColumns` it doesn't mean that each cluster in the source contains it,
835 // as it may be a deferred column that only has real data in a future cluster.
836 // We need to figure out which columns are actually present in this cluster so we only merge their pages (the
837 // missing columns are handled by synthesizing zero pages - see below).
839 while (nCommonColumnsInCluster > 0) {
840 // Since `commonColumns` is sorted by column input id, we can simply traverse it from the back and stop as
841 // soon as we find a common column that appears in this cluster: we know that in that case all previous
842 // columns must appear as well.
843 if (clusterDesc.ContainsColumn(commonColumns[nCommonColumnsInCluster - 1].fInputId))
844 break;
846 }
847
848 // Convert columns to a ColumnSet for the ClusterPool query
851 for (size_t i = 0; i < nCommonColumnsInCluster; ++i)
852 commonColumnSet.emplace(commonColumns[i].fInputId);
853
854 // For each cluster, the "missing columns" are the union of the extraDstColumns and the common columns
855 // that are not present in the cluster. We generate zero pages for all of them.
856 missingColumns.resize(extraDstColumns.size());
857 for (size_t i = nCommonColumnsInCluster; i < commonColumns.size(); ++i)
858 missingColumns.push_back(commonColumns[i]);
859
862 sealedPageData, mergeData, *fPageAlloc);
864 *mergeData.fSrcDescriptor, mergeData.fDstDescriptor, mergeData);
865
866 // Commit the pages and the clusters
867 mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
868 mergeData.fDestination.CommitCluster(nClusterEntries);
869 mergeData.fNumDstEntries += nClusterEntries;
870
871 // Go to the next cluster
872 clusterId = mergeData.fSrcDescriptor->FindNextClusterId(clusterId);
873 }
874
875 // TODO(gparolini): when we get serious about huge file support (>~ 100GB) we might want to check here
876 // the size of the running page list and commit a cluster group when it exceeds some threshold,
877 // which would prevent the page list from getting too large.
878 // However, as of today, we aren't really handling such huge files, and even relatively big ones
879 // such as the CMS dataset have a page list size of about only 2 MB.
880 // So currently we simply merge all cluster groups into one.
881}
882
883static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
884{
887 return typeid(ROOT::Internal::RColumnIndex);
888
890 return typeid(ROOT::Internal::RColumnSwitch);
891
892 // clang-format off
893 if (fieldType == "bool") return typeid(bool);
894 if (fieldType == "std::byte") return typeid(std::byte);
895 if (fieldType == "char") return typeid(char);
896 if (fieldType == "std::int8_t") return typeid(std::int8_t);
897 if (fieldType == "std::uint8_t") return typeid(std::uint8_t);
898 if (fieldType == "std::int16_t") return typeid(std::int16_t);
899 if (fieldType == "std::uint16_t") return typeid(std::uint16_t);
900 if (fieldType == "std::int32_t") return typeid(std::int32_t);
901 if (fieldType == "std::uint32_t") return typeid(std::uint32_t);
902 if (fieldType == "std::int64_t") return typeid(std::int64_t);
903 if (fieldType == "std::uint64_t") return typeid(std::uint64_t);
904 if (fieldType == "float") return typeid(float);
905 if (fieldType == "double") return typeid(double);
906 // clang-format on
907
908 // if the type is not one of those above, we use the default in-memory type.
909 return std::nullopt;
910}
911
912// Given a field, fill `columns` and `mergeData.fColumnIdMap` with information about all columns belonging to it and its
913// subfields. `mergeData.fColumnIdMap` is used to map matching columns from different sources to the same output column
914// in the destination. We match columns by their "fully qualified name", which is the concatenation of their ancestor
915// fields' names and the column index. By this point, since we called `CompareDescriptorStructure()` earlier, we should
916// be guaranteed that two matching columns will have at least compatible representations. NOTE: srcFieldDesc and
917// dstFieldDesc may alias.
918static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const ROOT::RNTupleDescriptor &srcDesc,
920 const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix = "")
921{
922 std::string name = prefix + '.' + srcFieldDesc.GetFieldName();
923
924 const auto &columnIds = srcFieldDesc.GetLogicalColumnIds();
925 columns.reserve(columns.size() + columnIds.size());
926 // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with
927 // different column representations.
928 for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) {
929 // We don't want to try and merge alias columns
930 if (srcFieldDesc.IsProjectedField())
931 continue;
932
933 auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i];
934 const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId);
935
937 info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex());
938 info.fInputId = srcColumn.GetPhysicalId();
939 // NOTE(gparolini): the parent field is used when synthesizing zero pages, which happens in 2 situations:
940 // 1. when adding extra dst columns (in which case we need to synthesize zero pages for the incoming src), and
941 // 2. when merging a deferred column into an existing column (in which case we need to fill the "hole" with
942 // zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this function),
943 // but for the second case they're not, and we need to pick the source field because we will then check the
944 // column's *input* id inside fParentField to see if it's a suppressed column (see GenerateZeroPagesForColumns()).
945 info.fParentField = &srcFieldDesc;
946
947 if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) {
948 info.fOutputId = it->second.fColumnId;
949 info.fColumnType = it->second.fColumnType;
950 } else {
951 info.fOutputId = mergeData.fColumnIdMap.size();
952 // NOTE(gparolini): map the type of src column to the type of dst column.
953 // This mapping is only relevant for common columns and it's done to ensure we keep a consistent
954 // on-disk representation of the same column.
955 // This is also important to do for first source when it is used to generate the destination sink,
956 // because even in that case their column representations may differ.
957 // e.g. if the destination has a different compression than the source, an integer column might be
958 // zigzag-encoded in the source but not in the destination.
959 auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[i];
960 const auto &dstColumn = mergeData.fDstDescriptor.GetColumnDescriptor(dstColumnId);
961 info.fColumnType = dstColumn.GetType();
962 mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
963 }
964
965 if (mergeData.fMergeOpts.fExtraVerbose) {
966 Info("RNTuple::Merge",
967 "Adding column %s with log.id %" PRIu64 ", phys.id %" PRIu64 ", type %s "
968 " -> log.id %" PRIu64 ", type %s",
969 info.fColumnName.c_str(), srcColumnId, srcColumn.GetPhysicalId(),
972 }
973
974 // Since we disallow merging fields of different types, src and dstFieldDesc must have the same type name.
975 assert(srcFieldDesc.GetTypeName() == dstFieldDesc.GetTypeName());
976 info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), info.fColumnType);
977 columns.emplace_back(info);
978 }
979
980 const auto &srcChildrenIds = srcFieldDesc.GetLinkIds();
981 const auto &dstChildrenIds = dstFieldDesc.GetLinkIds();
982 assert(srcChildrenIds.size() == dstChildrenIds.size());
983 for (auto i = 0u; i < srcChildrenIds.size(); ++i) {
984 const auto &srcChild = srcDesc.GetFieldDescriptor(srcChildrenIds[i]);
985 const auto &dstChild = mergeData.fDstDescriptor.GetFieldDescriptor(dstChildrenIds[i]);
987 }
988}
989
990// Converts the fields comparison data to the corresponding column information.
991// While doing so, it collects such information in `mergeData.fColumnIdMap`, which is used by later calls to this
992// function to map already-seen column names to their chosen outputId, type and so on.
993static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc,
995{
996 RColumnInfoGroup res;
997 for (const ROOT::RFieldDescriptor *field : descCmp.fExtraDstFields) {
998 AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field, *field);
999 }
1000 for (const auto &[srcField, dstField] : descCmp.fCommonFields) {
1001 AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *srcField, *dstField);
1002 }
1003
1004 // Sort the commonColumns by ID so we can more easily tell how many common columns each cluster has
1005 // (since each cluster must contain all columns of the previous cluster plus potentially some new ones)
1006 std::sort(res.fCommonColumns.begin(), res.fCommonColumns.end(),
1007 [](const auto &a, const auto &b) { return a.fInputId < b.fInputId; });
1008
1009 return res;
1010}
1011
1013 ColumnIdMap_t &colIdMap, const std::string &prefix = "")
1014{
1015 std::string name = prefix + '.' + fieldDesc.GetFieldName();
1016 for (const auto &colId : fieldDesc.GetLogicalColumnIds()) {
1017 const auto &colDesc = desc.GetColumnDescriptor(colId);
1018 RColumnOutInfo info{};
1019 const auto colName = name + '.' + std::to_string(colDesc.GetIndex());
1020 info.fColumnId = colDesc.GetLogicalId();
1021 info.fColumnType = colDesc.GetType();
1023 }
1024
1025 for (const auto &subId : fieldDesc.GetLinkIds()) {
1026 const auto &subfield = desc.GetFieldDescriptor(subId);
1028 }
1029}
1030
1031RNTupleMerger::RNTupleMerger(std::unique_ptr<ROOT::Internal::RPagePersistentSink> destination,
1032 std::unique_ptr<ROOT::RNTupleModel> model)
1033 // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime
1034 // of the RNTuples we are going to handle (e.g. we can reset the arena at every source)
1035 : fDestination(std::move(destination)),
1036 fPageAlloc(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()),
1037 fModel(std::move(model))
1038{
1040
1041#ifdef R__USE_IMT
1044#endif
1045}
1046
1047RNTupleMerger::RNTupleMerger(std::unique_ptr<ROOT::Internal::RPagePersistentSink> destination)
1048 : RNTupleMerger(std::move(destination), nullptr)
1049{
1050}
1051
1053{
1055
1057
1058 // Set compression settings if unset and verify it's compatible with the sink
1059 {
1060 const auto dstCompSettings = fDestination->GetWriteOptions().GetCompression();
1061 if (!mergeOpts.fCompressionSettings) {
1062 mergeOpts.fCompressionSettings = dstCompSettings;
1063 } else if (*mergeOpts.fCompressionSettings != dstCompSettings) {
1064 return R__FAIL(std::string("The compression given to RNTupleMergeOptions is different from that of the "
1065 "sink! (opts: ") +
1066 std::to_string(*mergeOpts.fCompressionSettings) + ", sink: " + std::to_string(dstCompSettings) +
1067 ") This is currently unsupported.");
1068 }
1069 }
1070
1071 // we should have a model if and only if the destination is initialized.
1072 if (!!fModel != fDestination->IsInitialized()) {
1073 return R__FAIL(
1074 "passing an already-initialized destination to RNTupleMerger::Merge (i.e. trying to do incremental "
1075 "merging) can only be done by providing a valid ROOT::RNTupleModel when constructing the RNTupleMerger.");
1076 }
1077
1079 mergeData.fNumDstEntries = mergeData.fDestination.GetNEntries();
1080
1081 if (fModel) {
1082 // If this is an incremental merging, pre-fill the column id map with the existing destination ids.
1083 // Otherwise we would generate new output ids that may not match the ones in the destination!
1084 for (const auto &field : mergeData.fDstDescriptor.GetTopLevelFields()) {
1085 PrefillColumnMap(fDestination->GetDescriptor(), field, mergeData.fColumnIdMap);
1086 }
1087 }
1088
1089#define SKIP_OR_ABORT(errMsg) \
1090 do { \
1091 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
1092 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
1093 continue; \
1094 } else { \
1095 return R__FAIL(errMsg); \
1096 } \
1097 } while (0)
1098
1099 // Merge main loop
1100 for (RPageSource *source : sources) {
1101 source->Attach(RNTupleSerializer::EDescriptorDeserializeMode::kForWriting);
1102 auto srcDescriptor = source->GetSharedDescriptorGuard();
1103 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
1104
1105 // Create sink from the input model if not initialized
1106 if (!fModel) {
1107 fModel = fDestination->InitFromDescriptor(srcDescriptor.GetRef(), false /* copyClusters */);
1108 }
1109
1110 for (const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
1111 fDestination->UpdateExtraTypeInfo(extraTypeInfoDesc);
1112
1113 auto descCmpRes = CompareDescriptorStructure(mergeData.fDstDescriptor, srcDescriptor.GetRef());
1114 if (!descCmpRes) {
1115 SKIP_OR_ABORT(std::string("Source RNTuple has an incompatible schema with the destination:\n") +
1116 descCmpRes.GetError()->GetReport());
1117 }
1118 auto descCmp = descCmpRes.Unwrap();
1119
1120 // If the current source is missing some fields and we're not in Union mode, error
1121 // (if we are in Union mode, MergeSourceClusters will fill the missing fields with default values).
1122 if (mergeOpts.fMergingMode != ENTupleMergingMode::kUnion && !descCmp.fExtraDstFields.empty()) {
1123 std::string msg = "Source RNTuple is missing the following fields:";
1124 for (const auto *field : descCmp.fExtraDstFields) {
1125 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
1126 }
1128 }
1129
1130 // handle extra src fields
1131 if (descCmp.fExtraSrcFields.size()) {
1132 if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) {
1133 // late model extension for all fExtraSrcFields in Union mode
1134 ExtendDestinationModel(descCmp.fExtraSrcFields, *fModel, mergeData, descCmp.fCommonFields);
1135 } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) {
1136 // If the current source has extra fields and we're in Strict mode, error
1137 std::string msg = "Source RNTuple has extra fields that the destination RNTuple doesn't have:";
1138 for (const auto *field : descCmp.fExtraSrcFields) {
1139 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
1140 }
1142 }
1143 }
1144
1145 // handle extra dst fields & common fields
1147 MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
1148 } // end loop over sources
1149
1150 if (fDestination->GetNEntries() == 0)
1151 Warning("RNTuple::Merge", "Output RNTuple '%s' has no entries.", fDestination->GetNTupleName().c_str());
1152
1153 // Commit the output
1154 fDestination->CommitClusterGroup();
1155 fDestination->CommitDataset();
1156
1157 return RResult<void>::Success();
1158}
fBuffer
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
static ROOT::RResult< RDescriptorsComparison > CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
static std::optional< ENTupleMergeErrBehavior > ParseOptionErrBehavior(const TString &opts)
static void ExtendDestinationModel(std::span< const ROOT::RFieldDescriptor * > newFields, ROOT::RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span< const RColumnMergeInfo > columns, RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc, const ROOT::RNTupleDescriptor &srcDescriptor, const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
#define SKIP_OR_ABORT(errMsg)
static std::optional< T > ParseStringOption(const TString &opts, const char *pattern, std::initializer_list< std::pair< const char *, T > > validValues)
static bool IsSplitOrUnsplitVersionOf(ENTupleColumnType a, ENTupleColumnType b)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const ROOT::RFieldDescriptor &srcFieldDesc, const ROOT::RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
static std::optional< ENTupleMergingMode > ParseOptionMergingMode(const TString &opts)
static void PrefillColumnMap(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &fieldDesc, ColumnIdMap_t &colIdMap, const std::string &prefix="")
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const ROOT::RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static bool BeginsWithDelimitedWord(const TString &str, const char *word)
#define b(i)
Definition RSha256.hxx:100
#define a(i)
Definition RSha256.hxx:99
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:244
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
char name[80]
Definition TGX11.cxx:110
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Given a set of RPageSources merge them into an RPagePersistentSink, optionally changing their compres...
void MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc, std::span< const RColumnMergeInfo > commonColumns, const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
std::unique_ptr< ROOT::RNTupleModel > fModel
RNTupleMerger(std::unique_ptr< ROOT::Internal::RPagePersistentSink > destination, std::unique_ptr< ROOT::RNTupleModel > model)
Creates a RNTupleMerger with the given destination.
void MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span< const RColumnMergeInfo > commonColumns, std::span< const RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
std::unique_ptr< ROOT::Internal::RPagePersistentSink > fDestination
RResult< void > Merge(std::span< ROOT::Internal::RPageSource * > sources, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
A class to manage the asynchronous execution of work items.
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:150
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(ROOT::ENTupleColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(ROOT::ENTupleColumnType type)
If CppT == void, use the default C++ type for the given column type.
The in-memory representation of a 32bit or 64bit on-disk index column.
Holds the index and the tag of a kSwitch column.
A helper class for serializing and deserialization of the RNTuple binary format.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
Common functionality of an ntuple storage for both reading and writing.
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
std::deque< RSealedPage > SealedPageSequence_t
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Metadata for RNTuple clusters.
Metadata stored for every field of an RNTuple.
ROOT::ENTupleStructure GetStructure() const
ROOT::DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
The on-storage metadata of an RNTuple.
const RColumnDescriptor & GetColumnDescriptor(ROOT::DescriptorId_t columnId) const
const RFieldDescriptor & GetFieldDescriptor(ROOT::DescriptorId_t fieldId) const
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for storing RNTuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:65
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
const_iterator begin() const
const_iterator end() const
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:197
Collection abstract base class.
Definition TCollection.h:65
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:131
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
Mother of all ROOT objects.
Definition TObject.h:41
Basic string class.
Definition TString.h:139
@ kIgnoreCase
Definition TString.h:277
std::ostream & Info()
Definition hadd.cxx:171
Double_t x[n]
Definition legend1.C:17
Double_t ex[n]
Definition legend1.C:17
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::ostream & operator<<(std::ostream &os, const std::optional< ROOT::RColumnDescriptor::RValueRange > &x)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:595
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
ENTupleColumnType
const ROOT::RFieldDescriptor * fParentField
std::optional< std::type_index > fInMemoryType
const ROOT::RNTupleDescriptor * fSrcDescriptor
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
const ROOT::RNTupleDescriptor & fDstDescriptor
Set of merging options to pass to RNTupleMerger.
std::optional< std::uint32_t > fCompressionSettings
If fCompressionSettings is empty (the default), the merger will not change the compression of any of ...
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51
Parameters for the SealPage() method.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...
Definition Compression.h:58