Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleMerger.cxx
Go to the documentation of this file.
1/// \file RNTupleMerger.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>, Max Orok <maxwellorok@gmail.com>, Alaettin Serhan Mete <amete@anl.gov>,
4/// Giacomo Parolini <giacomo.parolini@cern.ch>
5/// \date 2020-07-08
6/// \warning This is part of the ROOT 7 prototype! It will
7/// change without notice. It might trigger earthquakes. Feedback is welcome!
8
9/*************************************************************************
10 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
11 * All rights reserved. *
12 * *
13 * For the licensing terms see $ROOTSYS/LICENSE. *
14 * For the list of contributors see $ROOTSYS/README/CREDITS. *
15 *************************************************************************/
16
17#include <ROOT/RError.hxx>
18#include <ROOT/RNTuple.hxx>
21#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleUtil.hxx>
24#include <ROOT/RPageStorage.hxx>
25#include <ROOT/RClusterPool.hxx>
27#include <ROOT/RNTupleZip.hxx>
29#include <TROOT.h>
30#include <TFileMergeInfo.h>
31#include <TError.h>
32#include <TFile.h>
33#include <TKey.h>
34
35#include <algorithm>
36#include <deque>
37#include <inttypes.h> // for PRIu64
38#include <unordered_map>
39#include <vector>
40
41using namespace ROOT::Experimental;
42using namespace ROOT::Experimental::Internal;
43
44// Entry point for TFileMerger. Internally calls RNTupleMerger::Merge().
46// IMPORTANT: this function must not throw, as it is used in exception-unsafe code (TFileMerger).
47try {
48 // Check the inputs
49 if (!inputs || inputs->GetEntries() < 3 || !mergeInfo) {
50 Error("RNTuple::Merge", "Invalid inputs.");
51 return -1;
52 }
53
54 // Parse the input parameters
56
57 // First entry is the RNTuple name
58 std::string ntupleName = std::string(itr()->GetName());
59
60 // Second entry is the output file
62 TFile *outFile = dynamic_cast<TFile *>(secondArg);
63 if (!outFile) {
64 Error("RNTuple::Merge", "Second input parameter should be a TFile, but it's a %s.", secondArg->ClassName());
65 return -1;
66 }
67
68 // Check if the output file already has a key with that name
69 TKey *outKey = outFile->FindKey(ntupleName.c_str());
70 ROOT::RNTuple *outNTuple = nullptr;
71 if (outKey) {
72 outNTuple = outKey->ReadObject<ROOT::RNTuple>();
73 if (!outNTuple) {
74 Error("RNTuple::Merge", "Output file already has key, but not of type RNTuple!");
75 return -1;
76 }
77 // In principle, we should already be working on the RNTuple object from the output file, but just continue with
78 // pointer we just got.
79 }
80
81 const bool defaultComp = mergeInfo->fOptions.Contains("default_compression");
82 const bool firstSrcComp = mergeInfo->fOptions.Contains("first_source_compression");
84 // this should never happen through hadd, but a user may call RNTuple::Merge() from custom code...
85 Warning(
86 "RNTuple::Merge",
87 "Passed both options \"default_compression\" and \"first_source_compression\": only the latter will apply.");
88 }
90 if (firstSrcComp) {
91 // user passed -ff or -fk: use the same compression as the first RNTuple we find in the sources.
92 // (do nothing here, the compression will be fetched below)
93 } else if (!defaultComp) {
94 // compression was explicitly passed by the user: use it.
95 compression = outFile->GetCompressionSettings();
96 } else {
97 // user passed no compression-related options: use default
99 Info("RNTuple::Merge", "Using the default compression: %d", compression);
100 }
101
102 // The remaining entries are the input files
103 std::vector<std::unique_ptr<RPageSourceFile>> sources;
104 std::vector<RPageSource *> sourcePtrs;
105
106 while (const auto &pitr = itr()) {
107 TFile *inFile = dynamic_cast<TFile *>(pitr);
108 ROOT::RNTuple *anchor = inFile ? inFile->Get<ROOT::RNTuple>(ntupleName.c_str()) : nullptr;
109 if (!anchor) {
110 Error("RNTuple::Merge", "Failed to retrieve RNTuple anchor named '%s' from file '%s'", ntupleName.c_str(),
111 inFile->GetName());
112 return -1;
113 }
114
117 // Get the compression of this RNTuple and use it as the output compression.
118 // We currently assume all column ranges have the same compression, so we just peek at the first one.
119 source->Attach();
120 auto descriptor = source->GetSharedDescriptorGuard();
121 auto clusterIter = descriptor->GetClusterIterable();
123 if (firstCluster == clusterIter.end()) {
124 Error("RNTuple::Merge",
125 "Asked to use the first source's compression as the output compression, but the "
126 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
127 "determined.",
128 inFile->GetName());
129 return -1;
130 }
131 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
133 if (firstColRange == colRangeIter.end()) {
134 Error("RNTuple::Merge",
135 "Asked to use the first source's compression as the output compression, but the "
136 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
137 "determined.",
138 inFile->GetName());
139 return -1;
140 }
141 compression = (*firstColRange).fCompressionSettings;
142 Info("RNTuple::Merge", "Using the first RNTuple's compression: %d", compression);
143 }
144 sources.push_back(std::move(source));
145 }
146
149 writeOpts.SetCompression(compression);
150 auto destination = std::make_unique<RPageSinkFile>(ntupleName, *outFile, writeOpts);
151
152 // If we already have an existing RNTuple, copy over its descriptor to support incremental merging
153 if (outNTuple) {
155 outSource->Attach();
156 auto desc = outSource->GetSharedDescriptorGuard();
157 destination->InitFromDescriptor(desc.GetRef());
158 }
159
160 // Interface conversion
161 sourcePtrs.reserve(sources.size());
162 for (const auto &s : sources) {
163 sourcePtrs.push_back(s.get());
164 }
165
166 // Now merge
169 mergerOpts.fCompressionSettings = compression;
170 merger.Merge(sourcePtrs, *destination, mergerOpts).ThrowOnError();
171
172 // Provide the caller with a merged anchor object (even though we've already
173 // written it).
174 *this = *outFile->Get<ROOT::RNTuple>(ntupleName.c_str());
175
176 return 0;
177} catch (const RException &ex) {
178 Error("RNTuple::Merge", "Exception thrown while merging: %s", ex.what());
179 return -1;
180}
181
182namespace {
183// Functor used to change the compression of a page to `options.fCompressionSettings`.
184struct RChangeCompressionFunc {
185 DescriptorId_t fOutputColumnId;
186
187 const RColumnElementBase &fSrcColElement;
188 const RColumnElementBase &fDstColElement;
189 const RNTupleMergeOptions &fMergeOptions;
190
191 RPageStorage::RSealedPage &fSealedPage;
192 RPageAllocator &fPageAlloc;
193 std::uint8_t *fBuffer;
194
195 void operator()() const
196 {
197 auto page = RPageSource::UnsealPage(fSealedPage, fSrcColElement, fOutputColumnId, fPageAlloc).Unwrap();
199 sealConf.fElement = &fDstColElement;
200 sealConf.fPage = &page;
201 sealConf.fBuffer = fBuffer;
202 sealConf.fCompressionSetting = fMergeOptions.fCompressionSettings;
203 sealConf.fWriteChecksum = fSealedPage.GetHasChecksum();
205 fSealedPage = refSealedPage;
206 }
207};
208
209struct RCommonField {
210 const RFieldDescriptor *fSrc;
211 const RFieldDescriptor *fDst;
212
213 RCommonField(const RFieldDescriptor *src, const RFieldDescriptor *dst) : fSrc(src), fDst(dst) {}
214};
215
216struct RDescriptorsComparison {
217 std::vector<const RFieldDescriptor *> fExtraDstFields;
218 std::vector<const RFieldDescriptor *> fExtraSrcFields;
219 std::vector<RCommonField> fCommonFields;
220};
221
222struct RColumnOutInfo {
223 DescriptorId_t fColumnId;
224 EColumnType fColumnType;
225};
226
227// { fully.qualified.fieldName.colInputId => colOutputInfo }
228using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
229
230struct RColumnInfoGroup {
231 std::vector<RColumnMergeInfo> fExtraDstColumns;
232 std::vector<RColumnMergeInfo> fCommonColumns;
233};
234
235} // namespace
236
237// These structs cannot be in the anon namespace becase they're used in RNTupleMerger's private interface.
240 // This column name is built as a dot-separated concatenation of the ancestry of
241 // the columns' parent fields' names plus the index of the column itself.
242 // e.g. "Muon.pt.x._0"
243 std::string fColumnName;
247 // If nullopt, use the default in-memory type
248 std::optional<std::type_index> fInMemoryType;
250};
251
252// Data related to a single call of RNTupleMerger::Merge()
270
272 // We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are
273 // never invalidated.
274 std::deque<RPageStorage::SealedPageSequence_t> fPagesV;
275 std::vector<RPageStorage::RSealedPageGroup> fGroups;
276 std::vector<std::unique_ptr<std::uint8_t[]>> fBuffers;
277};
278
279std::ostream &operator<<(std::ostream &os, const std::optional<RColumnDescriptor::RValueRange> &x)
280{
281 if (x) {
282 os << '(' << x->fMin << ", " << x->fMax << ')';
283 } else {
284 os << "(null)";
285 }
286 return os;
287}
288
289} // namespace ROOT::Experimental::Internal
290
292{
293 // clang-format off
294 if (a == EColumnType::kInt16 && b == EColumnType::kSplitInt16) return true;
295 if (a == EColumnType::kSplitInt16 && b == EColumnType::kInt16) return true;
296 if (a == EColumnType::kInt32 && b == EColumnType::kSplitInt32) return true;
297 if (a == EColumnType::kSplitInt32 && b == EColumnType::kInt32) return true;
298 if (a == EColumnType::kInt64 && b == EColumnType::kSplitInt64) return true;
299 if (a == EColumnType::kSplitInt64 && b == EColumnType::kInt64) return true;
300 if (a == EColumnType::kUInt16 && b == EColumnType::kSplitUInt16) return true;
301 if (a == EColumnType::kSplitUInt16 && b == EColumnType::kUInt16) return true;
302 if (a == EColumnType::kUInt32 && b == EColumnType::kSplitUInt32) return true;
303 if (a == EColumnType::kSplitUInt32 && b == EColumnType::kUInt32) return true;
304 if (a == EColumnType::kUInt64 && b == EColumnType::kSplitUInt64) return true;
305 if (a == EColumnType::kSplitUInt64 && b == EColumnType::kUInt64) return true;
306 if (a == EColumnType::kIndex32 && b == EColumnType::kSplitIndex32) return true;
307 if (a == EColumnType::kSplitIndex32 && b == EColumnType::kIndex32) return true;
308 if (a == EColumnType::kIndex64 && b == EColumnType::kSplitIndex64) return true;
309 if (a == EColumnType::kSplitIndex64 && b == EColumnType::kIndex64) return true;
310 if (a == EColumnType::kReal32 && b == EColumnType::kSplitReal32) return true;
311 if (a == EColumnType::kSplitReal32 && b == EColumnType::kReal32) return true;
312 if (a == EColumnType::kReal64 && b == EColumnType::kSplitReal64) return true;
313 if (a == EColumnType::kSplitReal64 && b == EColumnType::kReal64) return true;
314 // clang-format on
315 return false;
316}
317
318/// Compares the top level fields of `dst` and `src` and determines whether they can be merged or not.
319/// In addition, returns the differences between `dst` and `src`'s structures
322{
323 // Cases:
324 // 1. dst == src
325 // 2. dst has fields that src hasn't
326 // 3. src has fields that dst hasn't
327 // 4. dst and src have fields that differ (compatible or incompatible)
328
329 std::vector<std::string> errors;
330 RDescriptorsComparison res;
331
332 std::vector<RCommonField> commonFields;
333
334 for (const auto &dstField : dst.GetTopLevelFields()) {
335 const auto srcFieldId = src.FindFieldId(dstField.GetFieldName());
337 const auto &srcField = src.GetFieldDescriptor(srcFieldId);
338 commonFields.push_back({&srcField, &dstField});
339 } else {
340 res.fExtraDstFields.emplace_back(&dstField);
341 }
342 }
343 for (const auto &srcField : src.GetTopLevelFields()) {
344 const auto dstFieldId = dst.FindFieldId(srcField.GetFieldName());
346 res.fExtraSrcFields.push_back(&srcField);
347 }
348
349 // Check compatibility of common fields
350 for (const auto &field : commonFields) {
351 // NOTE: field.fSrc and field.fDst have the same name by construction
352 const auto &fieldName = field.fSrc->GetFieldName();
353
354 // Require that fields are both projected or both not projected
355 bool projCompatible = field.fSrc->IsProjectedField() == field.fDst->IsProjectedField();
356 if (!projCompatible) {
357 std::stringstream ss;
358 ss << "Field `" << fieldName << "` is incompatible with previously-seen field with that name because the "
359 << (field.fSrc->IsProjectedField() ? "new" : "old") << " one is projected and the other isn't";
360 errors.push_back(ss.str());
361 } else if (field.fSrc->IsProjectedField()) {
362 // if both fields are projected, verify that they point to the same real field
363 const auto srcName = src.GetQualifiedFieldName(field.fSrc->GetProjectionSourceId());
364 const auto dstName = dst.GetQualifiedFieldName(field.fDst->GetProjectionSourceId());
365 if (srcName != dstName) {
366 std::stringstream ss;
367 ss << "Field `" << fieldName
368 << "` is projected to a different field than a previously-seen field with the same name (old: "
369 << dstName << ", new: " << srcName << ")";
370 errors.push_back(ss.str());
371 }
372 }
373
374 // Require that fields types match
375 // TODO(gparolini): allow non-identical but compatible types
376 const auto &srcTyName = field.fSrc->GetTypeName();
377 const auto &dstTyName = field.fDst->GetTypeName();
378 if (srcTyName != dstTyName) {
379 std::stringstream ss;
380 ss << "Field `" << fieldName
381 << "` has a type incompatible with a previously-seen field with the same name: (old: " << dstTyName
382 << ", new: " << srcTyName << ")";
383 errors.push_back(ss.str());
384 }
385
386 // Require that type checksums match
387 const auto srcTyChk = field.fSrc->GetTypeChecksum();
388 const auto dstTyChk = field.fDst->GetTypeChecksum();
389 if (srcTyChk && dstTyChk && *srcTyChk != *dstTyChk) {
390 std::stringstream ss;
391 ss << "Field `" << field.fSrc->GetFieldName()
392 << "` has a different type checksum than previously-seen field with the same name";
393 errors.push_back(ss.str());
394 }
395
396 // Require that type versions match
397 const auto srcTyVer = field.fSrc->GetTypeVersion();
398 const auto dstTyVer = field.fDst->GetTypeVersion();
399 if (srcTyVer != dstTyVer) {
400 std::stringstream ss;
401 ss << "Field `" << field.fSrc->GetFieldName()
402 << "` has a different type version than previously-seen field with the same name (old: " << dstTyVer
403 << ", new: " << srcTyVer << ")";
404 errors.push_back(ss.str());
405 }
406
407 // Require that column representations match
408 const auto srcNCols = field.fSrc->GetLogicalColumnIds().size();
409 const auto dstNCols = field.fDst->GetLogicalColumnIds().size();
410 if (srcNCols != dstNCols) {
411 std::stringstream ss;
412 ss << "Field `" << field.fSrc->GetFieldName()
413 << "` has a different number of columns than previously-seen field with the same name (old: " << dstNCols
414 << ", new: " << srcNCols << ")";
415 errors.push_back(ss.str());
416 } else {
417 for (auto i = 0u; i < srcNCols; ++i) {
418 const auto srcColId = field.fSrc->GetLogicalColumnIds()[i];
419 const auto dstColId = field.fDst->GetLogicalColumnIds()[i];
420 const auto &srcCol = src.GetColumnDescriptor(srcColId);
421 const auto &dstCol = dst.GetColumnDescriptor(dstColId);
422 // TODO(gparolini): currently we refuse to merge columns of different types unless they are Split/non-Split
423 // version of the same type, because we know how to treat that specific case. We should also properly handle
424 // different but compatible types.
425 if (srcCol.GetType() != dstCol.GetType() &&
426 !IsSplitOrUnsplitVersionOf(srcCol.GetType(), dstCol.GetType())) {
427 std::stringstream ss;
428 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
429 << "` has a different column type of the same column on the previously-seen field with the same name "
430 "(old: "
432 << ", new: " << RColumnElementBase::GetColumnTypeName(dstCol.GetType()) << ")";
433 errors.push_back(ss.str());
434 }
435 if (srcCol.GetBitsOnStorage() != dstCol.GetBitsOnStorage()) {
436 std::stringstream ss;
437 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
438 << "` has a different number of bits of the same column on the previously-seen field with the same "
439 "name "
440 "(old: "
441 << srcCol.GetBitsOnStorage() << ", new: " << dstCol.GetBitsOnStorage() << ")";
442 errors.push_back(ss.str());
443 }
444 if (srcCol.GetValueRange() != dstCol.GetValueRange()) {
445 std::stringstream ss;
446 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
447 << "` has a different value range of the same column on the previously-seen field with the same name "
448 "(old: "
449 << srcCol.GetValueRange() << ", new: " << dstCol.GetValueRange() << ")";
450 errors.push_back(ss.str());
451 }
452 if (srcCol.GetRepresentationIndex() > 0) {
453 std::stringstream ss;
454 ss << i << "-th column of field `" << field.fSrc->GetFieldName()
455 << "` has a representation index higher than 0. This is not supported yet by the merger.";
456 errors.push_back(ss.str());
457 }
458 }
459 }
460 }
461
462 std::string errMsg;
463 for (const auto &err : errors)
464 errMsg += std::string("\n * ") + err;
465
466 if (!errMsg.empty())
467 errMsg = errMsg.substr(1); // strip initial newline
468
469 if (errMsg.length())
470 return R__FAIL(errMsg);
471
472 res.fCommonFields.reserve(commonFields.size());
473 for (const auto &[srcField, dstField] : commonFields) {
474 res.fCommonFields.emplace_back(srcField, dstField);
475 }
476
477 // TODO(gparolini): we should exhaustively check the field tree rather than just the top level fields,
478 // in case the user forgets to change the version number on one field.
479
480 return RResult(res);
481}
482
483// Applies late model extension to `destination`, adding all `newFields` to it.
484static void ExtendDestinationModel(std::span<const RFieldDescriptor *> newFields, RNTupleModel &dstModel,
485 RNTupleMergeData &mergeData, std::vector<RCommonField> &commonFields)
486{
487 assert(newFields.size() > 0); // no point in calling this with 0 new cols
488
489 dstModel.Unfreeze();
491
492 std::string msg = "destination doesn't contain field";
493 if (newFields.size() > 1)
494 msg += 's';
495 msg += ' ';
496 msg += std::accumulate(newFields.begin(), newFields.end(), std::string{}, [](const auto &acc, const auto *field) {
497 return acc + (acc.length() ? ", " : "") + '`' + field->GetFieldName() + '`';
498 });
499 Info("RNTuple::Merge", "%s: adding %s to the destination model (entry #%" PRIu64 ").", msg.c_str(),
500 (newFields.size() > 1 ? "them" : "it"), mergeData.fNumDstEntries);
501
502 changeset.fAddedFields.reserve(newFields.size());
503 for (const auto *fieldDesc : newFields) {
504 auto field = fieldDesc->CreateField(*mergeData.fSrcDescriptor);
505 if (fieldDesc->IsProjectedField())
506 changeset.fAddedProjectedFields.emplace_back(field.get());
507 else
508 changeset.fAddedFields.emplace_back(field.get());
509 changeset.fModel.AddField(std::move(field));
510 }
511 dstModel.Freeze();
512 mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
513
514 commonFields.reserve(commonFields.size() + newFields.size());
515 for (const auto *field : newFields) {
516 const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName());
517 const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId);
518 commonFields.emplace_back(field, &newFieldInDst);
519 }
520}
521
522// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
523// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
525 std::span<RColumnMergeInfo> commonColumns,
528{
529 assert(commonColumns.size() == commonColumnSet.size());
530 if (commonColumns.empty())
531 return;
532
534 // we expect the cluster pool to contain the requested set of columns, since they were
535 // validated by CompareDescriptorStructure().
537
538 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
539
540 for (const auto &column : commonColumns) {
541 const auto &columnId = column.fInputId;
542 R__ASSERT(clusterDesc.ContainsColumn(columnId));
543
544 const auto &columnDesc = mergeData.fSrcDescriptor->GetColumnDescriptor(columnId);
545 const auto srcColElement = column.fInMemoryType
546 ? GenerateColumnElement(*column.fInMemoryType, columnDesc.GetType())
548 const auto dstColElement = column.fInMemoryType ? GenerateColumnElement(*column.fInMemoryType, column.fColumnType)
549 : RColumnElementBase::Generate(column.fColumnType);
550
551 // Now get the pages for this column in this cluster
552 const auto &pages = clusterDesc.GetPageRange(columnId);
553
555 sealedPages.resize(pages.fPageInfos.size());
556
557 // Each column range potentially has a distinct compression settings
558 const auto colRangeCompressionSettings = clusterDesc.GetColumnRange(columnId).fCompressionSettings;
559 const bool needsCompressionChange = colRangeCompressionSettings != mergeData.fMergeOpts.fCompressionSettings;
560 if (needsCompressionChange && mergeData.fMergeOpts.fExtraVerbose)
561 Info("RNTuple::Merge", "Column %s: changing source compression from %d to %d", column.fColumnName.c_str(),
562 colRangeCompressionSettings, mergeData.fMergeOpts.fCompressionSettings);
563
564 size_t pageBufferBaseIdx = sealedPageData.fBuffers.size();
565 // If the column range already has the right compression we don't need to allocate any new buffer, so we don't
566 // bother reserving memory for them.
568 sealedPageData.fBuffers.resize(sealedPageData.fBuffers.size() + pages.fPageInfos.size());
569
570 // Loop over the pages
571 std::uint64_t pageIdx = 0;
572 for (const auto &pageInfo : pages.fPageInfos) {
573 assert(pageIdx < sealedPages.size());
574 assert(sealedPageData.fBuffers.size() == 0 || pageIdx < sealedPageData.fBuffers.size());
575
577 auto onDiskPage = cluster->GetOnDiskPage(key);
578
579 const auto checksumSize = pageInfo.fHasChecksum * RPageStorage::kNBytesPageChecksum;
581 sealedPage.SetNElements(pageInfo.fNElements);
582 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
583 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + checksumSize);
584 sealedPage.SetBuffer(onDiskPage->GetAddress());
585 // TODO(gparolini): more graceful error handling (skip the page?)
586 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
587 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
588
590 const auto uncompressedSize = srcColElement->GetSize() * sealedPage.GetNElements();
591 auto &buffer = sealedPageData.fBuffers[pageBufferBaseIdx + pageIdx];
592 buffer = std::make_unique<std::uint8_t[]>(uncompressedSize + checksumSize);
593 RChangeCompressionFunc compressTask{
594 column.fOutputId, *srcColElement, *dstColElement, mergeData.fMergeOpts,
595 sealedPage, *fPageAlloc, buffer.get(),
596 };
597
598 if (fTaskGroup)
599 fTaskGroup->Run(compressTask);
600 else
601 compressTask();
602 }
603
604 ++pageIdx;
605
606 } // end of loop over pages
607
608 if (fTaskGroup)
609 fTaskGroup->Wait();
610
611 sealedPageData.fPagesV.push_back(std::move(sealedPages));
612 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
613 sealedPageData.fPagesV.back().cend());
614 } // end loop over common columns
615}
616
617// Generates default values for columns that are not present in the current source RNTuple
618// but are present in the destination's schema.
619static void GenerateExtraDstColumns(size_t nClusterEntries, std::span<RColumnMergeInfo> extraDstColumns,
621{
622 for (const auto &column : extraDstColumns) {
623 const auto &columnId = column.fInputId;
624 const auto &columnDesc = mergeData.fDstDescriptor.GetColumnDescriptor(columnId);
625 const RFieldDescriptor *field = column.fParentField;
626
627 // Skip all auxiliary columns
628 if (field->GetLogicalColumnIds()[0] != columnId)
629 continue;
630
631 // Check if this column is a child of a Collection or a Variant. If so, it has no data
632 // and can be skipped.
633 bool skipColumn = false;
634 auto nRepetitions = std::max<std::uint64_t>(field->GetNRepetitions(), 1);
635 for (auto parentId = field->GetParentId(); parentId != kInvalidDescriptorId;) {
636 const RFieldDescriptor &parent = mergeData.fSrcDescriptor->GetFieldDescriptor(parentId);
637 if (parent.GetStructure() == ENTupleStructure::kCollection ||
638 parent.GetStructure() == ENTupleStructure::kVariant) {
639 skipColumn = true;
640 break;
641 }
642 nRepetitions *= std::max<std::uint64_t>(parent.GetNRepetitions(), 1);
643 parentId = parent.GetParentId();
644 }
645 if (skipColumn)
646 continue;
647
648 const auto structure = field->GetStructure();
649
650 if (structure == ENTupleStructure::kStreamer) {
651 Fatal(
652 "RNTuple::Merge",
653 "Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
654 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.",
655 field->GetFieldName().c_str());
656 continue;
657 }
658
659 // NOTE: we cannot have a Record here because it has no associated columns.
660 R__ASSERT(structure == ENTupleStructure::kCollection || structure == ENTupleStructure::kVariant ||
661 structure == ENTupleStructure::kLeaf);
662
663 const auto colElement = RColumnElementBase::Generate(columnDesc.GetType());
665 const auto bytesOnStorage = colElement->GetPackedSize(nElements);
666 constexpr auto kPageSizeLimit = 256 * 1024;
667 // TODO(gparolini): consider coalescing the last page if its size is less than some threshold
669 for (size_t i = 0; i < nPages; ++i) {
670 const auto pageSize = (i < nPages - 1) ? kPageSizeLimit : bytesOnStorage - kPageSizeLimit * (nPages - 1);
672 const auto bufSize = pageSize + checksumSize;
673 auto &buffer = sealedPageData.fBuffers.emplace_back(new unsigned char[bufSize]);
674
675 RPageStorage::RSealedPage sealedPage{buffer.get(), bufSize, static_cast<std::uint32_t>(nElements), true};
676 memset(buffer.get(), 0, pageSize);
677 sealedPage.ChecksumIfEnabled();
678
679 sealedPageData.fPagesV.push_back({sealedPage});
680 }
681
682 sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
683 sealedPageData.fPagesV.back().cend());
684 }
685}
686
687// Iterates over all clusters of `source` and merges their pages into `destination`.
688// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
689// the destination's schemas.
690// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
691// compression is unspecified or matches the original compression settings.
693 std::span<RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
694{
696
697 // Convert columns to a ColumnSet for the ClusterPool query
699 commonColumnSet.reserve(commonColumns.size());
700 for (const auto &column : commonColumns)
701 commonColumnSet.emplace(column.fInputId);
702
704 extraDstColumnSet.reserve(extraDstColumns.size());
705 for (const auto &column : extraDstColumns)
706 extraDstColumnSet.emplace(column.fInputId);
707
708 // Loop over all clusters in this file.
709 // descriptor->GetClusterIterable() doesn't guarantee any specific order, so we explicitly
710 // request the first cluster.
711 DescriptorId_t clusterId = mergeData.fSrcDescriptor->FindClusterId(0, 0);
713 const auto &clusterDesc = mergeData.fSrcDescriptor->GetClusterDescriptor(clusterId);
714 const auto nClusterEntries = clusterDesc.GetNEntries();
716
718
719 if (!commonColumnSet.empty()) {
721 }
722
723 if (!extraDstColumnSet.empty()) {
725 }
726
727 // Commit the pages and the clusters
728 mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
729 mergeData.fDestination.CommitCluster(nClusterEntries);
730 mergeData.fNumDstEntries += nClusterEntries;
731
732 // Go to the next cluster
733 clusterId = mergeData.fSrcDescriptor->FindNextClusterId(clusterId);
734 }
735
736 // TODO(gparolini): when we get serious about huge file support (>~ 100GB) we might want to check here
737 // the size of the running page list and commit a cluster group when it exceeds some threshold,
738 // which would prevent the page list from getting too large.
739 // However, as of today, we aren't really handling such huge files, and even relatively big ones
740 // such as the CMS dataset have a page list size of about only 2 MB.
741 // So currently we simply merge all cluster groups into one.
742}
743
744static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, EColumnType onDiskType)
745{
746 if (onDiskType == EColumnType::kIndex32 || onDiskType == EColumnType::kSplitIndex32 ||
747 onDiskType == EColumnType::kIndex64 || onDiskType == EColumnType::kSplitIndex64)
748 return typeid(ClusterSize_t);
749
750 if (onDiskType == EColumnType::kSwitch)
752
753 if (fieldType == "bool") {
754 return typeid(bool);
755 } else if (fieldType == "std::byte") {
756 return typeid(std::byte);
757 } else if (fieldType == "char") {
758 return typeid(char);
759 } else if (fieldType == "std::int8_t") {
760 return typeid(std::int8_t);
761 } else if (fieldType == "std::uint8_t") {
762 return typeid(std::uint8_t);
763 } else if (fieldType == "std::int16_t") {
764 return typeid(std::int16_t);
765 } else if (fieldType == "std::uint16_t") {
766 return typeid(std::uint16_t);
767 } else if (fieldType == "std::int32_t") {
768 return typeid(std::int32_t);
769 } else if (fieldType == "std::uint32_t") {
770 return typeid(std::uint32_t);
771 } else if (fieldType == "std::int64_t") {
772 return typeid(std::int64_t);
773 } else if (fieldType == "std::uint64_t") {
774 return typeid(std::uint64_t);
775 } else if (fieldType == "float") {
776 return typeid(float);
777 } else if (fieldType == "double") {
778 return typeid(double);
779 }
780
781 // if the type is not one of those above, we use the default in-memory type.
782 return std::nullopt;
783}
784
785// Given a field, fill `columns` and `colIdMap` with information about all columns belonging to it and its subfields.
786// `colIdMap` is used to map matching columns from different sources to the same output column in the destination.
787// We match columns by their "fully qualified name", which is the concatenation of their ancestor fields' names
788// and the column index.
789// By this point, since we called `CompareDescriptorStructure()` earlier, we should be guaranteed that two matching
790// columns will have at least compatible representations.
791// NOTE: srcFieldDesc and dstFieldDesc may alias.
792static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const RNTupleDescriptor &srcDesc,
794 const RFieldDescriptor &dstFieldDesc, const std::string &prefix = "")
795{
796 std::string name = prefix + '.' + srcFieldDesc.GetFieldName();
797
798 const auto &columnIds = srcFieldDesc.GetLogicalColumnIds();
799 columns.reserve(columns.size() + columnIds.size());
800 // NOTE: here we can match the src and dst columns by column index because we forbid merging fields with
801 // different column representations.
802 for (auto i = 0u; i < srcFieldDesc.GetLogicalColumnIds().size(); ++i) {
803 // We don't want to try and merge alias columns
804 if (srcFieldDesc.IsProjectedField())
805 continue;
806
807 auto srcColumnId = srcFieldDesc.GetLogicalColumnIds()[i];
808 const auto &srcColumn = srcDesc.GetColumnDescriptor(srcColumnId);
810 info.fColumnName = name + '.' + std::to_string(srcColumn.GetIndex());
811 info.fInputId = srcColumn.GetPhysicalId();
812 // Since the parent field is only relevant for extra dst columns, the choice of src or dstFieldDesc as a parent
813 // is arbitrary (they're the same field).
814 info.fParentField = &dstFieldDesc;
815
816 if (auto it = mergeData.fColumnIdMap.find(info.fColumnName); it != mergeData.fColumnIdMap.end()) {
817 info.fOutputId = it->second.fColumnId;
818 info.fColumnType = it->second.fColumnType;
819 } else {
820 info.fOutputId = mergeData.fColumnIdMap.size();
821 // NOTE(gparolini): map the type of src column to the type of dst column.
822 // This mapping is only relevant for common columns and it's done to ensure we keep a consistent
823 // on-disk representation of the same column.
824 // This is also important to do for first source when it is used to generate the destination sink,
825 // because even in that case their column representations may differ.
826 // e.g. if the destination has a different compression than the source, an integer column might be
827 // zigzag-encoded in the source but not in the destination.
828 auto dstColumnId = dstFieldDesc.GetLogicalColumnIds()[i];
829 const auto &dstColumn = mergeData.fDstDescriptor.GetColumnDescriptor(dstColumnId);
830 info.fColumnType = dstColumn.GetType();
831 mergeData.fColumnIdMap[info.fColumnName] = {info.fOutputId, info.fColumnType};
832 }
833
834 if (mergeData.fMergeOpts.fExtraVerbose) {
835 Info("RNTuple::Merge",
836 "Adding column %s with log.id %" PRIu64 ", phys.id %" PRIu64 ", type %s "
837 " -> log.id %" PRIu64 ", type %s",
838 info.fColumnName.c_str(), srcColumnId, srcColumn.GetPhysicalId(),
841 }
842
843 // Since we disallow merging fields of different types, src and dstFieldDesc must have the same type name.
844 assert(srcFieldDesc.GetTypeName() == dstFieldDesc.GetTypeName());
845 info.fInMemoryType = ColumnInMemoryType(srcFieldDesc.GetTypeName(), info.fColumnType);
846 columns.emplace_back(info);
847 }
848
849 const auto &srcChildrenIds = srcFieldDesc.GetLinkIds();
850 const auto &dstChildrenIds = dstFieldDesc.GetLinkIds();
851 assert(srcChildrenIds.size() == dstChildrenIds.size());
852 for (auto i = 0u; i < srcChildrenIds.size(); ++i) {
853 const auto &srcChild = srcDesc.GetFieldDescriptor(srcChildrenIds[i]);
854 const auto &dstChild = mergeData.fDstDescriptor.GetFieldDescriptor(dstChildrenIds[i]);
856 }
857}
858
859// Converts the fields comparison data to the corresponding column information.
860// While doing so, it collects such information in `colIdMap`, which is used by later calls to this function
861// to map already-seen column names to their chosen outputId, type and so on.
862static RColumnInfoGroup
864{
865 RColumnInfoGroup res;
866 for (const RFieldDescriptor *field : descCmp.fExtraDstFields) {
867 AddColumnsFromField(res.fExtraDstColumns, mergeData.fDstDescriptor, mergeData, *field, *field);
868 }
869 for (const auto &[srcField, dstField] : descCmp.fCommonFields) {
870 AddColumnsFromField(res.fCommonColumns, srcDesc, mergeData, *srcField, *dstField);
871 }
872 return res;
873}
874
876 // TODO(gparolini): consider using an arena allocator instead, since we know the precise lifetime
877 // of the RNTuples we are going to handle (e.g. we can reset the arena at every source)
878 : fPageAlloc(std::make_unique<RPageAllocatorHeap>())
879{
880#ifdef R__USE_IMT
883#endif
884}
885
888{
890 {
891 const auto dstCompSettings = destination.GetWriteOptions().GetCompression();
892 if (mergeOpts.fCompressionSettings == kUnknownCompressionSettings) {
893 mergeOpts.fCompressionSettings = dstCompSettings;
894 } else if (mergeOpts.fCompressionSettings != dstCompSettings) {
895 return R__FAIL(std::string("The compression given to RNTupleMergeOptions is different from that of the "
896 "sink! (opts: ") +
897 std::to_string(mergeOpts.fCompressionSettings) + ", sink: " + std::to_string(dstCompSettings) +
898 ") This is currently unsupported.");
899 }
900 }
901
903
904 std::unique_ptr<RNTupleModel> model; // used to initialize the schema of the output RNTuple
905
906#define SKIP_OR_ABORT(errMsg) \
907 do { \
908 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
909 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
910 continue; \
911 } else { \
912 return R__FAIL(errMsg); \
913 } \
914 } while (0)
915
916 // Merge main loop
917 for (RPageSource *source : sources) {
918 source->Attach();
919 auto srcDescriptor = source->GetSharedDescriptorGuard();
920 mergeData.fSrcDescriptor = &srcDescriptor.GetRef();
921
922 // Create sink from the input model if not initialized
923 if (!destination.IsInitialized()) {
925 opts.fReconstructProjections = true;
926 model = srcDescriptor->CreateModel(opts);
927 destination.Init(*model);
928 }
929
930 for (const auto &extraTypeInfoDesc : srcDescriptor->GetExtraTypeInfoIterable())
931 destination.UpdateExtraTypeInfo(extraTypeInfoDesc);
932
933 auto descCmpRes = CompareDescriptorStructure(mergeData.fDstDescriptor, srcDescriptor.GetRef());
934 if (!descCmpRes) {
936 std::string("Source RNTuple will be skipped due to incompatible schema with the destination:\n") +
937 descCmpRes.GetError()->GetReport());
938 }
939 auto descCmp = descCmpRes.Unwrap();
940
941 // If the current source is missing some fields and we're not in Union mode, error
942 // (if we are in Union mode, MergeSourceClusters will fill the missing fields with default values).
943 if (mergeOpts.fMergingMode != ENTupleMergingMode::kUnion && !descCmp.fExtraDstFields.empty()) {
944 std::string msg = "Source RNTuple is missing the following fields:";
945 for (const auto *field : descCmp.fExtraDstFields) {
946 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
947 }
949 }
950
951 // handle extra src fields
952 if (descCmp.fExtraSrcFields.size()) {
953 if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) {
954 // late model extension for all fExtraSrcFields in Union mode
955 ExtendDestinationModel(descCmp.fExtraSrcFields, *model, mergeData, descCmp.fCommonFields);
956 } else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) {
957 // If the current source has extra fields and we're in Strict mode, error
958 std::string msg = "Source RNTuple has extra fields that the destination RNTuple doesn't have:";
959 for (const auto *field : descCmp.fExtraSrcFields) {
960 msg += "\n " + field->GetFieldName() + " : " + field->GetTypeName();
961 }
963 }
964 }
965
966 // handle extra dst fields & common fields
968 MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
969 } // end loop over sources
970
971 // Commit the output
972 destination.CommitClusterGroup();
973 destination.CommitDataset();
974
975 return RResult<void>::Success();
976}
fBuffer
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
static void GenerateExtraDstColumns(size_t nClusterEntries, std::span< RColumnMergeInfo > extraDstColumns, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
static void ExtendDestinationModel(std::span< const RFieldDescriptor * > newFields, RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static RResult< RDescriptorsComparison > CompareDescriptorStructure(const RNTupleDescriptor &dst, const RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
#define SKIP_OR_ABORT(errMsg)
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static bool IsSplitOrUnsplitVersionOf(EColumnType a, EColumnType b)
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, EColumnType onDiskType)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const RFieldDescriptor &srcFieldDesc, const RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
ROOT::Experimental::RResult< T > RResult
#define b(i)
Definition RSha256.hxx:100
#define a(i)
Definition RSha256.hxx:99
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:218
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:244
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
char name[80]
Definition TGX11.cxx:110
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(EColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
RResult< void > Merge(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
void MergeSourceClusters(RPageSource &source, std::span< RColumnMergeInfo > commonColumns, std::span< RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
void MergeCommonColumns(RClusterPool &clusterPool, DescriptorId_t clusterId, std::span< RColumnMergeInfo > commonColumns, const RCluster::ColumnSet_t &commonColumnSet, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
std::deque< RSealedPage > SealedPageSequence_t
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
Holds the index and the tag of a kSwitch column.
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Meta-data stored for every field of an ntuple.
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for storing ntuples.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:194
A class to manage the asynchronous execution of work items.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
const_iterator begin() const
const_iterator end() const
Collection abstract base class.
Definition TCollection.h:65
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
Mother of all ROOT objects.
Definition TObject.h:41
Double_t x[n]
Definition legend1.C:17
Double_t ex[n]
Definition legend1.C:17
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, EColumnType onDiskType)
std::ostream & operator<<(std::ostream &os, const std::optional< RColumnDescriptor::RValueRange > &x)
RClusterSize ClusterSize_t
constexpr int kUnknownCompressionSettings
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:595
std::optional< std::type_index > fInMemoryType
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
int fCompressionSettings
If fCompressionSettings == kUnknownCompressionSettings (the default), the merger will not change the ...
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
A sealed page contains the bytes of a page as written to storage (packed & compressed).
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...
Definition Compression.h:58