EM_Task/UnrealEd/Private/Cooker/CookDirector.cpp

562 lines
19 KiB
C++
Raw Permalink Normal View History

2026-02-13 16:18:33 +08:00
#include "Cooker/CookDirector.h"
#include "Cooker/CookWorkerServer.h"
#include "Cooker/CookPackageData.h"
#include "Cooker/CookPlatformManager.h"
#include "Cooker/IWorkerRequests.h"
#include "CookOnTheSide/CookLog.h"
#include "Sockets.h"
#include "SocketTypes.h"
#include "SocketSubsystem.h"
#include "Misc/ScopeExit.h"
#include "Misc/PathViews.h"
#include "GenericPlatform/GenericPlatformOutputDevices.h"
#include "String/Find.h"
#include "Cooker/MPCookTrace.h"
namespace UE
{
namespace Cook
{
FString GetProjectEditorBinaryPath()
{
#if PLATFORM_WINDOWS
return FPlatformProcess::ExecutablePath();
#elif PLATFORM_MAC
@autoreleasepool
{
return UTF8_TO_TCHAR([[[NSBundle mainBundle] executablePath] fileSystemRepresentation]);
}
#elif PLATFORM_LINUX
const TCHAR* PlatformConfig = FPlatformMisc::GetUBTPlatform();
FString ExeFileName = FPaths::EngineDir() / TEXT("Binaries") / PlatformConfig / FString(FPlatformProcess::ExecutableName());
return ExeFileName;
#else
#error "Unknown platform"
#endif
}
FCookDirector::FCookDirector(UCookOnTheFlyServer& InCOTFS, int32 CookProcessCount, bool bInCookProcessCountSetByCommandLine)
: RunnableShunt(*this),
COTFS(InCOTFS)
{
// CookWorkerCount
RequestedCookWorkerCount = CookProcessCount - 1;
WorkerConnectPort = Sockets::COOKDIRECTOR_DEFAULT_REQUEST_CONNECTION_PORT;
const TCHAR* CommandLine = FCommandLine::Get();
FParse::Value(CommandLine, TEXT("-CookDirectorListenPort="), WorkerConnectPort);
}
FCookDirector::~FCookDirector()
{
StopCommunicationThread();
}
void FCookDirector::InitializeWorkers()
{
if (bWorkersInitialized)
{
return;
}
bWorkersInitialized = true;
check(!bWorkersActive);
check(!CommunicationThread);
check(RemoteWorkers.Num() == 0);
if (!TryCreateWorkerConnectSocket())
{
return;
}
RemoteWorkers.Reserve(RequestedCookWorkerCount);
for (int32 RemoteIndex = 0; RemoteIndex < RequestedCookWorkerCount; ++RemoteIndex)
{
RemoteWorkers.Add(new FCookWorkerServer(*this, FWorkerId::FromRemoteIndex(RemoteIndex)));
}
LocalWorkerId = FWorkerId::Local();
WorkerAssignments.Add(FWorkerAssignment{LocalWorkerId});
COTFS.WorkerRequests->DesiredWaitAssignQueueLength *= (RequestedCookWorkerCount + 1);
CommandletExecutablePath = GetProjectEditorBinaryPath();
auto Platforms = COTFS.PlatformManager->GetSessionPlatforms();
InitialConfigMessage = MakeUnique<FInitialConfigMessage>();
InitialConfigMessage->ReadFromLocal(COTFS, Platforms);
ShutdownEvent->Reset();
LaunchCommunicationThread();
bWorkersActive = true;
}
bool FCookDirector::IsMultiprocessAvailable() const
{
return RequestedCookWorkerCount > 0;
}
void FCookDirector::LaunchCommunicationThread()
{
if (!CommunicationThread && FPlatformProcess::SupportsMultithreading())
{
CommunicationThread = FRunnableThread::Create(&RunnableShunt, TEXT("FCookDirector"), 0, TPri_Normal);
}
}
void FCookDirector::StopCommunicationThread()
{
ShutdownEvent->Trigger();
if (CommunicationThread)
{
CommunicationThread->WaitForCompletion();
delete CommunicationThread;
CommunicationThread = nullptr;
}
ShutdownEvent->Reset();
}
uint32 FCookDirector::RunCommunicationThread()
{
constexpr float TickPeriod = 1.f;
constexpr float MinSleepTime = 0.001f;
for (;;)
{
double StartTime = FPlatformTime::Seconds();
TickCommunication(ECookDirectorThread::CommunicateThread);
double CurrentTime = FPlatformTime::Seconds();
float RemainingDuration = StartTime + TickPeriod - CurrentTime;
if (RemainingDuration > .001f)
{
uint32 WaitTimeMilliseconds = static_cast<uint32>(RemainingDuration * 1000);
if (ShutdownEvent->Wait(WaitTimeMilliseconds))
{
break;
}
}
}
return 0;
}
void FCookDirector::TickCommunication(ECookDirectorThread TickThread)
{
{
FScopeLock CommunicationScopeLock(&CommunicationLock);
bWorkersActive = RemoteWorkers.Num() > ShuttingDownWorkers.Num();
}
TickWorkerConnects(TickThread);
ShuttingDownWorkers.Reset();
for (TRefCountPtr<FCookWorkerServer>& RemoteWorker: RemoteWorkers)
{
if (!RemoteWorker->IsConnectionLost())
{
RemoteWorker->TickCommunication(TickThread);
if (RemoteWorker->IsShuttingDown())
{
RemoteWorker->ShutdownRemoteProcess();
ShuttingDownWorkers.Add(RemoteWorker);
}
}
else
{
ShuttingDownWorkers.Add(RemoteWorker);
}
}
}
void FCookDirector::TickWorkerConnects(ECookDirectorThread TickThread)
{
if (!WorkerConnectSocket)
{
return;
}
bool bHasPendingConnection;
while (WorkerConnectSocket->HasPendingConnection(bHasPendingConnection) && bHasPendingConnection)
{
FSocket* WorkerSocket = WorkerConnectSocket->Accept(TEXT("Client Connection"));
if (!WorkerSocket)
{
UE_LOG(LogCook, Warning, TEXT("Pending connection failed to create a ClientSocket."));
}
else
{
FCookSocket CookSocket;
CookSocket.InitSocket(WorkerSocket);
FWorkerConnectMessage Message;
if (!CookSocket.ReadPacket(Message))
{
UE_LOG(LogCook, Warning, TEXT("Pending connection sent an invalid Connection Message. Connection will be ignored."));
continue;
}
if (Message.RemoteIndex < 0 || Message.RemoteIndex > RequestedCookWorkerCount)
{
UE_LOG(LogCook, Warning, TEXT("Pending connection sent a Connection Message with invalid RemoteIndex %d. RequestedCookWorkerCount = {%d}. Connection will be ignored."), Message.RemoteIndex, RequestedCookWorkerCount);
continue;
}
TRefCountPtr<FCookWorkerServer> RemoteWorkerPtr = RemoteWorkers[Message.RemoteIndex];
if (!RemoteWorkerPtr->TryHandleConnectMessage(Message, WorkerSocket))
{
UE_LOG(LogCook, Warning, TEXT("Pending connection sent a Connection Message with an already in-use RemoteIndex. Connection will be ignored."));
continue;
}
FWorkerAssignment RemoteAssignment;
RemoteAssignment.Id = FWorkerId::FromRemoteIndex(Message.RemoteIndex);
WorkerAssignments.Add(RemoteAssignment);
CookSocket.BlockSocket(false);
CookSocket.DetachSocket();
}
}
}
void FCookDirector::StartCook()
{
// We launch the CookWorkers during StartCook, so that their startup time overlaps with
// work the Director has to do during the first tick, for PumpRequests and AssignRequests.
FScopeLock CommunicationScopeLock(&CommunicationLock);
InitializeWorkers();
}
bool FCookDirector::TryCreateWorkerConnectSocket()
{
if (WorkerConnectSocket)
{
return true;
}
ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM);
if (!SocketSubsystem)
{
return false;
}
const FString SocketDescription = TEXT("FCookDirector-WorkerConnect");
const int32 MaxBacklog = 50;
WorkerConnectSocket = SocketSubsystem->CreateSocket(NAME_Stream, SocketDescription, false);
if (!WorkerConnectSocket)
{
UE_LOG(LogCook, Error, TEXT("CookDirector could not create listen socket (%s). CookWorkers will be disabled."), *SocketDescription);
return false;
}
TSharedPtr<FInternetAddr> ConnectAuthorityAddr = SocketSubsystem->GetAddressFromString(TEXT("127.0.0.1"));
ConnectAuthorityAddr->SetPort(WorkerConnectPort);
if (!WorkerConnectSocket->Bind(*ConnectAuthorityAddr))
{
ESocketErrors LastError = SocketSubsystem->GetLastErrorCode();
UE_LOG(LogCook, Error, TEXT("CookDirector could not bind listen socket to port %d. CookWorkers will be disabled. Last Error Code: %d"), WorkerConnectPort, LastError);
SocketSubsystem->DestroySocket(WorkerConnectSocket);
WorkerConnectSocket = nullptr;
return false;
}
if (!WorkerConnectSocket->Listen(MaxBacklog))
{
UE_LOG(LogCook, Error, TEXT("CookDirector listen failed on socket. CookWorkers will be disabled."));
SocketSubsystem->DestroySocket(WorkerConnectSocket);
WorkerConnectSocket = nullptr;
return false;
}
WorkerConnectAuthority = ConnectAuthorityAddr->ToString(true /* bAppendPort */);
return true;
}
FCookDirector::FLaunchInfo FCookDirector::GetLaunchInfo(FWorkerId WorkerId)
{
FLaunchInfo Info;
Info.ShowWorkerOption = GetShowWorkerOption();
Info.CommandletExecutable = CommandletExecutablePath;
Info.WorkerCommandLine = GetWorkerCommandLine(WorkerId);
return Info;
}
FString FCookDirector::GetWorkerCommandLine(FWorkerId WorkerId)
{
const TCHAR* CommandLine = FCommandLine::Get();
const TCHAR* ProjectName = FApp::GetProjectName();
checkf(ProjectName && ProjectName[0], TEXT("Expected UnrealEditor to be running with a non-empty project name"));
// Note that we need to handle quoted strings for e.g. a projectfile with spaces in it; FParse::Token
// does handle them
FString Token;
TArray<FString> Tokens;
while (FParse::Token(CommandLine, Token, false /* bUseEscape */))
{
if (Token.IsEmpty())
{
continue;
}
if (Token.StartsWith(TEXT("-run=")) ||
Token == TEXT("-CookOnTheFly") ||
Token == TEXT("-CookWorker") ||
Token.StartsWith(TEXT("-CookCultures")) ||
Token.StartsWith(TEXT("-CookDirectorHost=")) ||
Token.StartsWith(TEXT("-MultiprocessId=")) ||
Token.StartsWith(TEXT("-CookProfileId=")) ||
Token.StartsWith(TEXT("-ShowCookWorker")) ||
Token.StartsWith(TEXT("-CoreLimit")) ||
Token.StartsWith(TEXT("-PhysicalCoreLimit")) ||
Token.StartsWith(TEXT("-MPCookCoreSubscription")) ||
Token.StartsWith(TEXT("-CookProcessCount=")) ||
Token.StartsWith(TEXT("-abslog=")) ||
Token.StartsWith(TEXT("-unattended")))
{
continue;
}
else if (Token.StartsWith(TEXT("-tracefile=")))
{
FString TraceFile;
FString TokenString(Token);
if (FParse::Value(*TokenString, TEXT("-tracefile="), TraceFile) && !TraceFile.IsEmpty())
{
FStringView BaseFilenameWithPath = FPathViews::GetBaseFilenameWithPath(TraceFile);
FStringView Extension = FPathViews::GetExtension(TraceFile, true /* bIncludeDot */);
Tokens.Add(FString::Printf(TEXT("-tracefile=\"%.*s_Worker%d%.*s\""), BaseFilenameWithPath.Len(), BaseFilenameWithPath.GetData(), WorkerId.GetRemoteIndex(), Extension.Len(), Extension.GetData()));
continue;
}
}
Tokens.Add(MoveTemp(Token));
}
if (Tokens[0] != ProjectName && !Tokens[0].EndsWith(TEXT(".uproject"), ESearchCase::IgnoreCase))
{
FString ProjectFilePath = FPaths::GetProjectFilePath();
if (!FPaths::IsSamePath(Tokens[0], ProjectFilePath))
{
Tokens.Insert(ProjectFilePath, 0);
}
}
Tokens.Insert(TEXT("-run=cook"), 1);
Tokens.Insert(TEXT("-cookworker"), 2);
Tokens.Insert(FString::Printf(TEXT("-MultiprocessId=%d"), WorkerId.GetMultiprocessId()), 4);
// This should have been constructed in TryCreateWorkerConnectSocket before any CookWorkerServers could exist to
// call GetWorkerCommandLine
check(!WorkerConnectAuthority.IsEmpty());
Tokens.Add(FString::Printf(TEXT("-CookDirectorHost=%s"), *WorkerConnectAuthority));
Tokens.Add(TEXT("-unattended"));
Tokens.Add(FString::Printf(TEXT("-abslog=%s"), *GetWorkerLogFileName(WorkerId)));
// We are joining the tokens back into a commandline string; wrap tokens with whitespace in quotes
for (FString& IterToken: Tokens)
{
int32 IndexOfWhitespace = UE::String::FindFirstOfAnyChar(IterToken, {' ', '\r', '\n'});
if (IndexOfWhitespace != INDEX_NONE)
{
int32 IndexOfQuote;
if (!IterToken.FindChar('\"', IndexOfQuote))
{
IterToken = FString::Printf(TEXT("\"%s\""), *IterToken);
}
}
}
return FString::Join(Tokens, TEXT(" "));
}
FString FCookDirector::GetWorkerLogFileName(FWorkerId WorkerId)
{
FString DirectorLogFileName = FGenericPlatformOutputDevices::GetAbsoluteLogFilename();
FStringView BaseFileName = FPathViews::GetBaseFilenameWithPath(DirectorLogFileName);
FStringView Extension = FPathViews::GetExtension(DirectorLogFileName, true /* bIncludeDot */);
return FString::Printf(TEXT("%.*s_Worker%d%*s"), BaseFileName.Len(), BaseFileName.GetData(), WorkerId.GetRemoteIndex(), Extension.Len(), Extension.GetData());
}
uint32 FCookDirector::FRunnableShunt::Run()
{
return Director.RunCommunicationThread();
}
void FCookDirector::FRunnableShunt::Stop()
{
Director.ShutdownEvent->Trigger();
}
FInitialConfigMessage* FCookDirector::GetInitialConfigMessage()
{
return InitialConfigMessage.Get();
}
FWorkerId UE::Cook::FCookDirector::GetLocalWorkerId()
{
return LocalWorkerId;
}
bool FCookDirector::AssignRequests(TArrayView<FPackageData*> Batch, TArray<FWorkerId>& OutAssignments)
{
if (Batch.Num() == 0 || WorkerAssignments.Num() <= 1)
{
return false;
}
const float LambdaLoadPenalty = 0.2f;
for (FPackageData* Request: Batch)
{
FWorkerAssignment* BestWorker = nullptr;
float MaxDecisionScore = -TNumericLimits<float>::Max();
TArray<FName> Dependencies;
if (COTFS.AssetRegistry->GetDependencies(Request->GetPackageName(), Dependencies,
UE::AssetRegistry::EDependencyCategory::Package,
UE::AssetRegistry::EDependencyQuery::Hard | UE::AssetRegistry::EDependencyQuery::Game))
{
for (FWorkerAssignment& Worker: WorkerAssignments)
{
float AffinityScore = 0.0f;
int32 CurrentLoad = Worker.CurrentLoad;
for (const FName& DependencyName: Dependencies)
{
if (Worker.PackagesInProgress.Contains(DependencyName))
{
AffinityScore += 1.0f;
}
}
float DecisionScore = AffinityScore - (LambdaLoadPenalty * CurrentLoad);
if (DecisionScore > MaxDecisionScore)
{
MaxDecisionScore = DecisionScore;
BestWorker = &Worker;
}
else if (DecisionScore == MaxDecisionScore && BestWorker && CurrentLoad < BestWorker->CurrentLoad)
{
BestWorker = &Worker;
}
}
}
if (CookTrace::IsTargetPackage(Request->GetPackageName()))
{
if (BestWorker->Id.IsLocal())
{
BestWorker = &WorkerAssignments[1];
}
}
if (BestWorker)
{
OutAssignments.Add(BestWorker->Id);
BestWorker->CurrentLoad++;
BestWorker->PackagesInProgress.Add(Request->GetPackageName());
}
else
{
OutAssignments.Add(LocalWorkerId);
AssignRequestTo(Request, LocalWorkerId);
}
}
return OutAssignments.Num() > 0;
}
bool FCookDirector::AssignRequestTo(FPackageData* PackageData, FWorkerId WorkerId)
{
for (FWorkerAssignment& Worker: WorkerAssignments)
{
if (Worker.Id == WorkerId)
{
Worker.PackagesInProgress.Add(PackageData->GetPackageName());
Worker.CurrentLoad++;
return true;
}
}
return false;
}
bool FCookDirector::AssignRequests(TArrayView<FPackageData*> Batch, FWorkerId WorkerId)
{
if (Batch.Num() == 0)
{
return false;
}
if (WorkerId.IsLocal())
{
for (auto PackageData: Batch)
{
UPackage* ExistingPackage = (UPackage*)StaticFindObject(UPackage::StaticClass(), nullptr, *PackageData->GetPackageName().ToString());
if (ExistingPackage != nullptr && ExistingPackage->IsFullyLoaded())
{
PackageData->SendToState(EPackageState::LoadReady, ESendFlags::QueueAddAndRemove);
}
else
{
PackageData->SendToState(EPackageState::LoadPrepare, ESendFlags::QueueAddAndRemove);
}
}
}
else
{
for (const FPackageData* PackageData: Batch)
{
if (CookTrace::IsTargetPackage(PackageData->GetPackageName()))
{
UE_LOG(LogMPCookTrace, Display, TEXT("[Director] Assigning package %s to Worker #%d."), *PackageData->GetPackageName().ToString(), WorkerId.GetRemoteIndex());
}
}
TRefCountPtr<FCookWorkerServer> RemoteWorker = RemoteWorkers[WorkerId.GetRemoteIndex()];
RemoteWorker->AppendAssignments(Batch, ECookDirectorThread::SchedulerThread);
}
return true;
}
bool FDirectorConnectionInfo::TryParseCommandLine()
{
if (!FParse::Value(FCommandLine::Get(), TEXT("-CookDirectorHost="), HostURI))
{
UE_LOG(LogCook, Error, TEXT("CookWorker startup failed: no CookDirector specified on commandline."));
return false;
}
uint32 MultiprocessId = 0;
if (!FParse::Value(FCommandLine::Get(), TEXT("-MultiprocessId="), MultiprocessId))
{
UE_LOG(LogCook, Error, TEXT("CookWorker startup failed: no MultiprocessId specified on commandline."));
return false;
}
if (MultiprocessId < 1 || 257 <= MultiprocessId)
{
UE_LOG(LogCook, Error,
TEXT("CookWorker startup failed: commandline had invalid -MultiprocessId=%d; MultiprocessId must be in the range [1, 256]."),
MultiprocessId);
return false;
}
RemoteIndex = static_cast<int32>(MultiprocessId - 1);
return true;
}
void FCookDirector::TickFromSchedulerThread()
{
for (auto& Worker: RemoteWorkers)
{
Worker->TickFromSchedulerThread();
}
}
void FCookDirector::PumpCookComplete(bool& bCompleted)
{
{
FScopeLock CommunicationScopeLock(&CommunicationLock);
if (!bCookCompleteSent)
{
bool bAllIdle = true;
for (auto& RemoteWorker: RemoteWorkers)
{
if (RemoteWorker->NumAssignments() > 0)
{
bAllIdle = false;
break;
}
}
if (bAllIdle)
{
for (auto& RemoteWorker: RemoteWorkers)
{
RemoteWorker->SignalCookComplete(ECookDirectorThread::SchedulerThread);
check(RemoteWorker->IsShuttingDown());
}
bCookCompleteSent = true;
}
}
bCompleted = !bWorkersActive;
}
TickFromSchedulerThread();
}
} // namespace Cook
} // namespace UE