mirror of
https://github.com/syumai/workers.git
synced 2025-03-10 17:29:11 +00:00
Merge pull request #133 from syumai/split-producer-send-message-funcs
split queues.Producer's Send funcs and BatchMessage constructors
This commit is contained in:
commit
4beee48384
@ -1,6 +1,6 @@
|
|||||||
.PHONY: dev
|
.PHONY: dev
|
||||||
dev:
|
dev:
|
||||||
npx wrangler dev --port 8787
|
wrangler dev
|
||||||
|
|
||||||
.PHONY: build
|
.PHONY: build
|
||||||
build:
|
build:
|
||||||
@ -9,4 +9,4 @@ build:
|
|||||||
|
|
||||||
.PHONY: deploy
|
.PHONY: deploy
|
||||||
deploy:
|
deploy:
|
||||||
npx wrangler deploy
|
wrangler deploy
|
||||||
|
@ -23,6 +23,7 @@ func main() {
|
|||||||
http.HandleFunc("/", handleProduce)
|
http.HandleFunc("/", handleProduce)
|
||||||
workers.Serve(nil)
|
workers.Serve(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleProduce(w http.ResponseWriter, req *http.Request) {
|
func handleProduce(w http.ResponseWriter, req *http.Request) {
|
||||||
if req.URL.Path != "/" {
|
if req.URL.Path != "/" {
|
||||||
w.WriteHeader(http.StatusNotFound)
|
w.WriteHeader(http.StatusNotFound)
|
||||||
@ -48,7 +49,7 @@ func handleProduce(w http.ResponseWriter, req *http.Request) {
|
|||||||
err = produceText(q, req)
|
err = produceText(q, req)
|
||||||
case "application/json":
|
case "application/json":
|
||||||
log.Println("Handling json content type")
|
log.Println("Handling json content type")
|
||||||
err = produceJson(q, req)
|
err = produceJSON(q, req)
|
||||||
default:
|
default:
|
||||||
log.Println("Handling bytes content type")
|
log.Println("Handling bytes content type")
|
||||||
err = produceBytes(q, req)
|
err = produceBytes(q, req)
|
||||||
@ -68,38 +69,33 @@ func produceText(q *queues.Producer, req *http.Request) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to read request body: %w", err)
|
return fmt.Errorf("failed to read request body: %w", err)
|
||||||
}
|
}
|
||||||
if len(content) == 0 {
|
// text content type supports string
|
||||||
return fmt.Errorf("empty request body")
|
if err := q.SendText(string(content)); err != nil {
|
||||||
}
|
|
||||||
|
|
||||||
// text content type supports string and []byte messages
|
|
||||||
if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil {
|
|
||||||
return fmt.Errorf("failed to send message: %w", err)
|
return fmt.Errorf("failed to send message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func produceJson(q *queues.Producer, req *http.Request) error {
|
func produceJSON(q *queues.Producer, req *http.Request) error {
|
||||||
var data any
|
var data any
|
||||||
if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
|
if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
|
||||||
return fmt.Errorf("failed to read request body: %w", err)
|
return fmt.Errorf("failed to read request body: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// json content type is default and therefore can be omitted
|
|
||||||
// json content type supports messages of types that can be serialized to json
|
// json content type supports messages of types that can be serialized to json
|
||||||
if err := q.Send(data); err != nil {
|
if err := q.SendJSON(data); err != nil {
|
||||||
return fmt.Errorf("failed to send message: %w", err)
|
return fmt.Errorf("failed to send message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func produceBytes(q *queues.Producer, req *http.Request) error {
|
func produceBytes(q *queues.Producer, req *http.Request) error {
|
||||||
// bytes content type support messages of type []byte, string, and io.Reader
|
content, err := io.ReadAll(req.Body)
|
||||||
if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil {
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to read request body: %w", err)
|
||||||
|
}
|
||||||
|
// bytes content type support messages of type []byte
|
||||||
|
if err := q.SendBytes(content); err != nil {
|
||||||
return fmt.Errorf("failed to send message: %w", err)
|
return fmt.Errorf("failed to send message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
50
cloudflare/queues/batchmessage.go
Normal file
50
cloudflare/queues/batchmessage.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
package queues
|
||||||
|
|
||||||
|
import (
|
||||||
|
"syscall/js"
|
||||||
|
|
||||||
|
"github.com/syumai/workers/internal/jsutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BatchMessage struct {
|
||||||
|
body js.Value
|
||||||
|
options *sendOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTextBatchMessage creates a single text message to be batched before sending to a queue.
|
||||||
|
func NewTextBatchMessage(content string, opts ...SendOption) *BatchMessage {
|
||||||
|
return newBatchMessage(js.ValueOf(content), contentTypeText, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBytesBatchMessage creates a single byte array message to be batched before sending to a queue.
|
||||||
|
func NewBytesBatchMessage(content []byte, opts ...SendOption) *BatchMessage {
|
||||||
|
return newBatchMessage(js.ValueOf(content), contentTypeBytes, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJSONBatchMessage creates a single JSON message to be batched before sending to a queue.
|
||||||
|
func NewJSONBatchMessage(content any, opts ...SendOption) *BatchMessage {
|
||||||
|
return newBatchMessage(js.ValueOf(content), contentTypeJSON, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewV8BatchMessage creates a single raw JS value message to be batched before sending to a queue.
|
||||||
|
func NewV8BatchMessage(content js.Value, opts ...SendOption) *BatchMessage {
|
||||||
|
return newBatchMessage(content, contentTypeV8, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newBatchMessage creates a single message to be batched before sending to a queue.
|
||||||
|
func newBatchMessage(body js.Value, contentType contentType, opts ...SendOption) *BatchMessage {
|
||||||
|
options := sendOptions{
|
||||||
|
ContentType: contentType,
|
||||||
|
}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(&options)
|
||||||
|
}
|
||||||
|
return &BatchMessage{body: body, options: &options}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *BatchMessage) toJS() js.Value {
|
||||||
|
obj := jsutil.NewObject()
|
||||||
|
obj.Set("body", m.body)
|
||||||
|
obj.Set("options", m.options.toJS())
|
||||||
|
return obj
|
||||||
|
}
|
36
cloudflare/queues/batchsendoptions.go
Normal file
36
cloudflare/queues/batchsendoptions.go
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
package queues
|
||||||
|
|
||||||
|
import (
|
||||||
|
"syscall/js"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/syumai/workers/internal/jsutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type batchSendOptions struct {
|
||||||
|
// DelaySeconds - The number of seconds to delay the message.
|
||||||
|
// Default is 0
|
||||||
|
DelaySeconds int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *batchSendOptions) toJS() js.Value {
|
||||||
|
if o == nil {
|
||||||
|
return js.Undefined()
|
||||||
|
}
|
||||||
|
|
||||||
|
obj := jsutil.NewObject()
|
||||||
|
if o.DelaySeconds != 0 {
|
||||||
|
obj.Set("delaySeconds", o.DelaySeconds)
|
||||||
|
}
|
||||||
|
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
|
||||||
|
type BatchSendOption func(*batchSendOptions)
|
||||||
|
|
||||||
|
// WithBatchDelaySeconds changes the number of seconds to delay the message.
|
||||||
|
func WithBatchDelaySeconds(d time.Duration) BatchSendOption {
|
||||||
|
return func(o *batchSendOptions) {
|
||||||
|
o.DelaySeconds = int(d.Seconds())
|
||||||
|
}
|
||||||
|
}
|
@ -1,82 +0,0 @@
|
|||||||
package queues
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"syscall/js"
|
|
||||||
|
|
||||||
"github.com/syumai/workers/internal/jsutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
// QueueContentType represents the content type of a message produced to a queue.
|
|
||||||
// This information mostly affects how the message body is represented in the Cloudflare UI and is NOT
|
|
||||||
// propagated to the consumer side.
|
|
||||||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuescontenttype
|
|
||||||
type QueueContentType string
|
|
||||||
|
|
||||||
const (
|
|
||||||
// QueueContentTypeJSON is the default content type for the produced queue message.
|
|
||||||
// The message body is NOT being marshaled before sending and is passed to js.ValueOf directly.
|
|
||||||
// Make sure the body is serializable to JSON.
|
|
||||||
// - https://pkg.go.dev/syscall/js#ValueOf
|
|
||||||
QueueContentTypeJSON QueueContentType = "json"
|
|
||||||
|
|
||||||
// QueueContentTypeV8 is currently treated the same as QueueContentTypeJSON.
|
|
||||||
QueueContentTypeV8 QueueContentType = "v8"
|
|
||||||
|
|
||||||
// QueueContentTypeText is used to send a message as a string.
|
|
||||||
// Supported body types are string, []byte and io.Reader.
|
|
||||||
QueueContentTypeText QueueContentType = "text"
|
|
||||||
|
|
||||||
// QueueContentTypeBytes is used to send a message as a byte array.
|
|
||||||
// Supported body types are string, []byte, and io.Reader.
|
|
||||||
QueueContentTypeBytes QueueContentType = "bytes"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (o QueueContentType) mapValue(val any) (js.Value, error) {
|
|
||||||
switch o {
|
|
||||||
case QueueContentTypeText:
|
|
||||||
switch v := val.(type) {
|
|
||||||
case string:
|
|
||||||
return js.ValueOf(v), nil
|
|
||||||
case []byte:
|
|
||||||
return js.ValueOf(string(v)), nil
|
|
||||||
case io.Reader:
|
|
||||||
b, err := io.ReadAll(v)
|
|
||||||
if err != nil {
|
|
||||||
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
|
|
||||||
}
|
|
||||||
return js.ValueOf(string(b)), nil
|
|
||||||
default:
|
|
||||||
return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val)
|
|
||||||
}
|
|
||||||
|
|
||||||
case QueueContentTypeBytes:
|
|
||||||
var b []byte
|
|
||||||
switch v := val.(type) {
|
|
||||||
case string:
|
|
||||||
b = []byte(v)
|
|
||||||
case []byte:
|
|
||||||
b = v
|
|
||||||
case io.Reader:
|
|
||||||
var err error
|
|
||||||
b, err = io.ReadAll(v)
|
|
||||||
if err != nil {
|
|
||||||
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val)
|
|
||||||
}
|
|
||||||
|
|
||||||
ua := jsutil.NewUint8Array(len(b))
|
|
||||||
js.CopyBytesToJS(ua, b)
|
|
||||||
// accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should
|
|
||||||
// be used instead and with Uint8Array.buffer as a value, the send simply fails
|
|
||||||
return ua, nil
|
|
||||||
|
|
||||||
case QueueContentTypeJSON, QueueContentTypeV8:
|
|
||||||
return js.ValueOf(val), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return js.Undefined(), fmt.Errorf("unknown content type: %s", o)
|
|
||||||
}
|
|
@ -1,205 +0,0 @@
|
|||||||
package queues
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"syscall/js"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/syumai/workers/internal/jsutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestContentType_mapValue(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
contentType QueueContentType
|
|
||||||
val any
|
|
||||||
want js.Value
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "string as text",
|
|
||||||
contentType: QueueContentTypeText,
|
|
||||||
val: "hello",
|
|
||||||
want: js.ValueOf("hello"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "[]byte as text",
|
|
||||||
contentType: QueueContentTypeText,
|
|
||||||
val: []byte("hello"),
|
|
||||||
want: js.ValueOf("hello"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "io.Reader as text",
|
|
||||||
contentType: QueueContentTypeText,
|
|
||||||
val: bytes.NewBufferString("hello"),
|
|
||||||
want: js.ValueOf("hello"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "number as text",
|
|
||||||
contentType: QueueContentTypeText,
|
|
||||||
val: 42,
|
|
||||||
want: js.Undefined(),
|
|
||||||
wantErr: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "function as text",
|
|
||||||
contentType: QueueContentTypeText,
|
|
||||||
val: func() {},
|
|
||||||
want: js.Undefined(),
|
|
||||||
wantErr: true,
|
|
||||||
},
|
|
||||||
|
|
||||||
{
|
|
||||||
name: "string as json",
|
|
||||||
contentType: QueueContentTypeJSON,
|
|
||||||
val: "hello",
|
|
||||||
want: js.ValueOf("hello"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "number as json",
|
|
||||||
contentType: QueueContentTypeJSON,
|
|
||||||
val: 42,
|
|
||||||
want: js.ValueOf(42),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "bool as json",
|
|
||||||
contentType: QueueContentTypeJSON,
|
|
||||||
val: true,
|
|
||||||
want: js.ValueOf(true),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
got, err := tt.contentType.mapValue(tt.val)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Fatalf("%s.mapValue() error = %v, wantErr %v", tt.contentType, err, tt.wantErr)
|
|
||||||
}
|
|
||||||
if got.String() != tt.want.String() {
|
|
||||||
t.Errorf("%s.mapValue() = %v, want %v", tt.contentType, got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestContentType_mapValue_bytes(t *testing.T) {
|
|
||||||
jsOf := func(b []byte) js.Value {
|
|
||||||
ua := jsutil.NewUint8Array(len(b))
|
|
||||||
js.CopyBytesToJS(ua, b)
|
|
||||||
return ua
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
val any
|
|
||||||
want js.Value
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "[]byte as bytes",
|
|
||||||
val: []byte("hello"),
|
|
||||||
want: jsOf([]byte("hello")),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "string as bytes",
|
|
||||||
val: "hello",
|
|
||||||
want: jsOf([]byte("hello"))},
|
|
||||||
{
|
|
||||||
name: "io.Reader as bytes",
|
|
||||||
val: bytes.NewBufferString("hello"),
|
|
||||||
want: jsOf([]byte("hello")),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
got, err := QueueContentTypeBytes.mapValue(tt.val)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("%s.mapValue() got error = %v", QueueContentTypeBytes, err)
|
|
||||||
}
|
|
||||||
if got.Type() != tt.want.Type() {
|
|
||||||
t.Errorf("%s.mapValue() = type %v, want type %v", QueueContentTypeBytes, got, tt.want)
|
|
||||||
}
|
|
||||||
if got.String() != tt.want.String() {
|
|
||||||
t.Errorf("%s.mapValue() = %v, want %v", QueueContentTypeBytes, got, tt.want)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestContentType_mapValue_map(t *testing.T) {
|
|
||||||
val := map[string]interface{}{
|
|
||||||
"Name": "Alice",
|
|
||||||
"Age": 42,
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
contentType QueueContentType
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "json",
|
|
||||||
contentType: QueueContentTypeJSON,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "v8",
|
|
||||||
contentType: QueueContentTypeV8,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
|
|
||||||
got, err := tt.contentType.mapValue(val)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("QueueContentTypeJSON.mapValue() got error = %v", err)
|
|
||||||
}
|
|
||||||
if got.Type() != js.TypeObject {
|
|
||||||
t.Errorf("QueueContentTypeJSON.mapValue() = type %v, want type %v", got, js.TypeObject)
|
|
||||||
}
|
|
||||||
if got.Get("Name").String() != "Alice" {
|
|
||||||
t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Name").String(), "Alice")
|
|
||||||
}
|
|
||||||
if got.Get("Age").Int() != 42 {
|
|
||||||
t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Age").Int(), 42)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type User struct {
|
|
||||||
Name string
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestContentType_mapValue_unsupported_types(t *testing.T) {
|
|
||||||
t.Run("struct as json", func(t *testing.T) {
|
|
||||||
defer func() {
|
|
||||||
if p := recover(); p == nil {
|
|
||||||
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
val := User{Name: "Alice"}
|
|
||||||
_, _ = QueueContentTypeJSON.mapValue(val)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("slice of structs as json", func(t *testing.T) {
|
|
||||||
defer func() {
|
|
||||||
if p := recover(); p == nil {
|
|
||||||
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
val := User{Name: "Alice"}
|
|
||||||
_, _ = QueueContentTypeJSON.mapValue([]User{val})
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("slice of bytes as json", func(t *testing.T) {
|
|
||||||
defer func() {
|
|
||||||
if p := recover(); p == nil {
|
|
||||||
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
_, _ = QueueContentTypeJSON.mapValue([]byte("hello"))
|
|
||||||
})
|
|
||||||
}
|
|
26
cloudflare/queues/contenttype.go
Normal file
26
cloudflare/queues/contenttype.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package queues
|
||||||
|
|
||||||
|
// contentType represents the content type of a message produced to a queue.
|
||||||
|
// This information mostly affects how the message body is represented in the Cloudflare UI and is NOT
|
||||||
|
// propagated to the consumer side.
|
||||||
|
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuescontenttype
|
||||||
|
type contentType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
// contentTypeJSON is the default content type for the produced queue message.
|
||||||
|
// The message body is NOT being marshaled before sending and is passed to js.ValueOf directly.
|
||||||
|
// Make sure the body is serializable to JSON.
|
||||||
|
// - https://pkg.go.dev/syscall/js#ValueOf
|
||||||
|
contentTypeJSON contentType = "json"
|
||||||
|
|
||||||
|
// contentTypeV8 is currently treated the same as QueueContentTypeJSON.
|
||||||
|
contentTypeV8 contentType = "v8"
|
||||||
|
|
||||||
|
// contentTypeText is used to send a message as a string.
|
||||||
|
// Supported body types are string, []byte and io.Reader.
|
||||||
|
contentTypeText contentType = "text"
|
||||||
|
|
||||||
|
// contentTypeBytes is used to send a message as a byte array.
|
||||||
|
// Supported body types are string, []byte, and io.Reader.
|
||||||
|
contentTypeBytes contentType = "bytes"
|
||||||
|
)
|
@ -1,7 +1,6 @@
|
|||||||
package queues
|
package queues
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"syscall/js"
|
"syscall/js"
|
||||||
|
|
||||||
@ -9,37 +8,6 @@ import (
|
|||||||
"github.com/syumai/workers/internal/jsutil"
|
"github.com/syumai/workers/internal/jsutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BatchMessage struct {
|
|
||||||
body any
|
|
||||||
options *sendOptions
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewBatchMessage creates a single message to be batched before sending to a queue.
|
|
||||||
func NewBatchMessage(body any, opts ...SendOption) *BatchMessage {
|
|
||||||
options := defaultSendOptions()
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(options)
|
|
||||||
}
|
|
||||||
return &BatchMessage{body: body, options: options}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *BatchMessage) toJS() (js.Value, error) {
|
|
||||||
if m == nil {
|
|
||||||
return js.Undefined(), errors.New("message is nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
jsValue, err := m.options.ContentType.mapValue(m.body)
|
|
||||||
if err != nil {
|
|
||||||
return js.Undefined(), err
|
|
||||||
}
|
|
||||||
|
|
||||||
obj := jsutil.NewObject()
|
|
||||||
obj.Set("body", jsValue)
|
|
||||||
obj.Set("options", m.options.toJS())
|
|
||||||
|
|
||||||
return obj, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type Producer struct {
|
type Producer struct {
|
||||||
// queue - Objects that Queue API belongs to. Default is Global
|
// queue - Objects that Queue API belongs to. Default is Global
|
||||||
queue js.Value
|
queue js.Value
|
||||||
@ -57,56 +25,57 @@ func NewProducer(queueName string) (*Producer, error) {
|
|||||||
return &Producer{queue: inst}, nil
|
return &Producer{queue: inst}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send sends a single message to a queue. This function allows setting send options for the message.
|
// SendText sends a single text message to a queue.
|
||||||
// If no options are provided, the default options are used (QueueContentTypeJSON and no delay).
|
func (p *Producer) SendText(body string, opts ...SendOption) error {
|
||||||
//
|
return p.send(js.ValueOf(body), contentTypeText, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendBytes sends a single byte array message to a queue.
|
||||||
|
func (p *Producer) SendBytes(body []byte, opts ...SendOption) error {
|
||||||
|
ua := jsutil.NewUint8Array(len(body))
|
||||||
|
js.CopyBytesToJS(ua, body)
|
||||||
|
// accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should
|
||||||
|
// be used instead and with Uint8Array.buffer as a value, the send simply fails
|
||||||
|
return p.send(ua, contentTypeBytes, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendJSON sends a single JSON message to a queue.
|
||||||
|
func (p *Producer) SendJSON(body any, opts ...SendOption) error {
|
||||||
|
return p.send(js.ValueOf(body), contentTypeJSON, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendV8 sends a single raw JS value message to a queue.
|
||||||
|
func (p *Producer) SendV8(body js.Value, opts ...SendOption) error {
|
||||||
|
return p.send(body, contentTypeV8, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// send sends a single message to a queue.
|
||||||
|
// This function allows setting send options for the message.
|
||||||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer
|
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer
|
||||||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuesendoptions
|
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuesendoptions
|
||||||
func (p *Producer) Send(content any, opts ...SendOption) error {
|
func (p *Producer) send(body js.Value, contentType contentType, opts ...SendOption) error {
|
||||||
if p.queue.IsUndefined() {
|
options := sendOptions{
|
||||||
return errors.New("queue object not found")
|
ContentType: contentType,
|
||||||
}
|
}
|
||||||
|
|
||||||
options := defaultSendOptions()
|
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
opt(options)
|
opt(&options)
|
||||||
}
|
}
|
||||||
|
|
||||||
jsValue, err := options.ContentType.mapValue(content)
|
prom := p.queue.Call("send", body, options.toJS())
|
||||||
if err != nil {
|
_, err := jsutil.AwaitPromise(prom)
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
prom := p.queue.Call("send", jsValue, options.toJS())
|
|
||||||
_, err = jsutil.AwaitPromise(prom)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendBatch sends multiple messages to a queue. This function allows setting options for each message.
|
// SendBatch sends multiple messages to a queue. This function allows setting options for each message.
|
||||||
func (p *Producer) SendBatch(messages []*BatchMessage, opts ...BatchSendOption) error {
|
func (p *Producer) SendBatch(messages []*BatchMessage, opts ...BatchSendOption) error {
|
||||||
if p.queue.IsUndefined() {
|
var options batchSendOptions
|
||||||
return errors.New("queue object not found")
|
for _, opt := range opts {
|
||||||
}
|
opt(&options)
|
||||||
|
|
||||||
if len(messages) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var options *batchSendOptions
|
|
||||||
if len(opts) > 0 {
|
|
||||||
options = &batchSendOptions{}
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(options)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
jsArray := jsutil.NewArray(len(messages))
|
jsArray := jsutil.NewArray(len(messages))
|
||||||
for i, message := range messages {
|
for i, message := range messages {
|
||||||
jsValue, err := message.toJS()
|
jsArray.SetIndex(i, message.toJS())
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to convert message %d to JS: %w", i, err)
|
|
||||||
}
|
|
||||||
jsArray.SetIndex(i, jsValue)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
prom := p.queue.Call("sendBatch", jsArray, options.toJS())
|
prom := p.queue.Call("sendBatch", jsArray, options.toJS())
|
||||||
|
@ -1,79 +0,0 @@
|
|||||||
package queues
|
|
||||||
|
|
||||||
import (
|
|
||||||
"syscall/js"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/syumai/workers/internal/jsutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
type sendOptions struct {
|
|
||||||
// ContentType - Content type of the message
|
|
||||||
// Default is "json"
|
|
||||||
ContentType QueueContentType
|
|
||||||
|
|
||||||
// DelaySeconds - The number of seconds to delay the message.
|
|
||||||
// Default is 0
|
|
||||||
DelaySeconds int
|
|
||||||
}
|
|
||||||
|
|
||||||
func defaultSendOptions() *sendOptions {
|
|
||||||
return &sendOptions{
|
|
||||||
ContentType: QueueContentTypeJSON,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o *sendOptions) toJS() js.Value {
|
|
||||||
obj := jsutil.NewObject()
|
|
||||||
obj.Set("contentType", string(o.ContentType))
|
|
||||||
|
|
||||||
if o.DelaySeconds != 0 {
|
|
||||||
obj.Set("delaySeconds", o.DelaySeconds)
|
|
||||||
}
|
|
||||||
|
|
||||||
return obj
|
|
||||||
}
|
|
||||||
|
|
||||||
type SendOption func(*sendOptions)
|
|
||||||
|
|
||||||
// WithContentType changes the content type of the message.
|
|
||||||
func WithContentType(contentType QueueContentType) SendOption {
|
|
||||||
return func(o *sendOptions) {
|
|
||||||
o.ContentType = contentType
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithDelay changes the number of seconds to delay the message.
|
|
||||||
func WithDelay(d time.Duration) SendOption {
|
|
||||||
return func(o *sendOptions) {
|
|
||||||
o.DelaySeconds = int(d.Seconds())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type batchSendOptions struct {
|
|
||||||
// DelaySeconds - The number of seconds to delay the message.
|
|
||||||
// Default is 0
|
|
||||||
DelaySeconds int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o *batchSendOptions) toJS() js.Value {
|
|
||||||
if o == nil {
|
|
||||||
return js.Undefined()
|
|
||||||
}
|
|
||||||
|
|
||||||
obj := jsutil.NewObject()
|
|
||||||
if o.DelaySeconds != 0 {
|
|
||||||
obj.Set("delaySeconds", o.DelaySeconds)
|
|
||||||
}
|
|
||||||
|
|
||||||
return obj
|
|
||||||
}
|
|
||||||
|
|
||||||
type BatchSendOption func(*batchSendOptions)
|
|
||||||
|
|
||||||
// WithBatchDelay changes the number of seconds to delay the message.
|
|
||||||
func WithBatchDelay(d time.Duration) BatchSendOption {
|
|
||||||
return func(o *batchSendOptions) {
|
|
||||||
o.DelaySeconds = int(d.Seconds())
|
|
||||||
}
|
|
||||||
}
|
|
@ -53,7 +53,7 @@ func TestSend(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
producer := validatingProducer(t, validation)
|
producer := validatingProducer(t, validation)
|
||||||
err := producer.Send("hello", WithContentType(QueueContentTypeText))
|
err := producer.SendText("hello")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Send failed: %v", err)
|
t.Fatalf("Send failed: %v", err)
|
||||||
}
|
}
|
||||||
@ -74,85 +74,14 @@ func TestSend(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
producer := validatingProducer(t, validation)
|
producer := validatingProducer(t, validation)
|
||||||
err := producer.Send("hello", WithContentType(QueueContentTypeJSON))
|
err := producer.SendJSON("hello")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Send failed: %v", err)
|
t.Fatalf("Send failed: %v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSend_ContentTypeOption(t *testing.T) {
|
func TestSendBatch(t *testing.T) {
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
options []SendOption
|
|
||||||
expectedContentType string
|
|
||||||
expectedDelaySec int
|
|
||||||
wantErr bool
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "text",
|
|
||||||
options: []SendOption{WithContentType(QueueContentTypeText)},
|
|
||||||
expectedContentType: "text",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "json",
|
|
||||||
options: []SendOption{WithContentType(QueueContentTypeJSON)},
|
|
||||||
expectedContentType: "json",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "default",
|
|
||||||
options: nil,
|
|
||||||
expectedContentType: "json",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "v8",
|
|
||||||
options: []SendOption{WithContentType(QueueContentTypeV8)},
|
|
||||||
expectedContentType: "v8",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "bytes",
|
|
||||||
options: []SendOption{WithContentType(QueueContentTypeBytes)},
|
|
||||||
expectedContentType: "bytes",
|
|
||||||
},
|
|
||||||
|
|
||||||
{
|
|
||||||
name: "delay",
|
|
||||||
options: []SendOption{WithDelay(5 * time.Second)},
|
|
||||||
expectedDelaySec: 5,
|
|
||||||
expectedContentType: "json",
|
|
||||||
},
|
|
||||||
|
|
||||||
{
|
|
||||||
name: "invalid content type",
|
|
||||||
options: []SendOption{WithContentType("invalid")},
|
|
||||||
wantErr: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tt := range tests {
|
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
|
||||||
validation := func(message js.Value, options js.Value) error {
|
|
||||||
gotCT := options.Get("contentType").String()
|
|
||||||
if gotCT != string(tt.expectedContentType) {
|
|
||||||
return fmt.Errorf("expected content type %q, got %q", tt.expectedContentType, gotCT)
|
|
||||||
}
|
|
||||||
gotDelaySec := jsutil.MaybeInt(options.Get("delaySeconds"))
|
|
||||||
if gotDelaySec != tt.expectedDelaySec {
|
|
||||||
return fmt.Errorf("expected delay %d, got %d", tt.expectedDelaySec, gotDelaySec)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
producer := validatingProducer(t, validation)
|
|
||||||
err := producer.Send("hello", tt.options...)
|
|
||||||
if (err != nil) != tt.wantErr {
|
|
||||||
t.Fatalf("expected error: %t, got %v", tt.wantErr, err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSendBatch_Defaults(t *testing.T) {
|
|
||||||
validation := func(batch js.Value, options js.Value) error {
|
validation := func(batch js.Value, options js.Value) error {
|
||||||
if batch.Type() != js.TypeObject {
|
if batch.Type() != js.TypeObject {
|
||||||
return errors.New("message batch must be an object (array)")
|
return errors.New("message batch must be an object (array)")
|
||||||
@ -179,9 +108,9 @@ func TestSendBatch_Defaults(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var batch []*BatchMessage = []*BatchMessage{
|
batch := []*BatchMessage{
|
||||||
NewBatchMessage("hello"),
|
NewJSONBatchMessage("hello"),
|
||||||
NewBatchMessage("world", WithContentType(QueueContentTypeText)),
|
NewTextBatchMessage("world"),
|
||||||
}
|
}
|
||||||
|
|
||||||
producer := validatingProducer(t, validation)
|
producer := validatingProducer(t, validation)
|
||||||
@ -199,12 +128,12 @@ func TestSendBatch_Options(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var batch []*BatchMessage = []*BatchMessage{
|
batch := []*BatchMessage{
|
||||||
NewBatchMessage("hello"),
|
NewTextBatchMessage("hello"),
|
||||||
}
|
}
|
||||||
|
|
||||||
producer := validatingProducer(t, validation)
|
producer := validatingProducer(t, validation)
|
||||||
err := producer.SendBatch(batch, WithBatchDelay(5*time.Second))
|
err := producer.SendBatch(batch, WithBatchDelaySeconds(5*time.Second))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("SendBatch failed: %v", err)
|
t.Fatalf("SendBatch failed: %v", err)
|
||||||
}
|
}
|
||||||
|
38
cloudflare/queues/sendoptions.go
Normal file
38
cloudflare/queues/sendoptions.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
package queues
|
||||||
|
|
||||||
|
import (
|
||||||
|
"syscall/js"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/syumai/workers/internal/jsutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type sendOptions struct {
|
||||||
|
// ContentType - Content type of the message
|
||||||
|
// Default is "json"
|
||||||
|
ContentType contentType
|
||||||
|
|
||||||
|
// DelaySeconds - The number of seconds to delay the message.
|
||||||
|
// Default is 0
|
||||||
|
DelaySeconds int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *sendOptions) toJS() js.Value {
|
||||||
|
obj := jsutil.NewObject()
|
||||||
|
obj.Set("contentType", string(o.ContentType))
|
||||||
|
|
||||||
|
if o.DelaySeconds != 0 {
|
||||||
|
obj.Set("delaySeconds", o.DelaySeconds)
|
||||||
|
}
|
||||||
|
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
|
||||||
|
type SendOption func(*sendOptions)
|
||||||
|
|
||||||
|
// WithDelaySeconds changes the number of seconds to delay the message.
|
||||||
|
func WithDelaySeconds(d time.Duration) SendOption {
|
||||||
|
return func(o *sendOptions) {
|
||||||
|
o.DelaySeconds = int(d.Seconds())
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user