#include "CookWorkerServer.h" #include "CookDirector.h" #include "IWorkerRequests.h" #include "CookPackageData.h" #include "Sockets.h" #include "SocketTypes.h" #include "AssetRegistryState.h" #include "Cooker/CookPlatformManager.h" #include "Commandlets/AssetRegistryGenerator.h" #include "MPCookTrace.h" namespace UE { namespace Cook { FCookWorkerServer::FCookWorkerServer(FCookDirector& InDirector, FWorkerId InWorkerId) : Director(InDirector), COTFS(InDirector.COTFS), WorkerId(InWorkerId) { } FCookWorkerServer::~FCookWorkerServer() { DetachFromRemoteProcess(EWorkerDetachType::StillRunning); } bool FCookWorkerServer::IsShuttingDown() const { FScopeLock CommunicationScopeLock(&CommunicationLock); return ConnectStatus == EConnectStatus::ShuttingDown || ConnectStatus == EConnectStatus::LostConnection; } bool FCookWorkerServer::IsConnectionLost() const { FScopeLock CommunicationScopeLock(&CommunicationLock); return ConnectStatus == EConnectStatus::LostConnection; } void FCookWorkerServer::TickCommunication(ECookDirectorThread TickThread) { for (;;) { switch (ConnectStatus) { case EConnectStatus::Uninitialized: LaunchProcess(); break; case EConnectStatus::WaitForConnect: return; // Try again later case EConnectStatus::Connected: PumpReceiveMessages(); if (ConnectStatus == EConnectStatus::Connected) { PumpSendMessages(); return; // Tick duties complete; yield the tick } break; case EConnectStatus::ShuttingDown: { PumpReceiveMessages(); if (ConnectStatus == EConnectStatus::ShuttingDown) { constexpr float WaitForShutdownTimeout = 60.f; if (FPlatformTime::Seconds() - ConnectStartTimeSeconds <= WaitForShutdownTimeout) { return; // Try again later } UE_LOG(LogCook, Error, TEXT("CookWorkerServer %d timed out waiting for shutdown acknowledge. Terminating connection."), WorkerId.GetRemoteIndex()); bNeedCrashDiagnostics = true; CrashDiagnosticsError = FString::Printf(TEXT("CookWorkerCrash: Timed out during shutdown for CookWorker %d."), WorkerId.GetRemoteIndex()); SendToState(EConnectStatus::LostConnection); } break; } case EConnectStatus::LostConnection: return; // Nothing further to do default: checkNoEntry(); return; } } } void FCookWorkerServer::LaunchProcess() { FCookDirector::FLaunchInfo LaunchInfo = Director.GetLaunchInfo(WorkerId); bool bShowCookWorkers = LaunchInfo.ShowWorkerOption == FCookDirector::EShowWorker::SeparateWindows; CookWorkerHandle = FPlatformProcess::CreateProc(*LaunchInfo.CommandletExecutable, *LaunchInfo.WorkerCommandLine, true /* bLaunchDetached */, !bShowCookWorkers /* bLaunchHidden */, !bShowCookWorkers /* bLaunchReallyHidden */, &CookWorkerProcessId, 0 /* PriorityModifier */, *FPaths::GetPath(LaunchInfo.CommandletExecutable), nullptr /* PipeWriteChild */); if (CookWorkerHandle.IsValid()) { UE_LOG(LogCook, Display, TEXT("CookWorkerServer launched CookWorker as WorkerId %d and PID %u with commandline \"%s\"."), WorkerId.GetRemoteIndex(), CookWorkerProcessId, *LaunchInfo.WorkerCommandLine); SendToState(EConnectStatus::WaitForConnect); } else { // GetLastError information was logged by CreateProc CrashDiagnosticsError = FString::Printf(TEXT("CookWorkerCrash: Failed to create process for CookWorker %d. Assigned packages will be returned to the director."), WorkerId.GetRemoteIndex()); bNeedCrashDiagnostics = true; SendToState(EConnectStatus::LostConnection); } } void FCookWorkerServer::SendToState(EConnectStatus TargetStatus) { switch (TargetStatus) { case EConnectStatus::WaitForConnect: ConnectStartTimeSeconds = FPlatformTime::Seconds(); break; case EConnectStatus::ShuttingDown: ConnectStartTimeSeconds = FPlatformTime::Seconds(); break; case EConnectStatus::LostConnection: if (AssignedPackages.Num() > 0) { UE_LOG(LogCook, Warning, TEXT("CookWorkerServer %d is disconnecting with %d outstanding packages. Returning them to the director."), WorkerId.GetRemoteIndex(), AssignedPackages.Num()); ReassignPackages(); } DetachFromRemoteProcess(bNeedCrashDiagnostics ? EWorkerDetachType::Crashed : EWorkerDetachType::Dismissed); break; default: break; } ConnectStatus = TargetStatus; } void FCookWorkerServer::DetachFromRemoteProcess(EWorkerDetachType DetachType) { CloseSocket(); CookWorkerProcessId = 0; if (bNeedCrashDiagnostics) { bNeedCrashDiagnostics = false; UE_LOG(LogCook, Display, TEXT("SendCrashDiagnostics %s"), *CrashDiagnosticsError); } } void FCookWorkerServer::ShutdownRemoteProcess() { if (CookWorkerHandle.IsValid()) { FScopeLock CommunicationScopeLock(&CommunicationLock); if (ConnectStatus == EConnectStatus::Connected || ConnectStatus == EConnectStatus::ShuttingDown) { PumpReceiveMessages(); } if (PendingMessages.Num() > 0) { HandleReceiveMessages(); } FPlatformProcess::TerminateProc(CookWorkerHandle, /* bKillTree */ true); CookWorkerHandle = FProcHandle(); DetachFromRemoteProcess(EWorkerDetachType::ForceTerminated); SendToState(EConnectStatus::LostConnection); } } bool FCookWorkerServer::TryHandleConnectMessage(FWorkerConnectMessage& Message, FSocket* InSocket) { if (ConnectStatus != EConnectStatus::WaitForConnect) { return false; } InitSocket(InSocket); SendToState(EConnectStatus::Connected); UE_LOG(LogCook, Display, TEXT("CookWorker %d connected after %.3fs."), WorkerId.GetRemoteIndex(), static_cast(FPlatformTime::Seconds() - ConnectStartTimeSeconds)); int32 ReserveNum = (COTFS.WorkerRequests->GetNumRequests() + COTFS.PackageDatas->GetMonitor().GetNumInProgress()) / (COTFS.CookDirector->GetWorkerCount()); AssignedPackages.Reserve(ReserveNum); FInitialConfigMessage* InitialConfigMessage = Director.GetInitialConfigMessage(); InitialConfigMessage->WriteToTargetPlatform(OrderedSessionPlatforms); return InitialConfigMessage ? WritePacket(*InitialConfigMessage) : false; } void FCookWorkerServer::AppendAssignments(TArrayView Assignments, ECookDirectorThread TickThread) { ++SyncFenceVersion; PackagesToAssign.Append(Assignments.GetData(), Assignments.Num()); } void FCookWorkerServer::TickFromSchedulerThread() { HandleSendMessages(); if (PendingMessages.Num() > 0) { HandleReceiveMessages(); } } void FCookWorkerServer::PumpSendMessages() { if (SendSyncFenceVersion) { FSyncFenceMessage SyncFenceMessage; { FScopeLock Lock(&MessagesLock); SyncFenceMessage.FenceVersion = SyncFenceVersion; SendSyncFenceVersion = 0; } WriteMarshalledPacket(SyncFenceMessage); } if (PendingAssignMessage.Packages.Num() > 0) { { FScopeLock Lock(&MessagesLock); PendingAssignMessage.MessageSliceTo(FCookSocket::MaxSendPackageNum, PendingSendAssignMessage); } UE_LOG(LogCook, Display, TEXT("PumpSendMessages>>>>>>>>>>>> FAssignPackagesMessage_%d %d::%d"), WorkerId.GetRemoteIndex(), PendingAssignMessage.Packages.Num(), PendingSendAssignMessage.Packages.Num()); WriteMarshalledPacket(PendingSendAssignMessage); } } void FCookWorkerServer::PumpReceiveMessages() { FScopeLock Lock(&MessagesLock); EConnectionStatus SocketStatus = TryReadPacket(PendingMessages); if (SocketStatus != EConnectionStatus::Okay && SocketStatus != EConnectionStatus::Incomplete) { UE_LOG(LogCook, Error, TEXT("CookWorker connection lost: failed to read from socket.")); SendToState(EConnectStatus::LostConnection); return; } } void FCookWorkerServer::HandleSendMessages() { if (PackagesToAssign.Num() == 0) { return; } int32 PackagesToAssignNum = FMath::Min(FCookSocket::MaxSendPackageNum, PackagesToAssign.Num()); FScopeLock Lock(&MessagesLock); PendingAssignMessage.FenceVersion = ++SyncFenceVersion; for (int32 Index = 0; Index < PackagesToAssignNum; Index++) { FPackageData* PackageData = PackagesToAssign.Pop(); PendingAssignMessage.Packages.Add(FAssignPackageData{PackageData->GetPackageName(), PackageData->GetFileName()}); AssignedPackages.Add(PackageData); } } void FCookWorkerServer::HandleReceiveMessages() { TArray MessagesToProcess; { FScopeLock Lock(&MessagesLock); MessagesToProcess = MoveTemp(PendingMessages); } FPackageResultsMessage ResultsMessage; FDiscoveredPackagesMessage DiscoveredMessage; for (auto& Message: MessagesToProcess) { if (Message.MessageType == FPackageResultsMessage::MessageType) { ResultsMessage.Results.Reset(); if (!ReadMarshalledPacket(Message, ResultsMessage)) { LogInvalidMessage(TEXT("FPackageResultsMessage"), Message.MessageType); } else { UE_LOG(LogCook, Display, TEXT("HandleReceiveMessages<<<<<<<<<<<< FPackageResultsMessage_%d %d"), WorkerId.GetRemoteIndex(), ResultsMessage.Results.Num()); RecordResults(ResultsMessage); } } else if (Message.MessageType == FCookSideEffectMessage::MessageType) { FCookSideEffectMessage SideEffectMessage; if (!ReadMarshalledPacket(Message, SideEffectMessage)) { LogInvalidMessage(TEXT("FCookSideEffectMessage"), Message.MessageType); } else { UE_LOG(LogCook, Display, TEXT("HandleReceiveMessages<<<<<<<<<<<< FCookSideEffectMessage_%d asset registry = %d , package = %d"), WorkerId.GetRemoteIndex(), SideEffectMessage.AssetRegistryDataMap.Num(), SideEffectMessage.AssetPackageDataMap.Num()); CookSideEffectPackage(SideEffectMessage); } } else if (Message.MessageType == FDiscoveredPackagesMessage::MessageType) { DiscoveredMessage.Packages.Reset(); if (!ReadMarshalledPacket(Message, DiscoveredMessage)) { LogInvalidMessage(TEXT("FDiscoveredPackagesMessage"), Message.MessageType); } else { UE_LOG(LogCook, Display, TEXT("HandleReceiveMessages<<<<<<<<<<<< FDiscoveredPackagesMessage_%d %d"), WorkerId.GetRemoteIndex(), DiscoveredMessage.Packages.Num()); QueueDiscoveredPackage(DiscoveredMessage); } } else if (Message.MessageType == FSyncFenceMessage::MessageType) { FSyncFenceMessage SyncFenceMessage; if (!ReadMarshalledPacket(Message, SyncFenceMessage)) { LogInvalidMessage(TEXT("SyncFenceMessage"), Message.MessageType); } else { UE_LOG(LogCook, Display, TEXT("TrySyncFenceMessage_%d<<<<<<<<<<<<<<< %d => %d"), WorkerId.GetRemoteIndex(), SyncFenceMessage.FenceVersion, SyncFenceVersion); if (SyncFenceMessage.FenceVersion == SyncFenceVersion && PackagesToAssign.Num() == 0 && PendingAssignMessage.Packages.Num() == 0) { ReassignPackages(); } else { SendSyncFenceVersion = SyncFenceVersion; } } } else if (Message.MessageType == FShutdownAcknowledgeMessage::MessageType) { if (ConnectStatus == EConnectStatus::ShuttingDown) { UE_LOG(LogCook, Display, TEXT("CookWorkerServer %d received shutdown acknowledge. Finalizing."), WorkerId.GetRemoteIndex()); } else { UE_LOG(LogCook, Warning, TEXT("CookWorkerServer %d received unexpected shutdown acknowledge. Terminating connection."), WorkerId.GetRemoteIndex()); } } } } void FCookWorkerServer::RecordResults(FPackageResultsMessage& ResultsMessage) { using namespace UE::Cook; const int32 NumPlatforms = OrderedSessionPlatforms.Num(); TArray SucceededFlags; for (auto& MessageData: ResultsMessage.Results) { const FName PackageName = MessageData.PackageName; FPackageData* PackageData = COTFS.PackageDatas->FindPackageDataByPackageName(PackageName); if (!PackageData) { UE_LOG(LogCook, Error, TEXT("CookWorkerServer received FPackageResultsMessage for invalid package %s. Ignoring it."), *PackageName.ToString()); continue; } if (AssignedPackages.Remove(PackageData) != 1) { UE_LOG(LogCook, Error, TEXT("CookWorkerServer received FPackageResultsMessage for package %s which is not a pending package. Ignoring it."), *PackageName.ToString()); continue; } if (MessageData.PackageResult == EMPPackageResult::Failed) { PackageData->SendToState(EPackageState::LoadPrepare, UE::Cook::ESendFlags::QueueAddAndRemove); Director.AssignRequestTo(PackageData, Director.GetLocalWorkerId()); continue; } SucceededFlags.Reset(NumPlatforms); int32 CookResultNum = MessageData.CookResult.Num(); for (int32 PlatformIndex = 0; PlatformIndex < NumPlatforms; ++PlatformIndex) { auto CookResult = ECookResult::Succeeded; if (PlatformIndex < CookResultNum) { CookResult = MessageData.CookResult[PlatformIndex]; } SucceededFlags.Add(CookResult == ECookResult::Succeeded); } PackageData->AddCookedPlatforms(OrderedSessionPlatforms, SucceededFlags); FCompletionCallback LocalCallback = MoveTemp(PackageData->GetCompletionCallback()); PackageData->SendToState(EPackageState::Idle, ESendFlags::QueueRemove); if (LocalCallback) { LocalCallback(); } if (MessageData.AssetPackages.Num() > 0) { for (int32 PlatformIndex = 0; PlatformIndex < NumPlatforms; ++PlatformIndex) { if (MessageData.AssetPackages[PlatformIndex].DiskSize > 0) { const ITargetPlatform* TargetPlatform = OrderedSessionPlatforms[PlatformIndex]; FAssetRegistryGenerator* Generator = COTFS.PlatformManager->GetPlatformData(TargetPlatform)->RegistryGenerator.Get(); FAssetPackageData* AssetPackageData = Generator->GetAssetPackageData(PackageName); if (!AssetPackageData) { UE_LOG(LogCook, Log, TEXT("Received AssetPackages is null %s"), *PackageName.ToString()); } else { *AssetPackageData = MessageData.AssetPackages[PlatformIndex]; } } } } } } void FCookWorkerServer::CookSideEffectPackage(FCookSideEffectMessage& Message) { // Merge AssetRegistry data if (Message.AssetRegistryDataMap.Num() > 0) { FScopeLock StateLock(&COTFS.AssetRegistryStateLock); for (auto& It: Message.AssetRegistryDataMap) { int32 TargetIndex = 0; for (auto& AssetData: It.Value) { const ITargetPlatform* TargetPlatform = OrderedSessionPlatforms[TargetIndex++]; FAssetRegistryGenerator* Generator = COTFS.PlatformManager->GetPlatformData(TargetPlatform)->RegistryGenerator.Get(); if (CookTrace::IsTargetPackage(It.Key)) { UE_LOG(LogCook, Log, TEXT("Received AssetRegistryDataMap %s %s Num = %d"), *It.Key.ToString(), *AssetData.AssetClass.ToString(), AssetData.Tags.Num()); } Generator->UpdateAssetDataFromWorker(It.Key, AssetData.AssetClass, AssetData.PackageFlags, AssetData.Tags); } } } for (auto& It: Message.AssetPackageDataMap) { int32 TargetIndex = 0; for (auto& AssetData: It.Value) { const ITargetPlatform* TargetPlatform = OrderedSessionPlatforms[TargetIndex++]; FAssetRegistryGenerator* Generator = COTFS.PlatformManager->GetPlatformData(TargetPlatform)->RegistryGenerator.Get(); FAssetPackageData* AssetPackageData = Generator->GetAssetPackageData(It.Key); if (!AssetPackageData) { UE_LOG(LogCook, Log, TEXT("Received AssetPackageDataMap is null %s"), *It.Key.ToString()); } else { *AssetPackageData = AssetData; } } } auto& SideEffects = Message.CookSideEffects; ICookSideEffectCollector::HandleCookSideEffects(SideEffects); UE_LOG(LogCook, Log, TEXT("Received SideEffects<<<<<<<<<<<<<< %d AssetRegistryNum = %d, EdlExportsNum = %d , EdlImportsNum = %d , EdlArcsNum = %d"), WorkerId.GetRemoteIndex(), Message.AssetRegistryDataMap.Num(), SideEffects.EdlExports.Num(), SideEffects.EdlImports.Num(), SideEffects.EdlArcs.Num()); } void FCookWorkerServer::QueueDiscoveredPackage(FDiscoveredPackagesMessage& Message) { for (auto& DiscoveredPackage: Message.Packages) { FPackageData& PackageData = COTFS.PackageDatas->FindOrAddPackageData(DiscoveredPackage.PackageName, DiscoveredPackage.NormalizedFileName); if (!PackageData.IsInProgress()) { PackageData.UpdateRequestData(OrderedSessionPlatforms, false, FCompletionCallback(), ESendFlags::QueueNone); if (DiscoveredPackage.TargetState == EPackageState::LoadReady) { PackagesToAssign.Add(&PackageData); Director.AssignRequestTo(&PackageData, WorkerId); PackageData.SendToState(EPackageState::WaitAssign, UE::Cook::ESendFlags::QueueAdd); } else { COTFS.WorkerRequests->AddToWaitAssign(PackageData, EPackageState::LoadPrepare); } } } } void FCookWorkerServer::LogInvalidMessage(const TCHAR* MessageTypeName, FGuid MessageType) { UE_LOG(LogCook, Error, TEXT("CookWorkerServer received invalidly formatted message for type %s::%s from CookWorker. Ignoring it."), MessageTypeName, *MessageType.ToString()); } void FCookWorkerServer::ReassignPackages() { FScopeLock CommunicationScopeLock(&CommunicationLock); for (auto PackageData: AssignedPackages) { PackageData->SendToState(EPackageState::LoadPrepare, UE::Cook::ESendFlags::QueueAddAndRemove); Director.AssignRequestTo(PackageData, Director.GetLocalWorkerId()); } AssignedPackages.Reset(); } int32 FCookWorkerServer::NumAssignments() const { FScopeLock CommunicationScopeLock(&CommunicationLock); return PackagesToAssign.Num() + AssignedPackages.Num(); } void FCookWorkerServer::SignalCookComplete(ECookDirectorThread TickThread) { switch (ConnectStatus) { case EConnectStatus::Uninitialized: // Fall through case EConnectStatus::WaitForConnect: SendToState(EConnectStatus::LostConnection); break; case EConnectStatus::Connected: WriteMarshalledPacket(FShutdownRequestMessage()); SendToState(EConnectStatus::ShuttingDown); break; default: break; // Already in a disconnecting state } } } // namespace Cook } // namespace UE