Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RDFHelpers.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 02/2018
2
3/*************************************************************************
4 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11// This header contains helper free functions that slim down RDataFrame's programming model
12
13#ifndef ROOT_RDF_HELPERS
14#define ROOT_RDF_HELPERS
15
19#include <ROOT/RResultHandle.hxx> // users of RunGraphs might rely on this transitive include
20#include <ROOT/TypeTraits.hxx>
21
22#include <array>
23#include <chrono>
24#include <fstream>
25#include <functional>
26#include <map>
27#include <memory>
28#include <mutex>
29#include <type_traits>
30#include <utility> // std::index_sequence
31#include <vector>
32
33namespace ROOT {
34namespace Internal {
35namespace RDF {
36template <typename... ArgTypes, typename F>
38{
39 return std::function<bool(ArgTypes...)>([=](ArgTypes... args) mutable { return !f(args...); });
40}
41
42template <typename... ArgTypes, typename Ret, typename... Args>
44{
45 return std::function<bool(ArgTypes...)>([=](ArgTypes... args) mutable { return !f(args...); });
46}
47
48template <typename I, typename T, typename F>
50
51template <std::size_t... N, typename T, typename F>
52class PassAsVecHelper<std::index_sequence<N...>, T, F> {
53 template <std::size_t Idx>
54 using AlwaysT = T;
55 std::decay_t<F> fFunc;
56
57public:
58 PassAsVecHelper(F &&f) : fFunc(std::forward<F>(f)) {}
59 auto operator()(AlwaysT<N>... args) -> decltype(fFunc({args...})) { return fFunc({args...}); }
60};
61
62template <std::size_t N, typename T, typename F>
64{
65 return PassAsVecHelper<std::make_index_sequence<N>, T, F>(std::forward<F>(f));
66}
67
68} // namespace RDF
69} // namespace Internal
70
71namespace RDF {
73
74// clang-format off
75/// Given a callable with signature bool(T1, T2, ...) return a callable with same signature that returns the negated result
76///
77/// The callable must have one single non-template definition of operator(). This is a limitation with respect to
78/// std::not_fn, required for interoperability with RDataFrame.
79// clang-format on
80template <typename F,
81 typename Args = typename ROOT::TypeTraits::CallableTraits<std::decay_t<F>>::arg_types_nodecay,
82 typename Ret = typename ROOT::TypeTraits::CallableTraits<std::decay_t<F>>::ret_type>
83auto Not(F &&f) -> decltype(RDFInternal::NotHelper(Args(), std::forward<F>(f)))
84{
85 static_assert(std::is_same<Ret, bool>::value, "RDF::Not requires a callable that returns a bool.");
86 return RDFInternal::NotHelper(Args(), std::forward<F>(f));
87}
88
89// clang-format off
90/// PassAsVec is a callable generator that allows passing N variables of type T to a function as a single collection.
91///
92/// PassAsVec<N, T>(func) returns a callable that takes N arguments of type T, passes them down to function `func` as
93/// an initializer list `{t1, t2, t3,..., tN}` and returns whatever f({t1, t2, t3, ..., tN}) returns.
94///
95/// Note that for this to work with RDataFrame the type of all columns that the callable is applied to must be exactly T.
96/// Example usage together with RDataFrame ("varX" columns must all be `float` variables):
97/// \code
98/// bool myVecFunc(std::vector<float> args);
99/// df.Filter(PassAsVec<3, float>(myVecFunc), {"var1", "var2", "var3"});
100/// \endcode
101// clang-format on
102template <std::size_t N, typename T, typename F>
104{
105 return RDFInternal::PassAsVecHelper<std::make_index_sequence<N>, T, F>(std::forward<F>(f));
106}
107
108// clang-format off
109/// Create a graphviz representation of the dataframe computation graph, return it as a string.
110/// \param[in] node any node of the graph. Called on the head (first) node, it prints the entire graph. Otherwise, only the branch the node belongs to.
111///
112/// The output can be displayed with a command akin to `dot -Tpng output.dot > output.png && open output.png`.
113///
114/// Note that "hanging" Defines, i.e. Defines without downstream nodes, will not be displayed by SaveGraph as they are
115/// effectively optimized away from the computation graph.
116///
117/// Note that SaveGraph is not thread-safe and must not be called concurrently from different threads.
118// clang-format on
119template <typename NodeType>
120std::string SaveGraph(NodeType node)
121{
123 return helper.RepresentGraph(node);
124}
125
126// clang-format off
127/// Create a graphviz representation of the dataframe computation graph, write it to the specified file.
128/// \param[in] node any node of the graph. Called on the head (first) node, it prints the entire graph. Otherwise, only the branch the node belongs to.
129/// \param[in] outputFile file where to save the representation.
130///
131/// The output can be displayed with a command akin to `dot -Tpng output.dot > output.png && open output.png`.
132///
133/// Note that "hanging" Defines, i.e. Defines without downstream nodes, will not be displayed by SaveGraph as they are
134/// effectively optimized away from the computation graph.
135///
136/// Note that SaveGraph is not thread-safe and must not be called concurrently from different threads.
137// clang-format on
138template <typename NodeType>
139void SaveGraph(NodeType node, const std::string &outputFile)
140{
142 std::string dotGraph = helper.RepresentGraph(node);
143
144 std::ofstream out(outputFile);
145 if (!out.is_open()) {
146 throw std::runtime_error("Could not open output file \"" + outputFile + "\"for reading");
147 }
148
149 out << dotGraph;
150 out.close();
151}
152
153// clang-format off
154/// Cast a RDataFrame node to the common type ROOT::RDF::RNode
155/// \param[in] node Any node of a RDataFrame graph
156// clang-format on
157template <typename NodeType>
159{
160 return node;
161}
162
163// clang-format off
164/// Run the event loops of multiple RDataFrames concurrently.
165/// \param[in] handles A vector of RResultHandles whose event loops should be run.
166/// \return The number of distinct computation graphs that have been processed.
167///
168/// This function triggers the event loop of all computation graphs which relate to the
169/// given RResultHandles. The advantage compared to running the event loop implicitly by accessing the
170/// RResultPtr is that the event loops will run concurrently. Therefore, the overall
171/// computation of all results can be scheduled more efficiently.
172/// It should be noted that user-defined operations (e.g., Filters and Defines) of the different RDataFrame graphs are assumed to be safe to call concurrently.
173/// RDataFrame will pass slot numbers in the range [0, NThread-1] to all helpers used in nodes such as DefineSlot. NThread is the number of threads ROOT was
174/// configured with in EnableImplicitMT().
175/// Slot numbers are unique across all graphs, so no two tasks with the same slot number will run concurrently. Note that it is not guaranteed that each slot
176/// number will be reached in every graph.
177///
178/// ~~~{.cpp}
179/// ROOT::RDataFrame df1("tree1", "file1.root");
180/// auto r1 = df1.Histo1D("var1");
181///
182/// ROOT::RDataFrame df2("tree2", "file2.root");
183/// auto r2 = df2.Sum("var2");
184///
185/// // RResultPtr -> RResultHandle conversion is automatic
186/// ROOT::RDF::RunGraphs({r1, r2});
187/// ~~~
188// clang-format on
189unsigned int RunGraphs(std::vector<RResultHandle> handles);
190
191namespace Experimental {
192
193/// \brief Produce all required systematic variations for the given result.
194/// \param[in] resPtr The result for which variations should be produced.
195/// \return A \ref ROOT::RDF::Experimental::RResultMap "RResultMap" object with full variation names as strings
196/// (e.g. "pt:down") and the corresponding varied results as values.
197///
198/// A given input RResultPtr<T> produces a corresponding RResultMap<T> with a "nominal"
199/// key that will return a value identical to the one contained in the original RResultPtr.
200/// Other keys correspond to the varied values of this result, one for each variation
201/// that the result depends on.
202/// VariationsFor does not trigger the event loop. The event loop is only triggered
203/// upon first access to a valid key, similarly to what happens with RResultPtr.
204///
205/// If the result does not depend, directly or indirectly, from any registered systematic variation, the
206/// returned RResultMap will contain only the "nominal" key.
207///
208/// See RDataFrame's \ref ROOT::RDF::RInterface::Vary() "Vary" method for more information and example usages.
209///
210/// \note Currently, producing variations for the results of \ref ROOT::RDF::RInterface::Display() "Display",
211/// \ref ROOT::RDF::RInterface::Report() "Report" and \ref ROOT::RDF::RInterface::Snapshot() "Snapshot"
212/// actions is not supported.
213//
214// An overview of how systematic variations work internally. Given N variations (including the nominal):
215//
216// RResultMap owns RVariedAction
217// N results N action helpers
218// N previous filters
219// N*#input_cols column readers
220//
221// ...and each RFilter and RDefine knows for what universe it needs to construct column readers ("nominal" by default).
222template <typename T>
224{
225 R__ASSERT(resPtr != nullptr && "Calling VariationsFor on an empty RResultPtr");
226
227 // populate parts of the computation graph for which we only have "empty shells", e.g. RJittedActions and
228 // RJittedFilters
229 resPtr.fLoopManager->Jit();
230
231 std::unique_ptr<RDFInternal::RActionBase> variedAction;
232 std::vector<std::shared_ptr<T>> variedResults;
233
234 std::shared_ptr<RDFInternal::RActionBase> nominalAction = resPtr.fActionPtr;
235 std::vector<std::string> variations = nominalAction->GetVariations();
236 const auto nVariations = variations.size();
237
238 if (nVariations > 0) {
239 // clone the result once for each variation
240 variedResults.reserve(nVariations);
241 for (auto i = 0u; i < nVariations; ++i){
242 // implicitly assuming that T is copiable: this should be the case
243 // for all result types in use, as they are copied for each slot
244 variedResults.emplace_back(new T{*resPtr.fObjPtr});
245
246 // Check if the result's type T inherits from TNamed
247 if constexpr (std::is_base_of<TNamed, T>::value) {
248 // Get the current variation name
249 std::string variationName = variations[i];
250 // Replace the colon with an underscore
251 std::replace(variationName.begin(), variationName.end(), ':', '_');
252 // Get a pointer to the corresponding varied result
253 auto &variedResult = variedResults.back();
254 // Set the varied result's name to NOMINALNAME_VARIATIONAME
255 variedResult->SetName((std::string(variedResult->GetName()) + "_" + variationName).c_str());
256 }
257 }
258
259 std::vector<void *> typeErasedResults;
260 typeErasedResults.reserve(variedResults.size());
261 for (auto &res : variedResults)
262 typeErasedResults.emplace_back(&res);
263
264 // Create the RVariedAction and inject it in the computation graph.
265 // This recursively creates all the required varied column readers and upstream nodes of the computation graph.
266 variedAction = nominalAction->MakeVariedAction(std::move(typeErasedResults));
267 }
268
269 return RDFInternal::MakeResultMap<T>(resPtr.fObjPtr, std::move(variedResults), std::move(variations),
270 *resPtr.fLoopManager, std::move(nominalAction), std::move(variedAction));
271}
272
275
276/// \brief Add ProgressBar to a ROOT::RDF::RNode
277/// \param[in] df RDataFrame node at which ProgressBar is called.
278///
279/// The ProgressBar can be added not only at the RDataFrame head node, but also at any any computational node,
280/// such as Filter or Define.
281/// ###Example usage:
282/// ~~~{.cpp}
283/// ROOT::RDataFrame df("tree", "file.root");
284/// auto df_1 = ROOT::RDF::RNode(df.Filter("x>1"));
285/// ROOT::RDF::Experimental::AddProgressBar(df_1);
286/// ~~~
288
289/// \brief Add ProgressBar to an RDataFrame
290/// \param[in] df RDataFrame for which ProgressBar is called.
291///
292/// This function adds a ProgressBar to display the event statistics in the terminal every
293/// \b m events and every \b n seconds, including elapsed time, currently processed file,
294/// currently processed events, the rate of event processing
295/// and an estimated remaining time (per file being processed).
296/// ProgressBar should be added after the dataframe object (df) is created first:
297/// ~~~{.cpp}
298/// ROOT::RDataFrame df("tree", "file.root");
299/// ROOT::RDF::Experimental::AddProgressBar(df);
300/// ~~~
301/// For more details see ROOT::RDF::Experimental::ProgressHelper Class.
303
305
306/// RDF progress helper.
307/// This class provides callback functions to the RDataFrame. The event statistics
308/// (including elapsed time, currently processed file, currently processed events, the rate of event processing
309/// and an estimated remaining time (per file being processed))
310/// are recorded and printed in the terminal every m events and every n seconds.
311/// ProgressHelper::operator()(unsigned int, T&) is thread safe, and can be used as a callback in MT mode.
312/// ProgressBar should be added after creating the dataframe object (df):
313/// ~~~{.cpp}
314/// ROOT::RDataFrame df("tree", "file.root");
315/// ROOT::RDF::Experimental::AddProgressBar(df);
316/// ~~~
317/// alternatively RDataFrame can be cast to an RNode first giving it more flexibility.
318/// For example, it can be called at any computational node, such as Filter or Define, not only the head node,
319/// with no change to the ProgressBar function itself:
320/// ~~~{.cpp}
321/// ROOT::RDataFrame df("tree", "file.root");
322/// auto df_1 = ROOT::RDF::RNode(df.Filter("x>1"));
323/// ROOT::RDF::Experimental::AddProgressBar(df_1);
324/// ~~~
326private:
327 double EvtPerSec() const;
328 std::pair<std::size_t, std::chrono::seconds> RecordEvtCountAndTime();
329 void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const;
330 void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const;
331 void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const;
332
333 std::chrono::time_point<std::chrono::system_clock> fBeginTime = std::chrono::system_clock::now();
334 std::chrono::time_point<std::chrono::system_clock> fLastPrintTime = fBeginTime;
335 std::chrono::seconds fPrintInterval{1};
336
337 std::atomic<std::size_t> fProcessedEvents{0};
338 std::size_t fLastProcessedEvents{0};
339 std::size_t fIncrement;
340
342 std::map<std::string, ULong64_t> fSampleNameToEventEntries; // Filename, events in the file
343
344 std::array<double, 20> fEventsPerSecondStatistics;
346
347 unsigned int fBarWidth;
348 unsigned int fTotalFiles;
349
350 std::mutex fPrintMutex;
351 bool fIsTTY;
353
354 std::shared_ptr<TTree> fTree{nullptr};
355
356public:
357 /// Create a progress helper.
358 /// \param increment RDF callbacks are called every `n` events. Pass this `n` here.
359 /// \param totalFiles read total number of files in the RDF.
360 /// \param progressBarWidth Number of characters the progress bar will occupy.
361 /// \param printInterval Update every stats every `n` seconds.
362 /// \param useColors Use shell colour codes to colour the output. Automatically disabled when
363 /// we are not writing to a tty.
364 ProgressHelper(std::size_t increment, unsigned int totalFiles = 1, unsigned int progressBarWidth = 40,
365 unsigned int printInterval = 1, bool useColors = true);
366
367 ~ProgressHelper() = default;
368
369 friend class ProgressBarAction;
370
371 /// Register a new sample for completion statistics.
372 /// \see ROOT::RDF::RInterface::DefinePerSample().
373 /// The *id.AsString()* refers to the name of the currently processed file.
374 /// The idea is to populate the event entries in the *fSampleNameToEventEntries* map
375 /// by selecting the greater of the two values:
376 /// *id.EntryRange().second* which is the upper event entry range of the processed sample
377 /// and the current value of the event entries in the *fSampleNameToEventEntries* map.
378 /// In the single threaded case, the two numbers are the same as the entry range corresponds
379 /// to the number of events in an individual file (each sample is simply a single file).
380 /// In the multithreaded case, the idea is to accumulate the higher event entry value until
381 /// the total number of events in a given file is reached.
382 void registerNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id)
383 {
384 std::lock_guard<std::mutex> lock(fSampleNameToEventEntriesMutex);
385 fSampleNameToEventEntries[id.AsString()] =
386 std::max(id.EntryRange().second, fSampleNameToEventEntries[id.AsString()]);
387 }
388
389 /// Thread-safe callback for RDataFrame.
390 /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the
391 /// fPrintInterval). \param slot Ignored. \param value Ignored.
392 template <typename T>
393 void operator()(unsigned int /*slot*/, T &value)
394 {
396 }
397 // clang-format off
398 /// Thread-safe callback for RDataFrame.
399 /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the fPrintInterval).
400 /// \param value Ignored.
401 // clang-format on
402 template <typename T>
403 void operator()(T & /*value*/)
404 {
405 using namespace std::chrono;
406 // ***************************************************
407 // Warning: Here, everything needs to be thread safe:
408 // ***************************************************
410
411 // We only print every n seconds.
412 if (duration_cast<seconds>(system_clock::now() - fLastPrintTime) < fPrintInterval) {
413 return;
414 }
415
416 // ***************************************************
417 // Protected by lock from here:
418 // ***************************************************
419 if (!fPrintMutex.try_lock())
420 return;
421 std::lock_guard<std::mutex> lockGuard(fPrintMutex, std::adopt_lock);
422
423 std::size_t eventCount;
424 seconds elapsedSeconds;
426
427 if (fIsTTY)
428 std::cout << "\r";
429
430 PrintProgressBar(std::cout, eventCount);
432
433 if (fIsTTY)
434 std::cout << std::flush;
435 else
436 std::cout << std::endl;
437 }
438
439 std::size_t ComputeNEventsSoFar() const
440 {
441 std::unique_lock<std::mutex> lock(fSampleNameToEventEntriesMutex);
442 std::size_t result = 0;
443 for (const auto &item : fSampleNameToEventEntries)
444 result += item.second;
445 return result;
446 }
447
448 unsigned int ComputeCurrentFileIdx() const
449 {
450 std::unique_lock<std::mutex> lock(fSampleNameToEventEntriesMutex);
451 return fSampleNameToEventEntries.size();
452 }
453};
454} // namespace Experimental
455} // namespace RDF
456} // namespace ROOT
457#endif
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
#define N
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
TRObject operator()(const T1 &t1) const
std::pair< std::size_t, std::chrono::seconds > RecordEvtCountAndTime()
Record current event counts and time stamp, populate evts/s statistics array.
void registerNewSample(unsigned int, const ROOT::RDF::RSampleInfo &id)
Register a new sample for completion statistics.
void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const
Print a progress bar of width ProgressHelper::fBarWidth if fGetNEventsOfCurrentFile is known.
void operator()(unsigned int, T &value)
Thread-safe callback for RDataFrame.
void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const
Print event and time statistics.
std::map< std::string, ULong64_t > fSampleNameToEventEntries
std::array< double, 20 > fEventsPerSecondStatistics
double EvtPerSec() const
Compute a running mean of events/s.
std::atomic< std::size_t > fProcessedEvents
ProgressHelper(std::size_t increment, unsigned int totalFiles=1, unsigned int progressBarWidth=40, unsigned int printInterval=1, bool useColors=true)
Create a progress helper.
std::chrono::time_point< std::chrono::system_clock > fLastPrintTime
std::chrono::time_point< std::chrono::system_clock > fBeginTime
void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const
void operator()(T &)
Thread-safe callback for RDataFrame.
The public interface to the RDataFrame federation of classes.
Smart pointer for the return type of actions.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...
const_iterator begin() const
const_iterator end() const
#define F(x, y, z)
std::function< bool(ArgTypes...)> NotHelper(ROOT::TypeTraits::TypeList< ArgTypes... >, F &&f)
auto PassAsVec(F &&f) -> PassAsVecHelper< std::make_index_sequence< N >, T, F >
RResultMap< T > VariationsFor(RResultPtr< T > resPtr)
Produce all required systematic variations for the given result.
void AddProgressBar(ROOT::RDF::RNode df)
Add ProgressBar to a ROOT::RDF::RNode.
auto Not(F &&f) -> decltype(RDFInternal::NotHelper(Args(), std::forward< F >(f)))
Given a callable with signature bool(T1, T2, ...) return a callable with same signature that returns ...
std::string SaveGraph(NodeType node)
Create a graphviz representation of the dataframe computation graph, return it as a string.
RNode AsRNode(NodeType node)
Cast a RDataFrame node to the common type ROOT::RDF::RNode.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Lightweight storage for a collection of types.