EM_Task/UnrealEd/Private/Cooker/MPCookMessage.cpp
Boshuang Zhao 5144a49c9b add
2026-02-13 16:18:33 +08:00

634 lines
20 KiB
C++

#include "Cooker/MPCookMessage.h"
#include "Cooker/MPCookSideEffect.h"
#include "Cooker/CookWorkerServer.h"
#include "Cooker/CookPackageData.h"
#include "Cooker/CookPlatformManager.h"
#include "CookOnTheSide/CookLog.h"
#include "Sockets.h"
#include "SocketTypes.h"
#include "SocketSubsystem.h"
#define HEADER_SIZE sizeof(FPacketHeader)
FArchive& operator<<(FArchive& Ar, FAssetPackageData& Value)
{
Ar << Value.DiskSize;
Ar << Value.PackageGuid;
Ar << Value.CookedHash;
return Ar;
}
namespace UE
{
namespace Cook
{
int32 FCookSocket::MaxSendPackageNum = 500;
FGuid MarshalledMessageType(TEXT("5E56C5D96F3B455E9452C15ADA601A71"));
FGuid FInitialConfigMessage::MessageType(TEXT("340CDCB927304CEB9C0A66B5F707FC2B"));
FGuid FWorkerConnectMessage::MessageType(TEXT("302096E887DA48F7B079FAFAD0EE5695"));
FGuid FAssignPackagesMessage::MessageType(TEXT("B7B1542B73254B679319D73F753DB6F8"));
FGuid FDiscoveredPackagesMessage::MessageType(TEXT("C9F5BC5C11484B06B346B411F1ED3090"));
FGuid FPackageResultsMessage::MessageType(TEXT("4631C6C0F6DC4CEFB2B09D3FB0B524DB"));
FGuid FCookSideEffectMessage::MessageType(TEXT("58A03A38FEE045B08331BB8457AEBE35"));
FGuid FSyncFenceMessage::MessageType(TEXT("CBFB840A4FB94903A757C490514A4B86"));
FGuid FShutdownRequestMessage::MessageType(TEXT("DA952DA8D58245429E56424194A5462D"));
FGuid FShutdownAcknowledgeMessage::MessageType(TEXT("A563814699634422A93437596B07B372"));
FArchive& operator<<(FArchive& Ar, FCookByTheBookStartupOptions& Value)
{
Ar << Value.CookProcessCount;
Ar << Value.CookInitializationFlags;
Ar << Value.OutputDirectoryOverride;
Ar << Value.BasedOnReleaseVersion;
Ar << Value.NeverCookDirectories;
Ar << Value.CookCultures;
return Ar;
}
FArchive& operator<<(FArchive& Ar, FCookByTheBookOptions& Value)
{
// --- 1. 基础布尔类型 (Primitive Booleans) ---
Ar << Value.bGenerateStreamingInstallManifests;
Ar << Value.bGenerateDependenciesForMaps;
Ar << Value.bRunning;
Ar << Value.bCancel;
Ar << Value.bErrorOnEngineContentUse;
Ar << Value.bSkipHardReferences;
Ar << Value.bSkipSoftReferences;
Ar << Value.bFullLoadAndSave;
Ar << Value.bPackageStore;
Ar << Value.bCookAgainstFixedBase;
Ar << Value.bDlcLoadMainAssetRegistry;
// --- 2. FString (String Objects) ---
Ar << Value.DlcName;
Ar << Value.CreateReleaseVersion;
// --- 3. Double 类型 (Timing Information) ---
Ar << Value.CookTime;
Ar << Value.CookStartTime;
return Ar;
}
FArchive& operator<<(FArchive& Ar, FAssignPackageData& Value)
{
Ar << Value.PackageName;
Ar << Value.NormalizedFileName;
return Ar;
}
FArchive& operator<<(FArchive& Ar, FDiscoveredPackageData& Value)
{
Ar << Value.PackageName;
Ar << Value.NormalizedFileName;
return Ar;
}
FArchive& operator<<(FArchive& Ar, FPackageCookResultData& Value)
{
Ar << Value.PackageName;
Ar << Value.PackageResult;
Ar << Value.AssetPackages;
Ar << Value.CookResult;
return Ar;
}
FPacketHeader::FPacketHeader(uint32 InSize)
: Magic(MessageHeaderExpectedMagic), Size(InSize)
{
}
void FPacketHeader::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
if (StartIndex + HEADER_SIZE > SerializedData.Num())
{
SerializedData.AddUninitialized(StartIndex + HEADER_SIZE - SerializedData.Num());
}
FMemory::Memcpy(SerializedData.GetData() + StartIndex, this, HEADER_SIZE);
}
bool FPacketHeader::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
if (StartIndex + HEADER_SIZE > SerializedData.Num())
{
return false;
}
FMemory::Memcpy(this, SerializedData.GetData() + StartIndex, HEADER_SIZE);
if (Magic != MessageHeaderExpectedMagic)
{
return false;
}
return true;
}
bool FPacketHeader::Check(uint32 MaxPacketSize)
{
return Magic == MessageHeaderExpectedMagic && (MaxPacketSize <= 0 || Size < MaxPacketSize);
}
FCookSocket::~FCookSocket()
{
CloseSocket();
}
FSocket* FCookSocket::DetachSocket()
{
FSocket* Result = Socket;
Socket = nullptr;
return Result;
}
void FCookSocket::InitSocket(FSocket* InSocket)
{
CloseSocket();
Socket = InSocket;
BlockSocket(true);
}
void FCookSocket::CloseSocket()
{
if (Socket)
{
Socket->Close();
ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM)->DestroySocket(Socket);
Socket = nullptr;
}
}
void FCookSocket::BlockSocket(bool IsBlocking)
{
if (Socket)
{
Socket->SetNonBlocking(!IsBlocking);
}
}
bool FCookSocket::WriteBuffer(TArray<uint8>& Buffer, int32 StartIndex)
{
int32 CurrentBytesSent = 0, BytesLength = Buffer.Num() - StartIndex;
while (CurrentBytesSent < BytesLength)
{
int32 BytesSent = 0;
if (Socket->Send(Buffer.GetData() + CurrentBytesSent, BytesLength - CurrentBytesSent, BytesSent))
{
CurrentBytesSent += BytesSent;
}
else
{
ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM);
ESocketErrors LastError = SocketSubsystem->GetLastErrorCode();
UE_LOG(LogCook, Error, TEXT("Socket send failed with error: %d"), LastError);
return false;
}
}
return true;
}
bool FCookSocket::WritePacket(ICookMessage& Message)
{
SendBuffer.Empty();
SendBuffer.AddUninitialized(HEADER_SIZE);
Message.Serialize(SendBuffer, HEADER_SIZE);
FPacketHeader MessageHeader(SendBuffer.Num() - HEADER_SIZE);
MessageHeader.Serialize(SendBuffer);
WriteBuffer(SendBuffer);
return true;
}
bool FCookSocket::ReadPacket(ICookMessage& Message)
{
PendingBuffer.Empty();
PendingBuffer.SetNumUninitialized(HEADER_SIZE);
int32 CurrentBytesRead = 0;
while (CurrentBytesRead < HEADER_SIZE)
{
int32 BytesRead = 0;
if (!Socket->Recv(PendingBuffer.GetData() + CurrentBytesRead, HEADER_SIZE - CurrentBytesRead, BytesRead))
{
return false;
}
CurrentBytesRead += BytesRead;
}
FPacketHeader MessageHeader;
MessageHeader.Unserialize(PendingBuffer);
if (!MessageHeader.Unserialize(PendingBuffer))
{
UE_LOG(LogCook, Error, TEXT("Received packet with invalid or zero payload size: %d"), MessageHeader.Size);
return false;
}
int32 ExpectedPayloadSize = MessageHeader.Size;
PendingBuffer.Empty();
PendingBuffer.SetNumUninitialized(ExpectedPayloadSize);
CurrentBytesRead = 0;
while (CurrentBytesRead < ExpectedPayloadSize)
{
int32 BytesRead = 0;
if (!Socket->Recv(PendingBuffer.GetData() + CurrentBytesRead, ExpectedPayloadSize - CurrentBytesRead, BytesRead))
{
UE_LOG(LogCook, Error, TEXT("Failed to read full payload (Expected: %d, Read: %d)"), ExpectedPayloadSize, CurrentBytesRead);
return false;
}
CurrentBytesRead += BytesRead;
}
return Message.Unserialize(PendingBuffer);
}
EConnectionStatus FCookSocket::TryReadPacket(TArray<FMarshalledMessage>& Messages, uint32 MaxPacketSize)
{
int32 BytesRead;
if (!ReceiveBuffer.bParsedHeader)
{
FPacketHeader MessageHeader;
bool bStillAlive = Socket->Recv(reinterpret_cast<uint8*>(&MessageHeader), HEADER_SIZE, BytesRead, ESocketReceiveFlags::Peek);
if (BytesRead < HEADER_SIZE)
{
if (!bStillAlive)
{
return EConnectionStatus::Terminated;
}
if (BytesRead == 0)
{
ISocketSubsystem* SocketSubsystem = ISocketSubsystem::Get(PLATFORM_SOCKETSUBSYSTEM);
ESocketErrors LastError = SocketSubsystem->GetLastErrorCode();
if (LastError != SE_EWOULDBLOCK)
{
UE_LOG(LogCook, Warning, TEXT("Socket Connection Closed by Peer (Header Peek 0 bytes, Error: %s)"), SocketSubsystem->GetSocketError(LastError));
return EConnectionStatus::Terminated;
}
}
return EConnectionStatus::Okay;
}
Socket->Recv(reinterpret_cast<uint8*>(&MessageHeader), HEADER_SIZE, BytesRead, ESocketReceiveFlags::None);
if (!MessageHeader.Check(MaxPacketSize))
{
return EConnectionStatus::FormatError;
}
ReceiveBuffer.Payload = FCookUniqueBuffer::Alloc(MessageHeader.Size);
ReceiveBuffer.bParsedHeader = true;
ReceiveBuffer.BytesRead = 0;
}
while (ReceiveBuffer.Payload.GetSize() > ReceiveBuffer.BytesRead)
{
// When reading the possibly large payload size that will block the remote socket
// from continuing until we have read some of it, we read as much of the payload
// as is available and store it in our dynamic and larger buffer in Buffer.Payload.
uint32 RemainingSize = ReceiveBuffer.Payload.GetSize() - ReceiveBuffer.BytesRead;
// Avoid possible OS restrictions on maximum read size by imposing our own moderate
// size per call to Recv.
constexpr uint32 MaxReadSize = 1000 * 1000 * 64;
int32 ReadSize = static_cast<int32>(FMath::Min(RemainingSize, MaxReadSize));
uint8* ReadData = static_cast<uint8*>(ReceiveBuffer.Payload.GetData()) + ReceiveBuffer.BytesRead;
bool bConnectionAlive = Socket->Recv(ReadData, ReadSize, BytesRead);
if (BytesRead <= 0)
{
if (!bConnectionAlive)
{
return EConnectionStatus::Terminated;
}
return EConnectionStatus::Okay;
}
check(BytesRead <= ReadSize);
ReceiveBuffer.BytesRead += BytesRead;
}
ReceiveBuffer.bParsedHeader = false;
FMarshalledMessage& Message = Messages.Emplace_GetRef();
if (!Message.MakeFrom(ReceiveBuffer.Payload))
{
return EConnectionStatus::FormatError;
}
return EConnectionStatus::Okay;
}
bool FCookSocket::WriteMarshalledPacket(ICookMessage& Message)
{
SendBuffer.Empty();
SendBuffer.AddUninitialized(HEADER_SIZE);
int32 BufferIndex = FMarshalledMessage::SerializeMessageType(SendBuffer, Message.GetMessageType(), HEADER_SIZE);
Message.Serialize(SendBuffer, BufferIndex);
FPacketHeader MessageHeader(SendBuffer.Num() - HEADER_SIZE);
MessageHeader.Serialize(SendBuffer);
return WriteBuffer(SendBuffer);
}
bool FCookSocket::ReadMarshalledPacket(FMarshalledMessage& Messages, ICookMessage& Message)
{
if (Messages.MessageType != Message.GetMessageType())
{
return false;
}
return Message.Unserialize(Messages.MessageBuffer, Messages.BufferIndex);
}
int32 FMarshalledMessage::SerializeMessageType(TArray<uint8>& SerializedData, FGuid InMessageType, int32 StartIndex)
{
FMemoryWriter Writer(SerializedData);
Writer.Seek(StartIndex);
Writer << InMessageType;
return StartIndex + sizeof(FGuid);
}
void FMarshalledMessage::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryWriter Writer(SerializedData);
Writer.Seek(StartIndex);
Writer << MessageType;
Writer << MessageBuffer;
Writer.FlushCache();
}
bool FMarshalledMessage::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryReader Reader(SerializedData, true /*bIs]=true*/);
Reader.Seek(StartIndex);
Reader << MessageType;
Reader << MessageBuffer;
if (Reader.IsError() || !Reader.AtEnd())
{
UE_LOG(LogCook, Error, TEXT("FMarshalledMessage failed. Payload size: %d, Read position: %lld"), SerializedData.Num(), Reader.Tell());
return false;
}
return true;
}
FGuid FMarshalledMessage::GetMessageType() const
{
return MarshalledMessageType;
}
void FMarshalledMessage::Clear()
{
MessageType = MarshalledMessageType;
}
bool FMarshalledMessage::MakeFrom(FCookUniqueBuffer& Payload)
{
if (!Payload.MoveToShared(MessageBuffer) || MessageBuffer.Num() < sizeof(FGuid))
{
return false;
}
FMemoryReader Reader(MessageBuffer, true);
Reader << MessageType;
if (Reader.IsError())
{
UE_LOG(LogCook, Error, TEXT("FMarshalledMessage failed. Payload size: %d, Read position: %lld"), MessageBuffer.Num(), Reader.Tell());
return false;
}
BufferIndex = sizeof(FGuid);
return true;
}
void FWorkerConnectMessage::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryWriter Writer(SerializedData);
Writer.Seek(StartIndex);
Writer << RemoteIndex;
Writer.FlushCache();
}
bool FWorkerConnectMessage::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryReader Reader(SerializedData, true /*bIs]=true*/);
Reader.Seek(StartIndex);
Reader << RemoteIndex;
if (Reader.IsError() || !Reader.AtEnd())
{
UE_LOG(LogCook, Error, TEXT("UnmarshalConnectMessage failed. Payload size: %d, Read position: %lld"), SerializedData.Num(), Reader.Tell());
return false;
}
return true;
}
void FInitialConfigMessage::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryWriter Writer(SerializedData);
Writer.Seek(StartIndex);
int32 LocalCookMode = static_cast<int32>(DirectorCookMode);
Writer << LocalCookMode;
Writer << OrderedSessionPlatforms;
Writer << CookByTheBookOptions;
Writer << CookByTheBookStartupOptions;
Writer.FlushCache();
}
bool FInitialConfigMessage::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryReader Reader(SerializedData, true /*bIs]=true*/);
Reader.Seek(StartIndex);
int32 LocalCookMode;
Reader << LocalCookMode;
Reader << OrderedSessionPlatforms;
Reader << CookByTheBookOptions;
Reader << CookByTheBookStartupOptions;
DirectorCookMode = static_cast<ECookMode::Type>(LocalCookMode);
if (Reader.IsError() || !Reader.AtEnd())
{
UE_LOG(LogCook, Error, TEXT("FInitialConfigMessage failed. Payload size: %d, Read position: %lld"), SerializedData.Num(), Reader.Tell());
return false;
}
return true;
}
void FInitialConfigMessage::ReadFromLocal(const UCookOnTheFlyServer& COTFS, const TConstArrayView<const ITargetPlatform*>& InOrderedSessionPlatforms)
{
DirectorCookMode = COTFS.GetCookMode();
COTFS.WriteToCookByTheBookOptions(CookByTheBookOptions);
COTFS.WriteToCookByTheBookStartupOptions(CookByTheBookStartupOptions);
OrderedSessionPlatforms.Reset(InOrderedSessionPlatforms.Num());
for (auto& Platform: InOrderedSessionPlatforms)
{
OrderedSessionPlatforms.Add(Platform->PlatformName());
}
}
void FInitialConfigMessage::WriteToTargetPlatform(TArray<ITargetPlatform*>& InOrderedSessionPlatforms)
{
InOrderedSessionPlatforms.Reset(OrderedSessionPlatforms.Num());
ITargetPlatformManagerModule& TPM(GetTargetPlatformManagerRef());
for (auto& PlatformName: OrderedSessionPlatforms)
{
ITargetPlatform* TargetPlatform = TPM.FindTargetPlatform(PlatformName);
if (TargetPlatform)
{
InOrderedSessionPlatforms.Add(TargetPlatform);
}
else
{
UE_LOG(LogCook, Error, TEXT("FInitialConfigMessage::WriteToTargetPlatform Could not find TargetPlatform \"%s\"."), *PlatformName);
}
}
}
FAssignPackagesMessage::FAssignPackagesMessage(TArray<FAssignPackageData>&& InPackageDatas)
: Packages(InPackageDatas)
{
}
void FAssignPackagesMessage::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryWriter Writer(SerializedData);
Writer.Seek(StartIndex);
Writer << FenceVersion;
Writer << Packages;
Writer.FlushCache();
}
bool FAssignPackagesMessage::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryReader Reader(SerializedData, true /*bIs]=true*/);
Reader.Seek(StartIndex);
Reader << FenceVersion;
Reader << Packages;
return true;
}
void FAssignPackagesMessage::MessageSliceTo(int32 BatchCount, FAssignPackagesMessage& PendingMessage)
{
PendingMessage.Packages.Reset();
PendingMessage.FenceVersion = FenceVersion;
if (Packages.Num() <= BatchCount)
{
PendingMessage.Packages = MoveTemp(Packages);
}
else
{
const int32 StartIndex = Packages.Num() - BatchCount;
PendingMessage.Packages.Append(Packages.GetData() + StartIndex, BatchCount);
Packages.SetNum(StartIndex, false);
}
}
void FDiscoveredPackagesMessage::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryWriter Writer(SerializedData);
Writer.Seek(StartIndex);
Writer << Packages;
Writer.FlushCache();
}
bool FDiscoveredPackagesMessage::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryReader Reader(SerializedData, true /*bIs]=true*/);
Reader.Seek(StartIndex);
Reader << Packages;
return true;
}
void FDiscoveredPackagesMessage::MessageSliceTo(int32 BatchCount, FDiscoveredPackagesMessage& PendingMessage)
{
PendingMessage.Packages.Reset();
if (Packages.Num() <= BatchCount)
{
PendingMessage.Packages = MoveTemp(Packages);
}
else
{
const int32 StartIndex = Packages.Num() - BatchCount;
PendingMessage.Packages.Append(Packages.GetData() + StartIndex, BatchCount);
Packages.SetNum(StartIndex, false);
}
}
void FPackageResultsMessage::MessageSliceTo(int32 BatchCount, FPackageResultsMessage& PendingMessage)
{
PendingMessage.Results.Reset();
if (Results.Num() <= BatchCount)
{
PendingMessage.Results = MoveTemp(Results);
}
else
{
const int32 StartIndex = Results.Num() - BatchCount;
PendingMessage.Results.Append(Results.GetData() + StartIndex, BatchCount);
Results.SetNum(StartIndex, false);
}
}
void FPackageResultsMessage::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryWriter Writer(SerializedData);
Writer.Seek(StartIndex);
Writer << Results;
Writer.FlushCache();
}
bool FPackageResultsMessage::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryReader Reader(SerializedData, true /*bIs]=true*/);
Reader.Seek(StartIndex);
Reader << Results;
return true;
}
void FCookSideEffectMessage::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryWriter Writer(SerializedData);
Writer.Seek(StartIndex);
Writer << AssetRegistryDataMap;
Writer << AssetPackageDataMap;
Writer << CookSideEffects;
Writer.FlushCache();
}
bool FCookSideEffectMessage::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryReader Reader(SerializedData, true /*bIs]=true*/);
Reader.Seek(StartIndex);
Reader << AssetRegistryDataMap;
Reader << AssetPackageDataMap;
Reader << CookSideEffects;
return true;
}
void FSyncFenceMessage::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryWriter Writer(SerializedData);
Writer.Seek(StartIndex);
Writer << FenceVersion;
Writer.FlushCache();
}
bool FSyncFenceMessage::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
FMemoryReader Reader(SerializedData, true /*bIs]=true*/);
Reader.Seek(StartIndex);
Reader << FenceVersion;
return true;
}
bool FCookSocket::WriteMarshalledPacket(FShutdownAcknowledgeMessage&& Message)
{
FShutdownAcknowledgeMessage WriteMessage = Message;
return WriteMarshalledPacket(WriteMessage);
}
bool FCookSocket::WriteMarshalledPacket(FShutdownRequestMessage&& Message)
{
FShutdownRequestMessage WriteMessage = Message;
return WriteMarshalledPacket(WriteMessage);
}
void FShutdownRequestMessage::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
}
bool FShutdownRequestMessage::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
return true;
}
void FShutdownAcknowledgeMessage::Serialize(TArray<uint8>& SerializedData, int32 StartIndex)
{
}
bool FShutdownAcknowledgeMessage::Unserialize(const TArray<uint8>& SerializedData, int32 StartIndex)
{
return true;
}
} // namespace Cook
} // namespace UE