Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ntpl013_staged.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_ntuple
3/// \notebook
4/// Example of staged cluster committing in multi-threaded writing using RNTupleParallelWriter.
5///
6/// \macro_code
7///
8/// \date September 2024
9/// \author The ROOT Team
10
11// NOTE: The RNTupleParallelWriter is experimental at this point.
12// Functionality and interface are still subject to changes.
13
15#include <ROOT/RNTupleModel.hxx>
17
18#include <TRandom3.h>
19
20#include <iostream>
21#include <memory>
22#include <mutex>
23#include <random>
24#include <thread>
25#include <vector>
26
27// Import classes from experimental namespace for the time being
29
30// Where to store the ntuple of this example
31constexpr char const *kNTupleFileName = "ntpl013_staged.root";
32
33// Number of parallel threads to fill the ntuple
34constexpr int kNWriterThreads = 4;
35
36// Number of events to generate is kNEventsPerThread * kNWriterThreads
37constexpr int kNEventsPerThread = 25000;
38
39// Number of events per block
40constexpr int kNEventsPerBlock = 10000;
41
42// Thread function to generate and write events
43void FillData(int id, RNTupleParallelWriter *writer)
44{
45 // static variables that are shared between threads; this is done for simplicity in this tutorial, use proper data
46 // structures in real code!
47 static std::mutex g_Mutex;
49
50 using generator = std::mt19937;
52
53 // Create a fill context and turn on staged cluster committing.
54 auto fillContext = writer->CreateFillContext();
55 fillContext->EnableStagedClusterCommitting();
56 auto entry = fillContext->CreateEntry();
57
58 auto eventId = entry->GetPtr<std::uint32_t>("eventId");
60 auto rndm = entry->GetPtr<float>("rndm");
61
62 for (int i = 0; i < kNEventsPerThread; i++) {
63 // Prepare the entry with an id and a random number.
64 *eventId = eventIdStart + i;
65 auto d = static_cast<double>(gen()) / generator::max();
66 *rndm = static_cast<float>(d);
67
68 // Fill might auto-flush a cluster, which will be staged.
69 fillContext->Fill(*entry);
70 }
71
72 // It is important to first FlushCluster() so that a cluster with the remaining entries is staged.
73 fillContext->FlushCluster();
74 {
75 std::lock_guard g(g_Mutex);
76 fillContext->CommitStagedClusters();
77 std::cout << "Thread #" << id << " wrote events #" << eventIdStart << " - #"
78 << (eventIdStart + kNEventsPerThread - 1) << " as entries #" << g_WrittenEntries << " - #"
79 << (g_WrittenEntries + kNEventsPerThread - 1) << std::endl;
81 }
82}
83
84// Generate kNEvents with multiple threads in kNTupleFileName
85void Write()
86{
87 std::cout << " === Writing with staged cluster committing ===" << std::endl;
88
89 // Create the data model
90 auto model = ROOT::RNTupleModel::CreateBare();
91 model->MakeField<std::uint32_t>("eventId");
92 model->MakeField<float>("rndm");
93
94 // Create RNTupleWriteOptions to make the writing commit multiple clusters.
95 // This is for demonstration purposes only to have multiple clusters per
96 // thread that are implicitly flushed, and should not be copied into real
97 // code!
99 options.SetApproxZippedClusterSize(32'000);
100
101 // We hand over the data model to a newly created ntuple of name "NTuple", stored in kNTupleFileName
102 auto writer = RNTupleParallelWriter::Recreate(std::move(model), "NTuple", kNTupleFileName, options);
103
104 std::vector<std::thread> threads;
105 for (int i = 0; i < kNWriterThreads; ++i)
106 threads.emplace_back(FillData, i, writer.get());
107 for (int i = 0; i < kNWriterThreads; ++i)
108 threads[i].join();
109
110 // The writer unique pointer goes out of scope here. On destruction, the writer flushes unwritten data to disk
111 // and closes the attached ROOT file.
112}
113
114void FillDataInBlocks(int id, RNTupleParallelWriter *writer)
115{
116 // static variables that are shared between threads; this is done for simplicity in this tutorial, use proper data
117 // structures in real code!
118 static std::mutex g_Mutex;
120
121 using generator = std::mt19937;
123
124 // Create a fill context and turn on staged cluster committing.
125 auto fillContext = writer->CreateFillContext();
126 fillContext->EnableStagedClusterCommitting();
127 auto entry = fillContext->CreateEntry();
128
129 auto eventId = entry->GetPtr<std::uint32_t>("eventId");
131 int startOfBlock = 0;
132 auto rndm = entry->GetPtr<float>("rndm");
133
134 for (int i = 0; i < kNEventsPerThread; i++) {
135 // Prepare the entry with an id and a random number.
136 *eventId = eventIdStart + i;
137 auto d = static_cast<double>(gen()) / generator::max();
138 *rndm = static_cast<float>(d);
139
140 // Fill might auto-flush a cluster, which will be staged.
141 fillContext->Fill(*entry);
142
143 if ((i + 1) % kNEventsPerBlock == 0) {
144 // Decide to flush this cluster and logically append all staged clusters to the ntuple.
145 fillContext->FlushCluster();
146 {
147 std::lock_guard g(g_Mutex);
148 fillContext->CommitStagedClusters();
149 auto startOfBlock;
150 auto lastEvent = eventIdStart + i;
151 std::cout << "Thread #" << id << " wrote events #" << lastEvent << " as entries #"
152 << g_WrittenEntries << " - #" << (g_WrittenEntries + kNEventsPerBlock - 1) << std::endl;
155 }
156 }
157 }
158
159 // Flush the remaining data and commit staged clusters.
160 fillContext->FlushCluster();
161 {
162 std::lock_guard g(g_Mutex);
163 fillContext->CommitStagedClusters();
164 auto startOfBlock;
167 std::cout << "Thread #" << id << " wrote events #" << lastEvent << " as entries #"
168 << g_WrittenEntries << " - #" << (g_WrittenEntries + numEvents - 1) << std::endl;
170 }
171}
172
173// Generate kNEvents with multiple threads in kNTupleFileName, and sequence them in blocks of kNEventsPerBlock entries
174void WriteInBlocks()
175{
176 std::cout << "\n === ... with sequencing in blocks of " << kNEventsPerBlock << " events ===" << std::endl;
177
178 // Create the data model
179 auto model = ROOT::RNTupleModel::CreateBare();
180 model->MakeField<std::uint32_t>("eventId");
181 model->MakeField<float>("rndm");
182
183 // Create RNTupleWriteOptions to make the writing commit multiple clusters.
184 // This is for demonstration purposes only to have multiple clusters per
185 // thread and also per block that are implicitly flushed, and can be mixed
186 // with explicit calls to FlushCluster(). This should not be copied into real
187 // code!
189 options.SetApproxZippedClusterSize(32'000);
190
191 // We hand over the data model to a newly created ntuple of name "NTuple", stored in kNTupleFileName
192 auto writer = RNTupleParallelWriter::Recreate(std::move(model), "NTuple", kNTupleFileName, options);
193
194 std::vector<std::thread> threads;
195 for (int i = 0; i < kNWriterThreads; ++i)
196 threads.emplace_back(FillDataInBlocks, i, writer.get());
197 for (int i = 0; i < kNWriterThreads; ++i)
198 threads[i].join();
199
200 // The writer unique pointer goes out of scope here. On destruction, the writer flushes unwritten data to disk
201 // and closes the attached ROOT file.
202}
203
204void ntpl013_staged()
205{
206 Write();
208}
#define d(i)
Definition RSha256.hxx:102
#define g(i)
Definition RSha256.hxx:105
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
A writer to fill an RNTuple from multiple contexts.
static std::unique_ptr< RNTupleModel > CreateBare()
Creates a "bare model", i.e. an RNTupleModel with no default entry.
Common user-tunable settings for storing RNTuples.
void SetApproxZippedClusterSize(std::size_t val)
void FillData(BinData &dv, const TH1 *hist, TF1 *func=nullptr)
fill the data vector from a TH1.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.