Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.cxx
Go to the documentation of this file.
1/// \file RPageSinkBuf.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \author Max Orok <maxwellorok@gmail.com>
4/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
5/// \date 2021-03-17
6
7/*************************************************************************
8 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
9 * All rights reserved. *
10 * *
11 * For the licensing terms see $ROOTSYS/LICENSE. *
12 * For the list of contributors see $ROOTSYS/README/CREDITS. *
13 *************************************************************************/
14
15#include <ROOT/RNTupleModel.hxx>
16#include <ROOT/RNTupleUtils.hxx>
18#include <ROOT/RNTupleZip.hxx>
19#include <ROOT/RPageSinkBuf.hxx>
20
21#include <algorithm>
22#include <memory>
23
31
33{
34 fBufferedPages.clear();
35 // Each RSealedPage points to the same region as `fBuf` for some element in `fBufferedPages`; thus, no further
36 // clean-up is required
37 fSealedPages.clear();
38}
39
42{
43 fMetrics = RNTupleMetrics("RPageSinkBuf");
44 fCounters = std::make_unique<RCounters>(
45 RCounters{*fMetrics.MakeCounter<RNTuplePlainCounter *>("ParallelZip", "", "compressing pages in parallel"),
46 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
47 *fMetrics.MakeCounter<RNTuplePlainCounter *>("timeWallCriticalSection", "ns",
48 "wall clock time spent in critical sections"),
50 "CPU time spent compressing"),
52 "timeCpuCriticalSection", "ns", "CPU time spent in critical section")});
53 fMetrics.ObserveMetrics(fInnerSink->GetMetrics());
54}
55
57{
58 // Wait for unterminated tasks, if any, as they may still hold a reference to `this`.
59 // This cannot be moved to the base class destructor, given non-static members have been destroyed by the time the
60 // base class destructor is invoked.
61 WaitForAllTasks();
62}
63
66{
67 return ColumnHandle_t{fNColumns++, &column};
68}
69
70void ROOT::Internal::RPageSinkBuf::ConnectFields(const std::vector<ROOT::RFieldBase *> &fields,
72{
73 auto connectField = [&](ROOT::RFieldBase &f) {
74 // Field Zero would have id 0.
75 ++fNFields;
76 f.SetOnDiskId(fNFields);
77 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
78 };
79 for (auto *f : fields) {
81 for (auto &descendant : *f) {
83 }
84 }
85 fBufferedColumns.resize(fNColumns);
86}
87
89{
90 return fInnerSink->GetDescriptor();
91}
92
94{
95 ConnectFields(GetFieldZeroOfModel(model).GetMutableSubfields(), 0U);
96
97 fInnerModel = model.Clone();
98 fInnerSink->Init(*fInnerModel);
99}
100
103{
104 ConnectFields(changeset.fAddedFields, firstEntry);
105
106 // The buffered page sink maintains a copy of the RNTupleModel for the inner sink; replicate the changes there
107 // TODO(jalopezg): we should be able, in general, to simplify the buffered sink.
108 auto cloneAddField = [&](const ROOT::RFieldBase *field) {
109 auto cloned = field->Clone(field->GetFieldName());
110 auto p = &(*cloned);
111
112 auto parent = field->GetParent();
113 assert(parent);
114 if (typeid(*parent) != typeid(RFieldZero)) {
115 auto &innerParent = fInnerModel->GetMutableField(parent->GetQualifiedFieldName());
116 assert(dynamic_cast<RRecordField *>(&innerParent));
117 AddItemToRecord(static_cast<RRecordField &>(innerParent), std::move(cloned));
118 } else {
119 fInnerModel->AddField(std::move(cloned));
120 }
121 return p;
122 };
124 auto cloned = field->Clone(field->GetFieldName());
125 auto p = &(*cloned);
128 fieldMap[p] = &fInnerModel->GetConstField(projectedFields.GetSourceField(field)->GetQualifiedFieldName());
129 auto targetIt = cloned->begin();
130 for (auto &f : *field)
131 fieldMap[&(*targetIt++)] =
132 &fInnerModel->GetConstField(projectedFields.GetSourceField(&f)->GetQualifiedFieldName());
133 GetProjectedFieldsOfModel(*fInnerModel).Add(std::move(cloned), fieldMap);
134 return p;
135 };
137 fInnerModel->Unfreeze();
138 std::transform(changeset.fAddedFields.cbegin(), changeset.fAddedFields.cend(),
139 std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
140 std::transform(changeset.fAddedProjectedFields.cbegin(), changeset.fAddedProjectedFields.cend(),
141 std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
142 fInnerModel->Freeze();
143 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
144}
145
147{
148 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
149 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
150 fInnerSink->UpdateExtraTypeInfo(extraTypeInfo);
151}
152
157
159{
160 auto colId = columnHandle.fPhysicalId;
161 const auto &element = *columnHandle.fColumn->GetElement();
162
163 // Safety: References are guaranteed to be valid until the element is destroyed. In other words, all buffered page
164 // elements are valid until DropBufferedPages().
165 auto &zipItem = fBufferedColumns.at(colId).BufferPage(columnHandle);
166 std::size_t maxSealedPageBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
167 // Do not allocate the buffer yet, in case of IMT we only need it once the task is started.
168 auto &sealedPage = fBufferedColumns.at(colId).RegisterSealedPage();
169
172 R__ASSERT(zipItem.fBuf);
173 };
175 // If the sealed page is smaller than the maximum size (with compression), allocate what is needed and copy the
176 // sealed page content to save memory.
177 auto sealedBufferSize = sealedPage.GetBufferSize();
180 memcpy(buf.get(), sealedPage.GetBuffer(), sealedBufferSize);
181 zipItem.fBuf = std::move(buf);
182 sealedPage.SetBuffer(zipItem.fBuf.get());
183 }
184 };
185
186 // If we already buffer more uncompressed bytes than the approximate zipped cluster size, we assume there is enough
187 // work for other threads to pick up. This limits the buffer usage when sealing / compression tasks are not processed
188 // fast enough, and heuristically reduces the memory usage, especially for big compression factors.
189 std::size_t bufferedUncompressed = fBufferedUncompressed.load();
190 bool enoughWork = bufferedUncompressed > GetWriteOptions().GetApproxZippedClusterSize();
191
192 if (!fTaskScheduler || enoughWork) {
193 allocateBuf();
194 // Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased.
195 RSealPageConfig config;
196 config.fPage = &page;
197 config.fElement = &element;
198 config.fCompressionSettings = GetWriteOptions().GetCompression();
199 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
200 config.fAllowAlias = false;
201 config.fBuffer = zipItem.fBuf.get();
202 {
203 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
204 sealedPage = SealPage(config);
205 }
207 zipItem.fSealedPage = &sealedPage;
208 return;
209 }
210
211 // We will buffer the uncompressed page. Unless work is consumed fast enough, the next page might be compressed
212 // directly.
213 fBufferedUncompressed += page.GetNBytes();
214
215 // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
216 zipItem.fPage = fPageAllocator->NewPage(page.GetElementSize(), page.GetNElements());
217 // make sure the page is aware of how many elements it will have
218 zipItem.fPage.GrowUnchecked(page.GetNElements());
219 assert(zipItem.fPage.GetNBytes() == page.GetNBytes());
220 memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
221
222 fCounters->fParallelZip.SetValue(1);
223 // Thread safety: Each thread works on a distinct zipItem which owns its
224 // compression buffer.
225 fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] {
226 // The task will consume the uncompressed page. Decrease the atomic counter early so that more work has arrived
227 // when we are done.
228 fBufferedUncompressed -= zipItem.fPage.GetNBytes();
229
230 allocateBuf();
231 RSealPageConfig config;
232 config.fPage = &zipItem.fPage;
233 config.fElement = &element;
234 config.fCompressionSettings = GetWriteOptions().GetCompression();
235 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
236 // Make sure the page buffer is not aliased so that we can free the uncompressed page.
237 config.fAllowAlias = false;
238 config.fBuffer = zipItem.fBuf.get();
239 // TODO: Somehow expose the time spent in zipping via the metrics. Wall time is tricky because the tasks run
240 // in parallel...
241 sealedPage = SealPage(config);
243 zipItem.fSealedPage = &sealedPage;
244 // Release the uncompressed page. This works because the "page allocator must be thread-safe."
245 zipItem.fPage = RPage();
246 });
247}
248
250 const RSealedPage & /*sealedPage*/)
251{
252 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
253}
254
256 std::span<ROOT::Internal::RPageStorage::RSealedPageGroup> /*ranges*/)
257{
258 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
259}
260
261// We implement both StageCluster() and CommitCluster() because we can call CommitCluster() on the inner sink more
262// efficiently in a single critical section. For parallel writing, it also guarantees that we produce a fully sequential
263// file.
265{
266 WaitForAllTasks();
267 assert(fBufferedUncompressed == 0 && "all buffered pages should have been processed");
268
269 std::vector<RSealedPageGroup> toCommit;
270 toCommit.reserve(fBufferedColumns.size());
271 for (auto &bufColumn : fBufferedColumns) {
272 R__ASSERT(bufColumn.HasSealedPagesOnly());
273 const auto &sealedPages = bufColumn.GetSealedPages();
274 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
275 }
276
277 {
278 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
279 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
280 fInnerSink->CommitSealedPageV(toCommit);
281
282 for (auto handle : fSuppressedColumns)
283 fInnerSink->CommitSuppressedColumn(handle);
284 fSuppressedColumns.clear();
285
287 }
288
289 for (auto &bufColumn : fBufferedColumns)
290 bufColumn.DropBufferedPages();
291}
292
294{
295 std::uint64_t nbytes;
296 FlushClusterImpl([&] { nbytes = fInnerSink->CommitCluster(nNewEntries); });
297 return nbytes;
298}
299
306
308{
309 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
310 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
311 fInnerSink->CommitStagedClusters(clusters);
312}
313
315{
316 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
317 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
318 fInnerSink->CommitClusterGroup();
319}
320
322{
323 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
324 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
325 return fInnerSink->CommitDataset();
326}
327
332
333std::unique_ptr<ROOT::Internal::RPageSink>
335{
336 return fInnerSink->CloneAsHidden(name, opts);
337}
338
340{
341 fInnerSink->CommitAttributeSet(attrSetName, attrAnchorInfo);
342}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define f(i)
Definition RSha256.hxx:104
#define g(i)
Definition RSha256.hxx:105
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
char name[80]
Definition TGX11.cxx:148
A thread-safe integral performance counter.
A collection of Counter objects with a name, a unit, and a description.
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:37
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
std::unique_ptr< RPageSink > CloneAsHidden(std::string_view name, const RNTupleWriteOptions &opts) const final
Creates a new sink with the same underlying storage as this but writing to a different RNTuple named ...
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
std::unique_ptr< RCounters > fCounters
std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
void FlushClusterImpl(std::function< void(void)> FlushClusterFn)
void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
void CommitAttributeSet(std::string_view attrSetName, const RNTupleLink &attrAnchorInfo) final
Adds the given anchor information (name + locator) into the main RNTuple's descriptor as an attribute...
RNTupleLink CommitDatasetImpl() final
const ROOT::RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void InitImpl(ROOT::RNTupleModel &model) final
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void ConnectFields(const std::vector< ROOT::RFieldBase * > &fields, ROOT::NTupleSize_t firstEntry)
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final
Register a new column.
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
Abstract interface to write data into an ntuple.
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
const std::string & GetNTupleName() const
Returns the NTuple name.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:43
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The container field for an ntuple model, which itself has no physical representation.
Definition RField.hxx:58
The on-storage metadata of an RNTuple.
The RNTupleModel encapulates the schema of an RNTuple.
std::unique_ptr< RNTupleModel > Clone() const
Common user-tunable settings for storing RNTuples.
const_iterator begin() const
The field for an untyped record.
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
void AddItemToRecord(RRecordField &record, std::unique_ptr< RFieldBase > newItem)
Definition RField.cxx:641
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel
I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
A sealed page contains the bytes of a page as written to storage (packed & compressed).