Skip to content

[ML] Ensure the performance critical data are 16 byte aligned for data frame analyses #1142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
(See {ml-pull}1126[#1126], issue: {issue}54506[#54506].)
* Added a {ml} native code build for Linux on AArch64. (See {ml-pull}1132[#1132] and
{ml-pull}1135[#1135].)
* Improve data frame analysis runtime by optimising memory alignment for intrinsic
operations. (See {ml-pull}1142[#1142].)

== {es} version 7.7.1

Expand All @@ -66,7 +68,6 @@
* Fixed background persistence of categorizer state (See {ml-pull}1137[#1137],
issue: {ml-issue}1136[#1136].)


== {es} version 7.7.0

=== New Features
Expand Down
2 changes: 1 addition & 1 deletion include/api/CDataFrameTrainBoostedTreeRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class API_EXPORT CDataFrameTrainBoostedTreeRunner : public CDataFrameAnalysisRun
static const std::string NUM_TOP_FEATURE_IMPORTANCE_VALUES;
static const std::string TRAINING_PERCENT_FIELD_NAME;

//Output
// Output
static const std::string IS_TRAINING_FIELD_NAME;
static const std::string FEATURE_NAME_FIELD_NAME;
static const std::string IMPORTANCE_FIELD_NAME;
Expand Down
114 changes: 114 additions & 0 deletions include/core/CAlignment.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

#ifndef INCLUDED_ml_core_CAlignment_h
#define INCLUDED_ml_core_CAlignment_h

#include <core/ImportExport.h>

#include <Eigen/Core>

#include <array>
#include <cstddef>
#include <vector>

namespace ml {
namespace core {

class CORE_EXPORT CAlignment {
public:
//! Alignment types.
enum EType {
E_Unaligned = 1,
E_Aligned8 = 8,
E_Aligned16 = 16,
E_Aligned32 = 32
};

//! This is an ordering by inclusion, i.e. \p lhs < \p rhs if an address is
//! \p rhs aligned implies it is lhs aligned but not vice versa.
static bool less(EType lhs, EType rhs) { return bytes(lhs) < bytes(rhs); }

//! Get the alignment of \p address.
template<typename T>
static EType maxAlignment(const T* address) {
// clang-format off
return (isAligned(address, E_Aligned32) ? E_Aligned32 :
(isAligned(address, E_Aligned16) ? E_Aligned16 :
(isAligned(address, E_Aligned8) ? E_Aligned8 :
(E_Unaligned))));
// clang-format on
}

//! Check if \p address has \p alignment.
template<typename T>
static bool isAligned(const T* address, EType alignment) {
return offset(address, alignment) == 0;
}

//! Get the next index in \p buffer which is aligned to \p alignment.
template<typename T, std::size_t N>
static std::size_t
nextAligned(const std::array<T, N>& buffer, std::size_t index, EType alignment) {
std::size_t offset_{offset(&buffer[index], alignment)};
return offset_ == 0 ? index : index + (bytes(alignment) - offset_) / sizeof(T);
}

//! Get the next index in \p buffer which is aligned to \p alignment.
template<typename T>
static std::size_t
nextAligned(const std::vector<T>& buffer, std::size_t index, EType alignment) {
std::size_t offset_{offset(&buffer[index], alignment)};
return offset_ == 0 ? index : index + (bytes(alignment) - offset_) / sizeof(T);
}

//! Round up n items of T so they use a multiple of \p alignment size memory.
template<typename T>
static std::size_t roundup(EType alignment, std::size_t n) {
return roundupSizeof<T>(alignment, n) / sizeof(T);
}

//! Round up sizeof(T) up to multiple of \p alignment bytes.
template<typename T>
static std::size_t roundupSizeof(EType alignment, std::size_t n = 1) {
std::size_t bytes_{bytes(alignment)};
return ((n * sizeof(T) + bytes_ - 1) / bytes_) * bytes_;
}

//! Print the type.
static std::string print(EType type) {
switch (type) {
case E_Unaligned:
return "unaligned";
case E_Aligned8:
return "aligned 8";
case E_Aligned16:
return "aligned 16";
case E_Aligned32:
return "aligned 32";
}
return "";
}

private:
template<typename T>
static std::size_t offset(const T* address, EType alignment) {
return reinterpret_cast<std::size_t>(address) & mask(alignment);
}

static std::size_t mask(EType alignment) { return bytes(alignment) - 1; }

static std::size_t bytes(EType alignment) {
return static_cast<std::size_t>(alignment);
}
};

template<typename T>
using CAlignedAllocator = Eigen::aligned_allocator<T>;
}
}

#endif // INCLUDED_ml_core_CAlignment_h
68 changes: 49 additions & 19 deletions include/core/CDataFrame.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef INCLUDED_ml_core_CDataFrame_h
#define INCLUDED_ml_core_CDataFrame_h

#include <core/CAlignment.h>
#include <core/CFloatStorage.h>
#include <core/CPackedBitVector.h>
#include <core/CVectorRange.h>
Expand All @@ -32,7 +33,7 @@ class CTemporaryDirectory;

namespace data_frame_detail {

using TFloatVec = std::vector<CFloatStorage>;
using TFloatVec = std::vector<CFloatStorage, CAlignedAllocator<CFloatStorage>>;
using TFloatVecItr = TFloatVec::iterator;
using TInt32Vec = std::vector<std::int32_t>;
using TInt32VecCItr = TInt32Vec::const_iterator;
Expand Down Expand Up @@ -178,47 +179,54 @@ class CORE_EXPORT CRowIterator final
//! parallelized in which case each reader reads a disjoint subset of the data
//! frame's rows.
//!
//! Space can be reserved at any point to hold one or more additional columns.
//! These are not visible until they are written.
//! Space can be reserved for additional rows and the data frame can be resized
//! to hold one or more additional columns. Resizing is a heavyweight operation
//! and should be minimized.
//!
//! IMPLEMENTATION:\n
//! This is a fairly lightweight container which is essentially responsible
//! for managing the read and write process to some underlying store format.
//! The store format is determined by the user implementing functionality to
//! read and write state from the store. For example, these could copy to /
//! from main memory, "write to" / "read from" disk, etc. A factory function
//! must be provided to the constructor which effectively that determines the
//! type of storage used. It is assumed that copying this has no side effects.
//! for new chunks of storage must be provided to the constructor and this
//! effectively determines the type of storage used. It is assumed that copying
//! this function has no side effects.
//!
//! The data frame is divided into slices each of which represent a number of
//! contiguous rows. The idea is that they contain a reasonable amount of memory
//! so that, for example, they significantly reduce the number of "writes to" /
//! "reads from" disk (a whole slice being written or read in one go), mean we'll
//! get good locality of reference and mean there is minimal book keeping overhead
//! (such as state for vector sizes, pointers to starts of memory blocks, etc).
//! In addition, it is assumed that access to the individual slices is thread
//! safe. If they share state the implementation must ensure that access to this
//! is synchronized.
//! It is possible to choose an alignment for each row in which case the address
//! of the start of each row is 8, 16, etc byte aligned. This comes with a memory
//! overhead as row sizes are then rounded up to the nearest multiple of the
//! alignment size. Finally, note that it is assumed that access to the individual
//! slices is thread safe. If they share state the implementation must ensure that
//! access to this is synchronized.
//!
//! Reads and writes of a single row are also done via call backs supplied to the
//! Reads and writes of a single row are done via call backs supplied to the
//! readRows and writeRow functions. This is to achieve maximum decoupling from
//! the calling code for how the underlying values are used or where they come
//! from. It also means certain operations can be done very efficiently. For example,
//! a stream can be attached to a row writer function to copy the values directly
//! into the data frame storage.
//! into the data frame storage with no marshalling costs.
//!
//! Read and writes to storage can optionally happen in a separate thread to the
//! row reading and writing to deal with the case that these operations can by
//! time consuming.
//! Read from and writes to storage can optionally happen in a separate thread
//! to the row reading and writing to deal with the case that these operations
//! can by time consuming.
class CORE_EXPORT CDataFrame final {
public:
using TBoolVec = std::vector<bool>;
using TSizeVec = std::vector<std::size_t>;
using TStrVec = std::vector<std::string>;
using TStrVecVec = std::vector<TStrVec>;
using TStrCRng = CVectorRange<const TStrVec>;
using TFloatVec = std::vector<CFloatStorage>;
using TFloatVec = std::vector<CFloatStorage, CAlignedAllocator<CFloatStorage>>;
using TFloatVecItr = TFloatVec::iterator;
using TInt32Vec = std::vector<std::int32_t>;
using TSizeAlignmentPrVec = std::vector<std::pair<std::size_t, CAlignment::EType>>;
using TRowRef = data_frame_detail::CRowRef;
using TRowItr = data_frame_detail::CRowIterator;
using TRowFunc = std::function<void(TRowItr, TRowItr)>;
Expand All @@ -245,6 +253,7 @@ class CORE_EXPORT CDataFrame final {
public:
//! \param[in] inMainMemory True if the data frame is stored in main memory.
//! \param[in] numberColumns The number of columns in the data frame.
//! \param[in] rowAlignment The alignment to use for the start of each row.
//! \param[in] sliceCapacityInRows The capacity of a slice of the data frame
//! as a number of rows.
//! \param[in] readAndWriteToStoreSyncStrategy Controls whether reads and
Expand All @@ -256,13 +265,15 @@ class CORE_EXPORT CDataFrame final {
//! the implementers responsibility to ensure these conditions are satisfied.
CDataFrame(bool inMainMemory,
std::size_t numberColumns,
CAlignment::EType rowAlignment,
std::size_t sliceCapacityInRows,
EReadWriteToStorage readAndWriteToStoreSyncStrategy,
const TWriteSliceToStoreFunc& writeSliceToStore);

//! Overload which manages the setting of slice capacity to a sensible default.
CDataFrame(bool inMainMemory,
std::size_t numberColumns,
CAlignment::EType rowAlignment,
EReadWriteToStorage readAndWriteToStoreSyncStrategy,
const TWriteSliceToStoreFunc& writeSliceToStore);

Expand Down Expand Up @@ -297,6 +308,18 @@ class CORE_EXPORT CDataFrame final {
//! \param[in] numberColumns The desired number of columns.
void resizeColumns(std::size_t numberThreads, std::size_t numberColumns);

//! Resize to contain \p extraColumns columns.
//!
//! These are split up into blocks of columns with their required alignment.
//! Pads are automatically inserted for alignment and a vector of the start
//! position of each block of columns is returned.
//!
//! \param[in] numberThreads The target number of threads to use.
//! \param[in] extraColumns The desired additional columns.
//! \return The index of each (block of) columns in \p extraColumns.
//! \warning This only supports alignments less than or equal the row alignment.
TSizeVec resizeColumns(std::size_t numberThreads, const TSizeAlignmentPrVec& extraColumns);

//! This reads rows using one or more readers.
//!
//! One reader is bound to one thread. Each thread reads a disjoint subset
Expand Down Expand Up @@ -351,7 +374,7 @@ class CORE_EXPORT CDataFrame final {
std::vector<READER> readers;
readers.reserve(result.first.size());
for (auto& reader_ : result.first) {
readers.push_back(std::move(*reader_.target<READER>()));
readers.emplace_back(std::move(*reader_.target<READER>()));
}

return {std::move(readers), result.second};
Expand Down Expand Up @@ -412,7 +435,7 @@ class CORE_EXPORT CDataFrame final {
std::vector<WRITER> writers;
writers.reserve(result.first.size());
for (auto& writer_ : result.first) {
writers.push_back(std::move(*writer_.target<WRITER>()));
writers.emplace_back(std::move(*writer_.target<WRITER>()));
}

return {std::move(writers), result.second};
Expand Down Expand Up @@ -485,7 +508,8 @@ class CORE_EXPORT CDataFrame final {
//! \p numberColumns columns.
static std::size_t estimateMemoryUsage(bool inMainMemory,
std::size_t numberRows,
std::size_t numberColumns);
std::size_t numberColumns,
CAlignment::EType alignment);

//! Get the value to use for a missing element in a data frame.
static constexpr double valueOfMissing() {
Expand Down Expand Up @@ -568,6 +592,8 @@ class CORE_EXPORT CDataFrame final {
std::size_t m_RowCapacity;
//! The capacity of a slice of the data frame as a number of rows.
std::size_t m_SliceCapacityInRows;
//! The start of row memory alignment.
core::CAlignment::EType m_RowAlignment;

//! If true read and write asynchronously to storage.
EReadWriteToStorage m_ReadAndWriteToStoreSyncStrategy;
Expand Down Expand Up @@ -610,12 +636,14 @@ class CORE_EXPORT CDataFrame final {
//! capacity in rows.
//! \param[in] readWriteToStoreSyncStrategy Controls whether reads and writes
//! from slice storage are synchronous or asynchronous.
//! \param[in] alignment The alignment to use for the start of each row.
CORE_EXPORT
std::pair<std::unique_ptr<CDataFrame>, std::shared_ptr<CTemporaryDirectory>>
makeMainStorageDataFrame(std::size_t numberColumns,
boost::optional<std::size_t> sliceCapacity = boost::none,
CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy =
CDataFrame::EReadWriteToStorage::E_Sync);
CDataFrame::EReadWriteToStorage::E_Sync,
CAlignment::EType alignment = CAlignment::E_Aligned16);

//! Make a data frame which uses disk storage for its slices.
//!
Expand All @@ -627,14 +655,16 @@ makeMainStorageDataFrame(std::size_t numberColumns,
//! capacity in rows.
//! \param[in] readWriteToStoreSyncStrategy Controls whether reads and writes
//! from slice storage are synchronous or asynchronous.
//! \param[in] alignment The alignment to use for the start of each row.
CORE_EXPORT
std::pair<std::unique_ptr<CDataFrame>, std::shared_ptr<CTemporaryDirectory>>
makeDiskStorageDataFrame(const std::string& rootDirectory,
std::size_t numberColumns,
std::size_t numberRows,
boost::optional<std::size_t> sliceCapacity = boost::none,
CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy =
CDataFrame::EReadWriteToStorage::E_Async);
CDataFrame::EReadWriteToStorage::E_Async,
CAlignment::EType alignment = CAlignment::E_Aligned16);
}
}

Expand Down
7 changes: 4 additions & 3 deletions include/core/CDataFrameRowSlice.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef INCLUDED_ml_core_CDataFrameRowSlice_h
#define INCLUDED_ml_core_CDataFrameRowSlice_h

#include <core/CAlignment.h>
#include <core/CFloatStorage.h>
#include <core/CompressUtils.h>
#include <core/ImportExport.h>
Expand All @@ -24,7 +25,7 @@ namespace data_frame_row_slice_detail {
//! \brief The implementation backing a data frame row slice handle.
class CORE_EXPORT CDataFrameRowSliceHandleImpl {
public:
using TFloatVec = std::vector<CFloatStorage>;
using TFloatVec = std::vector<CFloatStorage, CAlignedAllocator<CFloatStorage>>;
using TInt32Vec = std::vector<std::int32_t>;
using TImplPtr = std::unique_ptr<CDataFrameRowSliceHandleImpl>;

Expand All @@ -42,7 +43,7 @@ class CORE_EXPORT CDataFrameRowSliceHandleImpl {
//! CDataFrame storage.
class CORE_EXPORT CDataFrameRowSliceHandle {
public:
using TFloatVec = std::vector<CFloatStorage>;
using TFloatVec = std::vector<CFloatStorage, CAlignedAllocator<CFloatStorage>>;
using TFloatVecItr = TFloatVec::iterator;
using TInt32Vec = std::vector<std::int32_t>;
using TInt32VecCItr = TInt32Vec::const_iterator;
Expand Down Expand Up @@ -83,7 +84,7 @@ class CORE_EXPORT CDataFrameRowSliceHandle {
//! \brief CDataFrame slice storage interface.
class CORE_EXPORT CDataFrameRowSlice {
public:
using TFloatVec = std::vector<CFloatStorage>;
using TFloatVec = std::vector<CFloatStorage, CAlignedAllocator<CFloatStorage>>;
using TInt32Vec = std::vector<std::int32_t>;

public:
Expand Down
6 changes: 3 additions & 3 deletions include/maths/CBoostedTreeImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class MATHS_EXPORT CBoostedTreeImpl final {
//! Get the column containing the dependent variable.
std::size_t columnHoldingDependentVariable() const;

//! Get the number of columns in the original data frame.
std::size_t numberInputColumns() const;
//! Get start indices of the extra columns.
const TSizeVec& extraColumns() const;

//! Get the weights to apply to each class's predicted probability when
//! assigning classes.
Expand Down Expand Up @@ -303,7 +303,7 @@ class MATHS_EXPORT CBoostedTreeImpl final {
mutable CPRNG::CXorOShiro128Plus m_Rng;
std::size_t m_NumberThreads;
std::size_t m_DependentVariable = std::numeric_limits<std::size_t>::max();
std::size_t m_NumberInputColumns = 0;
TSizeVec m_ExtraColumns;
TLossFunctionUPtr m_Loss;
CBoostedTree::EClassAssignmentObjective m_ClassAssignmentObjective =
CBoostedTree::E_MinimumRecall;
Expand Down
Loading