Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
14#include <ROOT/RPageStorage.hxx>
16#include <ROOT/RColumn.hxx>
17#include <ROOT/RFieldBase.hxx>
20#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleZip.hxx>
24#include <ROOT/RPageSinkBuf.hxx>
26#ifdef R__ENABLE_DAOS
28#endif
29
30#include <Compression.h>
31#include <TError.h>
32
33#include <algorithm>
34#include <atomic>
35#include <cassert>
36#include <cstring>
37#include <functional>
38#include <memory>
39#include <string_view>
40#include <unordered_map>
41#include <utility>
42
52
56
62
64 : fMetrics(""), fPageAllocator(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()), fNTupleName(name)
65{
66}
67
69
71{
72 if (!fHasChecksum)
73 return;
74
75 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
76 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
77 std::uint64_t xxhash3;
79}
80
82{
83 if (!fHasChecksum)
85
86 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
87 if (!success)
88 return R__FAIL("page checksum verification failed, data corruption detected");
90}
91
93{
94 if (!fHasChecksum)
95 return R__FAIL("invalid attempt to extract non-existing page checksum");
96
97 assert(fBufferSize >= kNBytesPageChecksum);
98 std::uint64_t checksum;
100 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
101 return checksum;
102}
103
104//------------------------------------------------------------------------------
105
108{
109 auto [itr, _] = fColumnInfos.emplace(physicalColumnId, std::vector<RColumnInfo>());
110 for (auto &columnInfo : itr->second) {
111 if (columnInfo.fElementId == elementId) {
112 columnInfo.fRefCounter++;
113 return;
114 }
115 }
116 itr->second.emplace_back(RColumnInfo{elementId, 1});
117}
118
121{
122 auto itr = fColumnInfos.find(physicalColumnId);
123 R__ASSERT(itr != fColumnInfos.end());
124 for (std::size_t i = 0; i < itr->second.size(); ++i) {
125 if (itr->second[i].fElementId != elementId)
126 continue;
127
128 itr->second[i].fRefCounter--;
129 if (itr->second[i].fRefCounter == 0) {
130 itr->second.erase(itr->second.begin() + i);
131 if (itr->second.empty()) {
132 fColumnInfos.erase(itr);
133 }
134 }
135 break;
136 }
137}
138
146
148{
149 if (fFirstEntry == ROOT::kInvalidNTupleIndex) {
150 /// Entry range unset, we assume that the entry range covers the complete source
151 return true;
152 }
153
154 if (clusterDesc.GetNEntries() == 0)
155 return true;
156 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
157 return false;
158 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
159 return false;
160 return true;
161}
162
164 : RPageStorage(name), fOptions(options)
165{
166}
167
169
170std::unique_ptr<ROOT::Internal::RPageSource>
171ROOT::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
172 const ROOT::RNTupleReadOptions &options)
173{
174 if (ntupleName.empty()) {
175 throw RException(R__FAIL("empty RNTuple name"));
176 }
177 if (location.empty()) {
178 throw RException(R__FAIL("empty storage location"));
179 }
180 if (location.find("daos://") == 0)
181#ifdef R__ENABLE_DAOS
182 return std::make_unique<ROOT::Experimental::Internal::RPageSourceDaos>(ntupleName, location, options);
183#else
184 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
185#endif
186
187 return std::make_unique<ROOT::Internal::RPageSourceFile>(ntupleName, location, options);
188}
189
192{
194 auto physicalId =
195 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
197 fActivePhysicalColumns.Insert(physicalId, column.GetElement()->GetIdentifier());
198 return ColumnHandle_t{physicalId, &column};
199}
200
202{
203 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId, columnHandle.fColumn->GetElement()->GetIdentifier());
204}
205
207{
208 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
209 throw RException(R__FAIL("invalid entry range"));
210 }
211 fEntryRange = range;
212}
213
215{
216 if (!fHasStructure)
217 LoadStructureImpl();
218 fHasStructure = true;
219}
220
222{
223 LoadStructure();
224 if (!fIsAttached)
225 GetExclDescriptorGuard().MoveIn(AttachImpl(mode));
226 fIsAttached = true;
227}
228
229std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Internal::RPageSource::Clone() const
230{
231 auto clone = CloneImpl();
232 if (fIsAttached) {
233 clone->GetExclDescriptorGuard().MoveIn(GetSharedDescriptorGuard()->Clone());
234 clone->fHasStructure = true;
235 clone->fIsAttached = true;
236 }
237 return clone;
238}
239
241{
242 return GetSharedDescriptorGuard()->GetNEntries();
243}
244
246{
247 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
248}
249
251{
252 if (fTaskScheduler)
253 UnzipClusterImpl(cluster);
254}
255
257{
258 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
259
260 const auto clusterId = cluster->GetId();
261 auto descriptorGuard = GetSharedDescriptorGuard();
262 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
263
264 fPreloadedClusters[clusterDescriptor.GetFirstEntryIndex()] = clusterId;
265
266 std::atomic<bool> foundChecksumFailure{false};
267
268 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
269 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
270 for (const auto columnId : columnsInCluster) {
271 // By the time we unzip a cluster, the set of active columns may have already changed wrt. to the moment when
272 // we requested reading the cluster. That doesn't matter much, we simply decompress what is now in the list
273 // of active columns.
274 if (!fActivePhysicalColumns.HasColumnInfos(columnId))
275 continue;
276 const auto &columnInfos = fActivePhysicalColumns.GetColumnInfos(columnId);
277
278 allElements.reserve(allElements.size() + columnInfos.size());
279 for (const auto &info : columnInfos) {
280 allElements.emplace_back(GenerateColumnElement(info.fElementId));
281
282 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
283 std::uint64_t pageNo = 0;
284 std::uint64_t firstInPage = 0;
285 for (const auto &pi : pageRange.GetPageInfos()) {
286 auto onDiskPage = cluster->GetOnDiskPage(ROnDiskPage::Key{columnId, pageNo});
288 sealedPage.SetNElements(pi.GetNElements());
289 sealedPage.SetHasChecksum(pi.HasChecksum());
290 sealedPage.SetBufferSize(pi.GetLocator().GetNBytesOnStorage() + pi.HasChecksum() * kNBytesPageChecksum);
291 sealedPage.SetBuffer(onDiskPage->GetAddress());
292 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
293
294 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
296 indexOffset = clusterDescriptor.GetColumnRange(columnId).GetFirstElementIndex()]() {
297 const ROOT::Internal::RPagePool::RKey keyPagePool{columnId, element->GetIdentifier().fInMemoryType};
298 auto rv = UnsealPage(sealedPage, *element);
299 if (!rv) {
301 return;
302 }
303 auto newPage = rv.Unwrap();
304 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
305
306 newPage.SetWindow(indexOffset + firstInPage,
308 fPagePool.PreloadPage(std::move(newPage), keyPagePool);
309 };
310
311 fTaskScheduler->AddTask(taskFunc);
312
313 firstInPage += pi.GetNElements();
314 pageNo++;
315 } // for all pages in column
316
317 fCounters->fNPageUnsealed.Add(pageNo);
318 } // for all in-memory types of the column
319 } // for all columns in cluster
320
321 fTaskScheduler->Wait();
322
324 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
325 }
326}
327
332{
333 auto descriptorGuard = GetSharedDescriptorGuard();
334 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
335
336 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
337 if (clusterDesc.GetColumnRange(physicalColumnId).IsSuppressed())
338 continue;
339
340 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
342 for (const auto &pageInfo : pageRange.GetPageInfos()) {
343 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
346 pageInfo.GetLocator().GetNBytesOnStorage()));
347 } else {
349 }
350 ++pageNo;
351 }
352 }
353}
354
356{
357 if (fLastUsedCluster == clusterId)
358 return;
359
361 GetSharedDescriptorGuard()->GetClusterDescriptor(clusterId).GetFirstEntryIndex();
362 auto itr = fPreloadedClusters.begin();
363 while ((itr != fPreloadedClusters.end()) && (itr->first < firstEntryIndex)) {
364 fPagePool.Evict(itr->second);
365 itr = fPreloadedClusters.erase(itr);
366 }
367 std::size_t poolWindow = 0;
368 while ((itr != fPreloadedClusters.end()) &&
370 ++itr;
371 ++poolWindow;
372 }
373 while (itr != fPreloadedClusters.end()) {
374 fPagePool.Evict(itr->second);
375 itr = fPreloadedClusters.erase(itr);
376 }
377
378 fLastUsedCluster = clusterId;
379}
380
383{
384 const auto columnId = columnHandle.fPhysicalId;
385 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
386 auto cachedPageRef =
387 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, globalIndex);
388 if (!cachedPageRef.Get().IsNull()) {
389 UpdateLastUsedCluster(cachedPageRef.Get().GetClusterInfo().GetId());
390 return cachedPageRef;
391 }
392
393 std::uint64_t idxInCluster;
395 {
396 auto descriptorGuard = GetSharedDescriptorGuard();
397 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
398
399 if (clusterInfo.fClusterId == ROOT::kInvalidDescriptorId)
400 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
401
402 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
403 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
404 if (columnRange.IsSuppressed())
406
407 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
408 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
409 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
410 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
411 }
412
413 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
414 throw RException(R__FAIL("tried to read a page with an unknown locator"));
415
416 UpdateLastUsedCluster(clusterInfo.fClusterId);
417 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
418}
419
422{
423 const auto clusterId = localIndex.GetClusterId();
424 const auto idxInCluster = localIndex.GetIndexInCluster();
425 const auto columnId = columnHandle.fPhysicalId;
426 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
427 auto cachedPageRef =
428 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, localIndex);
429 if (!cachedPageRef.Get().IsNull()) {
430 UpdateLastUsedCluster(clusterId);
431 return cachedPageRef;
432 }
433
435 throw RException(R__FAIL("entry out of bounds"));
436
438 {
439 auto descriptorGuard = GetSharedDescriptorGuard();
440 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
441 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
442 if (columnRange.IsSuppressed())
444
445 clusterInfo.fClusterId = clusterId;
446 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
447 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
448 }
449
450 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
451 throw RException(R__FAIL("tried to read a page with an unknown locator"));
452
453 UpdateLastUsedCluster(clusterInfo.fClusterId);
454 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
455}
456
458{
459 fMetrics = RNTupleMetrics(prefix);
460 fCounters = std::make_unique<RCounters>(RCounters{
461 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
462 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
463 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadPayload", "B", "volume read from storage (required)"),
464 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadOverhead", "B", "volume read from storage (overhead)"),
465 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
466 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nClusterLoaded", "",
467 "number of partial clusters preloaded from storage"),
468 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
469 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageUnsealed", "", "number of pages unzipped and decoded"),
470 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
471 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallUnzip", "ns", "wall clock time spent decompressing"),
472 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuRead", "ns", "CPU time spent reading"),
473 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
474 "CPU time spent decompressing"),
475 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
476 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
477 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
478 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
479 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
480 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
481 if (auto walltime = timeWallRead->GetValueAsInt()) {
482 double payload = szReadPayload->GetValueAsInt();
483 double overhead = szReadOverhead->GetValueAsInt();
484 // unit: bytes / nanosecond = GB/s
485 return {true, (1000. * (payload + overhead) / walltime)};
486 }
487 }
488 }
489 }
490 return {false, -1.};
491 }),
492 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
493 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
494 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
495 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
496 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
497 if (auto walltime = timeWallRead->GetValueAsInt()) {
498 double unzip = szUnzip->GetValueAsInt();
499 // unit: bytes / nanosecond = GB/s
500 return {true, 1000. * unzip / walltime};
501 }
502 }
503 }
504 return {false, -1.};
505 }),
506 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
507 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
508 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
509 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
510 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
511 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
512 double unzip = szUnzip->GetValueAsInt();
513 // unit: bytes / nanosecond = GB/s
514 return {true, 1000. * unzip / walltime};
515 }
516 }
517 }
518 return {false, -1.};
519 }),
520 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
521 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
522 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
523 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
524 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
525 if (auto payload = szReadPayload->GetValueAsInt()) {
526 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
527 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
528 }
529 }
530 }
531 return {false, -1.};
532 }),
533 *fMetrics.MakeCounter<RNTupleCalcPerf *>("rtCompression", "", "ratio of compressed bytes / uncompressed bytes",
534 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
535 if (const auto szReadPayload =
536 metrics.GetLocalCounter("szReadPayload")) {
537 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
538 if (auto unzip = szUnzip->GetValueAsInt()) {
539 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
540 }
542 }
543 return {false, -1.};
544 })});
545}
546
549{
550 return UnsealPage(sealedPage, element, *fPageAllocator);
551}
552
556{
557 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
558 // large enough to hold `sealedPage.fNElements`
560 auto page = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
561 page.GrowUnchecked(sealedPage.GetNElements());
562 memset(page.GetBuffer(), 0, page.GetNBytes());
563 return page;
564 }
565
566 auto rv = sealedPage.VerifyChecksumIfEnabled();
567 if (!rv)
568 return R__FORWARD_ERROR(rv);
569
570 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
571 auto page = pageAlloc.NewPage(element.GetPackedSize(), sealedPage.GetNElements());
572 if (sealedPage.GetDataSize() != bytesPacked) {
574 page.GetBuffer());
575 } else {
576 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
577 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
578 // Note that usually pages are compressed.
579 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
580 }
581
582 if (!element.IsMappable()) {
583 auto tmp = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
584 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
585 page = std::move(tmp);
586 }
587
588 page.GrowUnchecked(sealedPage.GetNElements());
589 return page;
590}
591
593{
594 if (fHasStreamerInfosRegistered)
595 return;
596
597 for (const auto &extraTypeInfo : fDescriptor.GetExtraTypeInfoIterable()) {
599 continue;
600 // We don't need the result, it's enough that during deserialization, BuildCheck() is called for every
601 // streamer info record.
603 }
604
605 fHasStreamerInfosRegistered = true;
606}
607
608//------------------------------------------------------------------------------
609
611{
612 // Make the sort order unique by adding the physical on-disk column id as a secondary key
613 if (fCurrentPageSize == other.fCurrentPageSize)
614 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
615 return fCurrentPageSize > other.fCurrentPageSize;
616}
617
619{
620 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
621 return true;
622
623 auto itr = fColumnsSortedByPageSize.begin();
624 while (itr != fColumnsSortedByPageSize.end()) {
625 if (itr->fCurrentPageSize <= pageSizeLimit)
626 break;
627 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
628 ++itr;
629 continue;
630 }
631
632 // Flushing the current column will invalidate itr
633 auto itrFlush = itr++;
634
635 RColumnInfo next;
636 if (itr != fColumnsSortedByPageSize.end())
637 next = *itr;
638
639 itrFlush->fColumn->Flush();
640 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
641 return true;
642
643 if (next.fColumn == nullptr)
644 return false;
645 itr = fColumnsSortedByPageSize.find(next);
646 };
647
648 return false;
649}
650
652{
653 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
654 auto itr = fColumnsSortedByPageSize.find(key);
655 if (itr == fColumnsSortedByPageSize.end()) {
656 if (!TryEvict(newWritePageSize, 0))
657 return false;
658 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
659 fCurrentAllocatedBytes += newWritePageSize;
660 return true;
661 }
662
664 assert(newWritePageSize >= elem.fInitialPageSize);
665
666 if (newWritePageSize == elem.fCurrentPageSize)
667 return true;
668
669 fColumnsSortedByPageSize.erase(itr);
670
671 if (newWritePageSize < elem.fCurrentPageSize) {
672 // Page got smaller
673 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
674 elem.fCurrentPageSize = newWritePageSize;
675 fColumnsSortedByPageSize.insert(elem);
676 return true;
677 }
678
679 // Page got larger, we may need to make space available
680 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
681 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
682 // Don't change anything, let the calling column flush itself
683 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
684 fColumnsSortedByPageSize.insert(elem);
685 return false;
686 }
687 fCurrentAllocatedBytes += diffBytes;
688 elem.fCurrentPageSize = newWritePageSize;
689 fColumnsSortedByPageSize.insert(elem);
690 return true;
691}
692
693//------------------------------------------------------------------------------
694
696 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
697{
699}
700
702
704{
705 assert(config.fPage);
706 assert(config.fElement);
707 assert(config.fBuffer);
708
709 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
710 bool isAdoptedBuffer = true;
711 auto nBytesPacked = config.fPage->GetNBytes();
712 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
713
714 if (!config.fElement->IsMappable()) {
715 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
716 pageBuf = new unsigned char[nBytesPacked];
717 isAdoptedBuffer = false;
718 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
719 }
721
722 if ((config.fCompressionSettings != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
723 config.fWriteChecksum) {
726 if (!isAdoptedBuffer)
727 delete[] pageBuf;
728 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
729 isAdoptedBuffer = true;
730 }
731
733
735 sealedPage.ChecksumIfEnabled();
736
737 return sealedPage;
738}
739
742{
743 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
744 if (fSealPageBuffer.size() < nBytes)
745 fSealPageBuffer.resize(nBytes);
746
747 RSealPageConfig config;
748 config.fPage = &page;
749 config.fElement = &element;
750 config.fCompressionSettings = GetWriteOptions().GetCompression();
751 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
752 config.fAllowAlias = true;
753 config.fBuffer = fSealPageBuffer.data();
754
755 return SealPage(config);
756}
757
759{
760 for (const auto &cb : fOnDatasetCommitCallbacks)
761 cb(*this);
762 CommitDatasetImpl();
763}
764
766{
767 R__ASSERT(nElements > 0);
768 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
769 const auto nBytes = elementSize * nElements;
770 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
771 return ROOT::Internal::RPage();
772 return fPageAllocator->NewPage(elementSize, nElements);
773}
774
775//------------------------------------------------------------------------------
776
777std::unique_ptr<ROOT::Internal::RPageSink>
778ROOT::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
779 const ROOT::RNTupleWriteOptions &options)
780{
781 if (ntupleName.empty()) {
782 throw RException(R__FAIL("empty RNTuple name"));
783 }
784 if (location.empty()) {
785 throw RException(R__FAIL("empty storage location"));
786 }
787 if (location.find("daos://") == 0) {
788#ifdef R__ENABLE_DAOS
789 return std::make_unique<ROOT::Experimental::Internal::RPageSinkDaos>(ntupleName, location, options);
790#else
791 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
792#endif
793 }
794
795 // Otherwise assume that the user wants us to create a file.
796 return std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, location, options);
797}
798
800 const ROOT::RNTupleWriteOptions &options)
801 : RPageSink(name, options)
802{
803}
804
806
809{
810 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
812 columnBuilder.LogicalColumnId(columnId)
813 .PhysicalColumnId(columnId)
814 .FieldId(fieldId)
815 .BitsOnStorage(column.GetBitsOnStorage())
816 .ValueRange(column.GetValueRange())
817 .Type(column.GetType())
818 .Index(column.GetIndex())
819 .RepresentationIndex(column.GetRepresentationIndex())
820 .FirstElementIndex(column.GetFirstElementIndex());
821 // For late model extension, we assume that the primary column representation is the active one for the
822 // deferred range. All other representations are suppressed.
823 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
824 columnBuilder.SetSuppressedDeferred();
825 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
826 return ColumnHandle_t{columnId, &column};
827}
828
831{
832 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
833
834 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
835 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
836 // of the changeset follow immediately the already existing physical columns
837 auto getNColumns = [](const ROOT::RFieldBase &f) -> std::size_t {
838 const auto &reps = f.GetColumnRepresentatives();
839 if (reps.empty())
840 return 0;
841 return reps.size() * reps[0].size();
842 };
843 std::uint32_t nNewPhysicalColumns = 0;
844 for (auto f : changeset.fAddedFields) {
846 for (const auto &descendant : *f)
848 }
849 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
850 }
851
852 auto addField = [&](ROOT::RFieldBase &f) {
853 auto fieldId = descriptor.GetNFields();
854 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
855 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
856 f.SetOnDiskId(fieldId);
857 ROOT::Internal::CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
858 };
860 auto fieldId = descriptor.GetNFields();
861 auto sourceFieldId =
863 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
864 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
865 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
866 f.SetOnDiskId(fieldId);
867 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
868 auto targetId = descriptor.GetNLogicalColumns();
870 columnBuilder.LogicalColumnId(targetId)
871 .PhysicalColumnId(source.GetLogicalId())
872 .FieldId(fieldId)
873 .BitsOnStorage(source.GetBitsOnStorage())
874 .ValueRange(source.GetValueRange())
875 .Type(source.GetType())
876 .Index(source.GetIndex())
877 .RepresentationIndex(source.GetRepresentationIndex());
878 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
879 }
880 };
881
882 R__ASSERT(firstEntry >= fPrevClusterNEntries);
883 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
884 for (auto f : changeset.fAddedFields) {
885 addField(*f);
886 for (auto &descendant : *f)
888 }
889 for (auto f : changeset.fAddedProjectedFields) {
891 for (auto &descendant : *f)
893 }
894
895 const auto nColumns = descriptor.GetNPhysicalColumns();
896 fOpenColumnRanges.reserve(fOpenColumnRanges.size() + (nColumns - nColumnsBeforeUpdate));
897 fOpenPageRanges.reserve(fOpenPageRanges.size() + (nColumns - nColumnsBeforeUpdate));
900 columnRange.SetPhysicalColumnId(i);
901 // We set the first element index in the current cluster to the first element that is part of a materialized page
902 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
903 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
904 columnRange.SetFirstElementIndex(descriptor.GetColumnDescriptor(i).GetFirstElementIndex());
905 columnRange.SetNElements(0);
906 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
907 fOpenColumnRanges.emplace_back(columnRange);
909 pageRange.SetPhysicalColumnId(i);
910 fOpenPageRanges.emplace_back(std::move(pageRange));
911 }
912
913 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
914 // header was already serialized, this has to be done manually as it is required for page list serialization.
915 if (fSerializationContext.GetHeaderSize() > 0)
916 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
917}
918
920{
922 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
923
924 fStreamerInfos.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
925}
926
928{
929 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
930 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
931
933 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
934 fieldZero.SetOnDiskId(0);
936 projectedFields.GetFieldZero().SetOnDiskId(0);
937
939 initialChangeset.fAddedFields.reserve(fieldZero.GetMutableSubfields().size());
940 for (auto f : fieldZero.GetMutableSubfields())
941 initialChangeset.fAddedFields.emplace_back(f);
942 initialChangeset.fAddedProjectedFields.reserve(projectedFields.GetFieldZero().GetMutableSubfields().size());
943 for (auto f : projectedFields.GetFieldZero().GetMutableSubfields())
944 initialChangeset.fAddedProjectedFields.emplace_back(f);
945 UpdateSchema(initialChangeset, 0U);
946
947 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
948 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
949 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
950 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
951
952 fDescriptorBuilder.BeginHeaderExtension();
953}
954
955std::unique_ptr<ROOT::RNTupleModel>
957{
958 // Create new descriptor
959 fDescriptorBuilder.SetSchemaFromExisting(srcDescriptor);
960 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
961
962 // Create column/page ranges
963 const auto nColumns = descriptor.GetNPhysicalColumns();
964 R__ASSERT(fOpenColumnRanges.empty() && fOpenPageRanges.empty());
965 fOpenColumnRanges.reserve(nColumns);
966 fOpenPageRanges.reserve(nColumns);
967 for (ROOT::DescriptorId_t i = 0; i < nColumns; ++i) {
968 const auto &column = descriptor.GetColumnDescriptor(i);
970 columnRange.SetPhysicalColumnId(i);
971 columnRange.SetFirstElementIndex(column.GetFirstElementIndex());
972 columnRange.SetNElements(0);
973 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
974 fOpenColumnRanges.emplace_back(columnRange);
976 pageRange.SetPhysicalColumnId(i);
977 fOpenPageRanges.emplace_back(std::move(pageRange));
978 }
979
980 if (copyClusters) {
981 // Clone and add all cluster descriptors
982 auto clusterId = srcDescriptor.FindClusterId(0, 0);
984 auto &cluster = srcDescriptor.GetClusterDescriptor(clusterId);
985 auto nEntries = cluster.GetNEntries();
986 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
987 R__ASSERT(fOpenColumnRanges[i].GetPhysicalColumnId() == i);
988 if (!cluster.ContainsColumn(i)) // a cluster may not contain a column if that column is deferred
989 break;
990 const auto &columnRange = cluster.GetColumnRange(i);
991 R__ASSERT(columnRange.GetPhysicalColumnId() == i);
992 // TODO: properly handle suppressed columns (check MarkSuppressedColumnRange())
993 fOpenColumnRanges[i].IncrementFirstElementIndex(columnRange.GetNElements());
994 }
995 fDescriptorBuilder.AddCluster(cluster.Clone());
996 fPrevClusterNEntries += nEntries;
997
998 clusterId = srcDescriptor.FindNextClusterId(clusterId);
999 }
1000 }
1001
1002 // Create model
1004 modelOpts.SetReconstructProjections(true);
1005 auto model = descriptor.CreateModel(modelOpts);
1006 if (!copyClusters) {
1008 projectedFields.GetFieldZero().SetOnDiskId(model->GetConstFieldZero().GetOnDiskId());
1009 }
1010
1011 // Serialize header and init from it
1012 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
1013 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
1014 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
1015 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
1016
1017 fDescriptorBuilder.BeginHeaderExtension();
1018
1019 // mark this sink as initialized
1020 fIsInitialized = true;
1021
1022 return model;
1023}
1024
1026{
1027 fOpenColumnRanges.at(columnHandle.fPhysicalId).SetIsSuppressed(true);
1028}
1029
1031{
1032 fOpenColumnRanges.at(columnHandle.fPhysicalId).IncrementNElements(page.GetNElements());
1033
1035 pageInfo.SetNElements(page.GetNElements());
1036 pageInfo.SetLocator(CommitPageImpl(columnHandle, page));
1037 pageInfo.SetHasChecksum(GetWriteOptions().GetEnablePageChecksums());
1038 fOpenPageRanges.at(columnHandle.fPhysicalId).GetPageInfos().emplace_back(pageInfo);
1039}
1040
1043{
1044 fOpenColumnRanges.at(physicalColumnId).IncrementNElements(sealedPage.GetNElements());
1045
1047 pageInfo.SetNElements(sealedPage.GetNElements());
1048 pageInfo.SetLocator(CommitSealedPageImpl(physicalColumnId, sealedPage));
1049 pageInfo.SetHasChecksum(sealedPage.GetHasChecksum());
1050 fOpenPageRanges.at(physicalColumnId).GetPageInfos().emplace_back(pageInfo);
1051}
1052
1053std::vector<ROOT::RNTupleLocator>
1054ROOT::Internal::RPagePersistentSink::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
1055 const std::vector<bool> &mask)
1056{
1057 std::vector<ROOT::RNTupleLocator> locators;
1058 locators.reserve(mask.size());
1059 std::size_t i = 0;
1060 for (auto &range : ranges) {
1061 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1062 if (mask[i++])
1063 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
1064 }
1065 }
1066 locators.shrink_to_fit();
1067 return locators;
1068}
1069
1070void ROOT::Internal::RPagePersistentSink::CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
1071{
1072 /// Used in the `originalPages` map
1073 struct RSealedPageLink {
1074 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
1075 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
1076 };
1077
1078 std::vector<bool> mask;
1079 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
1080 std::vector<std::size_t> locatorIndexes;
1081 // Maps page checksums to the first sealed page with that checksum
1082 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
1083 std::size_t iLocator = 0;
1084 for (auto &range : ranges) {
1085 const auto rangeSize = std::distance(range.fFirst, range.fLast);
1086 mask.reserve(mask.size() + rangeSize);
1087 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
1088
1089 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1090 if (!fFeatures.fCanMergePages || !fOptions->GetEnableSamePageMerging()) {
1091 mask.emplace_back(true);
1092 locatorIndexes.emplace_back(iLocator++);
1093 continue;
1094 }
1095 // Same page merging requires page checksums - this is checked in the write options
1096 R__ASSERT(sealedPageIt->GetHasChecksum());
1097
1098 const auto chk = sealedPageIt->GetChecksum().Unwrap();
1099 auto itr = originalPages.find(chk);
1100 if (itr == originalPages.end()) {
1101 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
1102 mask.emplace_back(true);
1103 locatorIndexes.emplace_back(iLocator++);
1104 continue;
1105 }
1106
1107 const auto *p = itr->second.fSealedPage;
1108 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
1109 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
1110 mask.emplace_back(true);
1111 locatorIndexes.emplace_back(iLocator++);
1112 continue;
1113 }
1114
1115 mask.emplace_back(false);
1116 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
1117 }
1118
1119 mask.shrink_to_fit();
1120 locatorIndexes.shrink_to_fit();
1121 }
1122
1123 auto locators = CommitSealedPageVImpl(ranges, mask);
1124 unsigned i = 0;
1125
1126 for (auto &range : ranges) {
1127 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1128 fOpenColumnRanges.at(range.fPhysicalColumnId).IncrementNElements(sealedPageIt->GetNElements());
1129
1131 pageInfo.SetNElements(sealedPageIt->GetNElements());
1132 pageInfo.SetLocator(locators[locatorIndexes[i++]]);
1133 pageInfo.SetHasChecksum(sealedPageIt->GetHasChecksum());
1134 fOpenPageRanges.at(range.fPhysicalColumnId).GetPageInfos().emplace_back(pageInfo);
1135 }
1136 }
1137}
1138
1141{
1143 stagedCluster.fNBytesWritten = StageClusterImpl();
1144 stagedCluster.fNEntries = nNewEntries;
1145
1146 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1147 RStagedCluster::RColumnInfo columnInfo;
1148 columnInfo.fCompressionSettings = fOpenColumnRanges[i].GetCompressionSettings().value();
1149 if (fOpenColumnRanges[i].IsSuppressed()) {
1150 assert(fOpenPageRanges[i].GetPageInfos().empty());
1151 columnInfo.fPageRange.SetPhysicalColumnId(i);
1152 columnInfo.fIsSuppressed = true;
1153 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1154 fOpenColumnRanges[i].SetNElements(0);
1155 fOpenColumnRanges[i].SetIsSuppressed(false);
1156 } else {
1157 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1158 fOpenPageRanges[i].SetPhysicalColumnId(i);
1159
1160 columnInfo.fNElements = fOpenColumnRanges[i].GetNElements();
1161 fOpenColumnRanges[i].SetNElements(0);
1162 }
1163 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1164 }
1165
1166 return stagedCluster;
1167}
1168
1170{
1171 for (const auto &cluster : clusters) {
1173 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1174 .FirstEntryIndex(fPrevClusterNEntries)
1175 .NEntries(cluster.fNEntries);
1176 for (const auto &columnInfo : cluster.fColumnInfos) {
1177 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1178 if (columnInfo.fIsSuppressed) {
1179 assert(columnInfo.fPageRange.GetPageInfos().empty());
1180 clusterBuilder.MarkSuppressedColumnRange(colId);
1181 } else {
1182 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].GetFirstElementIndex(),
1183 columnInfo.fCompressionSettings, columnInfo.fPageRange);
1184 fOpenColumnRanges[colId].IncrementFirstElementIndex(columnInfo.fNElements);
1185 }
1186 }
1187
1188 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1189 for (const auto &columnInfo : cluster.fColumnInfos) {
1190 if (!columnInfo.fIsSuppressed)
1191 continue;
1192 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1193 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1194 // cluster. This information has been determined for the committed cluster descriptor through
1195 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1196 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1197 fOpenColumnRanges[colId].SetFirstElementIndex(columnRangeFromDesc.GetFirstElementIndex() +
1198 columnRangeFromDesc.GetNElements());
1199 }
1200
1201 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1202 fPrevClusterNEntries += cluster.fNEntries;
1203 }
1204}
1205
1207{
1208 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1209
1210 const auto nClusters = descriptor.GetNActiveClusters();
1211 std::vector<ROOT::DescriptorId_t> physClusterIDs;
1212 physClusterIDs.reserve(nClusters);
1213 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1214 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1215 }
1216
1217 auto szPageList =
1218 RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext).Unwrap();
1221
1222 const auto clusterGroupId = descriptor.GetNClusterGroups();
1223 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1225 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1226 if (fNextClusterInGroup == nClusters) {
1227 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1228 } else {
1229 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1230 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1231 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1232 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1233 firstClusterDesc.GetFirstEntryIndex())
1234 .NClusters(nClusters - fNextClusterInGroup);
1235 }
1236 std::vector<ROOT::DescriptorId_t> clusterIds;
1237 clusterIds.reserve(nClusters);
1238 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1239 clusterIds.emplace_back(i);
1240 }
1241 cgBuilder.AddSortedClusters(clusterIds);
1242 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1243 fSerializationContext.MapClusterGroupId(clusterGroupId);
1244
1245 fNextClusterInGroup = nClusters;
1246}
1247
1249{
1250 if (!fStreamerInfos.empty()) {
1251 // De-duplicate extra type infos before writing. Usually we won't have them already in the descriptor, but
1252 // this may happen when we are writing back an already-existing RNTuple, e.g. when doing incremental merging.
1253 for (const auto &etDesc : fDescriptorBuilder.GetDescriptor().GetExtraTypeInfoIterable()) {
1254 if (etDesc.GetContentId() == EExtraTypeInfoIds::kStreamerInfo) {
1255 // The specification mandates that the type name for a kStreamerInfo should be empty and the type version
1256 // should be zero.
1257 R__ASSERT(etDesc.GetTypeName().empty());
1258 R__ASSERT(etDesc.GetTypeVersion() == 0);
1259 auto etInfo = RNTupleSerializer::DeserializeStreamerInfos(etDesc.GetContent()).Unwrap();
1260 fStreamerInfos.merge(etInfo);
1261 }
1262 }
1263
1266 .Content(RNTupleSerializer::SerializeStreamerInfos(fStreamerInfos));
1267 fDescriptorBuilder.ReplaceExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1268 }
1269
1270 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1271
1272 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext).Unwrap();
1274 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1275
1276 CommitDatasetImpl(bufFooter.get(), szFooter);
1277}
1278
1280{
1281 fMetrics = RNTupleMetrics(prefix);
1282 fCounters = std::make_unique<RCounters>(RCounters{
1283 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageCommitted", "", "number of pages committed to storage"),
1284 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szWritePayload", "B", "volume written for committed pages"),
1285 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1286 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1287 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1288 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuWrite", "ns", "CPU time spent writing"),
1289 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1290 "CPU time spent compressing")});
1291}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:303
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char mode
char name[80]
Definition TGX11.cxx:110
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
A helper class for piece-wise construction of an RClusterDescriptor.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:150
A helper class for piece-wise construction of an RColumnDescriptor.
A column element encapsulates the translation between basic C++ types and their column representation...
virtual RIdentifier GetIdentifier() const =0
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:38
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:346
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:352
ROOT::Internal::RColumnElementBase * GetElement() const
Definition RColumn.hxx:339
ROOT::ENTupleColumnType GetType() const
Definition RColumn.hxx:340
ROOT::NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:354
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:361
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:341
std::uint32_t GetIndex() const
Definition RColumn.hxx:351
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
A helper class for piece-wise construction of an RFieldDescriptor.
static RFieldDescriptorBuilder FromField(const ROOT::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live RNTuple field.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static unsigned int GetClusterBunchSize(const RNTupleReadOptions &options)
A helper class for serializing and deserialization of the RNTuple binary format.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static RResult< std::uint32_t > SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< ROOT::DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< std::uint32_t > SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static RResult< RContext > SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:99
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:41
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
std::unique_ptr< RNTupleModel > InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters)
Initialize sink based on an existing descriptor and fill into the descriptor builder,...
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) final
Register a new column.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final
Write a page to the storage. The column must have been added before.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
Abstract interface to write data into an ntuple.
virtual ROOT::Internal::RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
ROOT::Internal::RCluster::ColumnSet_t ToColumnSet() const
void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void RegisterStreamerInfos()
Builds the streamer info records from the descriptor's extra type info section.
void Attach(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode=ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode::kForReading)
Open the physical storage container and deserialize header and footer.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) override
Register a new column.
void UnzipCluster(ROOT::Internal::RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
void PrepareLoadCluster(const ROOT::Internal::RCluster::RKey &clusterKey, ROOT::Internal::ROnDiskPageMap &pageZeroMap, std::function< void(ROOT::DescriptorId_t, ROOT::NTupleSize_t, const ROOT::RClusterDescriptor::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
ROOT::NTupleSize_t GetNEntries()
void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId)
Does nothing if fLastUsedCluster == clusterId.
ROOT::NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
virtual void UnzipClusterImpl(ROOT::Internal::RCluster *cluster)
RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions)
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page.
Common functionality of an ntuple storage for both reading and writing.
RPageStorage(std::string_view name)
Stores information about the cluster in which this page resides.
Definition RPage.hxx:53
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:23
const ROOT::RFieldBase * GetSourceField(const ROOT::RFieldBase *target) const
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
The window of element indexes of a particular column in a particular cluster.
Records the partition of data into pages for a particular column in a particular cluster.
Metadata for RNTuple clusters.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
The RNTupleModel encapulates the schema of an RNTuple.
const std::string & GetDescription() const
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
const_iterator begin() const
const_iterator end() const
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:289
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:197
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:152
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51
Default I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
Default I/O performance counters that get registered in fMetrics
Used in SetEntryRange / GetEntryRange.
bool IntersectsWith(const ROOT::RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
bool operator>(const RColumnInfo &other) const
Information about a single page in the context of a cluster's page range.