38#include <unordered_map>
50 Error(
"RNTuple::Merge",
"Invalid inputs.");
64 Error(
"RNTuple::Merge",
"Second input parameter should be a TFile, but it's a %s.",
secondArg->ClassName());
74 Error(
"RNTuple::Merge",
"Output file already has key, but not of type RNTuple!");
87 "Passed both options \"default_compression\" and \"first_source_compression\": only the latter will apply.");
99 Info(
"RNTuple::Merge",
"Using the default compression: %d",
compression);
103 std::vector<std::unique_ptr<RPageSourceFile>>
sources;
106 while (
const auto &
pitr =
itr()) {
110 Error(
"RNTuple::Merge",
"Failed to retrieve RNTuple anchor named '%s' from file '%s'",
ntupleName.c_str(),
124 Error(
"RNTuple::Merge",
125 "Asked to use the first source's compression as the output compression, but the "
126 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
131 auto colRangeIter = (*firstCluster).GetColumnRangeIterable();
134 Error(
"RNTuple::Merge",
135 "Asked to use the first source's compression as the output compression, but the "
136 "first source (file '%s') has an empty RNTuple, therefore the output compression could not be "
141 compression = (*firstColRange).fCompressionSettings;
142 Info(
"RNTuple::Merge",
"Using the first RNTuple's compression: %d",
compression);
156 auto desc =
outSource->GetSharedDescriptorGuard();
162 for (
const auto &s :
sources) {
178 Error(
"RNTuple::Merge",
"Exception thrown while merging: %s",
ex.what());
184struct RChangeCompressionFunc {
199 sealConf.fElement = &fDstColElement;
216struct RDescriptorsComparison {
217 std::vector<const RFieldDescriptor *> fExtraDstFields;
218 std::vector<const RFieldDescriptor *> fExtraSrcFields;
219 std::vector<RCommonField> fCommonFields;
222struct RColumnOutInfo {
228using ColumnIdMap_t = std::unordered_map<std::string, RColumnOutInfo>;
230struct RColumnInfoGroup {
231 std::vector<RColumnMergeInfo> fExtraDstColumns;
232 std::vector<RColumnMergeInfo> fCommonColumns;
274 std::deque<RPageStorage::SealedPageSequence_t>
fPagesV;
275 std::vector<RPageStorage::RSealedPageGroup>
fGroups;
276 std::vector<std::unique_ptr<std::uint8_t[]>>
fBuffers;
279std::ostream &
operator<<(std::ostream &os,
const std::optional<RColumnDescriptor::RValueRange> &
x)
282 os <<
'(' <<
x->fMin <<
", " <<
x->fMax <<
')';
294 if (
a == EColumnType::kInt16 &&
b == EColumnType::kSplitInt16)
return true;
295 if (
a == EColumnType::kSplitInt16 &&
b == EColumnType::kInt16)
return true;
296 if (
a == EColumnType::kInt32 &&
b == EColumnType::kSplitInt32)
return true;
297 if (
a == EColumnType::kSplitInt32 &&
b == EColumnType::kInt32)
return true;
298 if (
a == EColumnType::kInt64 &&
b == EColumnType::kSplitInt64)
return true;
299 if (
a == EColumnType::kSplitInt64 &&
b == EColumnType::kInt64)
return true;
300 if (
a == EColumnType::kUInt16 &&
b == EColumnType::kSplitUInt16)
return true;
301 if (
a == EColumnType::kSplitUInt16 &&
b == EColumnType::kUInt16)
return true;
302 if (
a == EColumnType::kUInt32 &&
b == EColumnType::kSplitUInt32)
return true;
303 if (
a == EColumnType::kSplitUInt32 &&
b == EColumnType::kUInt32)
return true;
304 if (
a == EColumnType::kUInt64 &&
b == EColumnType::kSplitUInt64)
return true;
305 if (
a == EColumnType::kSplitUInt64 &&
b == EColumnType::kUInt64)
return true;
306 if (
a == EColumnType::kIndex32 &&
b == EColumnType::kSplitIndex32)
return true;
307 if (
a == EColumnType::kSplitIndex32 &&
b == EColumnType::kIndex32)
return true;
308 if (
a == EColumnType::kIndex64 &&
b == EColumnType::kSplitIndex64)
return true;
309 if (
a == EColumnType::kSplitIndex64 &&
b == EColumnType::kIndex64)
return true;
310 if (
a == EColumnType::kReal32 &&
b == EColumnType::kSplitReal32)
return true;
311 if (
a == EColumnType::kSplitReal32 &&
b == EColumnType::kReal32)
return true;
312 if (
a == EColumnType::kReal64 &&
b == EColumnType::kSplitReal64)
return true;
313 if (
a == EColumnType::kSplitReal64 &&
b == EColumnType::kReal64)
return true;
329 std::vector<std::string>
errors;
330 RDescriptorsComparison res;
334 for (
const auto &
dstField :
dst.GetTopLevelFields()) {
340 res.fExtraDstFields.emplace_back(&
dstField);
343 for (
const auto &
srcField :
src.GetTopLevelFields()) {
346 res.fExtraSrcFields.push_back(&
srcField);
357 std::stringstream
ss;
358 ss <<
"Field `" <<
fieldName <<
"` is incompatible with previously-seen field with that name because the "
359 << (
field.fSrc->IsProjectedField() ?
"new" :
"old") <<
" one is projected and the other isn't";
361 }
else if (
field.fSrc->IsProjectedField()) {
363 const auto srcName =
src.GetQualifiedFieldName(
field.fSrc->GetProjectionSourceId());
364 const auto dstName =
dst.GetQualifiedFieldName(
field.fDst->GetProjectionSourceId());
366 std::stringstream
ss;
368 <<
"` is projected to a different field than a previously-seen field with the same name (old: "
379 std::stringstream
ss;
381 <<
"` has a type incompatible with a previously-seen field with the same name: (old: " <<
dstTyName
390 std::stringstream
ss;
391 ss <<
"Field `" <<
field.fSrc->GetFieldName()
392 <<
"` has a different type checksum than previously-seen field with the same name";
400 std::stringstream
ss;
401 ss <<
"Field `" <<
field.fSrc->GetFieldName()
402 <<
"` has a different type version than previously-seen field with the same name (old: " <<
dstTyVer
408 const auto srcNCols =
field.fSrc->GetLogicalColumnIds().size();
409 const auto dstNCols =
field.fDst->GetLogicalColumnIds().size();
411 std::stringstream
ss;
412 ss <<
"Field `" <<
field.fSrc->GetFieldName()
413 <<
"` has a different number of columns than previously-seen field with the same name (old: " <<
dstNCols
427 std::stringstream
ss;
428 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
429 <<
"` has a different column type of the same column on the previously-seen field with the same name "
435 if (
srcCol.GetBitsOnStorage() !=
dstCol.GetBitsOnStorage()) {
436 std::stringstream
ss;
437 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
438 <<
"` has a different number of bits of the same column on the previously-seen field with the same "
441 <<
srcCol.GetBitsOnStorage() <<
", new: " <<
dstCol.GetBitsOnStorage() <<
")";
445 std::stringstream
ss;
446 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
447 <<
"` has a different value range of the same column on the previously-seen field with the same name "
449 <<
srcCol.GetValueRange() <<
", new: " <<
dstCol.GetValueRange() <<
")";
452 if (
srcCol.GetRepresentationIndex() > 0) {
453 std::stringstream
ss;
454 ss << i <<
"-th column of field `" <<
field.fSrc->GetFieldName()
455 <<
"` has a representation index higher than 0. This is not supported yet by the merger.";
463 for (
const auto &err :
errors)
464 errMsg += std::string(
"\n * ") + err;
492 std::string
msg =
"destination doesn't contain field";
497 return acc + (acc.length() ?
", " :
"") +
'`' + field->GetFieldName() +
'`';
499 Info(
"RNTuple::Merge",
"%s: adding %s to the destination model (entry #%" PRIu64 ").",
msg.c_str(),
541 const auto &
columnId = column.fInputId;
561 Info(
"RNTuple::Merge",
"Column %s: changing source compression from %d to %d", column.fColumnName.c_str(),
586 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
623 const auto &
columnId = column.fInputId;
637 if (parent.
GetStructure() == ENTupleStructure::kCollection ||
648 const auto structure =
field->GetStructure();
650 if (structure == ENTupleStructure::kStreamer) {
653 "Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
654 "Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.",
655 field->GetFieldName().c_str());
660 R__ASSERT(structure == ENTupleStructure::kCollection || structure == ENTupleStructure::kVariant ||
661 structure == ENTupleStructure::kLeaf);
669 for (
size_t i = 0; i <
nPages; ++i) {
756 return typeid(std::byte);
760 return typeid(std::int8_t);
761 }
else if (
fieldType ==
"std::uint8_t") {
762 return typeid(std::uint8_t);
763 }
else if (
fieldType ==
"std::int16_t") {
764 return typeid(std::int16_t);
765 }
else if (
fieldType ==
"std::uint16_t") {
766 return typeid(std::uint16_t);
767 }
else if (
fieldType ==
"std::int32_t") {
768 return typeid(std::int32_t);
769 }
else if (
fieldType ==
"std::uint32_t") {
770 return typeid(std::uint32_t);
771 }
else if (
fieldType ==
"std::int64_t") {
772 return typeid(std::int64_t);
773 }
else if (
fieldType ==
"std::uint64_t") {
774 return typeid(std::uint64_t);
776 return typeid(float);
817 info.fOutputId = it->second.fColumnId;
818 info.fColumnType = it->second.fColumnType;
834 if (
mergeData.fMergeOpts.fExtraVerbose) {
835 Info(
"RNTuple::Merge",
836 "Adding column %s with log.id %" PRIu64 ", phys.id %" PRIu64 ", type %s "
837 " -> log.id %" PRIu64 ", type %s",
862static RColumnInfoGroup
865 RColumnInfoGroup res;
895 return R__FAIL(std::string(
"The compression given to RNTupleMergeOptions is different from that of the "
898 ") This is currently unsupported.");
904 std::unique_ptr<RNTupleModel> model;
906#define SKIP_OR_ABORT(errMsg) \
908 if (mergeOpts.fErrBehavior == ENTupleMergeErrBehavior::kSkip) { \
909 Warning("RNTuple::Merge", "Skipping RNTuple due to: %s", (errMsg).c_str()); \
912 return R__FAIL(errMsg); \
925 opts.fReconstructProjections =
true;
936 std::string(
"Source RNTuple will be skipped due to incompatible schema with the destination:\n") +
944 std::string
msg =
"Source RNTuple is missing the following fields:";
946 msg +=
"\n " +
field->GetFieldName() +
" : " +
field->GetTypeName();
952 if (
descCmp.fExtraSrcFields.size()) {
958 std::string
msg =
"Source RNTuple has extra fields that the destination RNTuple doesn't have:";
960 msg +=
"\n " +
field->GetFieldName() +
" : " +
field->GetTypeName();
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
static void GenerateExtraDstColumns(size_t nClusterEntries, std::span< RColumnMergeInfo > extraDstColumns, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
static void ExtendDestinationModel(std::span< const RFieldDescriptor * > newFields, RNTupleModel &dstModel, RNTupleMergeData &mergeData, std::vector< RCommonField > &commonFields)
static RResult< RDescriptorsComparison > CompareDescriptorStructure(const RNTupleDescriptor &dst, const RNTupleDescriptor &src)
Compares the top level fields of dst and src and determines whether they can be merged or not.
#define SKIP_OR_ABORT(errMsg)
static RColumnInfoGroup GatherColumnInfos(const RDescriptorsComparison &descCmp, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData)
static bool IsSplitOrUnsplitVersionOf(EColumnType a, EColumnType b)
static std::optional< std::type_index > ColumnInMemoryType(std::string_view fieldType, EColumnType onDiskType)
static void AddColumnsFromField(std::vector< RColumnMergeInfo > &columns, const RNTupleDescriptor &srcDesc, RNTupleMergeData &mergeData, const RFieldDescriptor &srcFieldDesc, const RFieldDescriptor &dstFieldDesc, const std::string &prefix="")
ROOT::Experimental::RResult< T > RResult
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t src
TRObject operator()(const T1 &t1) const
The available trivial, native content types of a column.
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< DescriptorId_t > ColumnSet_t
A column element encapsulates the translation between basic C++ types and their column representation...
static const char * GetColumnTypeName(EColumnType type)
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
Given a set of RPageSources merge them into an RPageSink, optionally changing their compression.
std::optional< TTaskGroup > fTaskGroup
RResult< void > Merge(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts=RNTupleMergeOptions())
Merge a given set of sources into the destination.
void MergeSourceClusters(RPageSource &source, std::span< RColumnMergeInfo > commonColumns, std::span< RColumnMergeInfo > extraDstColumns, RNTupleMergeData &mergeData)
void MergeCommonColumns(RClusterPool &clusterPool, DescriptorId_t clusterId, std::span< RColumnMergeInfo > commonColumns, const RCluster::ColumnSet_t &commonColumnSet, RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData)
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Abstract interface to write data into an ntuple.
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
Abstract interface to read data from an ntuple.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
std::deque< RSealedPage > SealedPageSequence_t
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
Holds the index and the tag of a kSwitch column.
Base class for all ROOT issued exceptions.
Meta-data stored for every field of an ntuple.
DescriptorId_t GetParentId() const
std::uint64_t GetNRepetitions() const
ENTupleStructure GetStructure() const
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for storing ntuples.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
A class to manage the asynchronous execution of work items.
Representation of an RNTuple data set in a ROOT file.
Long64_t Merge(TCollection *input, TFileMergeInfo *mergeInfo)
RNTuple implements the hadd MergeFile interface Merge this NTuple with the input list entries.
const_iterator begin() const
const_iterator end() const
Collection abstract base class.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Mother of all ROOT objects.
@ kStrict
The merger will refuse to merge any 2 RNTuples whose schema doesn't match exactly.
@ kUnion
The merger will update the output model to include all columns from all sources.
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, EColumnType onDiskType)
std::ostream & operator<<(std::ostream &os, const std::optional< RColumnDescriptor::RValueRange > &x)
RClusterSize ClusterSize_t
constexpr int kUnknownCompressionSettings
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
const RFieldDescriptor * fParentField
std::optional< std::type_index > fInMemoryType
ColumnIdMap_t fColumnIdMap
RNTupleMergeData(std::span< RPageSource * > sources, RPageSink &destination, const RNTupleMergeOptions &mergeOpts)
std::span< RPageSource * > fSources
const RNTupleDescriptor & fDstDescriptor
const RNTupleDescriptor * fSrcDescriptor
const RNTupleMergeOptions & fMergeOpts
std::vector< RColumnMergeInfo > fColumns
NTupleSize_t fNumDstEntries
int fCompressionSettings
If fCompressionSettings == kUnknownCompressionSettings (the default), the merger will not change the ...
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Parameters for the SealPage() method.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
bool GetHasChecksum() const
std::vector< RPageStorage::RSealedPageGroup > fGroups
std::deque< RPageStorage::SealedPageSequence_t > fPagesV
std::vector< std::unique_ptr< std::uint8_t[]> > fBuffers
Modifiers passed to CreateModel
@ kUseGeneralPurpose
Use the new recommended general-purpose setting; it is a best trade-off between compression ratio/dec...