Skip to content

Commit e85af16

Browse files
committedOct 17, 2018
[XRay][compiler-rt] Generational Buffer Management
Summary: This change updates the buffer queue implementation to support using a generation number to identify the lifetime of buffers. This first part introduces the notion of the generation number, without changing the way we handle the buffers yet. What's missing here is the cleanup of the buffers. Ideally we'll keep the two most recent generations. We need to ensure that before we do any writes to the buffers, that we check the generation number(s) first. Those changes will follow-on from this change. Depends on D52588. Reviewers: mboerger, eizan Subscribers: llvm-commits, jfb Differential Revision: https://reviews.llvm.org/D52974 llvm-svn: 344670
1 parent c5f1d21 commit e85af16

File tree

4 files changed

+233
-62
lines changed

4 files changed

+233
-62
lines changed
 

‎compiler-rt/lib/xray/tests/unit/buffer_queue_test.cc

+114-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
#include "xray_buffer_queue.h"
1414
#include "gtest/gtest.h"
1515

16+
#include <atomic>
1617
#include <future>
18+
#include <thread>
1719
#include <unistd.h>
1820

1921
namespace __xray {
@@ -55,6 +57,7 @@ TEST(BufferQueueTest, ReleaseUnknown) {
5557
BufferQueue::Buffer Buf;
5658
Buf.Data = reinterpret_cast<void *>(0xdeadbeef);
5759
Buf.Size = kSize;
60+
Buf.Generation = Buffers.generation();
5861
EXPECT_EQ(BufferQueue::ErrorCode::UnrecognizedBuffer,
5962
Buffers.releaseBuffer(Buf));
6063
}
@@ -70,8 +73,7 @@ TEST(BufferQueueTest, ErrorsWhenFinalising) {
7073
BufferQueue::Buffer OtherBuf;
7174
ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing,
7275
Buffers.getBuffer(OtherBuf));
73-
ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing,
74-
Buffers.finalize());
76+
ASSERT_EQ(BufferQueue::ErrorCode::QueueFinalizing, Buffers.finalize());
7577
ASSERT_EQ(Buffers.releaseBuffer(Buf), BufferQueue::ErrorCode::Ok);
7678
}
7779

@@ -111,4 +113,114 @@ TEST(BufferQueueTest, Apply) {
111113
ASSERT_EQ(Count, 10);
112114
}
113115

