Index: lib/xray/CMakeLists.txt =================================================================== --- lib/xray/CMakeLists.txt +++ lib/xray/CMakeLists.txt @@ -19,6 +19,7 @@ xray_basic_logging.cc) set(XRAY_PROFILER_MODE_SOURCES + xray_profile_collector.cc xray_profiler_flags.cc) # Implementation files for all XRay architectures. Index: lib/xray/tests/unit/CMakeLists.txt =================================================================== --- lib/xray/tests/unit/CMakeLists.txt +++ lib/xray/tests/unit/CMakeLists.txt @@ -8,3 +8,5 @@ segmented_array_test.cc xray_unit_test_main.cc) add_xray_unittest(XRayFunctionCallTrieTest SOURCES function_call_trie_test.cc xray_unit_test_main.cc) +add_xray_unittest(XRayProfileCollectorTest SOURCES + profile_collector_test.cc xray_unit_test_main.cc) Index: lib/xray/tests/unit/profile_collector_test.cc =================================================================== --- lib/xray/tests/unit/profile_collector_test.cc +++ lib/xray/tests/unit/profile_collector_test.cc @@ -0,0 +1,179 @@ +//===-- profile_collector_test.cc -----------------------------------------===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file is a part of XRay, a function call tracing system. +// +//===----------------------------------------------------------------------===// +#include "gtest/gtest.h" + +#include "xray_profile_collector.h" +#include "xray_profiler_flags.h" +#include +#include +#include +#include + +namespace __xray { +namespace { + +static constexpr auto kHeaderSize = 16u; + +void ValidateBlock(XRayBuffer B) { + profilerFlags()->setDefaults(); + ASSERT_NE(static_cast(B.Data), nullptr); + ASSERT_NE(B.Size, 0u); + ASSERT_GE(B.Size, kHeaderSize); + // We look at the block size, the block number, and the thread ID to ensure + // that none of them are zero (or that the header data is laid out as we + // expect). + char LocalBuffer[kHeaderSize] = {}; + internal_memcpy(LocalBuffer, B.Data, kHeaderSize); + u32 BlockSize = 0; + u32 BlockNumber = 0; + u64 ThreadId = 0; + internal_memcpy(&BlockSize, LocalBuffer, sizeof(u32)); + internal_memcpy(&BlockNumber, LocalBuffer + sizeof(u32), sizeof(u32)); + internal_memcpy(&ThreadId, LocalBuffer + (2 * sizeof(u32)), sizeof(u64)); + ASSERT_NE(BlockSize, 0u); + ASSERT_GE(BlockNumber, 0u); + ASSERT_NE(ThreadId, 0u); +} + +std::tuple ParseBlockHeader(XRayBuffer B) { + char LocalBuffer[kHeaderSize] = {}; + internal_memcpy(LocalBuffer, B.Data, kHeaderSize); + u32 BlockSize = 0; + u32 BlockNumber = 0; + u64 ThreadId = 0; + internal_memcpy(&BlockSize, LocalBuffer, sizeof(u32)); + internal_memcpy(&BlockNumber, LocalBuffer + sizeof(u32), sizeof(u32)); + internal_memcpy(&ThreadId, LocalBuffer + (2 * sizeof(u32)), sizeof(u64)); + return {BlockSize, BlockNumber, ThreadId}; +} + +struct Profile { + int64_t CallCount; + int64_t CumulativeLocalTime; + std::vector Path; +}; + +std::tuple ParseProfile(const char *P) { + Profile Result; + // Read the path first, until we find a sentinel 0. + int32_t F; + do { + internal_memcpy(&F, P, sizeof(int32_t)); + P += sizeof(int32_t); + Result.Path.push_back(F); + } while (F != 0); + + // Then read the CallCount. + internal_memcpy(&Result.CallCount, P, sizeof(int64_t)); + P += sizeof(int64_t); + + // Then read the CumulativeLocalTime. + internal_memcpy(&Result.CumulativeLocalTime, P, sizeof(int64_t)); + P += sizeof(int64_t); + return {std::move(Result), P}; +} + +TEST(profileCollectorServiceTest, PostSerializeCollect) { + profilerFlags()->setDefaults(); + // The most basic use-case (the one we actually only care about) is the one + // where we ensure that we can post FunctionCallTrie instances, which are then + // destroyed but serialized properly. + // + // First, we initialise a set of allocators in the local scope. This ensures + // that we're able to copy the contents of the FunctionCallTrie that uses + // the local allocators. + auto Allocators = FunctionCallTrie::InitAllocators(); + FunctionCallTrie T(Allocators); + + // Then, we populate the trie with some data. + T.enterFunction(1, 1); + T.enterFunction(2, 2); + T.exitFunction(2, 3); + T.exitFunction(1, 4); + + // Then we post the data to the global profile collector service. + profileCollectorService::post(T, 1); + + // Then we serialize the data. + profileCollectorService::serialize(); + + // Then we go through a single buffer to see whether we're getting the data we + // expect. + auto B = profileCollectorService::nextBuffer({nullptr, 0}); + ValidateBlock(B); + u32 BlockSize; + u32 BlockNum; + u64 ThreadId; + std::tie(BlockSize, BlockNum, ThreadId) = ParseBlockHeader(B); + + // We look at the serialized buffer to see whether the Trie we're expecting + // to see is there. + auto DStart = static_cast(B.Data) + kHeaderSize; + std::vector D(DStart, DStart + BlockSize); + B = profileCollectorService::nextBuffer(B); + ASSERT_EQ(B.Data, nullptr); + ASSERT_EQ(B.Size, 0u); + + Profile Profile1, Profile2; + auto P = static_cast(D.data()); + std::tie(Profile1, P) = ParseProfile(P); + std::tie(Profile2, P) = ParseProfile(P); + + ASSERT_NE(Profile1.Path.size(), Profile2.Path.size()); + auto &P1 = Profile1.Path.size() < Profile2.Path.size() ? Profile2 : Profile1; + auto &P2 = Profile1.Path.size() < Profile2.Path.size() ? Profile1 : Profile2; + std::vector P1Expected = {2, 1, 0}; + std::vector P2Expected = {1, 0}; + ASSERT_EQ(P1.Path.size(), P1Expected.size()); + ASSERT_EQ(P2.Path.size(), P2Expected.size()); + ASSERT_EQ(P1.Path, P1Expected); + ASSERT_EQ(P2.Path, P2Expected); +} + +// We break out a function that will be run in multiple threads, one that will +// use a thread local allocator, and will post the FunctionCallTrie to the +// profileCollectorService. This simulates what the threads being profiled would +// be doing anyway, but through the XRay logging implementation. +void threadProcessing() { + thread_local auto Allocators = FunctionCallTrie::InitAllocators(); + FunctionCallTrie T(Allocators); + + T.enterFunction(1, 1); + T.enterFunction(2, 2); + T.exitFunction(2, 3); + T.exitFunction(1, 4); + + profileCollectorService::post(T, __sanitizer::GetTid()); +} + +TEST(profileCollectorServiceTest, PostSerializeCollectMultipleThread) { + profilerFlags()->setDefaults(); + std::thread t1(threadProcessing); + std::thread t2(threadProcessing); + + t1.join(); + t2.join(); + + // At this point, t1 and t2 are already done with what they were doing. + profileCollectorService::serialize(); + + // Ensure that we see two buffers. + auto B = profileCollectorService::nextBuffer({nullptr, 0}); + ValidateBlock(B); + + B = profileCollectorService::nextBuffer(B); + ValidateBlock(B); +} + +} // namespace +} // namespace __xray Index: lib/xray/xray_function_call_trie.h =================================================================== --- lib/xray/xray_function_call_trie.h +++ lib/xray/xray_function_call_trie.h @@ -18,6 +18,7 @@ #include "xray_profiler_flags.h" #include "xray_segmented_array.h" #include +#include // For placement new. namespace __xray { Index: lib/xray/xray_profile_collector.h =================================================================== --- lib/xray/xray_profile_collector.h +++ lib/xray/xray_profile_collector.h @@ -0,0 +1,88 @@ +//===-- xray_profile_collector.h -------------------------------*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file is a part of XRay, a dynamic runtime instrumentation system. +// +// This file defines the interface for a data collection service, for XRay +// profiling. What we implement here is an in-process service where +// FunctionCallTrie instances can be handed off by threads, to be +// consolidated/collected. +// +//===----------------------------------------------------------------------===// +#ifndef XRAY_XRAY_PROFILE_COLLECTOR_H +#define XRAY_XRAY_PROFILE_COLLECTOR_H + +#include "xray_function_call_trie.h" + +#include "xray/xray_log_interface.h" + +namespace __xray { + +/// The ProfileCollectorService implements a centralised mechanism for +/// collecting FunctionCallTrie instances, indexed by thread ID. On demand, the +/// ProfileCollectorService can be queried for the most recent state of the +/// data, in a form that allows traversal. +namespace profileCollectorService { + +/// Posts the FunctionCallTrie associated with a specific Thread ID. This +/// will: +/// +/// - Make a copy of the FunctionCallTrie and store that against the Thread +/// ID. This will use the global allocator for the service-managed +/// FunctionCallTrie instances. +/// - Queue up a pointer to the FunctionCallTrie. +/// - If the queue is long enough (longer than some arbitrary threshold) we +/// then pre-calculate a single FunctionCallTrie for the whole process. +/// +/// +/// We are making a copy of the FunctionCallTrie because the intent is to have +/// this function be called at thread exit, or soon after the profiling +/// handler is finalized through the XRay APIs. By letting threads each +/// process their own thread-local FunctionCallTrie instances, we're removing +/// the need for synchronisation across threads while we're profiling. +/// However, once we're done profiling, we can then collect copies of these +/// FunctionCallTrie instances and pay the cost of the copy. +/// +/// NOTE: In the future, if this turns out to be more costly than "moving" the +/// FunctionCallTrie instances from the owning thread to the collector +/// service, then we can change the implementation to do it this way (moving) +/// instead. +void post(const FunctionCallTrie &T, tid_t TId); + +/// The serialize will process all FunctionCallTrie instances in memory, and +/// turn those into specifically formatted blocks, each describing the +/// function call trie's contents in a compact form. In memory, this looks +/// like the following layout: +/// +/// - block size (32 bits) +/// - block number (32 bits) +/// - thread id (64 bits) +/// - list of records: +/// - function ids in leaf to root order, terminated by +/// 0 (32 bits per function id) +/// - call count (64 bit) +/// - cumulative local time (64 bit) +/// - record delimiter (64 bit, 0x0) +/// +void serialize(); + +/// The reset function will clear out any internal memory held by the +/// service. The intent is to have the resetting be done in calls to the +/// initialization routine, or explicitly through the flush log API. +void reset(); + +/// This nextBuffer function is meant to implement the iterator functionality, +/// provided in the XRay API. +XRayBuffer nextBuffer(XRayBuffer B); + +}; // namespace profileCollectorService + +} // namespace __xray + +#endif // XRAY_XRAY_PROFILE_COLLECTOR_H Index: lib/xray/xray_profile_collector.cc =================================================================== --- lib/xray/xray_profile_collector.cc +++ lib/xray/xray_profile_collector.cc @@ -0,0 +1,285 @@ +//===-- xray_profile_collector.cc ------------------------------*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file is a part of XRay, a dynamic runtime instrumentation system. +// +// This implements the interface for the profileCollectorService. +// +//===----------------------------------------------------------------------===// +#include "xray_profile_collector.h" +#include "sanitizer_common/sanitizer_common.h" +#include "sanitizer_common/sanitizer_vector.h" +#include "xray_profiler_flags.h" +#include +#include + +namespace __xray { +namespace profileCollectorService { + +namespace { + +SpinMutex GlobalMutex; +struct ThreadTrie { + tid_t TId; + FunctionCallTrie *Trie; +}; +Vector ThreadTries; + +struct ProfileBuffer { + void *Data; + size_t Size; +}; +Vector ProfileBuffers; + +struct BlockHeader { + u32 BlockSize; + u32 BlockNum; + u64 ThreadId; +}; + +FunctionCallTrie::Allocators *GlobalAllocators = nullptr; + +} // namespace + +void post(const FunctionCallTrie &T, tid_t TId) { + static pthread_once_t Once = PTHREAD_ONCE_INIT; + pthread_once(&Once, +[] { + SpinMutexLock Lock(&GlobalMutex); + GlobalAllocators = reinterpret_cast( + InternalAlloc(sizeof(FunctionCallTrie::Allocators))); + new (GlobalAllocators) FunctionCallTrie::Allocators(); + *GlobalAllocators = FunctionCallTrie::InitAllocators(); + }); + DCHECK_NE(GlobalAllocators, nullptr); + + ThreadTrie *Item = nullptr; + { + SpinMutexLock Lock(&GlobalMutex); + if (GlobalAllocators == nullptr) + return; + + Item = ThreadTries.PushBack(); + Item->TId = TId; + + // Here we're using the internal allocator instead of the managed allocator + // because: + // + // 1) We're not using the segmented array data structure to host + // FunctionCallTrie objects. We're using a Vector (from sanitizer_common) + // which works like a std::vector<...> keeping elements contiguous in + // memory. The segmented array data structure assumes that elements are + // trivially destructible, where FunctionCallTrie isn't. + // + // 2) Using a managed allocator means we need to manage that separately, + // which complicates the nature of this code. To get around that, we're + // using the internal allocator instead, which has its own global state + // and is decoupled from the lifetime management required by the managed + // allocator we have in XRay. + // + Item->Trie = reinterpret_cast( + InternalAlloc(sizeof(FunctionCallTrie))); + DCHECK_NE(Item->Trie, nullptr); + new (Item->Trie) FunctionCallTrie(*GlobalAllocators); + } + DCHECK_NE(Item, nullptr); + + T.deepCopyInto(*Item->Trie); +} + +// A PathArray represents the function id's representing a stack trace. In this +// context a path is almost always represented from the leaf function in a call +// stack to a root of the call trie. +using PathArray = Array; + +struct ProfileRecord { + using PathAllocator = typename PathArray::AllocatorType; + + // The Path in this record is the function id's from the leaf to the root of + // the function call stack as represented from a FunctionCallTrie. + PathArray *Path = nullptr; + const FunctionCallTrie::Node *Node = nullptr; + + // Constructor for in-place construction. + ProfileRecord(PathAllocator &A, const FunctionCallTrie::Node *N) + : Path([&] { + auto P = + reinterpret_cast(InternalAlloc(sizeof(PathArray))); + new (P) PathArray(A); + return P; + }()), + Node(N) {} +}; + +namespace { + +using ProfileRecordArray = Array; + +// Walk a depth-first traversal of each root of the FunctionCallTrie to generate +// the path(s) and the data associated with the path. +static void populateRecords(ProfileRecordArray &PRs, + ProfileRecord::PathAllocator &PA, + const FunctionCallTrie &Trie) { + using StackArray = Array; + using StackAllocator = typename StackArray::AllocatorType; + StackAllocator StackAlloc(profilerFlags()->stack_allocator_max, 0); + StackArray DFSStack(StackAlloc); + for (const auto R : Trie.getRoots()) { + DFSStack.Append(R); + while (!DFSStack.empty()) { + auto Node = DFSStack.back(); + DFSStack.trim(1); + auto Record = PRs.AppendEmplace(PA, Node); + DCHECK_NE(Record, nullptr); + + // Traverse the Node's parents and as we're doing so, get the FIds in + // the order they appear. + for (auto N = Node; N != nullptr; N = N->Parent) + Record->Path->Append(N->FId); + DCHECK(!Record->Path->empty()); + + for (const auto C : Node->Callees) + DFSStack.Append(C.NodePtr); + } + } +} + +static void serializeRecords(ProfileBuffer *Buffer, const BlockHeader &Header, + const ProfileRecordArray &ProfileRecords) { + auto NextPtr = static_cast( + internal_memcpy(Buffer->Data, &Header, sizeof(Header))) + + sizeof(Header); + for (const auto &Record : ProfileRecords) { + // List of IDs follow: + for (const auto FId : *Record.Path) + NextPtr = + static_cast(internal_memcpy(NextPtr, &FId, sizeof(FId))) + + sizeof(FId); + + // Add the sentinel here. + constexpr int32_t SentinelFId = 0; + NextPtr = static_cast( + internal_memset(NextPtr, SentinelFId, sizeof(SentinelFId))) + + sizeof(SentinelFId); + + // Add the node data here. + NextPtr = + static_cast(internal_memcpy(NextPtr, &Record.Node->CallCount, + sizeof(Record.Node->CallCount))) + + sizeof(Record.Node->CallCount); + NextPtr = static_cast( + internal_memcpy(NextPtr, &Record.Node->CumulativeLocalTime, + sizeof(Record.Node->CumulativeLocalTime))) + + sizeof(Record.Node->CumulativeLocalTime); + } + + DCHECK_EQ(NextPtr - static_cast(Buffer->Data), Buffer->Size); +} + +} // namespace + +void serialize() { + SpinMutexLock Lock(&GlobalMutex); + + // Clear out the global ProfileBuffers. + for (uptr I = 0; I < ProfileBuffers.Size(); ++I) + InternalFree(ProfileBuffers[I].Data); + ProfileBuffers.Reset(); + + if (ThreadTries.Size() == 0) + return; + + // Then repopulate the global ProfileBuffers. + for (u32 I = 0; I < ThreadTries.Size(); ++I) { + using ProfileRecordAllocator = typename ProfileRecordArray::AllocatorType; + ProfileRecordAllocator PRAlloc(profilerFlags()->global_allocator_max, 0); + ProfileRecord::PathAllocator PathAlloc( + profilerFlags()->global_allocator_max, 0); + ProfileRecordArray ProfileRecords(PRAlloc); + + // First, we want to compute the amount of space we're going to need. We'll + // use a local allocator and an __xray::Array<...> to store the intermediary + // data, then compute the size as we're going along. Then we'll allocate the + // contiguous space to contain the thread buffer data. + const auto &Trie = *ThreadTries[I].Trie; + if (Trie.getRoots().empty()) + continue; + populateRecords(ProfileRecords, PathAlloc, Trie); + DCHECK(!Trie.getRoots().empty()); + DCHECK(!ProfileRecords.empty()); + + // Go through each record, to compute the sizes. + // + // header size = block size (4 bytes) + // + block number (4 bytes) + // + thread id (8 bytes) + // record size = path ids (4 bytes * number of ids + sentinel 4 bytes) + // + call count (8 bytes) + // + local time (8 bytes) + // + end of record (8 bytes) + u32 CumulativeSizes = 0; + for (const auto &Record : ProfileRecords) + CumulativeSizes += 20 + (4 * Record.Path->size()); + + BlockHeader Header{16 + CumulativeSizes, I, ThreadTries[I].TId}; + auto Buffer = ProfileBuffers.PushBack(); + Buffer->Size = sizeof(Header) + CumulativeSizes; + Buffer->Data = InternalAlloc(Buffer->Size, nullptr, 64); + DCHECK_NE(Buffer->Data, nullptr); + serializeRecords(Buffer, Header, ProfileRecords); + + // Now clean up the ProfileRecords array, one at a time. + for (auto &Record : ProfileRecords) { + Record.Path->~PathArray(); + InternalFree(Record.Path); + } + } +} + +void reset() { + SpinMutexLock Lock(&GlobalMutex); + // Clear out the profile buffers that have been serialized. + for (uptr I = 0; I < ProfileBuffers.Size(); ++I) + InternalFree(ProfileBuffers[I].Data); + ProfileBuffers.Reset(); + + // Clear out the function call tries per thread. + for (uptr I = 0; I < ThreadTries.Size(); ++I) { + auto &T = ThreadTries[I]; + T.Trie->~FunctionCallTrie(); + InternalFree(T.Trie); + } + ThreadTries.Reset(); + + // Reset the global allocators. + if (GlobalAllocators != nullptr) { + GlobalAllocators->~Allocators(); + InternalFree(GlobalAllocators); + GlobalAllocators = nullptr; + } + GlobalAllocators = reinterpret_cast( + InternalAlloc(sizeof(FunctionCallTrie::Allocators))); + new (GlobalAllocators) FunctionCallTrie::Allocators(); + *GlobalAllocators = FunctionCallTrie::InitAllocators(); +} + +XRayBuffer nextBuffer(XRayBuffer B) { + SpinMutexLock Lock(&GlobalMutex); + if (B.Data == nullptr && ProfileBuffers.Size()) + return {ProfileBuffers[0].Data, ProfileBuffers[0].Size}; + + BlockHeader Header; + internal_memcpy(&Header, B.Data, sizeof(BlockHeader)); + auto NextBlock = Header.BlockNum + 1; + if (NextBlock < ProfileBuffers.Size()) + return {ProfileBuffers[NextBlock].Data, ProfileBuffers[NextBlock].Size}; + return {nullptr, 0}; +} + +} // namespace profileCollectorService +} // namespace __xray