mirror of
https://github.com/syumai/workers.git
synced 2025-03-11 09:49:12 +00:00
78 lines
2.4 KiB
Go
78 lines
2.4 KiB
Go
package queues
|
|
|
|
import (
|
|
"fmt"
|
|
"syscall/js"
|
|
|
|
"github.com/syumai/workers/internal/jsutil"
|
|
)
|
|
|
|
// Consumer is a function that received a batch of messages from Cloudflare Queues.
|
|
// The function should be set using Consume or ConsumeNonBlock.
|
|
// A returned error will cause the batch to be retried (unless the batch or individual messages are acked).
|
|
// NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message
|
|
// acknowledgment until the task is completed witout blocking the queue consumption.
|
|
type Consumer func(batch *MessageBatch) error
|
|
|
|
var consumer Consumer
|
|
|
|
func init() {
|
|
handleBatchCallback := js.FuncOf(func(this js.Value, args []js.Value) any {
|
|
batch := args[0]
|
|
var cb js.Func
|
|
cb = js.FuncOf(func(_ js.Value, pArgs []js.Value) any {
|
|
defer cb.Release()
|
|
resolve := pArgs[0]
|
|
reject := pArgs[1]
|
|
go func() {
|
|
if len(args) > 1 {
|
|
reject.Invoke(jsutil.Errorf("too many args given to handleQueueMessageBatch: %d", len(args)))
|
|
return
|
|
}
|
|
err := consumeBatch(batch)
|
|
if err != nil {
|
|
reject.Invoke(jsutil.Error(err.Error()))
|
|
return
|
|
}
|
|
resolve.Invoke(js.Undefined())
|
|
}()
|
|
return js.Undefined()
|
|
})
|
|
return jsutil.NewPromise(cb)
|
|
})
|
|
jsutil.Binding.Set("handleQueueMessageBatch", handleBatchCallback)
|
|
}
|
|
|
|
func consumeBatch(batch js.Value) error {
|
|
b, err := newMessageBatch(batch)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse message batch: %v", err)
|
|
}
|
|
|
|
if err := consumer(b); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
//go:wasmimport workers ready
|
|
func ready()
|
|
|
|
// Consume sets the Consumer function to receive batches of messages from Cloudflare Queues
|
|
// NOTE: This function will block the current goroutine and is intented to be used as long as the
|
|
// only worker's purpose is to be the consumer of a Cloudflare Queue.
|
|
// In case the worker has other purposes (e.g. handling HTTP requests), use ConsumeNonBlock instead.
|
|
func Consume(f Consumer) {
|
|
consumer = f
|
|
ready()
|
|
select {}
|
|
}
|
|
|
|
// ConsumeNonBlock sets the Consumer function to receive batches of messages from Cloudflare Queues.
|
|
// This function is intented to be used when the worker has other purposes (e.g. handling HTTP requests).
|
|
// The worker will not block receiving messages and will continue to execute other tasks.
|
|
// ConsumeNonBlock should be called before setting other blocking handlers (e.g. workers.Serve).
|
|
func ConsumeNonBlock(f Consumer) {
|
|
consumer = f
|
|
}
|