116+
TEST(BufferQueueTest, GenerationalSupport) {
117+
bool Success = false;
118+
BufferQueue Buffers(kSize, 10, Success);
119+
ASSERT_TRUE(Success);
120+
BufferQueue::Buffer B0;
121+
ASSERT_EQ(Buffers.getBuffer(B0), BufferQueue::ErrorCode::Ok);
122+
ASSERT_EQ(Buffers.finalize(),
123+
BufferQueue::ErrorCode::Ok); // No more new buffers.
124+
125+
// Re-initialise the queue.
126+
ASSERT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
127+
128+
BufferQueue::Buffer B1;
129+
ASSERT_EQ(Buffers.getBuffer(B1), BufferQueue::ErrorCode::Ok);
130+
131+
// Validate that the buffers come from different generations.
132+
ASSERT_NE(B0.Generation, B1.Generation);
133+
134+
// We stash the current generation, for use later.
135+
auto PrevGen = B1.Generation;
136+
137+
// At this point, we want to ensure that we can return the buffer from the
138+
// first "generation" would still be accepted in the new generation...
139+
EXPECT_EQ(Buffers.releaseBuffer(B0), BufferQueue::ErrorCode::Ok);
140+
141+
// ... and that the new buffer is also accepted.
142+
EXPECT_EQ(Buffers.releaseBuffer(B1), BufferQueue::ErrorCode::Ok);
143+
144+
// A next round will do the same, ensure that we are able to do multiple
145+
// rounds in this case.
146+
ASSERT_EQ(Buffers.finalize(), BufferQueue::ErrorCode::Ok);
147+
ASSERT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
148+
EXPECT_EQ(Buffers.getBuffer(B0), BufferQueue::ErrorCode::Ok);
149+
EXPECT_EQ(Buffers.getBuffer(B1), BufferQueue::ErrorCode::Ok);
150+
151+
// Here we ensure that the generation is different from the previous
152+
// generation.
153+
EXPECT_NE(B0.Generation, PrevGen);
154+
EXPECT_EQ(B1.Generation, B1.Generation);
155+
ASSERT_EQ(Buffers.finalize(), BufferQueue::ErrorCode::Ok);
156+
EXPECT_EQ(Buffers.releaseBuffer(B0), BufferQueue::ErrorCode::Ok);
157+
EXPECT_EQ(Buffers.releaseBuffer(B1), BufferQueue::ErrorCode::Ok);
158+
}
159+
160+
TEST(BufferQueueTest, GenerationalSupportAcrossThreads) {
161+
bool Success = false;
162+
BufferQueue Buffers(kSize, 10, Success);
163+
ASSERT_TRUE(Success);
164+
165+
std::atomic<int> Counter{0};
166+
167+
// This function allows us to use thread-local storage to isolate the
168+
// instances of the buffers to be used. It also allows us signal the threads
169+
// of a new generation, and allow those to get new buffers. This is
170+
// representative of how we expect the buffer queue to be used by the XRay
171+
// runtime.
172+
auto Process = [&] {
173+
thread_local BufferQueue::Buffer B;
174+
ASSERT_EQ(Buffers.getBuffer(B), BufferQueue::ErrorCode::Ok);
175+
auto FirstGen = B.Generation;
176+
177+
// Signal that we've gotten a buffer in the thread.
178+
Counter.fetch_add(1, std::memory_order_acq_rel);
179+
while (!Buffers.finalizing()) {
180+
Buffers.releaseBuffer(B);
181+
Buffers.getBuffer(B);
182+
}
183+
184+
// Signal that we've exited the get/release buffer loop.
185+
Counter.fetch_sub(1, std::memory_order_acq_rel);
186+
if (B.Data != nullptr)
187+
Buffers.releaseBuffer(B);
188+
189+
// Spin until we find that the Buffer Queue is no longer finalizing.
190+
while (Buffers.getBuffer(B) != BufferQueue::ErrorCode::Ok)
191+
;
192+
193+
// Signal that we've successfully gotten a buffer in the thread.
194+
Counter.fetch_add(1, std::memory_order_acq_rel);
195+
196+
EXPECT_NE(FirstGen, B.Generation);
197+
EXPECT_EQ(Buffers.releaseBuffer(B), BufferQueue::ErrorCode::Ok);
198+
199+
// Signal that we've successfully exited.
200+
Counter.fetch_sub(1, std::memory_order_acq_rel);
201+
};
202+
203+
// Spawn two threads running Process.
204+
std::thread T0(Process), T1(Process);
205+
206+
// Spin until we find the counter is up to 2.
207+
while (Counter.load(std::memory_order_acquire) != 2)
208+
;
209+
210+
// Then we finalize, then re-initialize immediately.
211+
Buffers.finalize();
212+
213+
// Spin until we find the counter is down to 0.
214+
while (Counter.load(std::memory_order_acquire) != 0)
215+
;
216+
217+
// Then we re-initialize.
218+
EXPECT_EQ(Buffers.init(kSize, 10), BufferQueue::ErrorCode::Ok);
219+
220+
T0.join();
221+
T1.join();
222+
223+
ASSERT_EQ(Counter.load(std::memory_order_acquire), 0);
224+
}
225+
114226
} // namespace __xray

‎compiler-rt/lib/xray/xray_buffer_queue.cc

+95-55
Original file line numberDiff line numberDiff line change
@@ -24,58 +24,85 @@
2424
using namespace __xray;
2525
using namespace __sanitizer;
2626

