Merge pull request #137 from meandnano/feature/consumer

Implement Queues Consumer
This commit is contained in:
syumai 2025-01-15 20:21:17 +09:00 committed by GitHub
commit 0c1f74cc73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 751 additions and 2 deletions

View File

@ -21,6 +21,12 @@ make deploy # deploy worker
### Interacting with the local queue
NOTE: Wrangler does not support running multiple workers interacting with the same _local_ queue. Therefore, for the demostrational purposes,
we use the same worker to both produce and consume messages from the queue. For a real-world scenario, please consider the differences
between [queues.Consume](https://github.com/syumai/workers/blob/main/cloudflare/queues/consumer.go#L65) and
(queues.ConsumeNonBlocking)(https://github.com/syumai/workers/blob/main/cloudflare/queues/consumer.go#L75) functions.
1. Start the dev server.
```sh
make dev

View File

@ -20,6 +20,11 @@ func handleErr(w http.ResponseWriter, msg string, err error) {
}
func main() {
// start Qeueue consumer.
// If we would not have an HTTP handler in this worker, we would use queues.Consume instead
queues.ConsumeNonBlocking(consumeBatch)
// start HTTP server
http.HandleFunc("/", handleProduce)
workers.Serve(nil)
}
@ -99,3 +104,12 @@ func produceBytes(q *queues.Producer, req *http.Request) error {
}
return nil
}
func consumeBatch(batch *queues.ConsumerMessageBatch) error {
for _, msg := range batch.Messages {
log.Printf("Received message: %v\n", msg.Body.Get("name").String())
}
batch.AckAll()
return nil
}

View File

@ -9,5 +9,12 @@ compatibility_flags = [
queue = "my-queue"
binding = "QUEUE"
[[queues.consumers]]
queue = "my-queue"
max_batch_size = 1
max_batch_timeout = 30
max_retries = 10
dead_letter_queue = "my-queue-dlq"
[build]
command = "make build"

View File

@ -0,0 +1,77 @@
package queues
import (
"fmt"
"syscall/js"
"github.com/syumai/workers/internal/jsutil"
)
// Consumer is a function that received a batch of messages from Cloudflare Queues.
// The function should be set using Consume or ConsumeNonBlocking.
// A returned error will cause the batch to be retried (unless the batch or individual messages are acked).
// NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message
// acknowledgment until the task is completed witout blocking the queue consumption.
type Consumer func(batch *ConsumerMessageBatch) error
var consumer Consumer
func init() {
handleBatchCallback := js.FuncOf(func(this js.Value, args []js.Value) any {
batch := args[0]
var cb js.Func
cb = js.FuncOf(func(_ js.Value, pArgs []js.Value) any {
defer cb.Release()
resolve := pArgs[0]
reject := pArgs[1]
go func() {
if len(args) > 1 {
reject.Invoke(jsutil.Errorf("too many args given to handleQueueMessageBatch: %d", len(args)))
return
}
err := consumeBatch(batch)
if err != nil {
reject.Invoke(jsutil.Error(err.Error()))
return
}
resolve.Invoke(js.Undefined())
}()
return js.Undefined()
})
return jsutil.NewPromise(cb)
})
jsutil.Binding.Set("handleQueueMessageBatch", handleBatchCallback)
}
func consumeBatch(batch js.Value) error {
b, err := newConsumerMessageBatch(batch)
if err != nil {
return fmt.Errorf("failed to parse message batch: %v", err)
}
if err := consumer(b); err != nil {
return err
}
return nil
}
//go:wasmimport workers ready
func ready()
// Consume sets the Consumer function to receive batches of messages from Cloudflare Queues
// NOTE: This function will block the current goroutine and is intented to be used as long as the
// only worker's purpose is to be the consumer of a Cloudflare Queue.
// In case the worker has other purposes (e.g. handling HTTP requests), use ConsumeNonBlocking instead.
func Consume(f Consumer) {
consumer = f
ready()
select {}
}
// ConsumeNonBlocking sets the Consumer function to receive batches of messages from Cloudflare Queues.
// This function is intented to be used when the worker has other purposes (e.g. handling HTTP requests).
// The worker will not block receiving messages and will continue to execute other tasks.
// ConsumeNonBlocking should be called before setting other blocking handlers (e.g. workers.Serve).
func ConsumeNonBlocking(f Consumer) {
consumer = f
}

View File

@ -0,0 +1,98 @@
package queues
import (
"fmt"
"syscall/js"
"time"
"github.com/syumai/workers/internal/jsutil"
)
// ConsumerMessage represents a message of the batch received by the consumer.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message
type ConsumerMessage struct {
// instance - The underlying instance of the JS message object passed by the cloudflare
instance js.Value
// Id - The unique Cloudflare-generated identifier of the message
Id string
// Timestamp - The time when the message was enqueued
Timestamp time.Time
// Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody.
Body js.Value
// Attempts - The number of times the message delivery has been retried.
Attempts int
}
func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) {
timestamp, err := jsutil.DateToTime(obj.Get("timestamp"))
if err != nil {
return nil, fmt.Errorf("failed to parse message timestamp: %v", err)
}
return &ConsumerMessage{
instance: obj,
Id: obj.Get("id").String(),
Body: obj.Get("body"),
Attempts: obj.Get("attempts").Int(),
Timestamp: timestamp,
}, nil
}
// Ack acknowledges the message as successfully delivered despite the result returned from the consuming function.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message
func (m *ConsumerMessage) Ack() {
m.instance.Call("ack")
}
// Retry marks the message to be re-delivered.
// The message will be retried after the optional delay configured with RetryOption.
func (m *ConsumerMessage) Retry(opts ...RetryOption) {
var o *retryOptions
if len(opts) > 0 {
o = &retryOptions{}
for _, opt := range opts {
opt(o)
}
}
m.instance.Call("retry", o.toJS())
}
func (m *ConsumerMessage) StringBody() (string, error) {
if m.Body.Type() != js.TypeString {
return "", fmt.Errorf("message body is not a string: %v", m.Body)
}
return m.Body.String(), nil
}
func (m *ConsumerMessage) BytesBody() ([]byte, error) {
switch m.Body.Type() {
case js.TypeString:
return []byte(m.Body.String()), nil
case js.TypeObject:
if m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass) {
b := make([]byte, m.Body.Get("byteLength").Int())
js.CopyBytesToGo(b, m.Body)
return b, nil
}
}
return nil, fmt.Errorf("message body is not a byte array: %v", m.Body)
}
func (m *ConsumerMessage) IntBody() (int, error) {
if m.Body.Type() == js.TypeNumber {
return m.Body.Int(), nil
}
return 0, fmt.Errorf("message body is not a number: %v", m.Body)
}
func (m *ConsumerMessage) FloatBody() (float64, error) {
if m.Body.Type() == js.TypeNumber {
return m.Body.Float(), nil
}
return 0, fmt.Errorf("message body is not a number: %v", m.Body)
}

View File

@ -0,0 +1,321 @@
package queues
import (
"bytes"
"syscall/js"
"testing"
"time"
"github.com/syumai/workers/internal/jsutil"
)
func TestNewConsumerMessage(t *testing.T) {
ts := time.Now()
jsTs := jsutil.TimeToDate(ts)
id := "some-message-id"
m := map[string]any{
"body": "hello",
"timestamp": jsTs,
"id": id,
"attempts": 1,
}
got, err := newConsumerMessage(js.ValueOf(m))
if err != nil {
t.Fatalf("newConsumerMessage failed: %v", err)
}
if body := got.Body.String(); body != "hello" {
t.Fatalf("Body() = %v, want %v", body, "hello")
}
if got.Id != id {
t.Fatalf("Id = %v, want %v", got.Id, id)
}
if got.Attempts != 1 {
t.Fatalf("Attempts = %v, want %v", got.Attempts, 1)
}
if got.Timestamp.UnixMilli() != ts.UnixMilli() {
t.Fatalf("Timestamp = %v, want %v", got.Timestamp, ts)
}
}
func TestConsumerMessage_Ack(t *testing.T) {
ackCalled := false
jsObj := jsutil.NewObject()
jsObj.Set("ack", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
ackCalled = true
return nil
}))
m := &ConsumerMessage{
instance: jsObj,
}
m.Ack()
if !ackCalled {
t.Fatalf("Ack() did not call ack")
}
}
func TestConsumerMessage_Retry(t *testing.T) {
retryCalled := false
jsObj := jsutil.NewObject()
jsObj.Set("retry", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
retryCalled = true
return nil
}))
m := &ConsumerMessage{
instance: jsObj,
}
m.Retry()
if !retryCalled {
t.Fatalf("Retry() did not call retry")
}
}
func TestConsumerMessage_RetryWithDelay(t *testing.T) {
retryCalled := false
jsObj := jsutil.NewObject()
jsObj.Set("retry", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
retryCalled = true
if len(args) != 1 {
t.Fatalf("retry() called with %d arguments, want 1", len(args))
}
opts := args[0]
if opts.Type() != js.TypeObject {
t.Fatalf("retry() called with argument of type %v, want object", opts.Type())
}
if delay := opts.Get("delaySeconds").Int(); delay != 10 {
t.Fatalf("delaySeconds = %v, want %v", delay, 10)
}
return nil
}))
m := &ConsumerMessage{
instance: jsObj,
}
m.Retry(WithRetryDelay(10 * time.Second))
if !retryCalled {
t.Fatalf("RetryAll() did not call retryAll")
}
}
func TestNewConsumerMessage_StringBody(t *testing.T) {
tests := []struct {
name string
body func() js.Value
want string
wantErr bool
}{
{
name: "string",
body: func() js.Value {
return js.ValueOf("hello")
},
want: "hello",
},
{
name: "uint8 array",
body: func() js.Value {
v := jsutil.Uint8ArrayClass.New(3)
js.CopyBytesToJS(v, []byte("foo"))
return v
},
wantErr: true,
},
{
name: "int",
body: func() js.Value {
return js.ValueOf(42)
},
wantErr: true,
},
{
name: "undefined",
body: func() js.Value {
return js.Undefined()
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{
Body: tt.body(),
}
got, err := m.StringBody()
if (err != nil) != tt.wantErr {
t.Fatalf("StringBody() error = %v, wantErr %v", err, tt.wantErr)
}
if got != tt.want {
t.Fatalf("StringBody() = %v, want %v", got, tt.want)
}
})
}
}
func TestConsumerMessage_BytesBody(t *testing.T) {
tests := []struct {
name string
body func() js.Value
want []byte
wantErr bool
}{
{
name: "string",
body: func() js.Value {
return js.ValueOf("hello")
},
want: []byte("hello"),
},
{
name: "uint8 array",
body: func() js.Value {
v := jsutil.Uint8ArrayClass.New(3)
js.CopyBytesToJS(v, []byte("foo"))
return v
},
want: []byte("foo"),
},
{
name: "uint8 clamped array",
body: func() js.Value {
v := jsutil.Uint8ClampedArrayClass.New(3)
js.CopyBytesToJS(v, []byte("bar"))
return v
},
want: []byte("bar"),
},
{
name: "incorrect type",
body: func() js.Value {
return js.ValueOf(42)
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{
Body: tt.body(),
}
got, err := m.BytesBody()
if (err != nil) != tt.wantErr {
t.Fatalf("BytesBody() error = %v, wantErr %v", err, tt.wantErr)
}
if !bytes.Equal(got, tt.want) {
t.Fatalf("BytesBody() = %v, want %v", got, tt.want)
}
})
}
}
func TestConsumerMessage_IntBody(t *testing.T) {
tests := []struct {
name string
body js.Value
want int
wantErr bool
}{
{
name: "int",
body: js.ValueOf(42),
want: 42,
},
{
name: "float",
body: js.ValueOf(42.5),
want: 42,
},
{
name: "string",
body: js.ValueOf("42"),
wantErr: true,
},
{
name: "undefined",
body: js.Undefined(),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{
Body: tt.body,
}
got, err := m.IntBody()
if (err != nil) != tt.wantErr {
t.Fatalf("IntBody() error = %v, wantErr %v", err, tt.wantErr)
}
if got != tt.want {
t.Fatalf("IntBody() = %v, want %v", got, tt.want)
}
})
}
}
func TestConsumerMessage_FloatBody(t *testing.T) {
tests := []struct {
name string
body js.Value
want float64
wantErr bool
}{
{
name: "int",
body: js.ValueOf(42),
want: 42.0,
},
{
name: "float",
body: js.ValueOf(42.5),
want: 42.5,
},
{
name: "string",
body: js.ValueOf("42"),
wantErr: true,
},
{
name: "undefined",
body: js.Undefined(),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{
Body: tt.body,
}
got, err := m.FloatBody()
if (err != nil) != tt.wantErr {
t.Fatalf("FloatBody() error = %v, wantErr %v", err, tt.wantErr)
}
if got != tt.want {
t.Fatalf("FloatBody() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -0,0 +1,60 @@
package queues
import (
"fmt"
"syscall/js"
)
// ConsumerMessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the
// worker configuration.
// - https://developers.cloudflare.com/queues/configuration/configure-queues/#consumer
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
type ConsumerMessageBatch struct {
// instance - The underlying instance of the JS message object passed by the cloudflare
instance js.Value
// Queue - The name of the queue from which the messages were received
Queue string
// Messages - The messages in the batch
Messages []*ConsumerMessage
}
func newConsumerMessageBatch(obj js.Value) (*ConsumerMessageBatch, error) {
msgArr := obj.Get("messages")
messages := make([]*ConsumerMessage, msgArr.Length())
for i := 0; i < msgArr.Length(); i++ {
m, err := newConsumerMessage(msgArr.Index(i))
if err != nil {
return nil, fmt.Errorf("failed to parse message %d: %v", i, err)
}
messages[i] = m
}
return &ConsumerMessageBatch{
instance: obj,
Queue: obj.Get("queue").String(),
Messages: messages,
}, nil
}
// AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
func (b *ConsumerMessageBatch) AckAll() {
b.instance.Call("ackAll")
}
// RetryAll marks all messages in the batch to be re-delivered.
// The messages will be retried after the optional delay configured with RetryOption.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
func (b *ConsumerMessageBatch) RetryAll(opts ...RetryOption) {
var o *retryOptions
if len(opts) > 0 {
o = &retryOptions{}
for _, opt := range opts {
opt(o)
}
}
b.instance.Call("retryAll", o.toJS())
}

View File

@ -0,0 +1,124 @@
package queues
import (
"syscall/js"
"testing"
"time"
"github.com/syumai/workers/internal/jsutil"
)
func TestNewConsumerMessageBatch(t *testing.T) {
ts := time.Now()
jsTs := jsutil.TimeToDate(ts)
id := "some-message-id"
m := map[string]any{
"queue": "some-queue",
"messages": []any{
map[string]any{
"body": "hello",
"timestamp": jsTs,
"id": id,
"attempts": 1,
},
},
}
got, err := newConsumerMessageBatch(js.ValueOf(m))
if err != nil {
t.Fatalf("newConsumerMessageBatch failed: %v", err)
}
if got.Queue != "some-queue" {
t.Fatalf("Queue = %v, want %v", got.Queue, "some-queue")
}
if len(got.Messages) != 1 {
t.Fatalf("Messages = %v, want %v", len(got.Messages), 1)
}
msg := got.Messages[0]
if body := msg.Body.String(); body != "hello" {
t.Fatalf("Body() = %v, want %v", body, "hello")
}
if msg.Id != id {
t.Fatalf("Id = %v, want %v", msg.Id, id)
}
if msg.Attempts != 1 {
t.Fatalf("Attempts = %v, want %v", msg.Attempts, 1)
}
if msg.Timestamp.UnixMilli() != ts.UnixMilli() {
t.Fatalf("Timestamp = %v, want %v", msg.Timestamp, ts)
}
}
func TestConsumerMessageBatch_AckAll(t *testing.T) {
ackAllCalled := false
jsObj := jsutil.NewObject()
jsObj.Set("ackAll", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
ackAllCalled = true
return nil
}))
b := &ConsumerMessageBatch{
instance: jsObj,
}
b.AckAll()
if !ackAllCalled {
t.Fatalf("AckAll() did not call ackAll")
}
}
func TestConsumerMessageBatch_RetryAll(t *testing.T) {
retryAllCalled := false
jsObj := jsutil.NewObject()
jsObj.Set("retryAll", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
retryAllCalled = true
return nil
}))
b := &ConsumerMessageBatch{
instance: jsObj,
}
b.RetryAll()
if !retryAllCalled {
t.Fatalf("RetryAll() did not call retryAll")
}
}
func TestConsumerMessageBatch_RetryAllWithRetryOption(t *testing.T) {
retryAllCalled := false
jsObj := jsutil.NewObject()
jsObj.Set("retryAll", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
retryAllCalled = true
if len(args) != 1 {
t.Fatalf("retryAll() called with %d arguments, want 1", len(args))
}
opts := args[0]
if opts.Type() != js.TypeObject {
t.Fatalf("retryAll() called with argument of type %v, want object", opts.Type())
}
if delay := opts.Get("delaySeconds").Int(); delay != 10 {
t.Fatalf("delaySeconds = %v, want %v", delay, 10)
}
return nil
}))
b := &ConsumerMessageBatch{
instance: jsObj,
}
b.RetryAll(WithRetryDelay(10 * time.Second))
if !retryAllCalled {
t.Fatalf("RetryAll() did not call retryAll")
}
}

View File

@ -0,0 +1,35 @@
package queues
import (
"syscall/js"
"time"
"github.com/syumai/workers/internal/jsutil"
)
type retryOptions struct {
delaySeconds int
}
func (o *retryOptions) toJS() js.Value {
if o == nil {
return js.Undefined()
}
obj := jsutil.NewObject()
if o.delaySeconds != 0 {
obj.Set("delaySeconds", o.delaySeconds)
}
return obj
}
type RetryOption func(*retryOptions)
// WithRetryDelay sets the delay in seconds before the messages delivery is retried.
// Note that the delay should not be less than a second and is not more precise than a second.
func WithRetryDelay(d time.Duration) RetryOption {
return func(o *retryOptions) {
o.delaySeconds = int(d.Seconds())
}
}

View File

@ -63,4 +63,10 @@ export async function onRequest(ctx) {
const { request, env } = ctx;
await run(createRuntimeContext(env, ctx, binding));
return binding.handleRequest(request);
}
}
export async function queue(batch, env, ctx) {
const binding = {};
await run(createRuntimeContext(env, ctx, binding));
return binding.handleQueueMessageBatch(batch);
}

View File

@ -3,4 +3,4 @@ import mod from "./app.wasm";
imports.init(mod);
export default { fetch: imports.fetch, scheduled: imports.scheduled }
export default { fetch: imports.fetch, scheduled: imports.scheduled, queue: imports.queue };

View File

@ -16,6 +16,7 @@ var (
HeadersClass = js.Global().Get("Headers")
ArrayClass = js.Global().Get("Array")
Uint8ArrayClass = js.Global().Get("Uint8Array")
Uint8ClampedArrayClass = js.Global().Get("Uint8ClampedArray")
ErrorClass = js.Global().Get("Error")
ReadableStreamClass = js.Global().Get("ReadableStream")
FixedLengthStreamClass = js.Global().Get("FixedLengthStream")