EM_Task/UnrealEd/Private/Cooker/CookWorkerServer.cpp

501 lines
20 KiB
C++
Raw Permalink Normal View History

2026-02-13 16:18:33 +08:00
#include "CookWorkerServer.h"
#include "CookDirector.h"
#include "IWorkerRequests.h"
#include "CookPackageData.h"
#include "Sockets.h"
#include "SocketTypes.h"
#include "AssetRegistryState.h"
#include "Cooker/CookPlatformManager.h"
#include "Commandlets/AssetRegistryGenerator.h"
#include "MPCookTrace.h"
namespace UE
{
namespace Cook
{
FCookWorkerServer::FCookWorkerServer(FCookDirector& InDirector, FWorkerId InWorkerId)
: Director(InDirector), COTFS(InDirector.COTFS), WorkerId(InWorkerId)
{
}
FCookWorkerServer::~FCookWorkerServer()
{
DetachFromRemoteProcess(EWorkerDetachType::StillRunning);
}
bool FCookWorkerServer::IsShuttingDown() const
{
FScopeLock CommunicationScopeLock(&CommunicationLock);
return ConnectStatus == EConnectStatus::ShuttingDown || ConnectStatus == EConnectStatus::LostConnection;
}
bool FCookWorkerServer::IsConnectionLost() const
{
FScopeLock CommunicationScopeLock(&CommunicationLock);
return ConnectStatus == EConnectStatus::LostConnection;
}
void FCookWorkerServer::TickCommunication(ECookDirectorThread TickThread)
{
for (;;)
{
switch (ConnectStatus)
{
case EConnectStatus::Uninitialized:
LaunchProcess();
break;
case EConnectStatus::WaitForConnect:
return; // Try again later
case EConnectStatus::Connected:
PumpReceiveMessages();
if (ConnectStatus == EConnectStatus::Connected)
{
PumpSendMessages();
return; // Tick duties complete; yield the tick
}
break;
case EConnectStatus::ShuttingDown: {
PumpReceiveMessages();
if (ConnectStatus == EConnectStatus::ShuttingDown)
{
constexpr float WaitForShutdownTimeout = 60.f;
if (FPlatformTime::Seconds() - ConnectStartTimeSeconds <= WaitForShutdownTimeout)
{
return; // Try again later
}
UE_LOG(LogCook, Error, TEXT("CookWorkerServer %d timed out waiting for shutdown acknowledge. Terminating connection."), WorkerId.GetRemoteIndex());
bNeedCrashDiagnostics = true;
CrashDiagnosticsError = FString::Printf(TEXT("CookWorkerCrash: Timed out during shutdown for CookWorker %d."), WorkerId.GetRemoteIndex());
SendToState(EConnectStatus::LostConnection);
}
break;
}
case EConnectStatus::LostConnection:
return; // Nothing further to do
default:
checkNoEntry();
return;
}
}
}
void FCookWorkerServer::LaunchProcess()
{
FCookDirector::FLaunchInfo LaunchInfo = Director.GetLaunchInfo(WorkerId);
bool bShowCookWorkers = LaunchInfo.ShowWorkerOption == FCookDirector::EShowWorker::SeparateWindows;
CookWorkerHandle = FPlatformProcess::CreateProc(*LaunchInfo.CommandletExecutable, *LaunchInfo.WorkerCommandLine,
true /* bLaunchDetached */, !bShowCookWorkers /* bLaunchHidden */, !bShowCookWorkers /* bLaunchReallyHidden */,
&CookWorkerProcessId, 0 /* PriorityModifier */, *FPaths::GetPath(LaunchInfo.CommandletExecutable),
nullptr /* PipeWriteChild */);
if (CookWorkerHandle.IsValid())
{
UE_LOG(LogCook, Display, TEXT("CookWorkerServer launched CookWorker as WorkerId %d and PID %u with commandline \"%s\"."), WorkerId.GetRemoteIndex(), CookWorkerProcessId, *LaunchInfo.WorkerCommandLine);
SendToState(EConnectStatus::WaitForConnect);
}
else
{
// GetLastError information was logged by CreateProc
CrashDiagnosticsError = FString::Printf(TEXT("CookWorkerCrash: Failed to create process for CookWorker %d. Assigned packages will be returned to the director."), WorkerId.GetRemoteIndex());
bNeedCrashDiagnostics = true;
SendToState(EConnectStatus::LostConnection);
}
}
void FCookWorkerServer::SendToState(EConnectStatus TargetStatus)
{
switch (TargetStatus)
{
case EConnectStatus::WaitForConnect:
ConnectStartTimeSeconds = FPlatformTime::Seconds();
break;
case EConnectStatus::ShuttingDown:
ConnectStartTimeSeconds = FPlatformTime::Seconds();
break;
case EConnectStatus::LostConnection:
if (AssignedPackages.Num() > 0)
{
UE_LOG(LogCook, Warning, TEXT("CookWorkerServer %d is disconnecting with %d outstanding packages. Returning them to the director."), WorkerId.GetRemoteIndex(), AssignedPackages.Num());
ReassignPackages();
}
DetachFromRemoteProcess(bNeedCrashDiagnostics ? EWorkerDetachType::Crashed : EWorkerDetachType::Dismissed);
break;
default:
break;
}
ConnectStatus = TargetStatus;
}
void FCookWorkerServer::DetachFromRemoteProcess(EWorkerDetachType DetachType)
{
CloseSocket();
CookWorkerProcessId = 0;
if (bNeedCrashDiagnostics)
{
bNeedCrashDiagnostics = false;
UE_LOG(LogCook, Display, TEXT("SendCrashDiagnostics %s"), *CrashDiagnosticsError);
}
}
void FCookWorkerServer::ShutdownRemoteProcess()
{
if (CookWorkerHandle.IsValid())
{
FScopeLock CommunicationScopeLock(&CommunicationLock);
if (ConnectStatus == EConnectStatus::Connected || ConnectStatus == EConnectStatus::ShuttingDown)
{
PumpReceiveMessages();
}
if (PendingMessages.Num() > 0)
{
HandleReceiveMessages();
}
FPlatformProcess::TerminateProc(CookWorkerHandle, /* bKillTree */ true);
CookWorkerHandle = FProcHandle();
DetachFromRemoteProcess(EWorkerDetachType::ForceTerminated);
SendToState(EConnectStatus::LostConnection);
}
}
bool FCookWorkerServer::TryHandleConnectMessage(FWorkerConnectMessage& Message, FSocket* InSocket)
{
if (ConnectStatus != EConnectStatus::WaitForConnect)
{
return false;
}
InitSocket(InSocket);
SendToState(EConnectStatus::Connected);
UE_LOG(LogCook, Display, TEXT("CookWorker %d connected after %.3fs."), WorkerId.GetRemoteIndex(), static_cast<float>(FPlatformTime::Seconds() - ConnectStartTimeSeconds));
int32 ReserveNum = (COTFS.WorkerRequests->GetNumRequests() + COTFS.PackageDatas->GetMonitor().GetNumInProgress()) / (COTFS.CookDirector->GetWorkerCount());
AssignedPackages.Reserve(ReserveNum);
FInitialConfigMessage* InitialConfigMessage = Director.GetInitialConfigMessage();
InitialConfigMessage->WriteToTargetPlatform(OrderedSessionPlatforms);
return InitialConfigMessage ? WritePacket(*InitialConfigMessage) : false;
}
void FCookWorkerServer::AppendAssignments(TArrayView<FPackageData*> Assignments, ECookDirectorThread TickThread)
{
++SyncFenceVersion;
PackagesToAssign.Append(Assignments.GetData(), Assignments.Num());
}
void FCookWorkerServer::TickFromSchedulerThread()
{
HandleSendMessages();
if (PendingMessages.Num() > 0)
{
HandleReceiveMessages();
}
}
void FCookWorkerServer::PumpSendMessages()
{
if (SendSyncFenceVersion)
{
FSyncFenceMessage SyncFenceMessage;
{
FScopeLock Lock(&MessagesLock);
SyncFenceMessage.FenceVersion = SyncFenceVersion;
SendSyncFenceVersion = 0;
}
WriteMarshalledPacket(SyncFenceMessage);
}
if (PendingAssignMessage.Packages.Num() > 0)
{
{
FScopeLock Lock(&MessagesLock);
PendingAssignMessage.MessageSliceTo(FCookSocket::MaxSendPackageNum, PendingSendAssignMessage);
}
UE_LOG(LogCook, Display, TEXT("PumpSendMessages>>>>>>>>>>>> FAssignPackagesMessage_%d %d::%d"), WorkerId.GetRemoteIndex(), PendingAssignMessage.Packages.Num(), PendingSendAssignMessage.Packages.Num());
WriteMarshalledPacket(PendingSendAssignMessage);
}
}
void FCookWorkerServer::PumpReceiveMessages()
{
FScopeLock Lock(&MessagesLock);
EConnectionStatus SocketStatus = TryReadPacket(PendingMessages);
if (SocketStatus != EConnectionStatus::Okay && SocketStatus != EConnectionStatus::Incomplete)
{
UE_LOG(LogCook, Error, TEXT("CookWorker connection lost: failed to read from socket."));
SendToState(EConnectStatus::LostConnection);
return;
}
}
void FCookWorkerServer::HandleSendMessages()
{
if (PackagesToAssign.Num() == 0)
{
return;
}
int32 PackagesToAssignNum = FMath::Min(FCookSocket::MaxSendPackageNum, PackagesToAssign.Num());
FScopeLock Lock(&MessagesLock);
PendingAssignMessage.FenceVersion = ++SyncFenceVersion;
for (int32 Index = 0; Index < PackagesToAssignNum; Index++)
{
FPackageData* PackageData = PackagesToAssign.Pop();
PendingAssignMessage.Packages.Add(FAssignPackageData{PackageData->GetPackageName(), PackageData->GetFileName()});
AssignedPackages.Add(PackageData);
}
}
void FCookWorkerServer::HandleReceiveMessages()
{
TArray<FMarshalledMessage> MessagesToProcess;
{
FScopeLock Lock(&MessagesLock);
MessagesToProcess = MoveTemp(PendingMessages);
}
FPackageResultsMessage ResultsMessage;
FDiscoveredPackagesMessage DiscoveredMessage;
for (auto& Message: MessagesToProcess)
{
if (Message.MessageType == FPackageResultsMessage::MessageType)
{
ResultsMessage.Results.Reset();
if (!ReadMarshalledPacket(Message, ResultsMessage))
{
LogInvalidMessage(TEXT("FPackageResultsMessage"), Message.MessageType);
}
else
{
UE_LOG(LogCook, Display, TEXT("HandleReceiveMessages<<<<<<<<<<<< FPackageResultsMessage_%d %d"), WorkerId.GetRemoteIndex(), ResultsMessage.Results.Num());
RecordResults(ResultsMessage);
}
}
else if (Message.MessageType == FCookSideEffectMessage::MessageType)
{
FCookSideEffectMessage SideEffectMessage;
if (!ReadMarshalledPacket(Message, SideEffectMessage))
{
LogInvalidMessage(TEXT("FCookSideEffectMessage"), Message.MessageType);
}
else
{
UE_LOG(LogCook, Display, TEXT("HandleReceiveMessages<<<<<<<<<<<< FCookSideEffectMessage_%d asset registry = %d , package = %d"), WorkerId.GetRemoteIndex(),
SideEffectMessage.AssetRegistryDataMap.Num(), SideEffectMessage.AssetPackageDataMap.Num());
CookSideEffectPackage(SideEffectMessage);
}
}
else if (Message.MessageType == FDiscoveredPackagesMessage::MessageType)
{
DiscoveredMessage.Packages.Reset();
if (!ReadMarshalledPacket(Message, DiscoveredMessage))
{
LogInvalidMessage(TEXT("FDiscoveredPackagesMessage"), Message.MessageType);
}
else
{
UE_LOG(LogCook, Display, TEXT("HandleReceiveMessages<<<<<<<<<<<< FDiscoveredPackagesMessage_%d %d"), WorkerId.GetRemoteIndex(), DiscoveredMessage.Packages.Num());
QueueDiscoveredPackage(DiscoveredMessage);
}
}
else if (Message.MessageType == FSyncFenceMessage::MessageType)
{
FSyncFenceMessage SyncFenceMessage;
if (!ReadMarshalledPacket(Message, SyncFenceMessage))
{
LogInvalidMessage(TEXT("SyncFenceMessage"), Message.MessageType);
}
else
{
UE_LOG(LogCook, Display, TEXT("TrySyncFenceMessage_%d<<<<<<<<<<<<<<< %d => %d"), WorkerId.GetRemoteIndex(), SyncFenceMessage.FenceVersion, SyncFenceVersion);
if (SyncFenceMessage.FenceVersion == SyncFenceVersion && PackagesToAssign.Num() == 0 && PendingAssignMessage.Packages.Num() == 0)
{
ReassignPackages();
}
else
{
SendSyncFenceVersion = SyncFenceVersion;
}
}
}
else if (Message.MessageType == FShutdownAcknowledgeMessage::MessageType)
{
if (ConnectStatus == EConnectStatus::ShuttingDown)
{
UE_LOG(LogCook, Display, TEXT("CookWorkerServer %d received shutdown acknowledge. Finalizing."), WorkerId.GetRemoteIndex());
}
else
{
UE_LOG(LogCook, Warning, TEXT("CookWorkerServer %d received unexpected shutdown acknowledge. Terminating connection."), WorkerId.GetRemoteIndex());
}
}
}
}
void FCookWorkerServer::RecordResults(FPackageResultsMessage& ResultsMessage)
{
using namespace UE::Cook;
const int32 NumPlatforms = OrderedSessionPlatforms.Num();
TArray<bool> SucceededFlags;
for (auto& MessageData: ResultsMessage.Results)
{
const FName PackageName = MessageData.PackageName;
FPackageData* PackageData = COTFS.PackageDatas->FindPackageDataByPackageName(PackageName);
if (!PackageData)
{
UE_LOG(LogCook, Error, TEXT("CookWorkerServer received FPackageResultsMessage for invalid package %s. Ignoring it."), *PackageName.ToString());
continue;
}
if (AssignedPackages.Remove(PackageData) != 1)
{
UE_LOG(LogCook, Error, TEXT("CookWorkerServer received FPackageResultsMessage for package %s which is not a pending package. Ignoring it."), *PackageName.ToString());
continue;
}
if (MessageData.PackageResult == EMPPackageResult::Failed)
{
PackageData->SendToState(EPackageState::LoadPrepare, UE::Cook::ESendFlags::QueueAddAndRemove);
Director.AssignRequestTo(PackageData, Director.GetLocalWorkerId());
continue;
}
SucceededFlags.Reset(NumPlatforms);
int32 CookResultNum = MessageData.CookResult.Num();
for (int32 PlatformIndex = 0; PlatformIndex < NumPlatforms; ++PlatformIndex)
{
auto CookResult = ECookResult::Succeeded;
if (PlatformIndex < CookResultNum)
{
CookResult = MessageData.CookResult[PlatformIndex];
}
SucceededFlags.Add(CookResult == ECookResult::Succeeded);
}
PackageData->AddCookedPlatforms(OrderedSessionPlatforms, SucceededFlags);
FCompletionCallback LocalCallback = MoveTemp(PackageData->GetCompletionCallback());
PackageData->SendToState(EPackageState::Idle, ESendFlags::QueueRemove);
if (LocalCallback)
{
LocalCallback();
}
if (MessageData.AssetPackages.Num() > 0)
{
for (int32 PlatformIndex = 0; PlatformIndex < NumPlatforms; ++PlatformIndex)
{
if (MessageData.AssetPackages[PlatformIndex].DiskSize > 0)
{
const ITargetPlatform* TargetPlatform = OrderedSessionPlatforms[PlatformIndex];
FAssetRegistryGenerator* Generator = COTFS.PlatformManager->GetPlatformData(TargetPlatform)->RegistryGenerator.Get();
FAssetPackageData* AssetPackageData = Generator->GetAssetPackageData(PackageName);
if (!AssetPackageData)
{
UE_LOG(LogCook, Log, TEXT("Received AssetPackages is null %s"), *PackageName.ToString());
}
else
{
*AssetPackageData = MessageData.AssetPackages[PlatformIndex];
}
}
}
}
}
}
void FCookWorkerServer::CookSideEffectPackage(FCookSideEffectMessage& Message)
{
// Merge AssetRegistry data
if (Message.AssetRegistryDataMap.Num() > 0)
{
FScopeLock StateLock(&COTFS.AssetRegistryStateLock);
for (auto& It: Message.AssetRegistryDataMap)
{
int32 TargetIndex = 0;
for (auto& AssetData: It.Value)
{
const ITargetPlatform* TargetPlatform = OrderedSessionPlatforms[TargetIndex++];
FAssetRegistryGenerator* Generator = COTFS.PlatformManager->GetPlatformData(TargetPlatform)->RegistryGenerator.Get();
if (CookTrace::IsTargetPackage(It.Key))
{
UE_LOG(LogCook, Log, TEXT("Received AssetRegistryDataMap %s %s Num = %d"), *It.Key.ToString(), *AssetData.AssetClass.ToString(), AssetData.Tags.Num());
}
Generator->UpdateAssetDataFromWorker(It.Key, AssetData.AssetClass, AssetData.PackageFlags, AssetData.Tags);
}
}
}
for (auto& It: Message.AssetPackageDataMap)
{
int32 TargetIndex = 0;
for (auto& AssetData: It.Value)
{
const ITargetPlatform* TargetPlatform = OrderedSessionPlatforms[TargetIndex++];
FAssetRegistryGenerator* Generator = COTFS.PlatformManager->GetPlatformData(TargetPlatform)->RegistryGenerator.Get();
FAssetPackageData* AssetPackageData = Generator->GetAssetPackageData(It.Key);
if (!AssetPackageData)
{
UE_LOG(LogCook, Log, TEXT("Received AssetPackageDataMap is null %s"), *It.Key.ToString());
}
else
{
*AssetPackageData = AssetData;
}
}
}
auto& SideEffects = Message.CookSideEffects;
ICookSideEffectCollector::HandleCookSideEffects(SideEffects);
UE_LOG(LogCook, Log, TEXT("Received SideEffects<<<<<<<<<<<<<< %d AssetRegistryNum = %d, EdlExportsNum = %d , EdlImportsNum = %d , EdlArcsNum = %d"),
WorkerId.GetRemoteIndex(), Message.AssetRegistryDataMap.Num(), SideEffects.EdlExports.Num(), SideEffects.EdlImports.Num(), SideEffects.EdlArcs.Num());
}
void FCookWorkerServer::QueueDiscoveredPackage(FDiscoveredPackagesMessage& Message)
{
for (auto& DiscoveredPackage: Message.Packages)
{
FPackageData& PackageData = COTFS.PackageDatas->FindOrAddPackageData(DiscoveredPackage.PackageName, DiscoveredPackage.NormalizedFileName);
if (!PackageData.IsInProgress())
{
PackageData.UpdateRequestData(OrderedSessionPlatforms, false, FCompletionCallback(), ESendFlags::QueueNone);
if (DiscoveredPackage.TargetState == EPackageState::LoadReady)
{
PackagesToAssign.Add(&PackageData);
Director.AssignRequestTo(&PackageData, WorkerId);
PackageData.SendToState(EPackageState::WaitAssign, UE::Cook::ESendFlags::QueueAdd);
}
else
{
COTFS.WorkerRequests->AddToWaitAssign(PackageData, EPackageState::LoadPrepare);
}
}
}
}
void FCookWorkerServer::LogInvalidMessage(const TCHAR* MessageTypeName, FGuid MessageType)
{
UE_LOG(LogCook, Error, TEXT("CookWorkerServer received invalidly formatted message for type %s::%s from CookWorker. Ignoring it."), MessageTypeName, *MessageType.ToString());
}
void FCookWorkerServer::ReassignPackages()
{
FScopeLock CommunicationScopeLock(&CommunicationLock);
for (auto PackageData: AssignedPackages)
{
PackageData->SendToState(EPackageState::LoadPrepare, UE::Cook::ESendFlags::QueueAddAndRemove);
Director.AssignRequestTo(PackageData, Director.GetLocalWorkerId());
}
AssignedPackages.Reset();
}
int32 FCookWorkerServer::NumAssignments() const
{
FScopeLock CommunicationScopeLock(&CommunicationLock);
return PackagesToAssign.Num() + AssignedPackages.Num();
}
void FCookWorkerServer::SignalCookComplete(ECookDirectorThread TickThread)
{
switch (ConnectStatus)
{
case EConnectStatus::Uninitialized: // Fall through
case EConnectStatus::WaitForConnect:
SendToState(EConnectStatus::LostConnection);
break;
case EConnectStatus::Connected:
WriteMarshalledPacket(FShutdownRequestMessage());
SendToState(EConnectStatus::ShuttingDown);
break;
default:
break; // Already in a disconnecting state
}
}
} // namespace Cook
} // namespace UE