27-
BufferQueue::BufferQueue(size_t B, size_t N,
28-
bool &Success) XRAY_NEVER_INSTRUMENT
29-
: BufferSize(B),
30-
BufferCount(N),
31-
Mutex(),
32-
Finalizing{0},
33-
BackingStore(allocateBuffer(B *N)),
34-
Buffers(initArray<BufferQueue::BufferRep>(N)),
35-
Next(Buffers),
36-
First(Buffers),
37-
LiveBuffers(0) {
38-
if (BackingStore == nullptr) {
39-
Success = false;
40-
return;
41-
}
42-
if (Buffers == nullptr) {
27+
BufferQueue::ErrorCode BufferQueue::init(size_t BS, size_t BC) {
28+
SpinMutexLock Guard(&Mutex);
29+
30+
if (!finalizing())
31+
return BufferQueue::ErrorCode::AlreadyInitialized;
32+
33+
bool Success = false;
34+
BufferSize = BS;
35+
BufferCount = BC;
36+
BackingStore = allocateBuffer(BufferSize * BufferCount);
37+
if (BackingStore == nullptr)
38+
return BufferQueue::ErrorCode::NotEnoughMemory;
39+
40+
auto CleanupBackingStore = __sanitizer::at_scope_exit([&, this] {
41+
if (Success)
42+
return;
4343
deallocateBuffer(BackingStore, BufferSize * BufferCount);
44-
Success = false;
45-
return;
46-
}
44+
});
45+
46+
Buffers = initArray<BufferRep>(BufferCount);
47+
if (Buffers == nullptr)
48+
return BufferQueue::ErrorCode::NotEnoughMemory;
49+
50+
// At this point we increment the generation number to associate the buffers
51+
// to the new generation.
52+
atomic_fetch_add(&Generation, 1, memory_order_acq_rel);
4753

48-
for (size_t i = 0; i < N; ++i) {
54+
Success = true;
55+
for (size_t i = 0; i < BufferCount; ++i) {
4956
auto &T = Buffers[i];
5057
auto &Buf = T.Buff;
51-
Buf.Data = reinterpret_cast<char *>(BackingStore) + (BufferSize * i);
52-
Buf.Size = B;
5358
atomic_store(&Buf.Extents, 0, memory_order_release);
59+
Buf.Generation = generation();
60+
Buf.Data = reinterpret_cast<char *>(BackingStore) + (BufferSize * i);
61+
Buf.Size = BufferSize;
5462
T.Used = false;
5563
}
56-
Success = true;
64+
65+
Next = Buffers;
66+
First = Buffers;
67+
LiveBuffers = 0;
68+
atomic_store(&Finalizing, 0, memory_order_release);
69+
return BufferQueue::ErrorCode::Ok;
70+
}
71+
72+
BufferQueue::BufferQueue(size_t B, size_t N,
73+
bool &Success) XRAY_NEVER_INSTRUMENT
74+
: BufferSize(B),
75+
BufferCount(N),
76+
Mutex(),
77+
Finalizing{1},
78+
BackingStore(nullptr),
79+
Buffers(nullptr),
80+
Next(Buffers),
81+
First(Buffers),
82+
LiveBuffers(0),
83+
Generation{0} {
84+
Success = init(B, N) == BufferQueue::ErrorCode::Ok;
5785
}
5886

5987
BufferQueue::ErrorCode BufferQueue::getBuffer(Buffer &Buf) {
6088
if (atomic_load(&Finalizing, memory_order_acquire))
6189
return ErrorCode::QueueFinalizing;
6290

63-
SpinMutexLock Guard(&Mutex);
64-
if (LiveBuffers == BufferCount)
65-
return ErrorCode::NotEnoughMemory;
66-
67-
auto &T = *Next;
68-
auto &B = T.Buff;
69-
auto Extents = atomic_load(&B.Extents, memory_order_acquire);
70-
atomic_store(&Buf.Extents, Extents, memory_order_release);
71-
Buf.Data = B.Data;
72-
Buf.Size = B.Size;
73-
T.Used = true;
74-
++LiveBuffers;
75-
76-
if (++Next == (Buffers + BufferCount))
77-
Next = Buffers;
91+
BufferRep *B = nullptr;
92+
{
93+
SpinMutexLock Guard(&Mutex);
94+
if (LiveBuffers == BufferCount)
95+
return ErrorCode::NotEnoughMemory;
96+
B = Next++;
97+
if (Next == (Buffers + BufferCount))
98+
Next = Buffers;
99+
++LiveBuffers;
100+
}
78101

102+
Buf.Data = B->Buff.Data;
103+
Buf.Generation = generation();
104+
Buf.Size = B->Buff.Size;
105+
B->Used = true;
79106
return ErrorCode::Ok;
80107
}
81108

@@ -84,29 +111,42 @@ BufferQueue::ErrorCode BufferQueue::releaseBuffer(Buffer &Buf) {
84111
// backing store's range.
85112
if (Buf.Data < BackingStore ||
86113
Buf.Data >
87-
reinterpret_cast<char *>(BackingStore) + (BufferCount * BufferSize))
88-
return ErrorCode::UnrecognizedBuffer;
114+
reinterpret_cast<char *>(BackingStore) + (BufferCount * BufferSize)) {
115+
if (Buf.Generation != generation()) {
116+
Buf.Data = nullptr;
117+
Buf.Size = 0;
118+
Buf.Generation = 0;
119+
return BufferQueue::ErrorCode::Ok;
120+
}
121+
return BufferQueue::ErrorCode::UnrecognizedBuffer;
122+
}
89123

90-
SpinMutexLock Guard(&Mutex);
124+
BufferRep *B = nullptr;
125+
{
126+
SpinMutexLock Guard(&Mutex);
127+
128+
// This points to a semantic bug, we really ought to not be releasing more
129+
// buffers than we actually get.
130+
if (LiveBuffers == 0)
131+
return ErrorCode::NotEnoughMemory;
91132

92-
// This points to a semantic bug, we really ought to not be releasing more
93-
// buffers than we actually get.
94-
if (LiveBuffers == 0)
95-
return ErrorCode::NotEnoughMemory;
133+
--LiveBuffers;
134+
B = First++;
135+
if (First == (Buffers + BufferCount))
136+
First = Buffers;
137+
}
96138

97139
// Now that the buffer has been released, we mark it as "used".
98-
auto Extents = atomic_load(&Buf.Extents, memory_order_acquire);
99-
atomic_store(&First->Buff.Extents, Extents, memory_order_release);
100-
First->Buff.Data = Buf.Data;
101-
First->Buff.Size = Buf.Size;
102-
First->Used = true;
140+
B->Buff.Data = Buf.Data;
141+
B->Buff.Size = Buf.Size;
142+
B->Buff.Generation = Buf.Generation;
143+
B->Used = true;
144+
atomic_store(&B->Buff.Extents,
145+
atomic_load(&Buf.Extents, memory_order_acquire),
146+
memory_order_release);
103147
Buf.Data = nullptr;
104148
Buf.Size = 0;
105-
atomic_store(&Buf.Extents, 0, memory_order_release);
106-
--LiveBuffers;
107-
if (++First == (Buffers + BufferCount))
108-
First = Buffers;
109-
149+
Buf.Generation = 0;
110150
return ErrorCode::Ok;
111151
}
112152

‎compiler-rt/lib/xray/xray_buffer_queue.h

+21
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class BufferQueue {
3333
public:
3434
struct Buffer {
3535
atomic_uint64_t Extents{0};
36+
uint64_t Generation{0};
3637
void *Data = nullptr;
3738
size_t Size = 0;
3839
};
@@ -130,13 +131,18 @@ class BufferQueue {
130131
// Count of buffers that have been handed out through 'getBuffer'.
131132
size_t LiveBuffers;
132133

134+
// We use a generation number to identify buffers and which generation they're
135+
// associated with.
136+
atomic_uint64_t Generation;
137+
133138
public:
134139
enum class ErrorCode : unsigned {
135140
Ok,
136141
NotEnoughMemory,
137142
QueueFinalizing,
138143
UnrecognizedBuffer,
139144
AlreadyFinalized,
145+
AlreadyInitialized,
140146
};
141147

142148
static const char *getErrorString(ErrorCode E) {
@@ -151,6 +157,8 @@ class BufferQueue {
151157
return "buffer being returned not owned by buffer queue";
152158
case ErrorCode::AlreadyFinalized:
153159
return "queue already finalized";
160+
case ErrorCode::AlreadyInitialized:
161+
return "queue already initialized";
154162
}
155163
return "unknown error";
156164
}
@@ -181,10 +189,23 @@ class BufferQueue {
181189
/// the buffer being released.
182190
ErrorCode releaseBuffer(Buffer &Buf);
183191

192+
/// Initializes the buffer queue, starting a new generation. We can re-set the
193+
/// size of buffers with |BS| along with the buffer count with |BC|.
194+
///
195+
/// Returns:
196+
/// - ErrorCode::Ok when we successfully initialize the buffer. This
197+
/// requires that the buffer queue is previously finalized.
198+
/// - ErrorCode::AlreadyInitialized when the buffer queue is not finalized.
199+
ErrorCode init(size_t BS, size_t BC);
200+
184201
bool finalizing() const {
185202
return atomic_load(&Finalizing, memory_order_acquire);
186203
}
187204

205+
uint64_t generation() const {
206+
return atomic_load(&Generation, memory_order_acquire);
207+
}
208+
188209
/// Returns the configured size of the buffers in the buffer queue.
189210
size_t ConfiguredBufferSize() const { return BufferSize; }
190211

‎compiler-rt/lib/xray/xray_fdr_logging.cc

+3-5
Original file line numberDiff line numberDiff line change
@@ -1056,8 +1056,7 @@ void fdrLoggingHandleTypedEvent(
10561056
endBufferIfFull();
10571057
}
10581058

1059-
XRayLogInitStatus fdrLoggingInit(UNUSED size_t BufferSize,
1060-
UNUSED size_t BufferMax, void *Options,
1059+
XRayLogInitStatus fdrLoggingInit(size_t, size_t, void *Options,
10611060
size_t OptionsSize) XRAY_NEVER_INSTRUMENT {
10621061
if (Options == nullptr)
10631062
return XRayLogInitStatus::XRAY_LOG_UNINITIALIZED;
@@ -1104,9 +1103,8 @@ XRayLogInitStatus fdrLoggingInit(UNUSED size_t BufferSize,
11041103
// environment-variable defined options.
11051104
FDRParser.ParseString(static_cast<const char *>(Options));
11061105
*fdrFlags() = FDRFlags;
1107-
BufferSize = FDRFlags.buffer_size;
1108-
BufferMax = FDRFlags.buffer_max;
1109-
1106+
auto BufferSize = FDRFlags.buffer_size;
1107+
auto BufferMax = FDRFlags.buffer_max;
11101108
bool Success = false;
11111109

11121110
if (BQ != nullptr) {

0 commit comments

Comments
 (0)
Please sign in to comment.