#include "Cooker/CookDirector.h" #include "Cooker/CookWorkerServer.h" #include "Cooker/CookPackageData.h" #include "Cooker/CookPlatformManager.h" #include "Cooker/IWorkerRequests.h" #include "CookOnTheSide/CookLog.h" #include "Sockets.h" #include "SocketTypes.h" #include "SocketSubsystem.h" #include "Misc/ScopeExit.h" #include "Misc/PathViews.h" #include "GenericPlatform/GenericPlatformOutputDevices.h" #include "String/Find.h" #include "Cooker/MPCookTrace.h" namespace UE { namespace Cook { FString GetProjectEditorBinaryPath() { #if PLATFORM_WINDOWS return FPlatformProcess::ExecutablePath(); #elif PLATFORM_MAC @autoreleasepool { return UTF8_TO_TCHAR([[[NSBundle mainBundle] executablePath] fileSystemRepresentation]); } #elif PLATFORM_LINUX const TCHAR* PlatformConfig = FPlatformMisc::GetUBTPlatform(); FString ExeFileName = FPaths::EngineDir() / TEXT("Binaries") / PlatformConfig / FString(FPlatformProcess::ExecutableName()); return ExeFileName; #else #error "Unknown platform" #endif } FCookDirector::FCookDirector(UCookOnTheFlyServer& InCOTFS, int32 CookProcessCount, bool bInCookProcessCountSetByCommandLine) : RunnableShunt(*this), COTFS(InCOTFS) { // CookWorkerCount RequestedCookWorkerCount = CookProcessCount - 1; WorkerConnectPort = Sockets::COOKDIRECTOR_DEFAULT_REQUEST_CONNECTION_PORT; const TCHAR* CommandLine = FCommandLine::Get(); FParse::Value(CommandLine, TEXT("-CookDirectorListenPort="), WorkerConnectPort); } FCookDirector::~FCookDirector() { StopCommunicationThread(); } void FCookDirector::InitializeWorkers() { if (bWorkersInitialized) { return; } bWorkersInitialized = true; check(!bWorkersActive); check(!CommunicationThread); check(RemoteWorkers.Num() == 0); if (!TryCreateWorkerConnectSocket()) { return; } RemoteWorkers.Reserve(RequestedCookWorkerCount); for (int32 RemoteIndex = 0; RemoteIndex < RequestedCookWorkerCount; ++RemoteIndex) { RemoteWorkers.Add(new FCookWorkerServer(*this, FWorkerId::FromRemoteIndex(RemoteIndex))); } LocalWorkerId = FWorkerId::Local(); WorkerAssignments.Add(FWorkerAssignment{LocalWorkerId}); COTFS.WorkerRequests->DesiredWaitAssignQueueLength *= (RequestedCookWorkerCount + 1); CommandletExecutablePath = GetProjectEditorBinaryPath(); auto Platforms = COTFS.PlatformManager->GetSessionPlatforms(); InitialConfigMessage = MakeUnique(); InitialConfigMessage->ReadFromLocal(COTFS, Platforms); ShutdownEvent->Reset(); LaunchCommunicationThread(); bWorkersActive = true; } bool FCookDirector::IsMultiprocessAvailable() const { return RequestedCookWorkerCount > 0; } void FCookDirector::LaunchCommunicationThread() { if (!CommunicationThread && FPlatformProcess::SupportsMultithreading()) { CommunicationThread = FRunnableThread::Create(&RunnableShunt, TEXT("FCookDirector"), 0, TPri_Normal); } } void FCookDirector::StopCommunicationThread() { ShutdownEvent->Trigger(); if (CommunicationThread) { CommunicationThread->WaitForCompletion(); delete CommunicationThread; CommunicationThread = nullptr; } ShutdownEvent->Reset(); } uint32 FCookDirector::RunCommunicationThread() { constexpr float TickPeriod = 1.f; constexpr float MinSleepTime = 0.001f; for (;;) { double StartTime = FPlatformTime::Seconds(); TickCommunication(ECookDirectorThread::CommunicateThread); double CurrentTime = FPlatformTime::Seconds(); float RemainingDuration = StartTime + TickPeriod - CurrentTime; if (RemainingDuration > .001f) { uint32 WaitTimeMilliseconds = static_cast(RemainingDuration * 1000); if (ShutdownEvent->Wait(WaitTimeMilliseconds)) { break; } } } return 0; } void FCookDirector::TickCommunication(ECookDirectorThread TickThread) { { FScopeLock CommunicationScopeLock(&CommunicationLock); bWorkersActive = RemoteWorkers.Num() > ShuttingDownWorkers.Num(); } TickWorkerConnects(TickThread); ShuttingDownWorkers.Reset(); for (TRefCountPtr& RemoteWorker: RemoteWorkers) { if (!RemoteWorker->IsConnectionLost()) { RemoteWorker->TickCommunication(TickThread); if (RemoteWorker->IsShuttingDown()) { RemoteWorker->ShutdownRemoteProcess(); ShuttingDownWorkers.Add(RemoteWorker); } } else { ShuttingDownWorkers.Add(RemoteWorker); } } } void FCookDirector::TickWorkerConnects(ECookDirectorThread TickThread) { if (!WorkerConnectSocket) { return; } bool bHasPendingConnection; while (WorkerConnectSocket->HasPendingConnection(bHasPendingConnection) && bHasPendingConnection) { FSocket* WorkerSocket = WorkerConnectSocket->Accept(TEXT("Client Connection")); if (!WorkerSocket) { UE_LOG(LogCook, Warning, TEXT("Pending connection failed to create a ClientSocket.")); } else { FCookSocket CookSocket; CookSocket.InitSocket(WorkerSocket); FWorkerConnectMessage Message; if (!CookSocket.ReadPacket(Message)) { UE_LOG(LogCook, Warning, TEXT("Pending connection sent an invalid Connection Message. Connection will be ignored.")); continue; } if (Message.RemoteIndex < 0 || Message.RemoteIndex > RequestedCookWorkerCount) { UE_LOG(LogCook, Warning, TEXT("Pending connection sent a Connection Message with invalid RemoteIndex %d. RequestedCookWorkerCount = {%d}. Connection will be ignored."), Message.RemoteIndex, RequestedCookWorkerCount); continue; } TRefCountPtr RemoteWorkerPtr = RemoteWorkers[Message.RemoteIndex]; if (!RemoteWorkerPtr->TryHandleConnectMessage(Message, WorkerSocket)) { UE_LOG(LogCook, Warning, TEXT("Pending connection sent a Connection Message with an already in-use RemoteIndex. Connection will be ignored.")); continue; } FWorkerAssignment RemoteAssignment; RemoteAssignment.Id = FWorkerId::FromRemoteIndex(Message.RemoteIndex); WorkerAssignments.Add(RemoteAssignment); CookSocket.BlockSocket(false); CookSocket.DetachSocket(); } } } void FCookDirector::StartCook() { // We launch the CookWorkers during StartCook, so that their startup time overlaps with // work the Director has to do during the first tick, for PumpRequests and AssignRequests. FScopeLock CommunicationScopeLock(&CommunicationLock); InitializeWorkers(); } bool FCookDirector::TryCreateWorkerConnectSocket() { if (WorkerConnectSocket) { return true; } ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM); if (!SocketSubsystem) { return false; } const FString SocketDescription = TEXT("FCookDirector-WorkerConnect"); const int32 MaxBacklog = 50; WorkerConnectSocket = SocketSubsystem->CreateSocket(NAME_Stream, SocketDescription, false); if (!WorkerConnectSocket) { UE_LOG(LogCook, Error, TEXT("CookDirector could not create listen socket (%s). CookWorkers will be disabled."), *SocketDescription); return false; } TSharedPtr ConnectAuthorityAddr = SocketSubsystem->GetAddressFromString(TEXT("127.0.0.1")); ConnectAuthorityAddr->SetPort(WorkerConnectPort); if (!WorkerConnectSocket->Bind(*ConnectAuthorityAddr)) { ESocketErrors LastError = SocketSubsystem->GetLastErrorCode(); UE_LOG(LogCook, Error, TEXT("CookDirector could not bind listen socket to port %d. CookWorkers will be disabled. Last Error Code: %d"), WorkerConnectPort, LastError); SocketSubsystem->DestroySocket(WorkerConnectSocket); WorkerConnectSocket = nullptr; return false; } if (!WorkerConnectSocket->Listen(MaxBacklog)) { UE_LOG(LogCook, Error, TEXT("CookDirector listen failed on socket. CookWorkers will be disabled.")); SocketSubsystem->DestroySocket(WorkerConnectSocket); WorkerConnectSocket = nullptr; return false; } WorkerConnectAuthority = ConnectAuthorityAddr->ToString(true /* bAppendPort */); return true; } FCookDirector::FLaunchInfo FCookDirector::GetLaunchInfo(FWorkerId WorkerId) { FLaunchInfo Info; Info.ShowWorkerOption = GetShowWorkerOption(); Info.CommandletExecutable = CommandletExecutablePath; Info.WorkerCommandLine = GetWorkerCommandLine(WorkerId); return Info; } FString FCookDirector::GetWorkerCommandLine(FWorkerId WorkerId) { const TCHAR* CommandLine = FCommandLine::Get(); const TCHAR* ProjectName = FApp::GetProjectName(); checkf(ProjectName && ProjectName[0], TEXT("Expected UnrealEditor to be running with a non-empty project name")); // Note that we need to handle quoted strings for e.g. a projectfile with spaces in it; FParse::Token // does handle them FString Token; TArray Tokens; while (FParse::Token(CommandLine, Token, false /* bUseEscape */)) { if (Token.IsEmpty()) { continue; } if (Token.StartsWith(TEXT("-run=")) || Token == TEXT("-CookOnTheFly") || Token == TEXT("-CookWorker") || Token.StartsWith(TEXT("-CookCultures")) || Token.StartsWith(TEXT("-CookDirectorHost=")) || Token.StartsWith(TEXT("-MultiprocessId=")) || Token.StartsWith(TEXT("-CookProfileId=")) || Token.StartsWith(TEXT("-ShowCookWorker")) || Token.StartsWith(TEXT("-CoreLimit")) || Token.StartsWith(TEXT("-PhysicalCoreLimit")) || Token.StartsWith(TEXT("-MPCookCoreSubscription")) || Token.StartsWith(TEXT("-CookProcessCount=")) || Token.StartsWith(TEXT("-abslog=")) || Token.StartsWith(TEXT("-unattended"))) { continue; } else if (Token.StartsWith(TEXT("-tracefile="))) { FString TraceFile; FString TokenString(Token); if (FParse::Value(*TokenString, TEXT("-tracefile="), TraceFile) && !TraceFile.IsEmpty()) { FStringView BaseFilenameWithPath = FPathViews::GetBaseFilenameWithPath(TraceFile); FStringView Extension = FPathViews::GetExtension(TraceFile, true /* bIncludeDot */); Tokens.Add(FString::Printf(TEXT("-tracefile=\"%.*s_Worker%d%.*s\""), BaseFilenameWithPath.Len(), BaseFilenameWithPath.GetData(), WorkerId.GetRemoteIndex(), Extension.Len(), Extension.GetData())); continue; } } Tokens.Add(MoveTemp(Token)); } if (Tokens[0] != ProjectName && !Tokens[0].EndsWith(TEXT(".uproject"), ESearchCase::IgnoreCase)) { FString ProjectFilePath = FPaths::GetProjectFilePath(); if (!FPaths::IsSamePath(Tokens[0], ProjectFilePath)) { Tokens.Insert(ProjectFilePath, 0); } } Tokens.Insert(TEXT("-run=cook"), 1); Tokens.Insert(TEXT("-cookworker"), 2); Tokens.Insert(FString::Printf(TEXT("-MultiprocessId=%d"), WorkerId.GetMultiprocessId()), 4); // This should have been constructed in TryCreateWorkerConnectSocket before any CookWorkerServers could exist to // call GetWorkerCommandLine check(!WorkerConnectAuthority.IsEmpty()); Tokens.Add(FString::Printf(TEXT("-CookDirectorHost=%s"), *WorkerConnectAuthority)); Tokens.Add(TEXT("-unattended")); Tokens.Add(FString::Printf(TEXT("-abslog=%s"), *GetWorkerLogFileName(WorkerId))); // We are joining the tokens back into a commandline string; wrap tokens with whitespace in quotes for (FString& IterToken: Tokens) { int32 IndexOfWhitespace = UE::String::FindFirstOfAnyChar(IterToken, {' ', '\r', '\n'}); if (IndexOfWhitespace != INDEX_NONE) { int32 IndexOfQuote; if (!IterToken.FindChar('\"', IndexOfQuote)) { IterToken = FString::Printf(TEXT("\"%s\""), *IterToken); } } } return FString::Join(Tokens, TEXT(" ")); } FString FCookDirector::GetWorkerLogFileName(FWorkerId WorkerId) { FString DirectorLogFileName = FGenericPlatformOutputDevices::GetAbsoluteLogFilename(); FStringView BaseFileName = FPathViews::GetBaseFilenameWithPath(DirectorLogFileName); FStringView Extension = FPathViews::GetExtension(DirectorLogFileName, true /* bIncludeDot */); return FString::Printf(TEXT("%.*s_Worker%d%*s"), BaseFileName.Len(), BaseFileName.GetData(), WorkerId.GetRemoteIndex(), Extension.Len(), Extension.GetData()); } uint32 FCookDirector::FRunnableShunt::Run() { return Director.RunCommunicationThread(); } void FCookDirector::FRunnableShunt::Stop() { Director.ShutdownEvent->Trigger(); } FInitialConfigMessage* FCookDirector::GetInitialConfigMessage() { return InitialConfigMessage.Get(); } FWorkerId UE::Cook::FCookDirector::GetLocalWorkerId() { return LocalWorkerId; } bool FCookDirector::AssignRequests(TArrayView Batch, TArray& OutAssignments) { if (Batch.Num() == 0 || WorkerAssignments.Num() <= 1) { return false; } const float LambdaLoadPenalty = 0.2f; for (FPackageData* Request: Batch) { FWorkerAssignment* BestWorker = nullptr; float MaxDecisionScore = -TNumericLimits::Max(); TArray Dependencies; if (COTFS.AssetRegistry->GetDependencies(Request->GetPackageName(), Dependencies, UE::AssetRegistry::EDependencyCategory::Package, UE::AssetRegistry::EDependencyQuery::Hard | UE::AssetRegistry::EDependencyQuery::Game)) { for (FWorkerAssignment& Worker: WorkerAssignments) { float AffinityScore = 0.0f; int32 CurrentLoad = Worker.CurrentLoad; for (const FName& DependencyName: Dependencies) { if (Worker.PackagesInProgress.Contains(DependencyName)) { AffinityScore += 1.0f; } } float DecisionScore = AffinityScore - (LambdaLoadPenalty * CurrentLoad); if (DecisionScore > MaxDecisionScore) { MaxDecisionScore = DecisionScore; BestWorker = &Worker; } else if (DecisionScore == MaxDecisionScore && BestWorker && CurrentLoad < BestWorker->CurrentLoad) { BestWorker = &Worker; } } } if (CookTrace::IsTargetPackage(Request->GetPackageName())) { if (BestWorker->Id.IsLocal()) { BestWorker = &WorkerAssignments[1]; } } if (BestWorker) { OutAssignments.Add(BestWorker->Id); BestWorker->CurrentLoad++; BestWorker->PackagesInProgress.Add(Request->GetPackageName()); } else { OutAssignments.Add(LocalWorkerId); AssignRequestTo(Request, LocalWorkerId); } } return OutAssignments.Num() > 0; } bool FCookDirector::AssignRequestTo(FPackageData* PackageData, FWorkerId WorkerId) { for (FWorkerAssignment& Worker: WorkerAssignments) { if (Worker.Id == WorkerId) { Worker.PackagesInProgress.Add(PackageData->GetPackageName()); Worker.CurrentLoad++; return true; } } return false; } bool FCookDirector::AssignRequests(TArrayView Batch, FWorkerId WorkerId) { if (Batch.Num() == 0) { return false; } if (WorkerId.IsLocal()) { for (auto PackageData: Batch) { UPackage* ExistingPackage = (UPackage*)StaticFindObject(UPackage::StaticClass(), nullptr, *PackageData->GetPackageName().ToString()); if (ExistingPackage != nullptr && ExistingPackage->IsFullyLoaded()) { PackageData->SendToState(EPackageState::LoadReady, ESendFlags::QueueAddAndRemove); } else { PackageData->SendToState(EPackageState::LoadPrepare, ESendFlags::QueueAddAndRemove); } } } else { for (const FPackageData* PackageData: Batch) { if (CookTrace::IsTargetPackage(PackageData->GetPackageName())) { UE_LOG(LogMPCookTrace, Display, TEXT("[Director] Assigning package %s to Worker #%d."), *PackageData->GetPackageName().ToString(), WorkerId.GetRemoteIndex()); } } TRefCountPtr RemoteWorker = RemoteWorkers[WorkerId.GetRemoteIndex()]; RemoteWorker->AppendAssignments(Batch, ECookDirectorThread::SchedulerThread); } return true; } bool FDirectorConnectionInfo::TryParseCommandLine() { if (!FParse::Value(FCommandLine::Get(), TEXT("-CookDirectorHost="), HostURI)) { UE_LOG(LogCook, Error, TEXT("CookWorker startup failed: no CookDirector specified on commandline.")); return false; } uint32 MultiprocessId = 0; if (!FParse::Value(FCommandLine::Get(), TEXT("-MultiprocessId="), MultiprocessId)) { UE_LOG(LogCook, Error, TEXT("CookWorker startup failed: no MultiprocessId specified on commandline.")); return false; } if (MultiprocessId < 1 || 257 <= MultiprocessId) { UE_LOG(LogCook, Error, TEXT("CookWorker startup failed: commandline had invalid -MultiprocessId=%d; MultiprocessId must be in the range [1, 256]."), MultiprocessId); return false; } RemoteIndex = static_cast(MultiprocessId - 1); return true; } void FCookDirector::TickFromSchedulerThread() { for (auto& Worker: RemoteWorkers) { Worker->TickFromSchedulerThread(); } } void FCookDirector::PumpCookComplete(bool& bCompleted) { { FScopeLock CommunicationScopeLock(&CommunicationLock); if (!bCookCompleteSent) { bool bAllIdle = true; for (auto& RemoteWorker: RemoteWorkers) { if (RemoteWorker->NumAssignments() > 0) { bAllIdle = false; break; } } if (bAllIdle) { for (auto& RemoteWorker: RemoteWorkers) { RemoteWorker->SignalCookComplete(ECookDirectorThread::SchedulerThread); check(RemoteWorker->IsShuttingDown()); } bCookCompleteSent = true; } } bCompleted = !bWorkersActive; } TickFromSchedulerThread(); } } // namespace Cook } // namespace UE