562 lines
19 KiB
C++
562 lines
19 KiB
C++
|
|
#include "Cooker/CookDirector.h"
|
||
|
|
#include "Cooker/CookWorkerServer.h"
|
||
|
|
#include "Cooker/CookPackageData.h"
|
||
|
|
#include "Cooker/CookPlatformManager.h"
|
||
|
|
#include "Cooker/IWorkerRequests.h"
|
||
|
|
#include "CookOnTheSide/CookLog.h"
|
||
|
|
#include "Sockets.h"
|
||
|
|
#include "SocketTypes.h"
|
||
|
|
#include "SocketSubsystem.h"
|
||
|
|
#include "Misc/ScopeExit.h"
|
||
|
|
#include "Misc/PathViews.h"
|
||
|
|
#include "GenericPlatform/GenericPlatformOutputDevices.h"
|
||
|
|
#include "String/Find.h"
|
||
|
|
#include "Cooker/MPCookTrace.h"
|
||
|
|
|
||
|
|
namespace UE
|
||
|
|
{
|
||
|
|
namespace Cook
|
||
|
|
{
|
||
|
|
|
||
|
|
FString GetProjectEditorBinaryPath()
|
||
|
|
{
|
||
|
|
#if PLATFORM_WINDOWS
|
||
|
|
return FPlatformProcess::ExecutablePath();
|
||
|
|
#elif PLATFORM_MAC
|
||
|
|
@autoreleasepool
|
||
|
|
{
|
||
|
|
return UTF8_TO_TCHAR([[[NSBundle mainBundle] executablePath] fileSystemRepresentation]);
|
||
|
|
}
|
||
|
|
#elif PLATFORM_LINUX
|
||
|
|
const TCHAR* PlatformConfig = FPlatformMisc::GetUBTPlatform();
|
||
|
|
FString ExeFileName = FPaths::EngineDir() / TEXT("Binaries") / PlatformConfig / FString(FPlatformProcess::ExecutableName());
|
||
|
|
return ExeFileName;
|
||
|
|
#else
|
||
|
|
#error "Unknown platform"
|
||
|
|
#endif
|
||
|
|
}
|
||
|
|
|
||
|
|
FCookDirector::FCookDirector(UCookOnTheFlyServer& InCOTFS, int32 CookProcessCount, bool bInCookProcessCountSetByCommandLine)
|
||
|
|
: RunnableShunt(*this),
|
||
|
|
COTFS(InCOTFS)
|
||
|
|
{
|
||
|
|
// CookWorkerCount
|
||
|
|
RequestedCookWorkerCount = CookProcessCount - 1;
|
||
|
|
WorkerConnectPort = Sockets::COOKDIRECTOR_DEFAULT_REQUEST_CONNECTION_PORT;
|
||
|
|
const TCHAR* CommandLine = FCommandLine::Get();
|
||
|
|
FParse::Value(CommandLine, TEXT("-CookDirectorListenPort="), WorkerConnectPort);
|
||
|
|
}
|
||
|
|
|
||
|
|
FCookDirector::~FCookDirector()
|
||
|
|
{
|
||
|
|
StopCommunicationThread();
|
||
|
|
}
|
||
|
|
|
||
|
|
void FCookDirector::InitializeWorkers()
|
||
|
|
{
|
||
|
|
if (bWorkersInitialized)
|
||
|
|
{
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
bWorkersInitialized = true;
|
||
|
|
|
||
|
|
check(!bWorkersActive);
|
||
|
|
check(!CommunicationThread);
|
||
|
|
check(RemoteWorkers.Num() == 0);
|
||
|
|
if (!TryCreateWorkerConnectSocket())
|
||
|
|
{
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
RemoteWorkers.Reserve(RequestedCookWorkerCount);
|
||
|
|
for (int32 RemoteIndex = 0; RemoteIndex < RequestedCookWorkerCount; ++RemoteIndex)
|
||
|
|
{
|
||
|
|
RemoteWorkers.Add(new FCookWorkerServer(*this, FWorkerId::FromRemoteIndex(RemoteIndex)));
|
||
|
|
}
|
||
|
|
|
||
|
|
LocalWorkerId = FWorkerId::Local();
|
||
|
|
WorkerAssignments.Add(FWorkerAssignment{LocalWorkerId});
|
||
|
|
|
||
|
|
COTFS.WorkerRequests->DesiredWaitAssignQueueLength *= (RequestedCookWorkerCount + 1);
|
||
|
|
|
||
|
|
CommandletExecutablePath = GetProjectEditorBinaryPath();
|
||
|
|
|
||
|
|
auto Platforms = COTFS.PlatformManager->GetSessionPlatforms();
|
||
|
|
InitialConfigMessage = MakeUnique<FInitialConfigMessage>();
|
||
|
|
InitialConfigMessage->ReadFromLocal(COTFS, Platforms);
|
||
|
|
|
||
|
|
ShutdownEvent->Reset();
|
||
|
|
LaunchCommunicationThread();
|
||
|
|
bWorkersActive = true;
|
||
|
|
}
|
||
|
|
bool FCookDirector::IsMultiprocessAvailable() const
|
||
|
|
{
|
||
|
|
return RequestedCookWorkerCount > 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
void FCookDirector::LaunchCommunicationThread()
|
||
|
|
{
|
||
|
|
if (!CommunicationThread && FPlatformProcess::SupportsMultithreading())
|
||
|
|
{
|
||
|
|
CommunicationThread = FRunnableThread::Create(&RunnableShunt, TEXT("FCookDirector"), 0, TPri_Normal);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void FCookDirector::StopCommunicationThread()
|
||
|
|
{
|
||
|
|
ShutdownEvent->Trigger();
|
||
|
|
if (CommunicationThread)
|
||
|
|
{
|
||
|
|
CommunicationThread->WaitForCompletion();
|
||
|
|
delete CommunicationThread;
|
||
|
|
CommunicationThread = nullptr;
|
||
|
|
}
|
||
|
|
ShutdownEvent->Reset();
|
||
|
|
}
|
||
|
|
|
||
|
|
uint32 FCookDirector::RunCommunicationThread()
|
||
|
|
{
|
||
|
|
constexpr float TickPeriod = 1.f;
|
||
|
|
constexpr float MinSleepTime = 0.001f;
|
||
|
|
for (;;)
|
||
|
|
{
|
||
|
|
double StartTime = FPlatformTime::Seconds();
|
||
|
|
TickCommunication(ECookDirectorThread::CommunicateThread);
|
||
|
|
|
||
|
|
double CurrentTime = FPlatformTime::Seconds();
|
||
|
|
float RemainingDuration = StartTime + TickPeriod - CurrentTime;
|
||
|
|
if (RemainingDuration > .001f)
|
||
|
|
{
|
||
|
|
uint32 WaitTimeMilliseconds = static_cast<uint32>(RemainingDuration * 1000);
|
||
|
|
if (ShutdownEvent->Wait(WaitTimeMilliseconds))
|
||
|
|
{
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return 0;
|
||
|
|
}
|
||
|
|
void FCookDirector::TickCommunication(ECookDirectorThread TickThread)
|
||
|
|
{
|
||
|
|
{
|
||
|
|
FScopeLock CommunicationScopeLock(&CommunicationLock);
|
||
|
|
bWorkersActive = RemoteWorkers.Num() > ShuttingDownWorkers.Num();
|
||
|
|
}
|
||
|
|
|
||
|
|
TickWorkerConnects(TickThread);
|
||
|
|
ShuttingDownWorkers.Reset();
|
||
|
|
for (TRefCountPtr<FCookWorkerServer>& RemoteWorker: RemoteWorkers)
|
||
|
|
{
|
||
|
|
if (!RemoteWorker->IsConnectionLost())
|
||
|
|
{
|
||
|
|
RemoteWorker->TickCommunication(TickThread);
|
||
|
|
if (RemoteWorker->IsShuttingDown())
|
||
|
|
{
|
||
|
|
RemoteWorker->ShutdownRemoteProcess();
|
||
|
|
ShuttingDownWorkers.Add(RemoteWorker);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
else
|
||
|
|
{
|
||
|
|
ShuttingDownWorkers.Add(RemoteWorker);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void FCookDirector::TickWorkerConnects(ECookDirectorThread TickThread)
|
||
|
|
{
|
||
|
|
if (!WorkerConnectSocket)
|
||
|
|
{
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool bHasPendingConnection;
|
||
|
|
while (WorkerConnectSocket->HasPendingConnection(bHasPendingConnection) && bHasPendingConnection)
|
||
|
|
{
|
||
|
|
FSocket* WorkerSocket = WorkerConnectSocket->Accept(TEXT("Client Connection"));
|
||
|
|
if (!WorkerSocket)
|
||
|
|
{
|
||
|
|
UE_LOG(LogCook, Warning, TEXT("Pending connection failed to create a ClientSocket."));
|
||
|
|
}
|
||
|
|
else
|
||
|
|
{
|
||
|
|
FCookSocket CookSocket;
|
||
|
|
CookSocket.InitSocket(WorkerSocket);
|
||
|
|
FWorkerConnectMessage Message;
|
||
|
|
|
||
|
|
if (!CookSocket.ReadPacket(Message))
|
||
|
|
{
|
||
|
|
UE_LOG(LogCook, Warning, TEXT("Pending connection sent an invalid Connection Message. Connection will be ignored."));
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
|
||
|
|
if (Message.RemoteIndex < 0 || Message.RemoteIndex > RequestedCookWorkerCount)
|
||
|
|
{
|
||
|
|
UE_LOG(LogCook, Warning, TEXT("Pending connection sent a Connection Message with invalid RemoteIndex %d. RequestedCookWorkerCount = {%d}. Connection will be ignored."), Message.RemoteIndex, RequestedCookWorkerCount);
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
|
||
|
|
TRefCountPtr<FCookWorkerServer> RemoteWorkerPtr = RemoteWorkers[Message.RemoteIndex];
|
||
|
|
if (!RemoteWorkerPtr->TryHandleConnectMessage(Message, WorkerSocket))
|
||
|
|
{
|
||
|
|
UE_LOG(LogCook, Warning, TEXT("Pending connection sent a Connection Message with an already in-use RemoteIndex. Connection will be ignored."));
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
FWorkerAssignment RemoteAssignment;
|
||
|
|
RemoteAssignment.Id = FWorkerId::FromRemoteIndex(Message.RemoteIndex);
|
||
|
|
WorkerAssignments.Add(RemoteAssignment);
|
||
|
|
CookSocket.BlockSocket(false);
|
||
|
|
CookSocket.DetachSocket();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void FCookDirector::StartCook()
|
||
|
|
{
|
||
|
|
// We launch the CookWorkers during StartCook, so that their startup time overlaps with
|
||
|
|
// work the Director has to do during the first tick, for PumpRequests and AssignRequests.
|
||
|
|
FScopeLock CommunicationScopeLock(&CommunicationLock);
|
||
|
|
InitializeWorkers();
|
||
|
|
}
|
||
|
|
|
||
|
|
bool FCookDirector::TryCreateWorkerConnectSocket()
|
||
|
|
{
|
||
|
|
if (WorkerConnectSocket)
|
||
|
|
{
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM);
|
||
|
|
if (!SocketSubsystem)
|
||
|
|
{
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
const FString SocketDescription = TEXT("FCookDirector-WorkerConnect");
|
||
|
|
const int32 MaxBacklog = 50;
|
||
|
|
WorkerConnectSocket = SocketSubsystem->CreateSocket(NAME_Stream, SocketDescription, false);
|
||
|
|
|
||
|
|
if (!WorkerConnectSocket)
|
||
|
|
{
|
||
|
|
UE_LOG(LogCook, Error, TEXT("CookDirector could not create listen socket (%s). CookWorkers will be disabled."), *SocketDescription);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
|
||
|
|
TSharedPtr<FInternetAddr> ConnectAuthorityAddr = SocketSubsystem->GetAddressFromString(TEXT("127.0.0.1"));
|
||
|
|
ConnectAuthorityAddr->SetPort(WorkerConnectPort);
|
||
|
|
|
||
|
|
if (!WorkerConnectSocket->Bind(*ConnectAuthorityAddr))
|
||
|
|
{
|
||
|
|
ESocketErrors LastError = SocketSubsystem->GetLastErrorCode();
|
||
|
|
|
||
|
|
UE_LOG(LogCook, Error, TEXT("CookDirector could not bind listen socket to port %d. CookWorkers will be disabled. Last Error Code: %d"), WorkerConnectPort, LastError);
|
||
|
|
SocketSubsystem->DestroySocket(WorkerConnectSocket);
|
||
|
|
WorkerConnectSocket = nullptr;
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
|
||
|
|
if (!WorkerConnectSocket->Listen(MaxBacklog))
|
||
|
|
{
|
||
|
|
UE_LOG(LogCook, Error, TEXT("CookDirector listen failed on socket. CookWorkers will be disabled."));
|
||
|
|
SocketSubsystem->DestroySocket(WorkerConnectSocket);
|
||
|
|
WorkerConnectSocket = nullptr;
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
WorkerConnectAuthority = ConnectAuthorityAddr->ToString(true /* bAppendPort */);
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
FCookDirector::FLaunchInfo FCookDirector::GetLaunchInfo(FWorkerId WorkerId)
|
||
|
|
{
|
||
|
|
FLaunchInfo Info;
|
||
|
|
Info.ShowWorkerOption = GetShowWorkerOption();
|
||
|
|
Info.CommandletExecutable = CommandletExecutablePath;
|
||
|
|
Info.WorkerCommandLine = GetWorkerCommandLine(WorkerId);
|
||
|
|
return Info;
|
||
|
|
}
|
||
|
|
|
||
|
|
FString FCookDirector::GetWorkerCommandLine(FWorkerId WorkerId)
|
||
|
|
{
|
||
|
|
const TCHAR* CommandLine = FCommandLine::Get();
|
||
|
|
|
||
|
|
const TCHAR* ProjectName = FApp::GetProjectName();
|
||
|
|
checkf(ProjectName && ProjectName[0], TEXT("Expected UnrealEditor to be running with a non-empty project name"));
|
||
|
|
|
||
|
|
// Note that we need to handle quoted strings for e.g. a projectfile with spaces in it; FParse::Token
|
||
|
|
// does handle them
|
||
|
|
FString Token;
|
||
|
|
TArray<FString> Tokens;
|
||
|
|
while (FParse::Token(CommandLine, Token, false /* bUseEscape */))
|
||
|
|
{
|
||
|
|
if (Token.IsEmpty())
|
||
|
|
{
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
if (Token.StartsWith(TEXT("-run=")) ||
|
||
|
|
Token == TEXT("-CookOnTheFly") ||
|
||
|
|
Token == TEXT("-CookWorker") ||
|
||
|
|
Token.StartsWith(TEXT("-CookCultures")) ||
|
||
|
|
Token.StartsWith(TEXT("-CookDirectorHost=")) ||
|
||
|
|
Token.StartsWith(TEXT("-MultiprocessId=")) ||
|
||
|
|
Token.StartsWith(TEXT("-CookProfileId=")) ||
|
||
|
|
Token.StartsWith(TEXT("-ShowCookWorker")) ||
|
||
|
|
Token.StartsWith(TEXT("-CoreLimit")) ||
|
||
|
|
Token.StartsWith(TEXT("-PhysicalCoreLimit")) ||
|
||
|
|
Token.StartsWith(TEXT("-MPCookCoreSubscription")) ||
|
||
|
|
Token.StartsWith(TEXT("-CookProcessCount=")) ||
|
||
|
|
Token.StartsWith(TEXT("-abslog=")) ||
|
||
|
|
Token.StartsWith(TEXT("-unattended")))
|
||
|
|
{
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
else if (Token.StartsWith(TEXT("-tracefile=")))
|
||
|
|
{
|
||
|
|
FString TraceFile;
|
||
|
|
FString TokenString(Token);
|
||
|
|
if (FParse::Value(*TokenString, TEXT("-tracefile="), TraceFile) && !TraceFile.IsEmpty())
|
||
|
|
{
|
||
|
|
FStringView BaseFilenameWithPath = FPathViews::GetBaseFilenameWithPath(TraceFile);
|
||
|
|
FStringView Extension = FPathViews::GetExtension(TraceFile, true /* bIncludeDot */);
|
||
|
|
Tokens.Add(FString::Printf(TEXT("-tracefile=\"%.*s_Worker%d%.*s\""), BaseFilenameWithPath.Len(), BaseFilenameWithPath.GetData(), WorkerId.GetRemoteIndex(), Extension.Len(), Extension.GetData()));
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
Tokens.Add(MoveTemp(Token));
|
||
|
|
}
|
||
|
|
|
||
|
|
if (Tokens[0] != ProjectName && !Tokens[0].EndsWith(TEXT(".uproject"), ESearchCase::IgnoreCase))
|
||
|
|
{
|
||
|
|
FString ProjectFilePath = FPaths::GetProjectFilePath();
|
||
|
|
if (!FPaths::IsSamePath(Tokens[0], ProjectFilePath))
|
||
|
|
{
|
||
|
|
Tokens.Insert(ProjectFilePath, 0);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
Tokens.Insert(TEXT("-run=cook"), 1);
|
||
|
|
Tokens.Insert(TEXT("-cookworker"), 2);
|
||
|
|
Tokens.Insert(FString::Printf(TEXT("-MultiprocessId=%d"), WorkerId.GetMultiprocessId()), 4);
|
||
|
|
// This should have been constructed in TryCreateWorkerConnectSocket before any CookWorkerServers could exist to
|
||
|
|
// call GetWorkerCommandLine
|
||
|
|
check(!WorkerConnectAuthority.IsEmpty());
|
||
|
|
Tokens.Add(FString::Printf(TEXT("-CookDirectorHost=%s"), *WorkerConnectAuthority));
|
||
|
|
Tokens.Add(TEXT("-unattended"));
|
||
|
|
Tokens.Add(FString::Printf(TEXT("-abslog=%s"), *GetWorkerLogFileName(WorkerId)));
|
||
|
|
|
||
|
|
// We are joining the tokens back into a commandline string; wrap tokens with whitespace in quotes
|
||
|
|
for (FString& IterToken: Tokens)
|
||
|
|
{
|
||
|
|
int32 IndexOfWhitespace = UE::String::FindFirstOfAnyChar(IterToken, {' ', '\r', '\n'});
|
||
|
|
if (IndexOfWhitespace != INDEX_NONE)
|
||
|
|
{
|
||
|
|
int32 IndexOfQuote;
|
||
|
|
if (!IterToken.FindChar('\"', IndexOfQuote))
|
||
|
|
{
|
||
|
|
IterToken = FString::Printf(TEXT("\"%s\""), *IterToken);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return FString::Join(Tokens, TEXT(" "));
|
||
|
|
}
|
||
|
|
|
||
|
|
FString FCookDirector::GetWorkerLogFileName(FWorkerId WorkerId)
|
||
|
|
{
|
||
|
|
FString DirectorLogFileName = FGenericPlatformOutputDevices::GetAbsoluteLogFilename();
|
||
|
|
FStringView BaseFileName = FPathViews::GetBaseFilenameWithPath(DirectorLogFileName);
|
||
|
|
FStringView Extension = FPathViews::GetExtension(DirectorLogFileName, true /* bIncludeDot */);
|
||
|
|
return FString::Printf(TEXT("%.*s_Worker%d%*s"), BaseFileName.Len(), BaseFileName.GetData(), WorkerId.GetRemoteIndex(), Extension.Len(), Extension.GetData());
|
||
|
|
}
|
||
|
|
|
||
|
|
uint32 FCookDirector::FRunnableShunt::Run()
|
||
|
|
{
|
||
|
|
return Director.RunCommunicationThread();
|
||
|
|
}
|
||
|
|
|
||
|
|
void FCookDirector::FRunnableShunt::Stop()
|
||
|
|
{
|
||
|
|
Director.ShutdownEvent->Trigger();
|
||
|
|
}
|
||
|
|
|
||
|
|
FInitialConfigMessage* FCookDirector::GetInitialConfigMessage()
|
||
|
|
{
|
||
|
|
return InitialConfigMessage.Get();
|
||
|
|
}
|
||
|
|
|
||
|
|
FWorkerId UE::Cook::FCookDirector::GetLocalWorkerId()
|
||
|
|
{
|
||
|
|
return LocalWorkerId;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool FCookDirector::AssignRequests(TArrayView<FPackageData*> Batch, TArray<FWorkerId>& OutAssignments)
|
||
|
|
{
|
||
|
|
if (Batch.Num() == 0 || WorkerAssignments.Num() <= 1)
|
||
|
|
{
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
const float LambdaLoadPenalty = 0.2f;
|
||
|
|
|
||
|
|
for (FPackageData* Request: Batch)
|
||
|
|
{
|
||
|
|
FWorkerAssignment* BestWorker = nullptr;
|
||
|
|
float MaxDecisionScore = -TNumericLimits<float>::Max();
|
||
|
|
TArray<FName> Dependencies;
|
||
|
|
if (COTFS.AssetRegistry->GetDependencies(Request->GetPackageName(), Dependencies,
|
||
|
|
UE::AssetRegistry::EDependencyCategory::Package,
|
||
|
|
UE::AssetRegistry::EDependencyQuery::Hard | UE::AssetRegistry::EDependencyQuery::Game))
|
||
|
|
{
|
||
|
|
for (FWorkerAssignment& Worker: WorkerAssignments)
|
||
|
|
{
|
||
|
|
float AffinityScore = 0.0f;
|
||
|
|
int32 CurrentLoad = Worker.CurrentLoad;
|
||
|
|
for (const FName& DependencyName: Dependencies)
|
||
|
|
{
|
||
|
|
if (Worker.PackagesInProgress.Contains(DependencyName))
|
||
|
|
{
|
||
|
|
AffinityScore += 1.0f;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
float DecisionScore = AffinityScore - (LambdaLoadPenalty * CurrentLoad);
|
||
|
|
if (DecisionScore > MaxDecisionScore)
|
||
|
|
{
|
||
|
|
MaxDecisionScore = DecisionScore;
|
||
|
|
BestWorker = &Worker;
|
||
|
|
}
|
||
|
|
else if (DecisionScore == MaxDecisionScore && BestWorker && CurrentLoad < BestWorker->CurrentLoad)
|
||
|
|
{
|
||
|
|
BestWorker = &Worker;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (CookTrace::IsTargetPackage(Request->GetPackageName()))
|
||
|
|
{
|
||
|
|
if (BestWorker->Id.IsLocal())
|
||
|
|
{
|
||
|
|
BestWorker = &WorkerAssignments[1];
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (BestWorker)
|
||
|
|
{
|
||
|
|
OutAssignments.Add(BestWorker->Id);
|
||
|
|
BestWorker->CurrentLoad++;
|
||
|
|
BestWorker->PackagesInProgress.Add(Request->GetPackageName());
|
||
|
|
}
|
||
|
|
else
|
||
|
|
{
|
||
|
|
OutAssignments.Add(LocalWorkerId);
|
||
|
|
AssignRequestTo(Request, LocalWorkerId);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return OutAssignments.Num() > 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool FCookDirector::AssignRequestTo(FPackageData* PackageData, FWorkerId WorkerId)
|
||
|
|
{
|
||
|
|
for (FWorkerAssignment& Worker: WorkerAssignments)
|
||
|
|
{
|
||
|
|
if (Worker.Id == WorkerId)
|
||
|
|
{
|
||
|
|
Worker.PackagesInProgress.Add(PackageData->GetPackageName());
|
||
|
|
Worker.CurrentLoad++;
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool FCookDirector::AssignRequests(TArrayView<FPackageData*> Batch, FWorkerId WorkerId)
|
||
|
|
{
|
||
|
|
if (Batch.Num() == 0)
|
||
|
|
{
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
if (WorkerId.IsLocal())
|
||
|
|
{
|
||
|
|
for (auto PackageData: Batch)
|
||
|
|
{
|
||
|
|
UPackage* ExistingPackage = (UPackage*)StaticFindObject(UPackage::StaticClass(), nullptr, *PackageData->GetPackageName().ToString());
|
||
|
|
if (ExistingPackage != nullptr && ExistingPackage->IsFullyLoaded())
|
||
|
|
{
|
||
|
|
PackageData->SendToState(EPackageState::LoadReady, ESendFlags::QueueAddAndRemove);
|
||
|
|
}
|
||
|
|
else
|
||
|
|
{
|
||
|
|
PackageData->SendToState(EPackageState::LoadPrepare, ESendFlags::QueueAddAndRemove);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
else
|
||
|
|
{
|
||
|
|
for (const FPackageData* PackageData: Batch)
|
||
|
|
{
|
||
|
|
if (CookTrace::IsTargetPackage(PackageData->GetPackageName()))
|
||
|
|
{
|
||
|
|
UE_LOG(LogMPCookTrace, Display, TEXT("[Director] Assigning package %s to Worker #%d."), *PackageData->GetPackageName().ToString(), WorkerId.GetRemoteIndex());
|
||
|
|
}
|
||
|
|
}
|
||
|
|
TRefCountPtr<FCookWorkerServer> RemoteWorker = RemoteWorkers[WorkerId.GetRemoteIndex()];
|
||
|
|
RemoteWorker->AppendAssignments(Batch, ECookDirectorThread::SchedulerThread);
|
||
|
|
}
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool FDirectorConnectionInfo::TryParseCommandLine()
|
||
|
|
{
|
||
|
|
if (!FParse::Value(FCommandLine::Get(), TEXT("-CookDirectorHost="), HostURI))
|
||
|
|
{
|
||
|
|
UE_LOG(LogCook, Error, TEXT("CookWorker startup failed: no CookDirector specified on commandline."));
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
uint32 MultiprocessId = 0;
|
||
|
|
if (!FParse::Value(FCommandLine::Get(), TEXT("-MultiprocessId="), MultiprocessId))
|
||
|
|
{
|
||
|
|
UE_LOG(LogCook, Error, TEXT("CookWorker startup failed: no MultiprocessId specified on commandline."));
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
if (MultiprocessId < 1 || 257 <= MultiprocessId)
|
||
|
|
{
|
||
|
|
UE_LOG(LogCook, Error,
|
||
|
|
TEXT("CookWorker startup failed: commandline had invalid -MultiprocessId=%d; MultiprocessId must be in the range [1, 256]."),
|
||
|
|
MultiprocessId);
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
RemoteIndex = static_cast<int32>(MultiprocessId - 1);
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
void FCookDirector::TickFromSchedulerThread()
|
||
|
|
{
|
||
|
|
for (auto& Worker: RemoteWorkers)
|
||
|
|
{
|
||
|
|
Worker->TickFromSchedulerThread();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void FCookDirector::PumpCookComplete(bool& bCompleted)
|
||
|
|
{
|
||
|
|
{
|
||
|
|
FScopeLock CommunicationScopeLock(&CommunicationLock);
|
||
|
|
if (!bCookCompleteSent)
|
||
|
|
{
|
||
|
|
bool bAllIdle = true;
|
||
|
|
for (auto& RemoteWorker: RemoteWorkers)
|
||
|
|
{
|
||
|
|
if (RemoteWorker->NumAssignments() > 0)
|
||
|
|
{
|
||
|
|
bAllIdle = false;
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (bAllIdle)
|
||
|
|
{
|
||
|
|
for (auto& RemoteWorker: RemoteWorkers)
|
||
|
|
{
|
||
|
|
RemoteWorker->SignalCookComplete(ECookDirectorThread::SchedulerThread);
|
||
|
|
check(RemoteWorker->IsShuttingDown());
|
||
|
|
}
|
||
|
|
bCookCompleteSent = true;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
bCompleted = !bWorkersActive;
|
||
|
|
}
|
||
|
|
TickFromSchedulerThread();
|
||
|
|
}
|
||
|
|
} // namespace Cook
|
||
|
|
} // namespace UE
|