61 lines
1.8 KiB
Go
Raw Permalink Normal View History

2024-11-20 11:41:31 +01:00
package queues
import (
"fmt"
"syscall/js"
)
// MessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the
2024-11-20 11:41:31 +01:00
// worker configuration.
// - https://developers.cloudflare.com/queues/configuration/configure-queues/#consumer
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
type MessageBatch struct {
2024-11-20 11:41:31 +01:00
// instance - The underlying instance of the JS message object passed by the cloudflare
instance js.Value
// Queue - The name of the queue from which the messages were received
Queue string
// Messages - The messages in the batch
Messages []*Message
2024-11-20 11:41:31 +01:00
}
func newMessageBatch(obj js.Value) (*MessageBatch, error) {
2024-11-20 11:41:31 +01:00
msgArr := obj.Get("messages")
messages := make([]*Message, msgArr.Length())
2024-11-20 11:41:31 +01:00
for i := 0; i < msgArr.Length(); i++ {
m, err := newMessage(msgArr.Index(i))
2024-11-20 11:41:31 +01:00
if err != nil {
return nil, fmt.Errorf("failed to parse message %d: %v", i, err)
}
messages[i] = m
}
return &MessageBatch{
2024-11-20 11:41:31 +01:00
instance: obj,
Queue: obj.Get("queue").String(),
Messages: messages,
}, nil
}
// AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
func (b *MessageBatch) AckAll() {
2024-11-20 11:41:31 +01:00
b.instance.Call("ackAll")
}
// RetryAll marks all messages in the batch to be re-delivered.
// The messages will be retried after the optional delay configured with RetryOption.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
func (b *MessageBatch) RetryAll(opts ...RetryOption) {
2024-11-20 11:41:31 +01:00
var o *retryOptions
if len(opts) > 0 {
o = &retryOptions{}
for _, opt := range opts {
opt(o)
}
}
b.instance.Call("retryAll", o.toJS())
}