Merge pull request #150 from syumai/update-consumer-message-apis

update consumer message APIs
This commit is contained in:
syumai 2025-01-15 21:19:20 +09:00 committed by GitHub
commit 68377bb9b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 48 additions and 168 deletions

View File

@ -105,7 +105,7 @@ func produceBytes(q *queues.Producer, req *http.Request) error {
return nil return nil
} }
func consumeBatch(batch *queues.ConsumerMessageBatch) error { func consumeBatch(batch *queues.MessageBatch) error {
for _, msg := range batch.Messages { for _, msg := range batch.Messages {
log.Printf("Received message: %v\n", msg.Body.Get("name").String()) log.Printf("Received message: %v\n", msg.Body.Get("name").String())
} }

View File

@ -6,6 +6,8 @@ import (
"github.com/syumai/workers/internal/jsutil" "github.com/syumai/workers/internal/jsutil"
) )
// FIXME: rename to MessageSendRequest
// see: https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagesendrequest
type BatchMessage struct { type BatchMessage struct {
body js.Value body js.Value
options *sendOptions options *sendOptions

View File

@ -12,7 +12,7 @@ import (
// A returned error will cause the batch to be retried (unless the batch or individual messages are acked). // A returned error will cause the batch to be retried (unless the batch or individual messages are acked).
// NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message // NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message
// acknowledgment until the task is completed witout blocking the queue consumption. // acknowledgment until the task is completed witout blocking the queue consumption.
type Consumer func(batch *ConsumerMessageBatch) error type Consumer func(batch *MessageBatch) error
var consumer Consumer var consumer Consumer
@ -44,7 +44,7 @@ func init() {
} }
func consumeBatch(batch js.Value) error { func consumeBatch(batch js.Value) error {
b, err := newConsumerMessageBatch(batch) b, err := newMessageBatch(batch)
if err != nil { if err != nil {
return fmt.Errorf("failed to parse message batch: %v", err) return fmt.Errorf("failed to parse message batch: %v", err)
} }

View File

@ -8,14 +8,14 @@ import (
"github.com/syumai/workers/internal/jsutil" "github.com/syumai/workers/internal/jsutil"
) )
// ConsumerMessage represents a message of the batch received by the consumer. // Message represents a message of the batch received by the consumer.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message
type ConsumerMessage struct { type Message struct {
// instance - The underlying instance of the JS message object passed by the cloudflare // instance - The underlying instance of the JS message object passed by the cloudflare
instance js.Value instance js.Value
// Id - The unique Cloudflare-generated identifier of the message // ID - The unique Cloudflare-generated identifier of the message
Id string ID string
// Timestamp - The time when the message was enqueued // Timestamp - The time when the message was enqueued
Timestamp time.Time Timestamp time.Time
// Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody. // Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody.
@ -24,15 +24,15 @@ type ConsumerMessage struct {
Attempts int Attempts int
} }
func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) { func newMessage(obj js.Value) (*Message, error) {
timestamp, err := jsutil.DateToTime(obj.Get("timestamp")) timestamp, err := jsutil.DateToTime(obj.Get("timestamp"))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse message timestamp: %v", err) return nil, fmt.Errorf("failed to parse message timestamp: %v", err)
} }
return &ConsumerMessage{ return &Message{
instance: obj, instance: obj,
Id: obj.Get("id").String(), ID: obj.Get("id").String(),
Body: obj.Get("body"), Body: obj.Get("body"),
Attempts: obj.Get("attempts").Int(), Attempts: obj.Get("attempts").Int(),
Timestamp: timestamp, Timestamp: timestamp,
@ -41,13 +41,13 @@ func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) {
// Ack acknowledges the message as successfully delivered despite the result returned from the consuming function. // Ack acknowledges the message as successfully delivered despite the result returned from the consuming function.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message
func (m *ConsumerMessage) Ack() { func (m *Message) Ack() {
m.instance.Call("ack") m.instance.Call("ack")
} }
// Retry marks the message to be re-delivered. // Retry marks the message to be re-delivered.
// The message will be retried after the optional delay configured with RetryOption. // The message will be retried after the optional delay configured with RetryOption.
func (m *ConsumerMessage) Retry(opts ...RetryOption) { func (m *Message) Retry(opts ...RetryOption) {
var o *retryOptions var o *retryOptions
if len(opts) > 0 { if len(opts) > 0 {
o = &retryOptions{} o = &retryOptions{}
@ -59,40 +59,19 @@ func (m *ConsumerMessage) Retry(opts ...RetryOption) {
m.instance.Call("retry", o.toJS()) m.instance.Call("retry", o.toJS())
} }
func (m *ConsumerMessage) StringBody() (string, error) { func (m *Message) StringBody() (string, error) {
if m.Body.Type() != js.TypeString { if m.Body.Type() != js.TypeString {
return "", fmt.Errorf("message body is not a string: %v", m.Body) return "", fmt.Errorf("message body is not a string: %v", m.Body)
} }
return m.Body.String(), nil return m.Body.String(), nil
} }
func (m *ConsumerMessage) BytesBody() ([]byte, error) { func (m *Message) BytesBody() ([]byte, error) {
switch m.Body.Type() { if m.Body.Type() != js.TypeObject ||
case js.TypeString: !(m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass)) {
return []byte(m.Body.String()), nil return nil, fmt.Errorf("message body is not a byte array: %v", m.Body)
case js.TypeObject:
if m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass) {
b := make([]byte, m.Body.Get("byteLength").Int())
js.CopyBytesToGo(b, m.Body)
return b, nil
}
} }
b := make([]byte, m.Body.Get("byteLength").Int())
return nil, fmt.Errorf("message body is not a byte array: %v", m.Body) js.CopyBytesToGo(b, m.Body)
} return b, nil
func (m *ConsumerMessage) IntBody() (int, error) {
if m.Body.Type() == js.TypeNumber {
return m.Body.Int(), nil
}
return 0, fmt.Errorf("message body is not a number: %v", m.Body)
}
func (m *ConsumerMessage) FloatBody() (float64, error) {
if m.Body.Type() == js.TypeNumber {
return m.Body.Float(), nil
}
return 0, fmt.Errorf("message body is not a number: %v", m.Body)
} }

View File

@ -20,17 +20,17 @@ func TestNewConsumerMessage(t *testing.T) {
"attempts": 1, "attempts": 1,
} }
got, err := newConsumerMessage(js.ValueOf(m)) got, err := newMessage(js.ValueOf(m))
if err != nil { if err != nil {
t.Fatalf("newConsumerMessage failed: %v", err) t.Fatalf("newMessage failed: %v", err)
} }
if body := got.Body.String(); body != "hello" { if body := got.Body.String(); body != "hello" {
t.Fatalf("Body() = %v, want %v", body, "hello") t.Fatalf("Body() = %v, want %v", body, "hello")
} }
if got.Id != id { if got.ID != id {
t.Fatalf("Id = %v, want %v", got.Id, id) t.Fatalf("ID = %v, want %v", got.ID, id)
} }
if got.Attempts != 1 { if got.Attempts != 1 {
@ -49,7 +49,7 @@ func TestConsumerMessage_Ack(t *testing.T) {
ackCalled = true ackCalled = true
return nil return nil
})) }))
m := &ConsumerMessage{ m := &Message{
instance: jsObj, instance: jsObj,
} }
@ -67,7 +67,7 @@ func TestConsumerMessage_Retry(t *testing.T) {
retryCalled = true retryCalled = true
return nil return nil
})) }))
m := &ConsumerMessage{ m := &Message{
instance: jsObj, instance: jsObj,
} }
@ -99,7 +99,7 @@ func TestConsumerMessage_RetryWithDelay(t *testing.T) {
return nil return nil
})) }))
m := &ConsumerMessage{ m := &Message{
instance: jsObj, instance: jsObj,
} }
@ -151,7 +151,7 @@ func TestNewConsumerMessage_StringBody(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{ m := &Message{
Body: tt.body(), Body: tt.body(),
} }
@ -174,13 +174,6 @@ func TestConsumerMessage_BytesBody(t *testing.T) {
want []byte want []byte
wantErr bool wantErr bool
}{ }{
{
name: "string",
body: func() js.Value {
return js.ValueOf("hello")
},
want: []byte("hello"),
},
{ {
name: "uint8 array", name: "uint8 array",
body: func() js.Value { body: func() js.Value {
@ -202,7 +195,7 @@ func TestConsumerMessage_BytesBody(t *testing.T) {
{ {
name: "incorrect type", name: "incorrect type",
body: func() js.Value { body: func() js.Value {
return js.ValueOf(42) return js.ValueOf("hello")
}, },
wantErr: true, wantErr: true,
}, },
@ -210,7 +203,7 @@ func TestConsumerMessage_BytesBody(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{ m := &Message{
Body: tt.body(), Body: tt.body(),
} }
@ -225,97 +218,3 @@ func TestConsumerMessage_BytesBody(t *testing.T) {
}) })
} }
} }
func TestConsumerMessage_IntBody(t *testing.T) {
tests := []struct {
name string
body js.Value
want int
wantErr bool
}{
{
name: "int",
body: js.ValueOf(42),
want: 42,
},
{
name: "float",
body: js.ValueOf(42.5),
want: 42,
},
{
name: "string",
body: js.ValueOf("42"),
wantErr: true,
},
{
name: "undefined",
body: js.Undefined(),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{
Body: tt.body,
}
got, err := m.IntBody()
if (err != nil) != tt.wantErr {
t.Fatalf("IntBody() error = %v, wantErr %v", err, tt.wantErr)
}
if got != tt.want {
t.Fatalf("IntBody() = %v, want %v", got, tt.want)
}
})
}
}
func TestConsumerMessage_FloatBody(t *testing.T) {
tests := []struct {
name string
body js.Value
want float64
wantErr bool
}{
{
name: "int",
body: js.ValueOf(42),
want: 42.0,
},
{
name: "float",
body: js.ValueOf(42.5),
want: 42.5,
},
{
name: "string",
body: js.ValueOf("42"),
wantErr: true,
},
{
name: "undefined",
body: js.Undefined(),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{
Body: tt.body,
}
got, err := m.FloatBody()
if (err != nil) != tt.wantErr {
t.Fatalf("FloatBody() error = %v, wantErr %v", err, tt.wantErr)
}
if got != tt.want {
t.Fatalf("FloatBody() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -5,11 +5,11 @@ import (
"syscall/js" "syscall/js"
) )
// ConsumerMessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the // MessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the
// worker configuration. // worker configuration.
// - https://developers.cloudflare.com/queues/configuration/configure-queues/#consumer // - https://developers.cloudflare.com/queues/configuration/configure-queues/#consumer
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
type ConsumerMessageBatch struct { type MessageBatch struct {
// instance - The underlying instance of the JS message object passed by the cloudflare // instance - The underlying instance of the JS message object passed by the cloudflare
instance js.Value instance js.Value
@ -17,21 +17,21 @@ type ConsumerMessageBatch struct {
Queue string Queue string
// Messages - The messages in the batch // Messages - The messages in the batch
Messages []*ConsumerMessage Messages []*Message
} }
func newConsumerMessageBatch(obj js.Value) (*ConsumerMessageBatch, error) { func newMessageBatch(obj js.Value) (*MessageBatch, error) {
msgArr := obj.Get("messages") msgArr := obj.Get("messages")
messages := make([]*ConsumerMessage, msgArr.Length()) messages := make([]*Message, msgArr.Length())
for i := 0; i < msgArr.Length(); i++ { for i := 0; i < msgArr.Length(); i++ {
m, err := newConsumerMessage(msgArr.Index(i)) m, err := newMessage(msgArr.Index(i))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse message %d: %v", i, err) return nil, fmt.Errorf("failed to parse message %d: %v", i, err)
} }
messages[i] = m messages[i] = m
} }
return &ConsumerMessageBatch{ return &MessageBatch{
instance: obj, instance: obj,
Queue: obj.Get("queue").String(), Queue: obj.Get("queue").String(),
Messages: messages, Messages: messages,
@ -40,14 +40,14 @@ func newConsumerMessageBatch(obj js.Value) (*ConsumerMessageBatch, error) {
// AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function. // AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
func (b *ConsumerMessageBatch) AckAll() { func (b *MessageBatch) AckAll() {
b.instance.Call("ackAll") b.instance.Call("ackAll")
} }
// RetryAll marks all messages in the batch to be re-delivered. // RetryAll marks all messages in the batch to be re-delivered.
// The messages will be retried after the optional delay configured with RetryOption. // The messages will be retried after the optional delay configured with RetryOption.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
func (b *ConsumerMessageBatch) RetryAll(opts ...RetryOption) { func (b *MessageBatch) RetryAll(opts ...RetryOption) {
var o *retryOptions var o *retryOptions
if len(opts) > 0 { if len(opts) > 0 {
o = &retryOptions{} o = &retryOptions{}

View File

@ -24,9 +24,9 @@ func TestNewConsumerMessageBatch(t *testing.T) {
}, },
} }
got, err := newConsumerMessageBatch(js.ValueOf(m)) got, err := newMessageBatch(js.ValueOf(m))
if err != nil { if err != nil {
t.Fatalf("newConsumerMessageBatch failed: %v", err) t.Fatalf("newMessageBatch failed: %v", err)
} }
if got.Queue != "some-queue" { if got.Queue != "some-queue" {
@ -42,8 +42,8 @@ func TestNewConsumerMessageBatch(t *testing.T) {
t.Fatalf("Body() = %v, want %v", body, "hello") t.Fatalf("Body() = %v, want %v", body, "hello")
} }
if msg.Id != id { if msg.ID != id {
t.Fatalf("Id = %v, want %v", msg.Id, id) t.Fatalf("ID = %v, want %v", msg.ID, id)
} }
if msg.Attempts != 1 { if msg.Attempts != 1 {
@ -62,7 +62,7 @@ func TestConsumerMessageBatch_AckAll(t *testing.T) {
ackAllCalled = true ackAllCalled = true
return nil return nil
})) }))
b := &ConsumerMessageBatch{ b := &MessageBatch{
instance: jsObj, instance: jsObj,
} }
@ -80,7 +80,7 @@ func TestConsumerMessageBatch_RetryAll(t *testing.T) {
retryAllCalled = true retryAllCalled = true
return nil return nil
})) }))
b := &ConsumerMessageBatch{ b := &MessageBatch{
instance: jsObj, instance: jsObj,
} }
@ -112,7 +112,7 @@ func TestConsumerMessageBatch_RetryAllWithRetryOption(t *testing.T) {
return nil return nil
})) }))
b := &ConsumerMessageBatch{ b := &MessageBatch{
instance: jsObj, instance: jsObj,
} }