zworld/engine/renderapi/command/worker.go

159 lines
3.6 KiB
Go
Raw Normal View History

2024-01-14 22:56:06 +08:00
package command
import (
"fmt"
2024-01-20 18:44:07 +08:00
"log"
2024-01-14 22:56:06 +08:00
"runtime/debug"
"zworld/engine/renderapi/device"
"zworld/engine/renderapi/sync"
"zworld/engine/util"
"github.com/vkngwrapper/core/v2/core1_0"
"github.com/vkngwrapper/core/v2/driver"
)
type CommandFn func(Buffer)
// Workers manage a command pool thread
type Worker interface {
Ptr() core1_0.Queue
Queue(CommandFn)
Submit(SubmitInfo)
Destroy()
Flush()
Invoke(func())
}
type Workers []Worker
type worker struct {
device device.T
name string
queue core1_0.Queue
pool Pool
batch []Buffer
work *ThreadWorker
}
func NewWorker(device device.T, name string, queueFlags core1_0.QueueFlags, queueIndex int) Worker {
2024-01-20 18:44:07 +08:00
if idx := device.GetQueueFamilyIndex(queueFlags); idx != queueIndex {
log.Println("GetQueueFamilyIndex Warning %d:: %d => %d", queueFlags, queueIndex, idx)
queueIndex = idx
}
2024-01-14 22:56:06 +08:00
pool := NewPool(device, core1_0.CommandPoolCreateTransient, queueIndex)
queue := device.GetQueue(queueIndex, queueFlags)
name = fmt.Sprintf("%s:%d:%x", name, queueIndex, queue.Handle())
device.SetDebugObjectName(driver.VulkanHandle(queue.Handle()), core1_0.ObjectTypeQueue, name)
return &worker{
device: device,
name: name,
queue: queue,
pool: pool,
batch: make([]Buffer, 0, 128),
work: NewThreadWorker(name, 100, true),
}
}
func (w *worker) Ptr() core1_0.Queue {
return w.queue
}
// Invoke schedules a callback to be called from the worker thread
func (w *worker) Invoke(callback func()) {
w.work.Invoke(callback)
}
func (w *worker) Queue(batch CommandFn) {
w.work.Invoke(func() {
w.enqueue(batch)
})
}
func (w *worker) enqueue(batch CommandFn) {
// todo: dont make a command buffer for each call to Queue() !!
// instead, allocate and record everything that we've batched just prior to submission
// allocate a new buffer
buf := w.pool.Allocate(core1_0.CommandBufferLevelPrimary)
// record commands
buf.Begin()
defer buf.End()
batch(buf)
// append to the next batch
w.batch = append(w.batch, buf)
}
type SubmitInfo struct {
Marker string
Wait []Wait
Signal []sync.Semaphore
Callback func()
}
type Wait struct {
Semaphore sync.Semaphore
Mask core1_0.PipelineStageFlags
}
// Submit the current batch of command buffers
// Blocks until the queue submission is confirmed
func (w *worker) Submit(submit SubmitInfo) {
w.work.Invoke(func() {
w.submit(submit)
})
}
func (w *worker) submit(submit SubmitInfo) {
debug.SetPanicOnFault(true)
buffers := util.Map(w.batch, func(buf Buffer) core1_0.CommandBuffer { return buf.Ptr() })
// create a cleanup callback
// todo: reuse fences
fence := sync.NewFence(w.device, submit.Marker, false)
// submit buffers to the given queue
w.queue.Submit(fence.Ptr(), []core1_0.SubmitInfo{
{
CommandBuffers: buffers,
SignalSemaphores: util.Map(submit.Signal, func(sem sync.Semaphore) core1_0.Semaphore { return sem.Ptr() }),
WaitSemaphores: util.Map(submit.Wait, func(w Wait) core1_0.Semaphore { return w.Semaphore.Ptr() }),
WaitDstStageMask: util.Map(submit.Wait, func(w Wait) core1_0.PipelineStageFlags { return w.Mask }),
},
})
// clear batch slice but keep memory
w.batch = w.batch[:0]
// fire up a cleanup goroutine that will execute when the work fence is signalled
go func() {
fence.Wait()
fence.Destroy()
w.work.Invoke(func() {
// free buffers
if len(buffers) > 0 {
w.device.Ptr().FreeCommandBuffers(buffers)
}
// run callback (on the worker thead)
if submit.Callback != nil {
submit.Callback()
}
})
}()
}
func (w *worker) Destroy() {
w.work.Stop()
w.pool.Destroy()
}
func (w *worker) Flush() {
w.work.Flush()
}