From 693eaf206ff13be781333308ea7c1d6c724c1db5 Mon Sep 17 00:00:00 2001 From: syumai Date: Sat, 9 Nov 2024 17:59:56 +0900 Subject: [PATCH 01/10] split batchmessage.go --- cloudflare/queues/batchmessage.go | 39 +++++++++++++++++++++++++++++++ cloudflare/queues/producer.go | 31 ------------------------ 2 files changed, 39 insertions(+), 31 deletions(-) create mode 100644 cloudflare/queues/batchmessage.go diff --git a/cloudflare/queues/batchmessage.go b/cloudflare/queues/batchmessage.go new file mode 100644 index 0000000..df165d0 --- /dev/null +++ b/cloudflare/queues/batchmessage.go @@ -0,0 +1,39 @@ +package queues + +import ( + "errors" + "syscall/js" + + "github.com/syumai/workers/internal/jsutil" +) + +type BatchMessage struct { + body any + options *sendOptions +} + +// NewBatchMessage creates a single message to be batched before sending to a queue. +func NewBatchMessage(body any, opts ...SendOption) *BatchMessage { + options := defaultSendOptions() + for _, opt := range opts { + opt(options) + } + return &BatchMessage{body: body, options: options} +} + +func (m *BatchMessage) toJS() (js.Value, error) { + if m == nil { + return js.Undefined(), errors.New("message is nil") + } + + jsValue, err := m.options.ContentType.mapValue(m.body) + if err != nil { + return js.Undefined(), err + } + + obj := jsutil.NewObject() + obj.Set("body", jsValue) + obj.Set("options", m.options.toJS()) + + return obj, nil +} diff --git a/cloudflare/queues/producer.go b/cloudflare/queues/producer.go index 6fa47eb..bdecfaa 100644 --- a/cloudflare/queues/producer.go +++ b/cloudflare/queues/producer.go @@ -9,37 +9,6 @@ import ( "github.com/syumai/workers/internal/jsutil" ) -type BatchMessage struct { - body any - options *sendOptions -} - -// NewBatchMessage creates a single message to be batched before sending to a queue. -func NewBatchMessage(body any, opts ...SendOption) *BatchMessage { - options := defaultSendOptions() - for _, opt := range opts { - opt(options) - } - return &BatchMessage{body: body, options: options} -} - -func (m *BatchMessage) toJS() (js.Value, error) { - if m == nil { - return js.Undefined(), errors.New("message is nil") - } - - jsValue, err := m.options.ContentType.mapValue(m.body) - if err != nil { - return js.Undefined(), err - } - - obj := jsutil.NewObject() - obj.Set("body", jsValue) - obj.Set("options", m.options.toJS()) - - return obj, nil -} - type Producer struct { // queue - Objects that Queue API belongs to. Default is Global queue js.Value From 9cfa0111ef0bdab26dbd1245f0b64937264184c5 Mon Sep 17 00:00:00 2001 From: syumai Date: Sat, 9 Nov 2024 23:24:30 +0900 Subject: [PATCH 02/10] split queue.Producer's send methods --- cloudflare/queues/content_type.go | 76 ++------- cloudflare/queues/content_type_test.go | 205 ------------------------- cloudflare/queues/producer.go | 39 +++-- cloudflare/queues/producer_opts.go | 15 +- 4 files changed, 37 insertions(+), 298 deletions(-) delete mode 100644 cloudflare/queues/content_type_test.go diff --git a/cloudflare/queues/content_type.go b/cloudflare/queues/content_type.go index b0db97a..8c6f255 100644 --- a/cloudflare/queues/content_type.go +++ b/cloudflare/queues/content_type.go @@ -1,82 +1,26 @@ package queues -import ( - "fmt" - "io" - "syscall/js" - - "github.com/syumai/workers/internal/jsutil" -) - -// QueueContentType represents the content type of a message produced to a queue. +// contentType represents the content type of a message produced to a queue. // This information mostly affects how the message body is represented in the Cloudflare UI and is NOT // propagated to the consumer side. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuescontenttype -type QueueContentType string +type contentType string const ( - // QueueContentTypeJSON is the default content type for the produced queue message. + // contentTypeJSON is the default content type for the produced queue message. // The message body is NOT being marshaled before sending and is passed to js.ValueOf directly. // Make sure the body is serializable to JSON. // - https://pkg.go.dev/syscall/js#ValueOf - QueueContentTypeJSON QueueContentType = "json" + contentTypeJSON contentType = "json" - // QueueContentTypeV8 is currently treated the same as QueueContentTypeJSON. - QueueContentTypeV8 QueueContentType = "v8" + // contentTypeV8 is currently treated the same as QueueContentTypeJSON. + contentTypeV8 contentType = "v8" - // QueueContentTypeText is used to send a message as a string. + // contentTypeText is used to send a message as a string. // Supported body types are string, []byte and io.Reader. - QueueContentTypeText QueueContentType = "text" + contentTypeText contentType = "text" - // QueueContentTypeBytes is used to send a message as a byte array. + // contentTypeBytes is used to send a message as a byte array. // Supported body types are string, []byte, and io.Reader. - QueueContentTypeBytes QueueContentType = "bytes" + contentTypeBytes contentType = "bytes" ) - -func (o QueueContentType) mapValue(val any) (js.Value, error) { - switch o { - case QueueContentTypeText: - switch v := val.(type) { - case string: - return js.ValueOf(v), nil - case []byte: - return js.ValueOf(string(v)), nil - case io.Reader: - b, err := io.ReadAll(v) - if err != nil { - return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err) - } - return js.ValueOf(string(b)), nil - default: - return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val) - } - - case QueueContentTypeBytes: - var b []byte - switch v := val.(type) { - case string: - b = []byte(v) - case []byte: - b = v - case io.Reader: - var err error - b, err = io.ReadAll(v) - if err != nil { - return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err) - } - default: - return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val) - } - - ua := jsutil.NewUint8Array(len(b)) - js.CopyBytesToJS(ua, b) - // accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should - // be used instead and with Uint8Array.buffer as a value, the send simply fails - return ua, nil - - case QueueContentTypeJSON, QueueContentTypeV8: - return js.ValueOf(val), nil - } - - return js.Undefined(), fmt.Errorf("unknown content type: %s", o) -} diff --git a/cloudflare/queues/content_type_test.go b/cloudflare/queues/content_type_test.go deleted file mode 100644 index 5305c97..0000000 --- a/cloudflare/queues/content_type_test.go +++ /dev/null @@ -1,205 +0,0 @@ -package queues - -import ( - "bytes" - "syscall/js" - "testing" - - "github.com/syumai/workers/internal/jsutil" -) - -func TestContentType_mapValue(t *testing.T) { - tests := []struct { - name string - contentType QueueContentType - val any - want js.Value - wantErr bool - }{ - { - name: "string as text", - contentType: QueueContentTypeText, - val: "hello", - want: js.ValueOf("hello"), - }, - { - name: "[]byte as text", - contentType: QueueContentTypeText, - val: []byte("hello"), - want: js.ValueOf("hello"), - }, - { - name: "io.Reader as text", - contentType: QueueContentTypeText, - val: bytes.NewBufferString("hello"), - want: js.ValueOf("hello"), - }, - { - name: "number as text", - contentType: QueueContentTypeText, - val: 42, - want: js.Undefined(), - wantErr: true, - }, - { - name: "function as text", - contentType: QueueContentTypeText, - val: func() {}, - want: js.Undefined(), - wantErr: true, - }, - - { - name: "string as json", - contentType: QueueContentTypeJSON, - val: "hello", - want: js.ValueOf("hello"), - }, - { - name: "number as json", - contentType: QueueContentTypeJSON, - val: 42, - want: js.ValueOf(42), - }, - { - name: "bool as json", - contentType: QueueContentTypeJSON, - val: true, - want: js.ValueOf(true), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := tt.contentType.mapValue(tt.val) - if (err != nil) != tt.wantErr { - t.Fatalf("%s.mapValue() error = %v, wantErr %v", tt.contentType, err, tt.wantErr) - } - if got.String() != tt.want.String() { - t.Errorf("%s.mapValue() = %v, want %v", tt.contentType, got, tt.want) - } - }) - } -} - -func TestContentType_mapValue_bytes(t *testing.T) { - jsOf := func(b []byte) js.Value { - ua := jsutil.NewUint8Array(len(b)) - js.CopyBytesToJS(ua, b) - return ua - } - - tests := []struct { - name string - val any - want js.Value - }{ - { - name: "[]byte as bytes", - val: []byte("hello"), - want: jsOf([]byte("hello")), - }, - { - name: "string as bytes", - val: "hello", - want: jsOf([]byte("hello"))}, - { - name: "io.Reader as bytes", - val: bytes.NewBufferString("hello"), - want: jsOf([]byte("hello")), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := QueueContentTypeBytes.mapValue(tt.val) - if err != nil { - t.Fatalf("%s.mapValue() got error = %v", QueueContentTypeBytes, err) - } - if got.Type() != tt.want.Type() { - t.Errorf("%s.mapValue() = type %v, want type %v", QueueContentTypeBytes, got, tt.want) - } - if got.String() != tt.want.String() { - t.Errorf("%s.mapValue() = %v, want %v", QueueContentTypeBytes, got, tt.want) - } - }) - } -} - -func TestContentType_mapValue_map(t *testing.T) { - val := map[string]interface{}{ - "Name": "Alice", - "Age": 42, - } - - tests := []struct { - name string - contentType QueueContentType - }{ - { - name: "json", - contentType: QueueContentTypeJSON, - }, - { - name: "v8", - contentType: QueueContentTypeV8, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - - got, err := tt.contentType.mapValue(val) - if err != nil { - t.Fatalf("QueueContentTypeJSON.mapValue() got error = %v", err) - } - if got.Type() != js.TypeObject { - t.Errorf("QueueContentTypeJSON.mapValue() = type %v, want type %v", got, js.TypeObject) - } - if got.Get("Name").String() != "Alice" { - t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Name").String(), "Alice") - } - if got.Get("Age").Int() != 42 { - t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Age").Int(), 42) - } - }) - } -} - -type User struct { - Name string -} - -func TestContentType_mapValue_unsupported_types(t *testing.T) { - t.Run("struct as json", func(t *testing.T) { - defer func() { - if p := recover(); p == nil { - t.Fatalf("QueueContentTypeJSON.mapValue() did not panic") - } - }() - - val := User{Name: "Alice"} - _, _ = QueueContentTypeJSON.mapValue(val) - }) - - t.Run("slice of structs as json", func(t *testing.T) { - defer func() { - if p := recover(); p == nil { - t.Fatalf("QueueContentTypeJSON.mapValue() did not panic") - } - }() - - val := User{Name: "Alice"} - _, _ = QueueContentTypeJSON.mapValue([]User{val}) - }) - - t.Run("slice of bytes as json", func(t *testing.T) { - defer func() { - if p := recover(); p == nil { - t.Fatalf("QueueContentTypeJSON.mapValue() did not panic") - } - }() - - _, _ = QueueContentTypeJSON.mapValue([]byte("hello")) - }) -} diff --git a/cloudflare/queues/producer.go b/cloudflare/queues/producer.go index bdecfaa..ef30bea 100644 --- a/cloudflare/queues/producer.go +++ b/cloudflare/queues/producer.go @@ -26,28 +26,41 @@ func NewProducer(queueName string) (*Producer, error) { return &Producer{queue: inst}, nil } -// Send sends a single message to a queue. This function allows setting send options for the message. +func (p *Producer) SendText(content string, opts ...SendOption) error { + return p.send(js.ValueOf(content), contentTypeText, opts...) +} + +func (p *Producer) SendBytes(content []byte, opts ...SendOption) error { + ua := jsutil.NewUint8Array(len(content)) + js.CopyBytesToJS(ua, content) + // accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should + // be used instead and with Uint8Array.buffer as a value, the send simply fails + return p.send(ua, contentTypeBytes, opts...) +} + +func (p *Producer) SendJSON(content any, opts ...SendOption) error { + return p.send(js.ValueOf(content), contentTypeJSON, opts...) +} + +func (p *Producer) SendV8(content js.Value, opts ...SendOption) error { + return p.send(content, contentTypeV8, opts...) +} + +// send sends a single message to a queue. This function allows setting send options for the message. // If no options are provided, the default options are used (QueueContentTypeJSON and no delay). // // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuesendoptions -func (p *Producer) Send(content any, opts ...SendOption) error { - if p.queue.IsUndefined() { - return errors.New("queue object not found") +func (p *Producer) send(body js.Value, contentType contentType, opts ...SendOption) error { + options := &sendOptions{ + ContentType: contentType, } - - options := defaultSendOptions() for _, opt := range opts { opt(options) } - jsValue, err := options.ContentType.mapValue(content) - if err != nil { - return err - } - - prom := p.queue.Call("send", jsValue, options.toJS()) - _, err = jsutil.AwaitPromise(prom) + prom := p.queue.Call("send", body, options.toJS()) + _, err := jsutil.AwaitPromise(prom) return err } diff --git a/cloudflare/queues/producer_opts.go b/cloudflare/queues/producer_opts.go index 5d345ae..cf2c5db 100644 --- a/cloudflare/queues/producer_opts.go +++ b/cloudflare/queues/producer_opts.go @@ -10,19 +10,13 @@ import ( type sendOptions struct { // ContentType - Content type of the message // Default is "json" - ContentType QueueContentType + ContentType contentType // DelaySeconds - The number of seconds to delay the message. // Default is 0 DelaySeconds int } -func defaultSendOptions() *sendOptions { - return &sendOptions{ - ContentType: QueueContentTypeJSON, - } -} - func (o *sendOptions) toJS() js.Value { obj := jsutil.NewObject() obj.Set("contentType", string(o.ContentType)) @@ -36,13 +30,6 @@ func (o *sendOptions) toJS() js.Value { type SendOption func(*sendOptions) -// WithContentType changes the content type of the message. -func WithContentType(contentType QueueContentType) SendOption { - return func(o *sendOptions) { - o.ContentType = contentType - } -} - // WithDelay changes the number of seconds to delay the message. func WithDelay(d time.Duration) SendOption { return func(o *sendOptions) { From 488aec95850c287d7cabe12e50866162ce1449ca Mon Sep 17 00:00:00 2001 From: syumai Date: Sat, 9 Nov 2024 23:49:32 +0900 Subject: [PATCH 03/10] split queue.NewBatchMessage funcs --- cloudflare/queues/batchmessage.go | 33 +++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/cloudflare/queues/batchmessage.go b/cloudflare/queues/batchmessage.go index df165d0..f31462d 100644 --- a/cloudflare/queues/batchmessage.go +++ b/cloudflare/queues/batchmessage.go @@ -8,13 +8,31 @@ import ( ) type BatchMessage struct { - body any + body js.Value options *sendOptions } -// NewBatchMessage creates a single message to be batched before sending to a queue. -func NewBatchMessage(body any, opts ...SendOption) *BatchMessage { - options := defaultSendOptions() +func NewTextBatchMessage(content string, opts ...SendOption) *BatchMessage { + return newBatchMessage(js.ValueOf(content), contentTypeText, opts...) +} + +func NewBytesBatchMessage(content []byte, opts ...SendOption) *BatchMessage { + return newBatchMessage(js.ValueOf(content), contentTypeBytes, opts...) +} + +func NewJSONBatchMessage(content any, opts ...SendOption) *BatchMessage { + return newBatchMessage(js.ValueOf(content), contentTypeJSON, opts...) +} + +func NewV8BatchMessage(content js.Value, opts ...SendOption) *BatchMessage { + return newBatchMessage(content, contentTypeV8, opts...) +} + +// newBatchMessage creates a single message to be batched before sending to a queue. +func newBatchMessage(body js.Value, contentType contentType, opts ...SendOption) *BatchMessage { + options := &sendOptions{ + ContentType: contentType, + } for _, opt := range opts { opt(options) } @@ -26,13 +44,8 @@ func (m *BatchMessage) toJS() (js.Value, error) { return js.Undefined(), errors.New("message is nil") } - jsValue, err := m.options.ContentType.mapValue(m.body) - if err != nil { - return js.Undefined(), err - } - obj := jsutil.NewObject() - obj.Set("body", jsValue) + obj.Set("body", m.body) obj.Set("options", m.options.toJS()) return obj, nil From b54a5edc0d753cee82bbebeea3a98a8155fddf39 Mon Sep 17 00:00:00 2001 From: syumai Date: Sat, 9 Nov 2024 23:51:23 +0900 Subject: [PATCH 04/10] split queues.batchSendOptions --- cloudflare/queues/batchsendoptions.go | 36 +++++++++++++++++++++++++++ cloudflare/queues/producer_opts.go | 28 --------------------- 2 files changed, 36 insertions(+), 28 deletions(-) create mode 100644 cloudflare/queues/batchsendoptions.go diff --git a/cloudflare/queues/batchsendoptions.go b/cloudflare/queues/batchsendoptions.go new file mode 100644 index 0000000..97abd6d --- /dev/null +++ b/cloudflare/queues/batchsendoptions.go @@ -0,0 +1,36 @@ +package queues + +import ( + "syscall/js" + "time" + + "github.com/syumai/workers/internal/jsutil" +) + +type batchSendOptions struct { + // DelaySeconds - The number of seconds to delay the message. + // Default is 0 + DelaySeconds int +} + +func (o *batchSendOptions) toJS() js.Value { + if o == nil { + return js.Undefined() + } + + obj := jsutil.NewObject() + if o.DelaySeconds != 0 { + obj.Set("delaySeconds", o.DelaySeconds) + } + + return obj +} + +type BatchSendOption func(*batchSendOptions) + +// WithBatchDelay changes the number of seconds to delay the message. +func WithBatchDelay(d time.Duration) BatchSendOption { + return func(o *batchSendOptions) { + o.DelaySeconds = int(d.Seconds()) + } +} diff --git a/cloudflare/queues/producer_opts.go b/cloudflare/queues/producer_opts.go index cf2c5db..c6501e9 100644 --- a/cloudflare/queues/producer_opts.go +++ b/cloudflare/queues/producer_opts.go @@ -36,31 +36,3 @@ func WithDelay(d time.Duration) SendOption { o.DelaySeconds = int(d.Seconds()) } } - -type batchSendOptions struct { - // DelaySeconds - The number of seconds to delay the message. - // Default is 0 - DelaySeconds int -} - -func (o *batchSendOptions) toJS() js.Value { - if o == nil { - return js.Undefined() - } - - obj := jsutil.NewObject() - if o.DelaySeconds != 0 { - obj.Set("delaySeconds", o.DelaySeconds) - } - - return obj -} - -type BatchSendOption func(*batchSendOptions) - -// WithBatchDelay changes the number of seconds to delay the message. -func WithBatchDelay(d time.Duration) BatchSendOption { - return func(o *batchSendOptions) { - o.DelaySeconds = int(d.Seconds()) - } -} From 4fa8e20a64f4a89a096e7a1bec21784941a8f10c Mon Sep 17 00:00:00 2001 From: syumai Date: Sat, 9 Nov 2024 23:52:26 +0900 Subject: [PATCH 05/10] rename producer_opts.go to sendoptions.go --- cloudflare/queues/{producer_opts.go => sendoptions.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename cloudflare/queues/{producer_opts.go => sendoptions.go} (100%) diff --git a/cloudflare/queues/producer_opts.go b/cloudflare/queues/sendoptions.go similarity index 100% rename from cloudflare/queues/producer_opts.go rename to cloudflare/queues/sendoptions.go From c4e9274e7393feea3b2e618c0378b94ac5fde74d Mon Sep 17 00:00:00 2001 From: syumai Date: Sat, 9 Nov 2024 23:59:48 +0900 Subject: [PATCH 06/10] update SendBatch impl --- cloudflare/queues/batchmessage.go | 16 ++++------- .../{content_type.go => contenttype.go} | 0 cloudflare/queues/producer.go | 28 ++++--------------- 3 files changed, 11 insertions(+), 33 deletions(-) rename cloudflare/queues/{content_type.go => contenttype.go} (100%) diff --git a/cloudflare/queues/batchmessage.go b/cloudflare/queues/batchmessage.go index f31462d..7258129 100644 --- a/cloudflare/queues/batchmessage.go +++ b/cloudflare/queues/batchmessage.go @@ -1,7 +1,6 @@ package queues import ( - "errors" "syscall/js" "github.com/syumai/workers/internal/jsutil" @@ -30,23 +29,18 @@ func NewV8BatchMessage(content js.Value, opts ...SendOption) *BatchMessage { // newBatchMessage creates a single message to be batched before sending to a queue. func newBatchMessage(body js.Value, contentType contentType, opts ...SendOption) *BatchMessage { - options := &sendOptions{ + options := sendOptions{ ContentType: contentType, } for _, opt := range opts { - opt(options) + opt(&options) } - return &BatchMessage{body: body, options: options} + return &BatchMessage{body: body, options: &options} } -func (m *BatchMessage) toJS() (js.Value, error) { - if m == nil { - return js.Undefined(), errors.New("message is nil") - } - +func (m *BatchMessage) toJS() js.Value { obj := jsutil.NewObject() obj.Set("body", m.body) obj.Set("options", m.options.toJS()) - - return obj, nil + return obj } diff --git a/cloudflare/queues/content_type.go b/cloudflare/queues/contenttype.go similarity index 100% rename from cloudflare/queues/content_type.go rename to cloudflare/queues/contenttype.go diff --git a/cloudflare/queues/producer.go b/cloudflare/queues/producer.go index ef30bea..3c0dd6c 100644 --- a/cloudflare/queues/producer.go +++ b/cloudflare/queues/producer.go @@ -1,7 +1,6 @@ package queues import ( - "errors" "fmt" "syscall/js" @@ -52,11 +51,11 @@ func (p *Producer) SendV8(content js.Value, opts ...SendOption) error { // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuesendoptions func (p *Producer) send(body js.Value, contentType contentType, opts ...SendOption) error { - options := &sendOptions{ + options := sendOptions{ ContentType: contentType, } for _, opt := range opts { - opt(options) + opt(&options) } prom := p.queue.Call("send", body, options.toJS()) @@ -66,29 +65,14 @@ func (p *Producer) send(body js.Value, contentType contentType, opts ...SendOpti // SendBatch sends multiple messages to a queue. This function allows setting options for each message. func (p *Producer) SendBatch(messages []*BatchMessage, opts ...BatchSendOption) error { - if p.queue.IsUndefined() { - return errors.New("queue object not found") - } - - if len(messages) == 0 { - return nil - } - - var options *batchSendOptions - if len(opts) > 0 { - options = &batchSendOptions{} - for _, opt := range opts { - opt(options) - } + var options batchSendOptions + for _, opt := range opts { + opt(&options) } jsArray := jsutil.NewArray(len(messages)) for i, message := range messages { - jsValue, err := message.toJS() - if err != nil { - return fmt.Errorf("failed to convert message %d to JS: %w", i, err) - } - jsArray.SetIndex(i, jsValue) + jsArray.SetIndex(i, message.toJS()) } prom := p.queue.Call("sendBatch", jsArray, options.toJS()) From 374d3a8461e26bab71f0abda47326ffefe19abe8 Mon Sep 17 00:00:00 2001 From: syumai Date: Sun, 10 Nov 2024 00:07:29 +0900 Subject: [PATCH 07/10] rename queues.WithDelay to queues.WithDelaySeconds --- cloudflare/queues/batchsendoptions.go | 4 ++-- cloudflare/queues/producer_test.go | 6 +++--- cloudflare/queues/sendoptions.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cloudflare/queues/batchsendoptions.go b/cloudflare/queues/batchsendoptions.go index 97abd6d..54815bc 100644 --- a/cloudflare/queues/batchsendoptions.go +++ b/cloudflare/queues/batchsendoptions.go @@ -28,8 +28,8 @@ func (o *batchSendOptions) toJS() js.Value { type BatchSendOption func(*batchSendOptions) -// WithBatchDelay changes the number of seconds to delay the message. -func WithBatchDelay(d time.Duration) BatchSendOption { +// WithBatchDelaySeconds changes the number of seconds to delay the message. +func WithBatchDelaySeconds(d time.Duration) BatchSendOption { return func(o *batchSendOptions) { o.DelaySeconds = int(d.Seconds()) } diff --git a/cloudflare/queues/producer_test.go b/cloudflare/queues/producer_test.go index fc00a93..2903f9d 100644 --- a/cloudflare/queues/producer_test.go +++ b/cloudflare/queues/producer_test.go @@ -117,7 +117,7 @@ func TestSend_ContentTypeOption(t *testing.T) { { name: "delay", - options: []SendOption{WithDelay(5 * time.Second)}, + options: []SendOption{WithDelaySeconds(5 * time.Second)}, expectedDelaySec: 5, expectedContentType: "json", }, @@ -200,11 +200,11 @@ func TestSendBatch_Options(t *testing.T) { } var batch []*BatchMessage = []*BatchMessage{ - NewBatchMessage("hello"), + NewTextBatchMessage("hello"), } producer := validatingProducer(t, validation) - err := producer.SendBatch(batch, WithBatchDelay(5*time.Second)) + err := producer.SendBatch(batch, WithBatchDelaySeconds(5*time.Second)) if err != nil { t.Fatalf("SendBatch failed: %v", err) } diff --git a/cloudflare/queues/sendoptions.go b/cloudflare/queues/sendoptions.go index c6501e9..bc3bd85 100644 --- a/cloudflare/queues/sendoptions.go +++ b/cloudflare/queues/sendoptions.go @@ -30,8 +30,8 @@ func (o *sendOptions) toJS() js.Value { type SendOption func(*sendOptions) -// WithDelay changes the number of seconds to delay the message. -func WithDelay(d time.Duration) SendOption { +// WithDelaySeconds changes the number of seconds to delay the message. +func WithDelaySeconds(d time.Duration) SendOption { return func(o *sendOptions) { o.DelaySeconds = int(d.Seconds()) } From c55bf16a3ed97ef3934c145279cbb9ae5e279a3a Mon Sep 17 00:00:00 2001 From: syumai Date: Sun, 10 Nov 2024 00:11:41 +0900 Subject: [PATCH 08/10] update queues test impl --- cloudflare/queues/producer_test.go | 85 +++--------------------------- 1 file changed, 7 insertions(+), 78 deletions(-) diff --git a/cloudflare/queues/producer_test.go b/cloudflare/queues/producer_test.go index 2903f9d..1a9aafa 100644 --- a/cloudflare/queues/producer_test.go +++ b/cloudflare/queues/producer_test.go @@ -53,7 +53,7 @@ func TestSend(t *testing.T) { } producer := validatingProducer(t, validation) - err := producer.Send("hello", WithContentType(QueueContentTypeText)) + err := producer.SendText("hello") if err != nil { t.Fatalf("Send failed: %v", err) } @@ -74,85 +74,14 @@ func TestSend(t *testing.T) { } producer := validatingProducer(t, validation) - err := producer.Send("hello", WithContentType(QueueContentTypeJSON)) + err := producer.SendJSON("hello") if err != nil { t.Fatalf("Send failed: %v", err) } }) } -func TestSend_ContentTypeOption(t *testing.T) { - tests := []struct { - name string - options []SendOption - expectedContentType string - expectedDelaySec int - wantErr bool - }{ - { - name: "text", - options: []SendOption{WithContentType(QueueContentTypeText)}, - expectedContentType: "text", - }, - { - name: "json", - options: []SendOption{WithContentType(QueueContentTypeJSON)}, - expectedContentType: "json", - }, - { - name: "default", - options: nil, - expectedContentType: "json", - }, - { - name: "v8", - options: []SendOption{WithContentType(QueueContentTypeV8)}, - expectedContentType: "v8", - }, - { - name: "bytes", - options: []SendOption{WithContentType(QueueContentTypeBytes)}, - expectedContentType: "bytes", - }, - - { - name: "delay", - options: []SendOption{WithDelaySeconds(5 * time.Second)}, - expectedDelaySec: 5, - expectedContentType: "json", - }, - - { - name: "invalid content type", - options: []SendOption{WithContentType("invalid")}, - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - validation := func(message js.Value, options js.Value) error { - gotCT := options.Get("contentType").String() - if gotCT != string(tt.expectedContentType) { - return fmt.Errorf("expected content type %q, got %q", tt.expectedContentType, gotCT) - } - gotDelaySec := jsutil.MaybeInt(options.Get("delaySeconds")) - if gotDelaySec != tt.expectedDelaySec { - return fmt.Errorf("expected delay %d, got %d", tt.expectedDelaySec, gotDelaySec) - } - return nil - } - - producer := validatingProducer(t, validation) - err := producer.Send("hello", tt.options...) - if (err != nil) != tt.wantErr { - t.Fatalf("expected error: %t, got %v", tt.wantErr, err) - } - }) - } -} - -func TestSendBatch_Defaults(t *testing.T) { +func TestSendBatch(t *testing.T) { validation := func(batch js.Value, options js.Value) error { if batch.Type() != js.TypeObject { return errors.New("message batch must be an object (array)") @@ -179,9 +108,9 @@ func TestSendBatch_Defaults(t *testing.T) { return nil } - var batch []*BatchMessage = []*BatchMessage{ - NewBatchMessage("hello"), - NewBatchMessage("world", WithContentType(QueueContentTypeText)), + batch := []*BatchMessage{ + NewJSONBatchMessage("hello"), + NewTextBatchMessage("world"), } producer := validatingProducer(t, validation) @@ -199,7 +128,7 @@ func TestSendBatch_Options(t *testing.T) { return nil } - var batch []*BatchMessage = []*BatchMessage{ + batch := []*BatchMessage{ NewTextBatchMessage("hello"), } From d0919c3abf21087332a1886bfdc32865b0ae5bf6 Mon Sep 17 00:00:00 2001 From: syumai Date: Sun, 10 Nov 2024 00:30:01 +0900 Subject: [PATCH 09/10] add comments to queues public funcs --- cloudflare/queues/batchmessage.go | 4 ++++ cloudflare/queues/producer.go | 29 ++++++++++++++++------------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/cloudflare/queues/batchmessage.go b/cloudflare/queues/batchmessage.go index 7258129..c54ba3c 100644 --- a/cloudflare/queues/batchmessage.go +++ b/cloudflare/queues/batchmessage.go @@ -11,18 +11,22 @@ type BatchMessage struct { options *sendOptions } +// NewTextBatchMessage creates a single text message to be batched before sending to a queue. func NewTextBatchMessage(content string, opts ...SendOption) *BatchMessage { return newBatchMessage(js.ValueOf(content), contentTypeText, opts...) } +// NewBytesBatchMessage creates a single byte array message to be batched before sending to a queue. func NewBytesBatchMessage(content []byte, opts ...SendOption) *BatchMessage { return newBatchMessage(js.ValueOf(content), contentTypeBytes, opts...) } +// NewJSONBatchMessage creates a single JSON message to be batched before sending to a queue. func NewJSONBatchMessage(content any, opts ...SendOption) *BatchMessage { return newBatchMessage(js.ValueOf(content), contentTypeJSON, opts...) } +// NewV8BatchMessage creates a single raw JS value message to be batched before sending to a queue. func NewV8BatchMessage(content js.Value, opts ...SendOption) *BatchMessage { return newBatchMessage(content, contentTypeV8, opts...) } diff --git a/cloudflare/queues/producer.go b/cloudflare/queues/producer.go index 3c0dd6c..6ebf3b6 100644 --- a/cloudflare/queues/producer.go +++ b/cloudflare/queues/producer.go @@ -25,29 +25,32 @@ func NewProducer(queueName string) (*Producer, error) { return &Producer{queue: inst}, nil } -func (p *Producer) SendText(content string, opts ...SendOption) error { - return p.send(js.ValueOf(content), contentTypeText, opts...) +// SendText sends a single text message to a queue. +func (p *Producer) SendText(body string, opts ...SendOption) error { + return p.send(js.ValueOf(body), contentTypeText, opts...) } -func (p *Producer) SendBytes(content []byte, opts ...SendOption) error { - ua := jsutil.NewUint8Array(len(content)) - js.CopyBytesToJS(ua, content) +// SendBytes sends a single byte array message to a queue. +func (p *Producer) SendBytes(body []byte, opts ...SendOption) error { + ua := jsutil.NewUint8Array(len(body)) + js.CopyBytesToJS(ua, body) // accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should // be used instead and with Uint8Array.buffer as a value, the send simply fails return p.send(ua, contentTypeBytes, opts...) } -func (p *Producer) SendJSON(content any, opts ...SendOption) error { - return p.send(js.ValueOf(content), contentTypeJSON, opts...) +// SendJSON sends a single JSON message to a queue. +func (p *Producer) SendJSON(body any, opts ...SendOption) error { + return p.send(js.ValueOf(body), contentTypeJSON, opts...) } -func (p *Producer) SendV8(content js.Value, opts ...SendOption) error { - return p.send(content, contentTypeV8, opts...) +// SendV8 sends a single raw JS value message to a queue. +func (p *Producer) SendV8(body js.Value, opts ...SendOption) error { + return p.send(body, contentTypeV8, opts...) } -// send sends a single message to a queue. This function allows setting send options for the message. -// If no options are provided, the default options are used (QueueContentTypeJSON and no delay). -// +// send sends a single message to a queue. +// This function allows setting send options for the message. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuesendoptions func (p *Producer) send(body js.Value, contentType contentType, opts ...SendOption) error { @@ -63,7 +66,7 @@ func (p *Producer) send(body js.Value, contentType contentType, opts ...SendOpti return err } -// SendBatch sends multiple messages to a queue. This function allows setting options for each message. +// SendBatch sends multiple messages to a queue. This function allows setting options for each message. func (p *Producer) SendBatch(messages []*BatchMessage, opts ...BatchSendOption) error { var options batchSendOptions for _, opt := range opts { From 15f2b81fcf3b613baa0cca39880d3616bae68c8a Mon Sep 17 00:00:00 2001 From: syumai Date: Sun, 10 Nov 2024 00:27:12 +0900 Subject: [PATCH 10/10] update queues example --- _examples/queues/Makefile | 4 ++-- _examples/queues/main.go | 28 ++++++++++++---------------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/_examples/queues/Makefile b/_examples/queues/Makefile index db68197..019492c 100644 --- a/_examples/queues/Makefile +++ b/_examples/queues/Makefile @@ -1,6 +1,6 @@ .PHONY: dev dev: - npx wrangler dev --port 8787 + wrangler dev .PHONY: build build: @@ -9,4 +9,4 @@ build: .PHONY: deploy deploy: - npx wrangler deploy + wrangler deploy diff --git a/_examples/queues/main.go b/_examples/queues/main.go index c899b15..12cd84d 100644 --- a/_examples/queues/main.go +++ b/_examples/queues/main.go @@ -23,6 +23,7 @@ func main() { http.HandleFunc("/", handleProduce) workers.Serve(nil) } + func handleProduce(w http.ResponseWriter, req *http.Request) { if req.URL.Path != "/" { w.WriteHeader(http.StatusNotFound) @@ -48,7 +49,7 @@ func handleProduce(w http.ResponseWriter, req *http.Request) { err = produceText(q, req) case "application/json": log.Println("Handling json content type") - err = produceJson(q, req) + err = produceJSON(q, req) default: log.Println("Handling bytes content type") err = produceBytes(q, req) @@ -68,38 +69,33 @@ func produceText(q *queues.Producer, req *http.Request) error { if err != nil { return fmt.Errorf("failed to read request body: %w", err) } - if len(content) == 0 { - return fmt.Errorf("empty request body") - } - - // text content type supports string and []byte messages - if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil { + // text content type supports string + if err := q.SendText(string(content)); err != nil { return fmt.Errorf("failed to send message: %w", err) } - return nil } -func produceJson(q *queues.Producer, req *http.Request) error { +func produceJSON(q *queues.Producer, req *http.Request) error { var data any if err := json.NewDecoder(req.Body).Decode(&data); err != nil { return fmt.Errorf("failed to read request body: %w", err) } - - // json content type is default and therefore can be omitted // json content type supports messages of types that can be serialized to json - if err := q.Send(data); err != nil { + if err := q.SendJSON(data); err != nil { return fmt.Errorf("failed to send message: %w", err) } - return nil } func produceBytes(q *queues.Producer, req *http.Request) error { - // bytes content type support messages of type []byte, string, and io.Reader - if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil { + content, err := io.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("failed to read request body: %w", err) + } + // bytes content type support messages of type []byte + if err := q.SendBytes(content); err != nil { return fmt.Errorf("failed to send message: %w", err) } - return nil }