#include "Cooker/MPCookMessage.h" #include "Cooker/MPCookSideEffect.h" #include "Cooker/CookWorkerServer.h" #include "Cooker/CookPackageData.h" #include "Cooker/CookPlatformManager.h" #include "CookOnTheSide/CookLog.h" #include "Sockets.h" #include "SocketTypes.h" #include "SocketSubsystem.h" #define HEADER_SIZE sizeof(FPacketHeader) FArchive& operator<<(FArchive& Ar, FAssetPackageData& Value) { Ar << Value.DiskSize; Ar << Value.PackageGuid; Ar << Value.CookedHash; return Ar; } namespace UE { namespace Cook { int32 FCookSocket::MaxSendPackageNum = 500; FGuid MarshalledMessageType(TEXT("5E56C5D96F3B455E9452C15ADA601A71")); FGuid FInitialConfigMessage::MessageType(TEXT("340CDCB927304CEB9C0A66B5F707FC2B")); FGuid FWorkerConnectMessage::MessageType(TEXT("302096E887DA48F7B079FAFAD0EE5695")); FGuid FAssignPackagesMessage::MessageType(TEXT("B7B1542B73254B679319D73F753DB6F8")); FGuid FDiscoveredPackagesMessage::MessageType(TEXT("C9F5BC5C11484B06B346B411F1ED3090")); FGuid FPackageResultsMessage::MessageType(TEXT("4631C6C0F6DC4CEFB2B09D3FB0B524DB")); FGuid FCookSideEffectMessage::MessageType(TEXT("58A03A38FEE045B08331BB8457AEBE35")); FGuid FSyncFenceMessage::MessageType(TEXT("CBFB840A4FB94903A757C490514A4B86")); FGuid FShutdownRequestMessage::MessageType(TEXT("DA952DA8D58245429E56424194A5462D")); FGuid FShutdownAcknowledgeMessage::MessageType(TEXT("A563814699634422A93437596B07B372")); FArchive& operator<<(FArchive& Ar, FCookByTheBookStartupOptions& Value) { Ar << Value.CookProcessCount; Ar << Value.CookInitializationFlags; Ar << Value.OutputDirectoryOverride; Ar << Value.BasedOnReleaseVersion; Ar << Value.NeverCookDirectories; Ar << Value.CookCultures; return Ar; } FArchive& operator<<(FArchive& Ar, FCookByTheBookOptions& Value) { // --- 1. 基础布尔类型 (Primitive Booleans) --- Ar << Value.bGenerateStreamingInstallManifests; Ar << Value.bGenerateDependenciesForMaps; Ar << Value.bRunning; Ar << Value.bCancel; Ar << Value.bErrorOnEngineContentUse; Ar << Value.bSkipHardReferences; Ar << Value.bSkipSoftReferences; Ar << Value.bFullLoadAndSave; Ar << Value.bPackageStore; Ar << Value.bCookAgainstFixedBase; Ar << Value.bDlcLoadMainAssetRegistry; // --- 2. FString (String Objects) --- Ar << Value.DlcName; Ar << Value.CreateReleaseVersion; // --- 3. Double 类型 (Timing Information) --- Ar << Value.CookTime; Ar << Value.CookStartTime; return Ar; } FArchive& operator<<(FArchive& Ar, FAssignPackageData& Value) { Ar << Value.PackageName; Ar << Value.NormalizedFileName; return Ar; } FArchive& operator<<(FArchive& Ar, FDiscoveredPackageData& Value) { Ar << Value.PackageName; Ar << Value.NormalizedFileName; return Ar; } FArchive& operator<<(FArchive& Ar, FPackageCookResultData& Value) { Ar << Value.PackageName; Ar << Value.PackageResult; Ar << Value.AssetPackages; Ar << Value.CookResult; return Ar; } FPacketHeader::FPacketHeader(uint32 InSize) : Magic(MessageHeaderExpectedMagic), Size(InSize) { } void FPacketHeader::Serialize(TArray& SerializedData, int32 StartIndex) { if (StartIndex + HEADER_SIZE > SerializedData.Num()) { SerializedData.AddUninitialized(StartIndex + HEADER_SIZE - SerializedData.Num()); } FMemory::Memcpy(SerializedData.GetData() + StartIndex, this, HEADER_SIZE); } bool FPacketHeader::Unserialize(const TArray& SerializedData, int32 StartIndex) { if (StartIndex + HEADER_SIZE > SerializedData.Num()) { return false; } FMemory::Memcpy(this, SerializedData.GetData() + StartIndex, HEADER_SIZE); if (Magic != MessageHeaderExpectedMagic) { return false; } return true; } bool FPacketHeader::Check(uint32 MaxPacketSize) { return Magic == MessageHeaderExpectedMagic && (MaxPacketSize <= 0 || Size < MaxPacketSize); } FCookSocket::~FCookSocket() { CloseSocket(); } FSocket* FCookSocket::DetachSocket() { FSocket* Result = Socket; Socket = nullptr; return Result; } void FCookSocket::InitSocket(FSocket* InSocket) { CloseSocket(); Socket = InSocket; BlockSocket(true); } void FCookSocket::CloseSocket() { if (Socket) { Socket->Close(); ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->DestroySocket(Socket); Socket = nullptr; } } void FCookSocket::BlockSocket(bool IsBlocking) { if (Socket) { Socket->SetNonBlocking(!IsBlocking); } } bool FCookSocket::WriteBuffer(TArray& Buffer, int32 StartIndex) { int32 CurrentBytesSent = 0, BytesLength = Buffer.Num() - StartIndex; while (CurrentBytesSent < BytesLength) { int32 BytesSent = 0; if (Socket->Send(Buffer.GetData() + CurrentBytesSent, BytesLength - CurrentBytesSent, BytesSent)) { CurrentBytesSent += BytesSent; } else { ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM); ESocketErrors LastError = SocketSubsystem->GetLastErrorCode(); UE_LOG(LogCook, Error, TEXT("Socket send failed with error: %d"), LastError); return false; } } return true; } bool FCookSocket::WritePacket(ICookMessage& Message) { SendBuffer.Empty(); SendBuffer.AddUninitialized(HEADER_SIZE); Message.Serialize(SendBuffer, HEADER_SIZE); FPacketHeader MessageHeader(SendBuffer.Num() - HEADER_SIZE); MessageHeader.Serialize(SendBuffer); WriteBuffer(SendBuffer); return true; } bool FCookSocket::ReadPacket(ICookMessage& Message) { PendingBuffer.Empty(); PendingBuffer.SetNumUninitialized(HEADER_SIZE); int32 CurrentBytesRead = 0; while (CurrentBytesRead < HEADER_SIZE) { int32 BytesRead = 0; if (!Socket->Recv(PendingBuffer.GetData() + CurrentBytesRead, HEADER_SIZE - CurrentBytesRead, BytesRead)) { return false; } CurrentBytesRead += BytesRead; } FPacketHeader MessageHeader; MessageHeader.Unserialize(PendingBuffer); if (!MessageHeader.Unserialize(PendingBuffer)) { UE_LOG(LogCook, Error, TEXT("Received packet with invalid or zero payload size: %d"), MessageHeader.Size); return false; } int32 ExpectedPayloadSize = MessageHeader.Size; PendingBuffer.Empty(); PendingBuffer.SetNumUninitialized(ExpectedPayloadSize); CurrentBytesRead = 0; while (CurrentBytesRead < ExpectedPayloadSize) { int32 BytesRead = 0; if (!Socket->Recv(PendingBuffer.GetData() + CurrentBytesRead, ExpectedPayloadSize - CurrentBytesRead, BytesRead)) { UE_LOG(LogCook, Error, TEXT("Failed to read full payload (Expected: %d, Read: %d)"), ExpectedPayloadSize, CurrentBytesRead); return false; } CurrentBytesRead += BytesRead; } return Message.Unserialize(PendingBuffer); } EConnectionStatus FCookSocket::TryReadPacket(TArray& Messages, uint32 MaxPacketSize) { int32 BytesRead; if (!ReceiveBuffer.bParsedHeader) { FPacketHeader MessageHeader; bool bStillAlive = Socket->Recv(reinterpret_cast(&MessageHeader), HEADER_SIZE, BytesRead, ESocketReceiveFlags::Peek); if (BytesRead < HEADER_SIZE) { if (!bStillAlive) { return EConnectionStatus::Terminated; } if (BytesRead == 0) { ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM); ESocketErrors LastError = SocketSubsystem->GetLastErrorCode(); if (LastError != SE_EWOULDBLOCK) { UE_LOG(LogCook, Warning, TEXT("Socket Connection Closed by Peer (Header Peek 0 bytes, Error: %s)"), SocketSubsystem->GetSocketError(LastError)); return EConnectionStatus::Terminated; } } return EConnectionStatus::Okay; } Socket->Recv(reinterpret_cast(&MessageHeader), HEADER_SIZE, BytesRead, ESocketReceiveFlags::None); if (!MessageHeader.Check(MaxPacketSize)) { return EConnectionStatus::FormatError; } ReceiveBuffer.Payload = FCookUniqueBuffer::Alloc(MessageHeader.Size); ReceiveBuffer.bParsedHeader = true; ReceiveBuffer.BytesRead = 0; } while (ReceiveBuffer.Payload.GetSize() > ReceiveBuffer.BytesRead) { // When reading the possibly large payload size that will block the remote socket // from continuing until we have read some of it, we read as much of the payload // as is available and store it in our dynamic and larger buffer in Buffer.Payload. uint32 RemainingSize = ReceiveBuffer.Payload.GetSize() - ReceiveBuffer.BytesRead; // Avoid possible OS restrictions on maximum read size by imposing our own moderate // size per call to Recv. constexpr uint32 MaxReadSize = 1000 * 1000 * 64; int32 ReadSize = static_cast(FMath::Min(RemainingSize, MaxReadSize)); uint8* ReadData = static_cast(ReceiveBuffer.Payload.GetData()) + ReceiveBuffer.BytesRead; bool bConnectionAlive = Socket->Recv(ReadData, ReadSize, BytesRead); if (BytesRead <= 0) { if (!bConnectionAlive) { return EConnectionStatus::Terminated; } return EConnectionStatus::Okay; } check(BytesRead <= ReadSize); ReceiveBuffer.BytesRead += BytesRead; } ReceiveBuffer.bParsedHeader = false; FMarshalledMessage& Message = Messages.Emplace_GetRef(); if (!Message.MakeFrom(ReceiveBuffer.Payload)) { return EConnectionStatus::FormatError; } return EConnectionStatus::Okay; } bool FCookSocket::WriteMarshalledPacket(ICookMessage& Message) { SendBuffer.Empty(); SendBuffer.AddUninitialized(HEADER_SIZE); int32 BufferIndex = FMarshalledMessage::SerializeMessageType(SendBuffer, Message.GetMessageType(), HEADER_SIZE); Message.Serialize(SendBuffer, BufferIndex); FPacketHeader MessageHeader(SendBuffer.Num() - HEADER_SIZE); MessageHeader.Serialize(SendBuffer); return WriteBuffer(SendBuffer); } bool FCookSocket::ReadMarshalledPacket(FMarshalledMessage& Messages, ICookMessage& Message) { if (Messages.MessageType != Message.GetMessageType()) { return false; } return Message.Unserialize(Messages.MessageBuffer, Messages.BufferIndex); } int32 FMarshalledMessage::SerializeMessageType(TArray& SerializedData, FGuid InMessageType, int32 StartIndex) { FMemoryWriter Writer(SerializedData); Writer.Seek(StartIndex); Writer << InMessageType; return StartIndex + sizeof(FGuid); } void FMarshalledMessage::Serialize(TArray& SerializedData, int32 StartIndex) { FMemoryWriter Writer(SerializedData); Writer.Seek(StartIndex); Writer << MessageType; Writer << MessageBuffer; Writer.FlushCache(); } bool FMarshalledMessage::Unserialize(const TArray& SerializedData, int32 StartIndex) { FMemoryReader Reader(SerializedData, true /*bIs]=true*/); Reader.Seek(StartIndex); Reader << MessageType; Reader << MessageBuffer; if (Reader.IsError() || !Reader.AtEnd()) { UE_LOG(LogCook, Error, TEXT("FMarshalledMessage failed. Payload size: %d, Read position: %lld"), SerializedData.Num(), Reader.Tell()); return false; } return true; } FGuid FMarshalledMessage::GetMessageType() const { return MarshalledMessageType; } void FMarshalledMessage::Clear() { MessageType = MarshalledMessageType; } bool FMarshalledMessage::MakeFrom(FCookUniqueBuffer& Payload) { if (!Payload.MoveToShared(MessageBuffer) || MessageBuffer.Num() < sizeof(FGuid)) { return false; } FMemoryReader Reader(MessageBuffer, true); Reader << MessageType; if (Reader.IsError()) { UE_LOG(LogCook, Error, TEXT("FMarshalledMessage failed. Payload size: %d, Read position: %lld"), MessageBuffer.Num(), Reader.Tell()); return false; } BufferIndex = sizeof(FGuid); return true; } void FWorkerConnectMessage::Serialize(TArray& SerializedData, int32 StartIndex) { FMemoryWriter Writer(SerializedData); Writer.Seek(StartIndex); Writer << RemoteIndex; Writer.FlushCache(); } bool FWorkerConnectMessage::Unserialize(const TArray& SerializedData, int32 StartIndex) { FMemoryReader Reader(SerializedData, true /*bIs]=true*/); Reader.Seek(StartIndex); Reader << RemoteIndex; if (Reader.IsError() || !Reader.AtEnd()) { UE_LOG(LogCook, Error, TEXT("UnmarshalConnectMessage failed. Payload size: %d, Read position: %lld"), SerializedData.Num(), Reader.Tell()); return false; } return true; } void FInitialConfigMessage::Serialize(TArray& SerializedData, int32 StartIndex) { FMemoryWriter Writer(SerializedData); Writer.Seek(StartIndex); int32 LocalCookMode = static_cast(DirectorCookMode); Writer << LocalCookMode; Writer << OrderedSessionPlatforms; Writer << CookByTheBookOptions; Writer << CookByTheBookStartupOptions; Writer.FlushCache(); } bool FInitialConfigMessage::Unserialize(const TArray& SerializedData, int32 StartIndex) { FMemoryReader Reader(SerializedData, true /*bIs]=true*/); Reader.Seek(StartIndex); int32 LocalCookMode; Reader << LocalCookMode; Reader << OrderedSessionPlatforms; Reader << CookByTheBookOptions; Reader << CookByTheBookStartupOptions; DirectorCookMode = static_cast(LocalCookMode); if (Reader.IsError() || !Reader.AtEnd()) { UE_LOG(LogCook, Error, TEXT("FInitialConfigMessage failed. Payload size: %d, Read position: %lld"), SerializedData.Num(), Reader.Tell()); return false; } return true; } void FInitialConfigMessage::ReadFromLocal(const UCookOnTheFlyServer& COTFS, const TConstArrayView& InOrderedSessionPlatforms) { DirectorCookMode = COTFS.GetCookMode(); COTFS.WriteToCookByTheBookOptions(CookByTheBookOptions); COTFS.WriteToCookByTheBookStartupOptions(CookByTheBookStartupOptions); OrderedSessionPlatforms.Reset(InOrderedSessionPlatforms.Num()); for (auto& Platform: InOrderedSessionPlatforms) { OrderedSessionPlatforms.Add(Platform->PlatformName()); } } void FInitialConfigMessage::WriteToTargetPlatform(TArray& InOrderedSessionPlatforms) { InOrderedSessionPlatforms.Reset(OrderedSessionPlatforms.Num()); ITargetPlatformManagerModule& TPM(GetTargetPlatformManagerRef()); for (auto& PlatformName: OrderedSessionPlatforms) { ITargetPlatform* TargetPlatform = TPM.FindTargetPlatform(PlatformName); if (TargetPlatform) { InOrderedSessionPlatforms.Add(TargetPlatform); } else { UE_LOG(LogCook, Error, TEXT("FInitialConfigMessage::WriteToTargetPlatform Could not find TargetPlatform \"%s\"."), *PlatformName); } } } FAssignPackagesMessage::FAssignPackagesMessage(TArray&& InPackageDatas) : Packages(InPackageDatas) { } void FAssignPackagesMessage::Serialize(TArray& SerializedData, int32 StartIndex) { FMemoryWriter Writer(SerializedData); Writer.Seek(StartIndex); Writer << FenceVersion; Writer << Packages; Writer.FlushCache(); } bool FAssignPackagesMessage::Unserialize(const TArray& SerializedData, int32 StartIndex) { FMemoryReader Reader(SerializedData, true /*bIs]=true*/); Reader.Seek(StartIndex); Reader << FenceVersion; Reader << Packages; return true; } void FAssignPackagesMessage::MessageSliceTo(int32 BatchCount, FAssignPackagesMessage& PendingMessage) { PendingMessage.Packages.Reset(); PendingMessage.FenceVersion = FenceVersion; if (Packages.Num() <= BatchCount) { PendingMessage.Packages = MoveTemp(Packages); } else { const int32 StartIndex = Packages.Num() - BatchCount; PendingMessage.Packages.Append(Packages.GetData() + StartIndex, BatchCount); Packages.SetNum(StartIndex, false); } } void FDiscoveredPackagesMessage::Serialize(TArray& SerializedData, int32 StartIndex) { FMemoryWriter Writer(SerializedData); Writer.Seek(StartIndex); Writer << Packages; Writer.FlushCache(); } bool FDiscoveredPackagesMessage::Unserialize(const TArray& SerializedData, int32 StartIndex) { FMemoryReader Reader(SerializedData, true /*bIs]=true*/); Reader.Seek(StartIndex); Reader << Packages; return true; } void FDiscoveredPackagesMessage::MessageSliceTo(int32 BatchCount, FDiscoveredPackagesMessage& PendingMessage) { PendingMessage.Packages.Reset(); if (Packages.Num() <= BatchCount) { PendingMessage.Packages = MoveTemp(Packages); } else { const int32 StartIndex = Packages.Num() - BatchCount; PendingMessage.Packages.Append(Packages.GetData() + StartIndex, BatchCount); Packages.SetNum(StartIndex, false); } } void FPackageResultsMessage::MessageSliceTo(int32 BatchCount, FPackageResultsMessage& PendingMessage) { PendingMessage.Results.Reset(); if (Results.Num() <= BatchCount) { PendingMessage.Results = MoveTemp(Results); } else { const int32 StartIndex = Results.Num() - BatchCount; PendingMessage.Results.Append(Results.GetData() + StartIndex, BatchCount); Results.SetNum(StartIndex, false); } } void FPackageResultsMessage::Serialize(TArray& SerializedData, int32 StartIndex) { FMemoryWriter Writer(SerializedData); Writer.Seek(StartIndex); Writer << Results; Writer.FlushCache(); } bool FPackageResultsMessage::Unserialize(const TArray& SerializedData, int32 StartIndex) { FMemoryReader Reader(SerializedData, true /*bIs]=true*/); Reader.Seek(StartIndex); Reader << Results; return true; } void FCookSideEffectMessage::Serialize(TArray& SerializedData, int32 StartIndex) { FMemoryWriter Writer(SerializedData); Writer.Seek(StartIndex); Writer << AssetRegistryDataMap; Writer << AssetPackageDataMap; Writer << CookSideEffects; Writer.FlushCache(); } bool FCookSideEffectMessage::Unserialize(const TArray& SerializedData, int32 StartIndex) { FMemoryReader Reader(SerializedData, true /*bIs]=true*/); Reader.Seek(StartIndex); Reader << AssetRegistryDataMap; Reader << AssetPackageDataMap; Reader << CookSideEffects; return true; } void FSyncFenceMessage::Serialize(TArray& SerializedData, int32 StartIndex) { FMemoryWriter Writer(SerializedData); Writer.Seek(StartIndex); Writer << FenceVersion; Writer.FlushCache(); } bool FSyncFenceMessage::Unserialize(const TArray& SerializedData, int32 StartIndex) { FMemoryReader Reader(SerializedData, true /*bIs]=true*/); Reader.Seek(StartIndex); Reader << FenceVersion; return true; } bool FCookSocket::WriteMarshalledPacket(FShutdownAcknowledgeMessage&& Message) { FShutdownAcknowledgeMessage WriteMessage = Message; return WriteMarshalledPacket(WriteMessage); } bool FCookSocket::WriteMarshalledPacket(FShutdownRequestMessage&& Message) { FShutdownRequestMessage WriteMessage = Message; return WriteMarshalledPacket(WriteMessage); } void FShutdownRequestMessage::Serialize(TArray& SerializedData, int32 StartIndex) { } bool FShutdownRequestMessage::Unserialize(const TArray& SerializedData, int32 StartIndex) { return true; } void FShutdownAcknowledgeMessage::Serialize(TArray& SerializedData, int32 StartIndex) { } bool FShutdownAcknowledgeMessage::Unserialize(const TArray& SerializedData, int32 StartIndex) { return true; } } // namespace Cook } // namespace UE