Skip to content

[SYCL] Store stream buffers in the scheduler #2416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ void Scheduler::enqueueLeavesOfReqUnlocked(const Requirement *const Req) {
EnqueueLeaves(Record->MWriteLeaves);
}

void Scheduler::allocateStreamBuffers(stream_impl *Impl,
size_t StreamBufferSize,
size_t FlushBufferSize) {
std::lock_guard<std::mutex> lock(StreamBuffersPoolMutex);
StreamBuffersPool.insert(
{Impl, StreamBuffers(StreamBufferSize, FlushBufferSize)});
}

Scheduler::Scheduler() {
sycl::device HostDevice;
DefaultHostQueue = QueueImplPtr(
Expand Down
32 changes: 32 additions & 0 deletions sycl/source/detail/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,38 @@ class Scheduler {

friend class Command;
friend class DispatchHostTask;

class StreamBuffers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need at least some info on this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added info in the style of this file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all public. Why is it class then? Suggest switching to struct here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, switched to structure

public:
StreamBuffers(size_t StreamBufferSize, size_t FlushBufferSize)
// Initialize stream buffer with zeros, this is needed for two reasons:
// 1. We don't need to care about end of line when printing out
// streamed data.
// 2. Offset is properly initialized.
: Data(StreamBufferSize, 0),
Buf(Data.data(), range<1>(StreamBufferSize),
{property::buffer::use_host_ptr()}),
FlushBuf(range<1>(FlushBufferSize)) {}

// Vector on the host side which is used to initialize the stream
// buffer
std::vector<char> Data;

// Stream buffer
buffer<char, 1> Buf;

// Global flush buffer
buffer<char, 1> FlushBuf;
};

friend class stream_impl;

// Protects stream buffers pool
std::mutex StreamBuffersPoolMutex;
std::map<stream_impl *, StreamBuffers> StreamBuffersPool;

// Allocate buffers in the pool for a provided stream
void allocateStreamBuffers(stream_impl *, size_t, size_t);
};

} // namespace detail
Expand Down
54 changes: 41 additions & 13 deletions sycl/source/detail/stream_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,55 @@ namespace detail {

stream_impl::stream_impl(size_t BufferSize, size_t MaxStatementSize,
handler &CGH)
: BufferSize_(BufferSize), MaxStatementSize_(MaxStatementSize),
// Allocate additional place for the offset variable and the end of line
// symbol. Initialize buffer with zeros, this is needed for two reasons:
// 1. We don't need to care about end of line when printing out streamed
// data.
// 2. Offset is properly initialized.
Data(BufferSize + OffsetSize + 1, 0),
Buf(Data.data(), range<1>(BufferSize + OffsetSize + 1),
{property::buffer::use_host_ptr()}),

FlushBuf(range<1>(MaxStatementSize)) {}
: BufferSize_(BufferSize), MaxStatementSize_(MaxStatementSize) {
// We need to store stream buffers in the scheduler because they need to be
// alive after submitting the kernel. They cannot be stored in the stream
// object because it causes loop dependency between objects and results in
// memory leak.
// Allocate additional place in the stream buffer for the offset variable and
// the end of line symbol.
detail::Scheduler::getInstance().allocateStreamBuffers(
this, BufferSize + OffsetSize + 1 /* size of the stream buffer */,
MaxStatementSize /* size of the flush buffer */);
}

// Method to provide an access to the global stream buffer
GlobalBufAccessorT stream_impl::accessGlobalBuf(handler &CGH) {
return detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
->second.Buf.get_access<cl::sycl::access::mode::read_write>(
CGH, range<1>(BufferSize_), id<1>(OffsetSize));
}

// Method to provide an accessor to the global flush buffer
GlobalBufAccessorT stream_impl::accessGlobalFlushBuf(handler &CGH) {
return detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
->second.FlushBuf.get_access<cl::sycl::access::mode::read_write>(
CGH, range<1>(MaxStatementSize_), id<1>(0));
}

// Method to provide an atomic access to the offset in the global stream
// buffer and offset in the flush buffer
GlobalOffsetAccessorT stream_impl::accessGlobalOffset(handler &CGH) {
auto OffsetSubBuf = buffer<char, 1>(
detail::Scheduler::getInstance().StreamBuffersPool.find(this)->second.Buf,
id<1>(0), range<1>(OffsetSize));
auto ReinterpretedBuf = OffsetSubBuf.reinterpret<unsigned, 1>(range<1>(2));
return ReinterpretedBuf.get_access<cl::sycl::access::mode::atomic>(
CGH, range<1>(2), id<1>(0));
}
size_t stream_impl::get_size() const { return BufferSize_; }

size_t stream_impl::get_max_statement_size() const { return MaxStatementSize_; }

void stream_impl::flush() {
// Access the stream buffer on the host. This access guarantees that kernel is
// executed and buffer contains streamed data.
auto HostAcc = Buf.get_access<cl::sycl::access::mode::read>(
range<1>(BufferSize_), id<1>(OffsetSize));
auto HostAcc = detail::Scheduler::getInstance()
.StreamBuffersPool.find(this)
->second.Buf.get_access<cl::sycl::access::mode::read>(
range<1>(BufferSize_), id<1>(OffsetSize));

printf("%s", HostAcc.get_pointer());
fflush(stdout);
Expand Down
25 changes: 3 additions & 22 deletions sycl/source/detail/stream_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,14 @@ class __SYCL_EXPORT stream_impl {
stream_impl(size_t BufferSize, size_t MaxStatementSize, handler &CGH);

// Method to provide an access to the global stream buffer
GlobalBufAccessorT accessGlobalBuf(handler &CGH) {
return Buf.get_access<cl::sycl::access::mode::read_write>(
CGH, range<1>(BufferSize_), id<1>(OffsetSize));
}
GlobalBufAccessorT accessGlobalBuf(handler &CGH);

// Method to provide an accessor to the global flush buffer
GlobalBufAccessorT accessGlobalFlushBuf(handler &CGH) {
return FlushBuf.get_access<cl::sycl::access::mode::read_write>(
CGH, range<1>(MaxStatementSize_), id<1>(0));
}
GlobalBufAccessorT accessGlobalFlushBuf(handler &CGH);

// Method to provide an atomic access to the offset in the global stream
// buffer and offset in the flush buffer
GlobalOffsetAccessorT accessGlobalOffset(handler &CGH) {
auto OffsetSubBuf = buffer<char, 1>(Buf, id<1>(0), range<1>(OffsetSize));
auto ReinterpretedBuf = OffsetSubBuf.reinterpret<unsigned, 1>(range<1>(2));
return ReinterpretedBuf.get_access<cl::sycl::access::mode::atomic>(
CGH, range<1>(2), id<1>(0));
}
GlobalOffsetAccessorT accessGlobalOffset(handler &CGH);

// Copy stream buffer to the host and print the contents
void flush();
Expand All @@ -65,14 +54,6 @@ class __SYCL_EXPORT stream_impl {
// 2 variables: offset in the stream buffer and offset in the flush buffer.
static const size_t OffsetSize = 2 * sizeof(unsigned);

// Vector on the host side which is used to initialize the stream buffer
std::vector<char> Data;

// Stream buffer
buffer<char, 1> Buf;

// Global flush buffer
buffer<char, 1> FlushBuf;
};

} // namespace detail
Expand Down
3 changes: 3 additions & 0 deletions sycl/test/abi/sycl_symbols_linux.dump
Original file line number Diff line number Diff line change
Expand Up @@ -4156,3 +4156,6 @@ _ZNK2cl4sycl9exception11has_contextEv
_ZNK2cl4sycl9exception4whatEv
__sycl_register_lib
__sycl_unregister_lib
_ZN2cl4sycl6detail11stream_impl15accessGlobalBufERNS0_7handlerE
_ZN2cl4sycl6detail11stream_impl20accessGlobalFlushBufERNS0_7handlerE
_ZN2cl4sycl6detail11stream_impl18accessGlobalOffsetERNS0_7handlerE
34 changes: 34 additions & 0 deletions sycl/test/basic_tests/stream/release_resources_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// RUN: %clangxx -fsycl -fsycl-targets=%sycl_triple %s -o %t.out
// RUN: env SYCL_PI_TRACE=2 %CPU_RUN_PLACEHOLDER %t.out %CPU_CHECK_PLACEHOLDER
// RUN: env SYCL_PI_TRACE=2 %GPU_RUN_PLACEHOLDER %t.out %GPU_CHECK_PLACEHOLDER
// RUN: env SYCL_PI_TRACE=2 %ACC_RUN_PLACEHOLDER %t.out %ACC_CHECK_PLACEHOLDER

//==----------------------- release_resources_test.cpp ---------------------==//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this part isn't needed here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed


// Check that buffer used by a stream object is released.

#include <CL/sycl.hpp>

using namespace cl::sycl;

int main() {
{
queue Queue;

// CHECK:---> piMemRelease
Queue.submit([&](handler &CGH) {
stream Out(1024, 80, CGH);
CGH.parallel_for<class test_cleanup1>(
range<1>(2), [=](id<1> i) { Out << "Hello, World!" << endl; });
});
Queue.wait();
}

return 0;
}