summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--compiler-rt/lib/xray/tests/unit/buffer_queue_test.cc116
-rw-r--r--compiler-rt/lib/xray/xray_buffer_queue.cc150
-rw-r--r--compiler-rt/lib/xray/xray_buffer_queue.h21
-rw-r--r--compiler-rt/lib/xray/xray_fdr_logging.cc8
4 files changed, 233 insertions, 62 deletions
diff --git a/compiler-rt/lib/xray/tests/unit/buffer_queue_test.cc b/compiler-rt/lib/xray/tests/unit/buffer_queue_test.cc
index c0d4ccb268d..8aa366a20df 100644
--- a/compiler-rt/lib/xray/tests/unit/buffer_queue_test.cc
+++ b/compiler-rt/lib/xray/tests/unit/buffer_queue_test.cc
@@ -13,7 +13,9 @@
#include "xray_buffer_queue.h"
#include "gtest/gtest.h"
+#include <atomic>
#include <future>
+#include <thread>
#include <unistd.h>
namespace __xray {
@@ -55,6 +57,7 @@ TEST(BufferQueueTest, ReleaseUnknown) {
BufferQueue::Buffer Buf;
Buf.Data = reinterpret_cast<void *>(0xdeadbeef);
Buf.Size = kSize;
+ Buf.Generation = Buffers.generation();
EXPECT_EQ(BufferQueue::ErrorCode::UnrecognizedBuffer,
Buffers.releaseBuffer(Buf));
}
@@ -70,8 +73,7 @@ TEST(BufferQueueTest, ErrorsWhenFinalising) {
BufferQueue::Buffer OtherBuf;
ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing,
Buffers.getBuffer(OtherBuf));
- ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing,
- Buffers.finalize());
+ ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing, Buffers.finalize());
ASSERT_EQ(Buffers.releaseBuffer(Buf), BufferQueue::ErrorCode::Ok);
}
@@ -111,4 +113,114 @@ TEST(BufferQueueTest, Apply) {
ASSERT_EQ(Count, 10);
}
+TEST(BufferQueueTest, GenerationalSupport) {
+ bool Success = false;
+ BufferQueue Buffers(kSize, 10, Success);
+ ASSERT_TRUE(Success);
+ BufferQueue::Buffer B0;
+ ASSERT_EQ(Buffers.getBuffer(B0), BufferQueue::ErrorCode::Ok);
+ ASSERT_EQ(Buffers.finalize(),
+ BufferQueue::ErrorCode::Ok); // No more new buffers.
+
+ // Re-initialise the queue.
+ ASSERT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
+
+ BufferQueue::Buffer B1;
+ ASSERT_EQ(Buffers.getBuffer(B1), BufferQueue::ErrorCode::Ok);
+
+ // Validate that the buffers come from different generations.
+ ASSERT_NE(B0.Generation, B1.Generation);
+
+ // We stash the current generation, for use later.
+ auto PrevGen = B1.Generation;
+
+ // At this point, we want to ensure that we can return the buffer from the
+ // first "generation" would still be accepted in the new generation...
+ EXPECT_EQ(Buffers.releaseBuffer(B0), BufferQueue::ErrorCode::Ok);
+
+ // ... and that the new buffer is also accepted.
+ EXPECT_EQ(Buffers.releaseBuffer(B1), BufferQueue::ErrorCode::Ok);
+
+ // A next round will do the same, ensure that we are able to do multiple
+ // rounds in this case.
+ ASSERT_EQ(Buffers.finalize(), BufferQueue::ErrorCode::Ok);
+ ASSERT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
+ EXPECT_EQ(Buffers.getBuffer(B0), BufferQueue::ErrorCode::Ok);
+ EXPECT_EQ(Buffers.getBuffer(B1), BufferQueue::ErrorCode::Ok);
+
+ // Here we ensure that the generation is different from the previous
+ // generation.
+ EXPECT_NE(B0.Generation, PrevGen);
+ EXPECT_EQ(B1.Generation, B1.Generation);
+ ASSERT_EQ(Buffers.finalize(), BufferQueue::ErrorCode::Ok);
+ EXPECT_EQ(Buffers.releaseBuffer(B0), BufferQueue::ErrorCode::Ok);
+ EXPECT_EQ(Buffers.releaseBuffer(B1), BufferQueue::ErrorCode::Ok);
+}
+
+TEST(BufferQueueTest, GenerationalSupportAcrossThreads) {
+ bool Success = false;
+ BufferQueue Buffers(kSize, 10, Success);
+ ASSERT_TRUE(Success);
+
+ std::atomic<int> Counter{0};
+
+ // This function allows us to use thread-local storage to isolate the
+ // instances of the buffers to be used. It also allows us signal the threads
+ // of a new generation, and allow those to get new buffers. This is
+ // representative of how we expect the buffer queue to be used by the XRay
+ // runtime.
+ auto Process = [&] {
+ thread_local BufferQueue::Buffer B;
+ ASSERT_EQ(Buffers.getBuffer(B), BufferQueue::ErrorCode::Ok);
+ auto FirstGen = B.Generation;
+
+ // Signal that we've gotten a buffer in the thread.
+ Counter.fetch_add(1, std::memory_order_acq_rel);
+ while (!Buffers.finalizing()) {
+ Buffers.releaseBuffer(B);
+ Buffers.getBuffer(B);
+ }
+
+ // Signal that we've exited the get/release buffer loop.
+ Counter.fetch_sub(1, std::memory_order_acq_rel);
+ if (B.Data != nullptr)
+ Buffers.releaseBuffer(B);
+
+ // Spin until we find that the Buffer Queue is no longer finalizing.
+ while (Buffers.getBuffer(B) != BufferQueue::ErrorCode::Ok)
+ ;
+
+ // Signal that we've successfully gotten a buffer in the thread.
+ Counter.fetch_add(1, std::memory_order_acq_rel);
+
+ EXPECT_NE(FirstGen, B.Generation);
+ EXPECT_EQ(Buffers.releaseBuffer(B), BufferQueue::ErrorCode::Ok);
+
+ // Signal that we've successfully exited.
+ Counter.fetch_sub(1, std::memory_order_acq_rel);
+ };
+
+ // Spawn two threads running Process.
+ std::thread T0(Process), T1(Process);
+
+ // Spin until we find the counter is up to 2.
+ while (Counter.load(std::memory_order_acquire) != 2)
+ ;
+
+ // Then we finalize, then re-initialize immediately.
+ Buffers.finalize();
+
+ // Spin until we find the counter is down to 0.
+ while (Counter.load(std::memory_order_acquire) != 0)
+ ;
+
+ // Then we re-initialize.
+ EXPECT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
+
+ T0.join();
+ T1.join();
+
+ ASSERT_EQ(Counter.load(std::memory_order_acquire), 0);
+}
+
} // namespace __xray
diff --git a/compiler-rt/lib/xray/xray_buffer_queue.cc b/compiler-rt/lib/xray/xray_buffer_queue.cc
index 5a88ecd3399..c17138d9972 100644
--- a/compiler-rt/lib/xray/xray_buffer_queue.cc
+++ b/compiler-rt/lib/xray/xray_buffer_queue.cc
@@ -24,58 +24,85 @@
using namespace __xray;
using namespace __sanitizer;
-BufferQueue::BufferQueue(size_t B, size_t N,
- bool &Success) XRAY_NEVER_INSTRUMENT
- : BufferSize(B),
- BufferCount(N),
- Mutex(),
- Finalizing{0},
- BackingStore(allocateBuffer(B *N)),
- Buffers(initArray<BufferQueue::BufferRep>(N)),
- Next(Buffers),
- First(Buffers),
- LiveBuffers(0) {
- if (BackingStore == nullptr) {
- Success = false;
- return;
- }
- if (Buffers == nullptr) {
+BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
+ SpinMutexLock Guard(&Mutex);
+
+ if (!finalizing())
+ return BufferQueue::ErrorCode::AlreadyInitialized;
+
+ bool Success = false;
+ BufferSize = BS;
+ BufferCount = BC;
+ BackingStore = allocateBuffer(BufferSize * BufferCount);
+ if (BackingStore == nullptr)
+ return BufferQueue::ErrorCode::NotEnoughMemory;
+
+ auto CleanupBackingStore = __sanitizer::at_scope_exit([&, this] {
+ if (Success)
+ return;
deallocateBuffer(BackingStore, BufferSize * BufferCount);
- Success = false;
- return;
- }
+ });
+
+ Buffers = initArray<BufferRep>(BufferCount);
+ if (Buffers == nullptr)
+ return BufferQueue::ErrorCode::NotEnoughMemory;
+
+ // At this point we increment the generation number to associate the buffers
+ // to the new generation.
+ atomic_fetch_add(&Generation, 1, memory_order_acq_rel);
- for (size_t i = 0; i < N; ++i) {
+ Success = true;
+ for (size_t i = 0; i < BufferCount; ++i) {
auto &T = Buffers[i];
auto &Buf = T.Buff;
- Buf.Data = reinterpret_cast<char *>(BackingStore) + (BufferSize * i);
- Buf.Size = B;
atomic_store(&Buf.Extents, 0, memory_order_release);
+ Buf.Generation = generation();
+ Buf.Data = reinterpret_cast<char *>(BackingStore) + (BufferSize * i);
+ Buf.Size = BufferSize;
T.Used = false;
}
- Success = true;
+
+ Next = Buffers;
+ First = Buffers;
+ LiveBuffers = 0;
+ atomic_store(&Finalizing, 0, memory_order_release);
+ return BufferQueue::ErrorCode::Ok;
+}
+
+BufferQueue::BufferQueue(size_t B, size_t N,
+ bool &Success) XRAY_NEVER_INSTRUMENT
+ : BufferSize(B),
+ BufferCount(N),
+ Mutex(),
+ Finalizing{1},
+ BackingStore(nullptr),
+ Buffers(nullptr),
+ Next(Buffers),
+ First(Buffers),
+ LiveBuffers(0),
+ Generation{0} {
+ Success = init(B, N) == BufferQueue::ErrorCode::Ok;
}
BufferQueue::ErrorCode BufferQueue::getBuffer(Buffer &Buf) {
if (atomic_load(&Finalizing, memory_order_acquire))
return ErrorCode::QueueFinalizing;
- SpinMutexLock Guard(&Mutex);
- if (LiveBuffers == BufferCount)
- return ErrorCode::NotEnoughMemory;
-
- auto &T = *Next;
- auto &B = T.Buff;
- auto Extents = atomic_load(&B.Extents, memory_order_acquire);
- atomic_store(&Buf.Extents, Extents, memory_order_release);
- Buf.Data = B.Data;
- Buf.Size = B.Size;
- T.Used = true;
- ++LiveBuffers;
-
- if (++Next == (Buffers + BufferCount))
- Next = Buffers;
+ BufferRep *B = nullptr;
+ {
+ SpinMutexLock Guard(&Mutex);
+ if (LiveBuffers == BufferCount)
+ return ErrorCode::NotEnoughMemory;
+ B = Next++;
+ if (Next == (Buffers + BufferCount))
+ Next = Buffers;
+ ++LiveBuffers;
+ }
+ Buf.Data = B->Buff.Data;
+ Buf.Generation = generation();
+ Buf.Size = B->Buff.Size;
+ B->Used = true;
return ErrorCode::Ok;
}
@@ -84,29 +111,42 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
// backing store's range.
if (Buf.Data < BackingStore ||
Buf.Data >
- reinterpret_cast<char *>(BackingStore) + (BufferCount * BufferSize))
- return ErrorCode::UnrecognizedBuffer;
+ reinterpret_cast<char *>(BackingStore) + (BufferCount * BufferSize)) {
+ if (Buf.Generation != generation()) {
+ Buf.Data = nullptr;
+ Buf.Size = 0;
+ Buf.Generation = 0;
+ return BufferQueue::ErrorCode::Ok;
+ }
+ return BufferQueue::ErrorCode::UnrecognizedBuffer;
+ }
- SpinMutexLock Guard(&Mutex);
+ BufferRep *B = nullptr;
+ {
+ SpinMutexLock Guard(&Mutex);
+
+ // This points to a semantic bug, we really ought to not be releasing more
+ // buffers than we actually get.
+ if (LiveBuffers == 0)
+ return ErrorCode::NotEnoughMemory;
- // This points to a semantic bug, we really ought to not be releasing more
- // buffers than we actually get.
- if (LiveBuffers == 0)
- return ErrorCode::NotEnoughMemory;
+ --LiveBuffers;
+ B = First++;
+ if (First == (Buffers + BufferCount))
+ First = Buffers;
+ }
// Now that the buffer has been released, we mark it as "used".
- auto Extents = atomic_load(&Buf.Extents, memory_order_acquire);
- atomic_store(&First->Buff.Extents, Extents, memory_order_release);
- First->Buff.Data = Buf.Data;
- First->Buff.Size = Buf.Size;
- First->Used = true;
+ B->Buff.Data = Buf.Data;
+ B->Buff.Size = Buf.Size;
+ B->Buff.Generation = Buf.Generation;
+ B->Used = true;
+ atomic_store(&B->Buff.Extents,
+ atomic_load(&Buf.Extents, memory_order_acquire),
+ memory_order_release);
Buf.Data = nullptr;
Buf.Size = 0;
- atomic_store(&Buf.Extents, 0, memory_order_release);
- --LiveBuffers;
- if (++First == (Buffers + BufferCount))
- First = Buffers;
-
+ Buf.Generation = 0;
return ErrorCode::Ok;
}
diff --git a/compiler-rt/lib/xray/xray_buffer_queue.h b/compiler-rt/lib/xray/xray_buffer_queue.h
index c1fa9fab768..cbd42835f8a 100644
--- a/compiler-rt/lib/xray/xray_buffer_queue.h
+++ b/compiler-rt/lib/xray/xray_buffer_queue.h
@@ -33,6 +33,7 @@ class BufferQueue {
public:
struct Buffer {
atomic_uint64_t Extents{0};
+ uint64_t Generation{0};
void *Data = nullptr;
size_t Size = 0;
};
@@ -130,6 +131,10 @@ private:
// Count of buffers that have been handed out through 'getBuffer'.
size_t LiveBuffers;
+ // We use a generation number to identify buffers and which generation they're
+ // associated with.
+ atomic_uint64_t Generation;
+
public:
enum class ErrorCode : unsigned {
Ok,
@@ -137,6 +142,7 @@ public:
QueueFinalizing,
UnrecognizedBuffer,
AlreadyFinalized,
+ AlreadyInitialized,
};
static const char *getErrorString(ErrorCode E) {
@@ -151,6 +157,8 @@ public:
return "buffer being returned not owned by buffer queue";
case ErrorCode::AlreadyFinalized:
return "queue already finalized";
+ case ErrorCode::AlreadyInitialized:
+ return "queue already initialized";
}
return "unknown error";
}
@@ -181,10 +189,23 @@ public:
/// the buffer being released.
ErrorCode releaseBuffer(Buffer &Buf);
+ /// Initializes the buffer queue, starting a new generation. We can re-set the
+ /// size of buffers with |BS| along with the buffer count with |BC|.
+ ///
+ /// Returns:
+ /// - ErrorCode::Ok when we successfully initialize the buffer. This
+ /// requires that the buffer queue is previously finalized.
+ /// - ErrorCode::AlreadyInitialized when the buffer queue is not finalized.
+ ErrorCode init(size_t BS, size_t BC);
+
bool finalizing() const {
return atomic_load(&Finalizing, memory_order_acquire);
}
+ uint64_t generation() const {
+ return atomic_load(&Generation, memory_order_acquire);
+ }
+
/// Returns the configured size of the buffers in the buffer queue.
size_t ConfiguredBufferSize() const { return BufferSize; }
diff --git a/compiler-rt/lib/xray/xray_fdr_logging.cc b/compiler-rt/lib/xray/xray_fdr_logging.cc
index 32188771cba..2479a0fa766 100644
--- a/compiler-rt/lib/xray/xray_fdr_logging.cc
+++ b/compiler-rt/lib/xray/xray_fdr_logging.cc
@@ -1056,8 +1056,7 @@ void fdrLoggingHandleTypedEvent(
endBufferIfFull();
}
-XRayLogInitStatus fdrLoggingInit(UNUSED size_t BufferSize,
- UNUSED size_t BufferMax, void *Options,
+XRayLogInitStatus fdrLoggingInit(size_t, size_t, void *Options,
size_t OptionsSize) XRAY_NEVER_INSTRUMENT {
if (Options == nullptr)
return XRayLogInitStatus::XRAY_LOG_UNINITIALIZED;
@@ -1104,9 +1103,8 @@ XRayLogInitStatus fdrLoggingInit(UNUSED size_t BufferSize,
// environment-variable defined options.
FDRParser.ParseString(static_cast<const char *>(Options));
*fdrFlags() = FDRFlags;
- BufferSize = FDRFlags.buffer_size;
- BufferMax = FDRFlags.buffer_max;
-
+ auto BufferSize = FDRFlags.buffer_size;
+ auto BufferMax = FDRFlags.buffer_max;
bool Success = false;
if (BQ != nullptr) {
OpenPOWER on IntegriCloud