CelticCraft/Plugins/VoxelFree/Source/Voxel/Private/VoxelThreadPool.cpp

452 lines
12 KiB
C++

// Copyright 2020 Phyronnaz
#include "VoxelThreadPool.h"
#include "VoxelQueuedWork.h"
#include "VoxelMinimal.h"
#include "IVoxelPool.h"
#include "HAL/Event.h"
#include "HAL/Runnable.h"
#include "HAL/RunnableThread.h"
#include "Misc/ScopeLock.h"
#include "Misc/ScopeExit.h"
#include "Async/TaskGraphInterfaces.h"
DECLARE_DWORD_COUNTER_STAT(TEXT("VoxelThreadPoolDummyCounter"), STAT_VoxelThreadPoolDummyCounter, STATGROUP_ThreadPoolAsyncTasks);
DECLARE_DWORD_COUNTER_STAT(TEXT("Recomputed Voxel Tasks Priorities"), STAT_RecomputedVoxelTasksPriorities, STATGROUP_VoxelCounters);
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
FVoxelQueuedThreadPoolStats& FVoxelQueuedThreadPoolStats::Get()
{
static FVoxelQueuedThreadPoolStats Stats;
return Stats;
}
void FVoxelQueuedThreadPoolStats::Report(FName Name, double Time)
{
FScopeLock Lock(&Section);
Times.FindOrAdd(Name) += Time;
}
void FVoxelQueuedThreadPoolStats::LogTimes() const
{
FScopeLock Lock(&Section);
LOG_VOXEL(Log, TEXT("#############################################"));
LOG_VOXEL(Log, TEXT("########## Voxel Thread Pool Stats ##########"));
LOG_VOXEL(Log, TEXT("#############################################"));
for (const auto& It : Times)
{
LOG_VOXEL(Log, TEXT("%s: %fs"), *It.Key.ToString(), It.Value);
}
}
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
class FScopeLockWithStats
{
public:
FScopeLockWithStats(FCriticalSection& InSynchObject)
: SynchObject(InSynchObject)
{
VOXEL_ASYNC_SCOPE_COUNTER("Lock");
SynchObject.Lock();
}
~FScopeLockWithStats()
{
VOXEL_ASYNC_SCOPE_COUNTER("Unlock");
SynchObject.Unlock();
}
private:
FCriticalSection& SynchObject;
};
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
class VOXEL_API FVoxelQueuedThread : public FRunnable
{
public:
const FString ThreadName;
FVoxelQueuedThreadPool* const ThreadPool;
/** The event that tells the thread there is work to do. */
FEvent* const DoWorkEvent;
FVoxelQueuedThread(FVoxelQueuedThreadPool* Pool, const FString& ThreadName, uint32 StackSize, EThreadPriority ThreadPriority);
~FVoxelQueuedThread();
//~ Begin FRunnable Interface
virtual uint32 Run() override;
//~ End FRunnable Interface
private:
/** If true, the thread should exit. */
FThreadSafeBool TimeToDie;
/** The work this thread is doing. */
TAtomic<IVoxelQueuedWork*> QueuedWork;
const TUniquePtr<FRunnableThread> Thread;
};
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
FVoxelQueuedThread::FVoxelQueuedThread(FVoxelQueuedThreadPool* Pool, const FString& ThreadName, uint32 StackSize, EThreadPriority ThreadPriority)
: ThreadName(ThreadName)
, ThreadPool(Pool)
, DoWorkEvent(FPlatformProcess::GetSynchEventFromPool()) // Create event BEFORE thread
, TimeToDie(false) // BEFORE creating thread
, QueuedWork(nullptr)
, Thread(FRunnableThread::Create(this, *ThreadName, StackSize, ThreadPriority, FPlatformAffinity::GetPoolThreadMask()))
{
check(Thread.IsValid());
}
FVoxelQueuedThread::~FVoxelQueuedThread()
{
// Tell the thread it needs to die
TimeToDie = true;
// Trigger the thread so that it will come out of the wait state if
// it isn't actively doing work
DoWorkEvent->Trigger();
// If waiting was specified, wait the amount of time. If that fails,
// brute force kill that thread. Very bad as that might leak.
Thread->WaitForCompletion();
// Clean up the event
FPlatformProcess::ReturnSynchEventToPool(DoWorkEvent);
}
uint32 FVoxelQueuedThread::Run()
{
while (!TimeToDie)
{
// This will force sending the stats packet from the previous frame.
SET_DWORD_STAT(STAT_VoxelThreadPoolDummyCounter, 0);
// We need to wait for shorter amount of time
bool bContinueWaiting = true;
while (bContinueWaiting)
{
VOXEL_ASYNC_VERBOSE_SCOPE_COUNTER("FVoxelQueuedThread::Run.WaitForWork");
// Wait for some work to do
bContinueWaiting = !DoWorkEvent->Wait(10);
}
if (!TimeToDie)
{
IVoxelQueuedWork* LocalQueuedWork = ThreadPool->ReturnToPoolOrGetNextJob(this);
while (LocalQueuedWork)
{
const FName Name = LocalQueuedWork->Name;
const double StartTime = FPlatformTime::Seconds();
LocalQueuedWork->DoThreadedWork();
// IMPORTANT: LocalQueuedWork should be considered as deleted after this line
const double EndTime = FPlatformTime::Seconds();
FVoxelQueuedThreadPoolStats::Get().Report(Name, EndTime - StartTime);
LocalQueuedWork = ThreadPool->ReturnToPoolOrGetNextJob(this);
}
}
}
return 0;
}
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
FVoxelQueuedThreadPoolSettings::FVoxelQueuedThreadPoolSettings(
const FString& PoolName,
uint32 NumThreads,
uint32 StackSize,
EThreadPriority ThreadPriority,
bool bConstantPriorities)
: PoolName(PoolName)
, NumThreads(NumThreads)
, StackSize(StackSize)
, ThreadPriority(ThreadPriority)
, bConstantPriorities(bConstantPriorities)
{
}
inline TArray<TUniquePtr<FVoxelQueuedThread>> CreateThreads(FVoxelQueuedThreadPool* Pool)
{
UE::Trace::ThreadGroupBegin(TEXT("VoxelThreadPool"));
ON_SCOPE_EXIT
{
UE::Trace::ThreadGroupEnd();
};
auto& Settings = Pool->Settings;
const uint32 NumThreads = Settings.NumThreads;
TArray<TUniquePtr<FVoxelQueuedThread>> Threads;
Threads.Reserve(NumThreads);
for (uint32 ThreadIndex = 0; ThreadIndex < NumThreads; ThreadIndex++)
{
const FString Name = FString::Printf(TEXT("%s Thread %d"), *Settings.PoolName, ThreadIndex);
Threads.Add(MakeUnique<FVoxelQueuedThread>(Pool, Name, Settings.StackSize, Settings.ThreadPriority));
}
return Threads;
}
FVoxelQueuedThreadPool::FVoxelQueuedThreadPool(const FVoxelQueuedThreadPoolSettings& Settings)
: Settings(Settings)
, AllThreads(CreateThreads(this))
{
QueuedThreads.Reserve(Settings.NumThreads);
for (auto& Thread : AllThreads)
{
QueuedThreads.Add(Thread.Get());
}
}
TVoxelSharedRef<FVoxelQueuedThreadPool> FVoxelQueuedThreadPool::Create(const FVoxelQueuedThreadPoolSettings& Settings)
{
const auto Pool = TVoxelSharedRef<FVoxelQueuedThreadPool>(new FVoxelQueuedThreadPool(Settings));
TFunction<void()> ShutdownCallback = [WeakPool = MakeVoxelWeakPtr(Pool)]()
{
auto PoolPtr = WeakPool.Pin();
if (PoolPtr.IsValid())
{
PoolPtr->AbandonAllTasks();
}
};
FTaskGraphInterface::Get().AddShutdownCallback(ShutdownCallback);
return Pool;
}
FVoxelQueuedThreadPool::~FVoxelQueuedThreadPool()
{
if (!TimeToDie)
{
AbandonAllTasks();
}
}
inline uint32 AddPriorityOffset(uint32 Priority, int32 PriorityOffset)
{
return FMath::Clamp<int64>(int64(Priority) + PriorityOffset, MIN_uint32, MAX_uint32);
}
FORCEINLINE void FVoxelQueuedThreadPool::FQueuedWorkInfo::RecomputePriority(double Time)
{
Priority = AddPriorityOffset(Work->GetPriority(), PriorityOffset);
NextPriorityUpdateTime = Time + Work->PriorityDuration;
}
void FVoxelQueuedThreadPool::AddQueuedWork(IVoxelQueuedWork* InQueuedWork, uint32 PriorityCategory, int32 PriorityOffset)
{
VOXEL_FUNCTION_COUNTER();
check(IsInGameThread());
check(InQueuedWork);
if (TimeToDie)
{
InQueuedWork->Abandon();
return;
}
FQueuedWorkInfo WorkInfo;
{
VOXEL_SCOPE_COUNTER("Compute Priority");
WorkInfo = FQueuedWorkInfo(InQueuedWork, PriorityCategory, PriorityOffset);
}
{
VOXEL_SCOPE_COUNTER("Lock");
Section.Lock();
}
{
VOXEL_SCOPE_COUNTER("Add Work");
if (Settings.bConstantPriorities)
{
WorkInfo.RecomputePriority(FPlatformTime::Seconds());
StaticQueuedWorks.push(WorkInfo);
}
else
{
QueuedWorks.Add(WorkInfo);
}
}
{
VOXEL_SCOPE_COUNTER("Wake up threads");
for (auto* QueuedThread : QueuedThreads)
{
QueuedThread->DoWorkEvent->Trigger();
}
QueuedThreads.Reset();
}
{
VOXEL_SCOPE_COUNTER("Unlock");
Section.Unlock();
}
}
void FVoxelQueuedThreadPool::AddQueuedWorks(const TArray<IVoxelQueuedWork*>& InQueuedWorks, uint32 PriorityCategory, int32 PriorityOffset)
{
VOXEL_FUNCTION_COUNTER();
check(IsInGameThread());
if (TimeToDie)
{
for (auto* InQueuedWork : InQueuedWorks)
{
InQueuedWork->Abandon();
}
return;
}
{
VOXEL_SCOPE_COUNTER("Lock");
Section.Lock();
}
{
if (!Settings.bConstantPriorities)
{
VOXEL_SCOPE_COUNTER("Reserve");
QueuedWorks.Reserve(QueuedWorks.Num() + InQueuedWorks.Num());
}
VOXEL_SCOPE_COUNTER("Add Works");
for (auto* InQueuedWork : InQueuedWorks)
{
FQueuedWorkInfo WorkInfo(InQueuedWork, PriorityCategory, PriorityOffset);
if (Settings.bConstantPriorities)
{
WorkInfo.RecomputePriority(FPlatformTime::Seconds());
StaticQueuedWorks.push(WorkInfo);
}
else
{
QueuedWorks.Add(WorkInfo);
}
}
}
{
VOXEL_SCOPE_COUNTER("Wake up threads");
for (auto* QueuedThread : QueuedThreads)
{
QueuedThread->DoWorkEvent->Trigger();
}
QueuedThreads.Reset();
}
{
VOXEL_SCOPE_COUNTER("Unlock");
Section.Unlock();
}
}
IVoxelQueuedWork* FVoxelQueuedThreadPool::ReturnToPoolOrGetNextJob(FVoxelQueuedThread* InQueuedThread)
{
VOXEL_ASYNC_FUNCTION_COUNTER();
check(InQueuedThread);
FScopeLockWithStats Lock(Section);
if (QueuedWorks.Num() > 0)
{
check(!Settings.bConstantPriorities);
check(!TimeToDie);
VOXEL_ASYNC_SCOPE_COUNTER("Voxel Thread Pool Recompute Priorities");
// Find best work. We recompute every priorities as the priorities can change (eg, the camera might have moved)
int32 BestIndex = -1;
uint64 BestPriority = 0;
int32 NumRecomputed = 0;
const double Time = FPlatformTime::Seconds();
for (int32 Index = 0; Index < QueuedWorks.Num(); Index++)
{
auto& WorkInfo = QueuedWorks.GetData()[Index];
if (WorkInfo.NextPriorityUpdateTime < Time)
{
NumRecomputed++;
WorkInfo.RecomputePriority(Time);
}
const uint64 Priority = WorkInfo.GetPriority();
if (Priority >= BestPriority)
{
BestPriority = Priority;
BestIndex = Index;
}
}
INC_DWORD_STAT_BY(STAT_RecomputedVoxelTasksPriorities, NumRecomputed);
auto* Work = QueuedWorks[BestIndex].Work;
QueuedWorks.RemoveAtSwap(BestIndex);
check(Work);
return Work;
}
else if (!StaticQueuedWorks.empty())
{
check(Settings.bConstantPriorities);
auto* Work = StaticQueuedWorks.top().Work;
StaticQueuedWorks.pop();
check(Work);
return Work;
}
else
{
QueuedThreads.Add(InQueuedThread);
return nullptr;
}
}
void FVoxelQueuedThreadPool::AbandonAllTasks()
{
VOXEL_ASYNC_FUNCTION_COUNTER();
ensure(!TimeToDie);
{
FScopeLockWithStats Lock(Section);
TimeToDie = true;
// Clean up all queued objects
for (auto& WorkInfo : QueuedWorks)
{
WorkInfo.Work->Abandon();
}
QueuedWorks.Reset();
while (!StaticQueuedWorks.empty())
{
StaticQueuedWorks.top().Work->Abandon();
StaticQueuedWorks.pop();
}
}
// Wait for all threads to finish up
while (true)
{
{
FScopeLockWithStats Lock(Section);
if (AllThreads.Num() == QueuedThreads.Num())
{
break;
}
}
FPlatformProcess::Sleep(0.0f);
}
}