#include "CookWorkerClient.h" #include "CookDirector.h" #include "CookWorkerServer.h" #include "CookPackageData.h" #include "IWorkerRequests.h" #include "CookOnTheSide/CookOnTheFlyServer.h" #include "SocketSubsystem.h" #include "Sockets.h" #include "SocketTypes.h" #include "Misc/DefaultValueHelper.h" #include "Commandlets/AssetRegistryGenerator.h" namespace UE { namespace Cook { bool IsCookIgnoreTimeouts() { static bool bIsIgnoreCookTimeouts = FParse::Param(FCommandLine::Get(), TEXT("CookIgnoreTimeouts")); return bIsIgnoreCookTimeouts; } FCookWorkerClient::FCookWorkerClient(UCookOnTheFlyServer& InCOTFS) : COTFS(InCOTFS) { FMPCookSideEffectCollector::SetCookWorkerClient(this); } FCookWorkerClient::~FCookWorkerClient() { FMPCookSideEffectCollector::SetCookWorkerClient(nullptr); } bool FCookWorkerClient::TryConnect(FDirectorConnectionInfo&& ConnectInfo) { EPollStatus Status; for (;;) { Status = PollTryConnect(ConnectInfo); if (Status != EPollStatus::Incomplete) { break; } constexpr float SleepTime = 0.01f; // 10 ms FPlatformProcess::Sleep(SleepTime); } return Status == EPollStatus::Success; } EPollStatus FCookWorkerClient::PollTryConnect(const FDirectorConnectionInfo& ConnectInfo) { for (;;) { switch (ConnectStatus) { case EConnectStatus::Connected: return EPollStatus::Success; case EConnectStatus::Uninitialized: CreateServerSocket(ConnectInfo); break; case EConnectStatus::WaitInitialConfig: WaitInitialConfigMessage(); if (ConnectStatus == EConnectStatus::WaitInitialConfig) { return EPollStatus::Incomplete; } break; case EConnectStatus::LostConnection: return EPollStatus::Error; default: return EPollStatus::Error; } } } void FCookWorkerClient::CreateServerSocket(const FDirectorConnectionInfo& ConnectInfo) { DirectorURI = ConnectInfo.HostURI; ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM); if (!SocketSubsystem) { UE_LOG(LogCook, Error, TEXT("CookWorker initialization failure: platform does not support network sockets, cannot connect to CookDirector.")); SendToState(EConnectStatus::LostConnection); return; } UE_LOG(LogCook, Display, TEXT("Connecting to CookDirector at %s..."), *DirectorURI); FSocket* ServerSocket = SocketSubsystem->CreateSocket(NAME_Stream, TEXT("FCookWorkerClient-WorkerConnect")); if (!ServerSocket) { UE_LOG(LogCook, Error, TEXT("CookWorker initialization failure: Could not create socket.")); SendToState(EConnectStatus::LostConnection); return; } FString IPString, PortString; int32 Port; if (!DirectorURI.Split(TEXT(":"), &IPString, &PortString, ESearchCase::IgnoreCase, ESearchDir::FromEnd)) { IPString = DirectorURI; } if (!FDefaultValueHelper::ParseInt(PortString, Port)) { Port = ConnectInfo.DirectorPort; } bool bIsValidIP = false; auto DirectorAddr = SocketSubsystem->CreateInternetAddr(); DirectorAddr->SetIp(*IPString, bIsValidIP); DirectorAddr->SetPort(Port); if (!bIsValidIP) { UE_LOG(LogCook, Error, TEXT("CookWorker initialization failure: could not convert -CookDirectorHost=%s into an address (Invalid Format)."), *DirectorURI); SendToState(EConnectStatus::LostConnection); return; } constexpr float WaitForConnectTimeout = 60.f * 10; // 10 minutes float ConditionalTimeoutSeconds = IsCookIgnoreTimeouts() ? FLT_MAX : WaitForConnectTimeout; bool bConnectStarted = ServerSocket->Connect(*DirectorAddr); bool bServerSocketReady = ServerSocket->Wait(ESocketWaitConditions::WaitForWrite, FTimespan::FromSeconds(ConditionalTimeoutSeconds)); if (!bServerSocketReady || ServerSocket->GetConnectionState() != ESocketConnectionState::SCS_Connected) { SocketSubsystem->DestroySocket(ServerSocket); ServerSocket = nullptr; UE_LOG(LogCook, Error, TEXT("CookWorker initialization failure: Timed out after %.0f seconds or connection failed trying to connect to CookDirector (State: %d)."), ConditionalTimeoutSeconds, (int32)ServerSocket->GetConnectionState()); SendToState(EConnectStatus::LostConnection); return; } InitSocket(ServerSocket); FWorkerConnectMessage ConnectMessage; ConnectMessage.RemoteIndex = ConnectInfo.RemoteIndex; WritePacket(ConnectMessage); SendToState(EConnectStatus::WaitInitialConfig); } void FCookWorkerClient::SendToState(EConnectStatus TargetStatus) { if (TargetStatus == EConnectStatus::LostConnection) { CloseSocket(); UE_LOG(LogCook, Error, TEXT("CookWorker LostConnection %d"), ConnectStatus); } else if (TargetStatus == EConnectStatus::ShuttingDown) { ConnectStartTimeSeconds = FPlatformTime::Seconds(); } ConnectStatus = TargetStatus; } void FCookWorkerClient::WaitInitialConfigMessage() { check(!InitialConfigMessage); InitialConfigMessage = MakeUnique(); if (!ReadPacket(*InitialConfigMessage)) { UE_LOG(LogCook, Warning, TEXT("CookWorker initialization failure: Director sent an invalid InitialConfigMessage.")); SendToState(EConnectStatus::LostConnection); } DirectorCookMode = InitialConfigMessage->GetDirectorCookMode(); InitialConfigMessage->WriteToTargetPlatform(OrderedSessionPlatforms); SendToState(EConnectStatus::Connected); } void FCookWorkerClient::DoneWithInitialSettings() { InitialConfigMessage.Reset(); BlockSocket(false); } void FCookWorkerClient::TickFromSchedulerThread(FTickStackData& StackData) { if (ConnectStatus == EConnectStatus::Connected) { PumpReceiveMessages(); if (ConnectStatus == EConnectStatus::Connected) { PumpSendMessages(bFlushPendingResults); } } if (ConnectStatus == EConnectStatus::ShuttingDown || ConnectStatus == EConnectStatus::LostConnection) { PumpDisconnect(StackData); } } void FCookWorkerClient::PumpReceiveMessages() { EConnectionStatus SocketStatus = TryReadPacket(PendingMessages); if (SocketStatus != EConnectionStatus::Okay && SocketStatus != EConnectionStatus::Incomplete) { UE_LOG(LogCook, Error, TEXT("CookWorker initialization failure: failed to read from socket.")); SendToState(EConnectStatus::LostConnection); return; } if (PendingMessages.Num() > 0) { HandleReceiveMessages(); return; } } void FCookWorkerClient::HandleReceiveMessages(FName OptionalPackageName) { FAssignPackagesMessage AssignPackagesMessage; for (auto& Message: PendingMessages) { if (ConnectStatus == EConnectStatus::ShuttingDown) { UE_LOG(LogCook, Log, TEXT("CookWorkerClient received message %s while shutting down. It will be ignored."), *Message.MessageType.ToString()); continue; } if (Message.MessageType == FShutdownRequestMessage::MessageType) { PumpSendMessages(true); UE_LOG(LogCook, Display, TEXT("CookWorkerClient received ShutdownRequest from Director. Shutting down.")); SendToState(EConnectStatus::ShuttingDown); break; } else if (Message.MessageType == FAssignPackagesMessage::MessageType) { AssignPackagesMessage.Packages.Reset(); if (!ReadMarshalledPacket(Message, AssignPackagesMessage)) { LogInvalidMessage(TEXT("FAssignPackagesMessage"), Message.MessageType); } else { UE_LOG(LogCook, Display, TEXT("HandleReceiveMessages<<<<<<<<<<<< FAssignPackagesMessage %d"), AssignPackagesMessage.Packages.Num()); AssignPackages(AssignPackagesMessage); } } else if (Message.MessageType == FSyncFenceMessage::MessageType) { FSyncFenceMessage SyncFenceMessage; if (!ReadMarshalledPacket(Message, SyncFenceMessage)) { LogInvalidMessage(TEXT("SyncFenceMessage"), Message.MessageType); } else { UE_LOG(LogCook, Display, TEXT("TrySyncFenceMessage<<<<<<<<<<<<<<< %d => %d"), SyncFenceMessage.FenceVersion, SyncFenceVersion); if (SyncFenceVersion <= SyncFenceMessage.FenceVersion) { SyncFenceVersion = SyncFenceMessage.FenceVersion; SendSyncFenceVersion = 0; } } } } PendingMessages.Empty(); } void FCookWorkerClient::AssignPackages(FAssignPackagesMessage& Message) { for (FAssignPackageData& AssignData: Message.Packages) { FName PackageName = AssignData.PackageName; FPackageData& PackageData = COTFS.PackageDatas->FindOrAddPackageData(PackageName, AssignData.NormalizedFileName); if (PackageData.GetState() <= EPackageState::WaitAssign) { ESendFlags SendFlag = PackageData.GetState() == EPackageState::Idle ? ESendFlags::QueueAdd : ESendFlags::QueueAddAndRemove; PackageData.UpdateRequestData(OrderedSessionPlatforms, false, FCompletionCallback(), ESendFlags::QueueNone); UPackage* ExistingPackage = (UPackage*)StaticFindObject(UPackage::StaticClass(), nullptr, *PackageName.ToString()); if (ExistingPackage != nullptr && ExistingPackage->IsFullyLoaded()) { PackageData.SendToState(EPackageState::LoadReady, SendFlag); } else { PackageData.SendToState(EPackageState::LoadPrepare, SendFlag); } } else { bool IsCooked = PackageData.HasAllCookedPlatforms(OrderedSessionPlatforms, true); UE_LOG(LogCook, Error, TEXT("AssignPackages With Error Package State. %s %d %d"), *PackageName.ToString(), PackageData.GetState(), IsCooked); ReportPackageMessage(PackageData, IsCooked ? EMPPackageResult::Success : EMPPackageResult::Failed); } } SyncFenceVersion = Message.FenceVersion; bFlushPendingResults = false; } void FCookWorkerClient::ReportDiscoveredPackage(TArray& Requests, TSet& LoadedPackages) { for (auto PackageData: Requests) { bool IsLoaded = LoadedPackages.Contains(PackageData); auto& Discovered = PendingDiscoveredPackagesMessage.Packages.Emplace_GetRef(); Discovered.PackageName = PackageData->GetPackageName(); Discovered.NormalizedFileName = PackageData->GetFileName(); Discovered.TargetState = IsLoaded ? EPackageState::LoadReady : EPackageState::LoadPrepare; PackageData->SendToState(EPackageState::WaitAssign, ESendFlags::QueueAddAndRemove); } } void FCookWorkerClient::ReportPackageMessage(const FPackageData& PackageData, EMPPackageResult Reason) { auto& PackageResult = PendingPackageResultsMessage.Results.Emplace_GetRef(); FName PackageName = PackageData.GetPackageName(); PackageResult.PackageName = PackageName; PackageResult.PackageResult = Reason; if (Reason == EMPPackageResult::Success) { static TArray FailedPlatformResults; const int32 NumPlatforms = OrderedSessionPlatforms.Num(); FailedPlatformResults.Reset(NumPlatforms); bool bExistsFailed = false; for (int32 PlatformIndex = 0; PlatformIndex < NumPlatforms; ++PlatformIndex) { const ITargetPlatform* TargetPlatform = OrderedSessionPlatforms[PlatformIndex]; const ECookResult CookResult = PackageData.GetCookResults(TargetPlatform); bExistsFailed = bExistsFailed || CookResult != ECookResult::Succeeded; FailedPlatformResults.Add(CookResult); } if (bExistsFailed) { PackageResult.CookResult = FailedPlatformResults; } } if (auto It = CookCollectorData.AssetPackageDataMap.Find(PackageName)) { PackageResult.AssetPackages = MoveTemp(*It); CookCollectorData.AssetPackageDataMap.Remove(PackageName); } } void FCookWorkerClient::UpdateAssetRegistryPackageData(FAssetRegistryGenerator* Generator, const UPackage& Package, UCookOnTheFlyServer::FCreateOrFindArray& OutputAssets) { const FName PackageName = Package.GetFName(); FAssetPackageData* AssetPackageData = Generator->GetAssetPackageData(PackageName); auto TargetPlatform = Generator->GetTargetPlatform(); const int32 NumPlatforms = OrderedSessionPlatforms.Num(); const int32 TargetIndex = OrderedSessionPlatforms.IndexOfByKey(TargetPlatform); { FScopeLock Lock(&CookCollectorData.AssetPackageDataLock); auto& AssetPackageDatas = CookCollectorData.AssetPackageDataMap.FindOrAdd(PackageName); if (AssetPackageDatas.Num() == 0) { AssetPackageDatas.AddDefaulted(NumPlatforms); } AssetPackageDatas[TargetIndex] = *AssetPackageData; } { FScopeLock Lock(&CookCollectorData.AssetRegistryDataLock); for (auto AssetData: OutputAssets) { const FName ObjectPath = AssetData->ObjectPath; auto& AssetRegistryDatas = CookCollectorData.AssetRegistryDataMap.FindOrAdd(ObjectPath); if (AssetRegistryDatas.Num() == 0) { AssetRegistryDatas.Reserve(NumPlatforms); for (int Index = 0; Index < NumPlatforms; Index++) { auto& AssetRegistryData = AssetRegistryDatas.Emplace_GetRef(); AssetRegistryData.AssetClass = AssetData->AssetClass; AssetRegistryData.PackageFlags = AssetData->PackageFlags; } } auto& Remote = AssetRegistryDatas[TargetIndex]; for (const auto& Pair: AssetData->TagsAndValues) { Remote.Tags.Add(Pair.Key, Pair.Value.AsString()); } } MarkAssetRegistryDirty(); } } void FCookWorkerClient::TrySyncFenceMessage() { constexpr float WaitForSyncFenceTimeout = 3.f; static float LastWaitForSyncFenceTime = 0.f; if (FPlatformTime::Seconds() - LastWaitForSyncFenceTime <= WaitForSyncFenceTimeout) { return; } LastWaitForSyncFenceTime = FPlatformTime::Seconds(); if (SendSyncFenceVersion == SyncFenceVersion || PendingMessages.Num() > 0 || PendingPackageResultsMessage.Results.Num() > 0 || PendingDiscoveredPackagesMessage.Packages.Num() > 0 || bAssetRegistryDirty) { if (PendingMessages.Num() == 0 && PendingDiscoveredPackagesMessage.Packages.Num() == 0) { bFlushPendingResults = true; } else { bFlushAssetRegistryMessage = true; } return; } UE_LOG(LogCook, Display, TEXT("TrySyncFenceMessage>>>>>>>>>>>>>>> %d => %d"), SendSyncFenceVersion, SyncFenceVersion); SendSyncFenceVersion = SyncFenceVersion; FSyncFenceMessage SyncFenceMessage; SyncFenceMessage.FenceVersion = SyncFenceVersion; WriteMarshalledPacket(SyncFenceMessage); CollectGarbage(RF_NoFlags, true); } void FCookWorkerClient::PumpSendMessages(bool bFlush) { int32 ResultNum = PendingPackageResultsMessage.Results.Num(); if (ResultNum >= FCookSocket::MaxSendPackageNum || (bFlush && ResultNum > 0)) { PendingPackageResultsMessage.MessageSliceTo(ResultNum, SendPackageResultsMessage); UE_LOG(LogCook, Display, TEXT("PumpSendMessages>>>>>>>>>>>> FPackageResultsMessage %d %d::%d"), bFlushPendingResults, PendingPackageResultsMessage.Results.Num(), SendPackageResultsMessage.Results.Num()); WriteMarshalledPacket(SendPackageResultsMessage); bFlushAssetRegistryMessage = true; bFlushPendingResults = false; } if (bAssetRegistryDirty && bFlushAssetRegistryMessage) { bFlushAssetRegistryMessage = false; { FScopeLock Lock(&CookCollectorData.AssetRegistryDataLock); SendAssetRegistryMessage.AssetRegistryDataMap = MoveTemp(CookCollectorData.AssetRegistryDataMap); bAssetRegistryDirty = false; } { FScopeLock Lock(&CookCollectorData.CookSideEffectsLock); CookCollectorData.UniqueToCookSideEffects(SendAssetRegistryMessage.CookSideEffects); } { FScopeLock Lock(&CookCollectorData.AssetPackageDataLock); SendAssetRegistryMessage.AssetPackageDataMap = MoveTemp(CookCollectorData.AssetPackageDataMap); } WriteMarshalledPacket(SendAssetRegistryMessage); SendAssetRegistryMessage.AssetRegistryDataMap.Reset(); SendAssetRegistryMessage.CookSideEffects.Reset(); } if (PendingDiscoveredPackagesMessage.Packages.Num() > 0) { PendingDiscoveredPackagesMessage.MessageSliceTo(FCookSocket::MaxSendPackageNum, SendDiscoveredPackagesMessage); UE_LOG(LogCook, Display, TEXT("PumpSendMessages>>>>>>>>>>>> FDiscoveredPackagesMessage %d::%d"), PendingDiscoveredPackagesMessage.Packages.Num(), SendDiscoveredPackagesMessage.Packages.Num()); WriteMarshalledPacket(SendDiscoveredPackagesMessage); } } void FCookWorkerClient::PumpDisconnect(FTickStackData& StackData) { if (ConnectStatus == EConnectStatus::ShuttingDown) { if (!bSendShutdownAcknowledgeMessage) { bSendShutdownAcknowledgeMessage = true; UE_LOG(LogCook, Display, TEXT("CookWorkerClient sending shutdown acknowledge and closing connection.")); if (!WriteMarshalledPacket(FShutdownAcknowledgeMessage())) { SendToState(EConnectStatus::LostConnection); } } constexpr float WaitForShutdownTimeout = 60.f; if (FPlatformTime::Seconds() - ConnectStartTimeSeconds > WaitForShutdownTimeout) { UE_LOG(LogCook, Error, TEXT("CookWorkerClient timed out waiting for shutdown acknowledge. Terminating connection.")); SendToState(EConnectStatus::LostConnection); } } if (ConnectStatus == EConnectStatus::LostConnection) { COTFS.CancelCookByTheBook(); } } void FCookWorkerClient::LogInvalidMessage(const TCHAR* MessageTypeName, FGuid MessageType) { UE_LOG(LogCook, Error, TEXT("CookWorkerClient received invalidly formatted message for type %s::%s from CookDirector. Ignoring it."), MessageTypeName, *MessageType.ToString()); } FCookByTheBookOptions&& FCookWorkerClient::ConsumeCookByTheBookOptions() { check(InitialConfigMessage); // Should only be called after TryConnect and before DoneWithInitialSettings return InitialConfigMessage->ConsumeCookByTheBookOptions(); } FCookByTheBookStartupOptions&& FCookWorkerClient::ConsumeCookByTheBookStartupOptions() { check(InitialConfigMessage); return InitialConfigMessage->ConsumeCookByTheBookStartupOptions(); } const TArray& FCookWorkerClient::GetTargetPlatforms() const { return OrderedSessionPlatforms; } } // namespace Cook } // namespace UE