diff --git a/cloudflare/queues/content_type.go b/cloudflare/queues/content_type.go index b0db97a..8c6f255 100644 --- a/cloudflare/queues/content_type.go +++ b/cloudflare/queues/content_type.go @@ -1,82 +1,26 @@ package queues -import ( - "fmt" - "io" - "syscall/js" - - "github.com/syumai/workers/internal/jsutil" -) - -// QueueContentType represents the content type of a message produced to a queue. +// contentType represents the content type of a message produced to a queue. // This information mostly affects how the message body is represented in the Cloudflare UI and is NOT // propagated to the consumer side. // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuescontenttype -type QueueContentType string +type contentType string const ( - // QueueContentTypeJSON is the default content type for the produced queue message. + // contentTypeJSON is the default content type for the produced queue message. // The message body is NOT being marshaled before sending and is passed to js.ValueOf directly. // Make sure the body is serializable to JSON. // - https://pkg.go.dev/syscall/js#ValueOf - QueueContentTypeJSON QueueContentType = "json" + contentTypeJSON contentType = "json" - // QueueContentTypeV8 is currently treated the same as QueueContentTypeJSON. - QueueContentTypeV8 QueueContentType = "v8" + // contentTypeV8 is currently treated the same as QueueContentTypeJSON. + contentTypeV8 contentType = "v8" - // QueueContentTypeText is used to send a message as a string. + // contentTypeText is used to send a message as a string. // Supported body types are string, []byte and io.Reader. - QueueContentTypeText QueueContentType = "text" + contentTypeText contentType = "text" - // QueueContentTypeBytes is used to send a message as a byte array. + // contentTypeBytes is used to send a message as a byte array. // Supported body types are string, []byte, and io.Reader. - QueueContentTypeBytes QueueContentType = "bytes" + contentTypeBytes contentType = "bytes" ) - -func (o QueueContentType) mapValue(val any) (js.Value, error) { - switch o { - case QueueContentTypeText: - switch v := val.(type) { - case string: - return js.ValueOf(v), nil - case []byte: - return js.ValueOf(string(v)), nil - case io.Reader: - b, err := io.ReadAll(v) - if err != nil { - return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err) - } - return js.ValueOf(string(b)), nil - default: - return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val) - } - - case QueueContentTypeBytes: - var b []byte - switch v := val.(type) { - case string: - b = []byte(v) - case []byte: - b = v - case io.Reader: - var err error - b, err = io.ReadAll(v) - if err != nil { - return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err) - } - default: - return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val) - } - - ua := jsutil.NewUint8Array(len(b)) - js.CopyBytesToJS(ua, b) - // accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should - // be used instead and with Uint8Array.buffer as a value, the send simply fails - return ua, nil - - case QueueContentTypeJSON, QueueContentTypeV8: - return js.ValueOf(val), nil - } - - return js.Undefined(), fmt.Errorf("unknown content type: %s", o) -} diff --git a/cloudflare/queues/content_type_test.go b/cloudflare/queues/content_type_test.go deleted file mode 100644 index 5305c97..0000000 --- a/cloudflare/queues/content_type_test.go +++ /dev/null @@ -1,205 +0,0 @@ -package queues - -import ( - "bytes" - "syscall/js" - "testing" - - "github.com/syumai/workers/internal/jsutil" -) - -func TestContentType_mapValue(t *testing.T) { - tests := []struct { - name string - contentType QueueContentType - val any - want js.Value - wantErr bool - }{ - { - name: "string as text", - contentType: QueueContentTypeText, - val: "hello", - want: js.ValueOf("hello"), - }, - { - name: "[]byte as text", - contentType: QueueContentTypeText, - val: []byte("hello"), - want: js.ValueOf("hello"), - }, - { - name: "io.Reader as text", - contentType: QueueContentTypeText, - val: bytes.NewBufferString("hello"), - want: js.ValueOf("hello"), - }, - { - name: "number as text", - contentType: QueueContentTypeText, - val: 42, - want: js.Undefined(), - wantErr: true, - }, - { - name: "function as text", - contentType: QueueContentTypeText, - val: func() {}, - want: js.Undefined(), - wantErr: true, - }, - - { - name: "string as json", - contentType: QueueContentTypeJSON, - val: "hello", - want: js.ValueOf("hello"), - }, - { - name: "number as json", - contentType: QueueContentTypeJSON, - val: 42, - want: js.ValueOf(42), - }, - { - name: "bool as json", - contentType: QueueContentTypeJSON, - val: true, - want: js.ValueOf(true), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := tt.contentType.mapValue(tt.val) - if (err != nil) != tt.wantErr { - t.Fatalf("%s.mapValue() error = %v, wantErr %v", tt.contentType, err, tt.wantErr) - } - if got.String() != tt.want.String() { - t.Errorf("%s.mapValue() = %v, want %v", tt.contentType, got, tt.want) - } - }) - } -} - -func TestContentType_mapValue_bytes(t *testing.T) { - jsOf := func(b []byte) js.Value { - ua := jsutil.NewUint8Array(len(b)) - js.CopyBytesToJS(ua, b) - return ua - } - - tests := []struct { - name string - val any - want js.Value - }{ - { - name: "[]byte as bytes", - val: []byte("hello"), - want: jsOf([]byte("hello")), - }, - { - name: "string as bytes", - val: "hello", - want: jsOf([]byte("hello"))}, - { - name: "io.Reader as bytes", - val: bytes.NewBufferString("hello"), - want: jsOf([]byte("hello")), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := QueueContentTypeBytes.mapValue(tt.val) - if err != nil { - t.Fatalf("%s.mapValue() got error = %v", QueueContentTypeBytes, err) - } - if got.Type() != tt.want.Type() { - t.Errorf("%s.mapValue() = type %v, want type %v", QueueContentTypeBytes, got, tt.want) - } - if got.String() != tt.want.String() { - t.Errorf("%s.mapValue() = %v, want %v", QueueContentTypeBytes, got, tt.want) - } - }) - } -} - -func TestContentType_mapValue_map(t *testing.T) { - val := map[string]interface{}{ - "Name": "Alice", - "Age": 42, - } - - tests := []struct { - name string - contentType QueueContentType - }{ - { - name: "json", - contentType: QueueContentTypeJSON, - }, - { - name: "v8", - contentType: QueueContentTypeV8, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - - got, err := tt.contentType.mapValue(val) - if err != nil { - t.Fatalf("QueueContentTypeJSON.mapValue() got error = %v", err) - } - if got.Type() != js.TypeObject { - t.Errorf("QueueContentTypeJSON.mapValue() = type %v, want type %v", got, js.TypeObject) - } - if got.Get("Name").String() != "Alice" { - t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Name").String(), "Alice") - } - if got.Get("Age").Int() != 42 { - t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Age").Int(), 42) - } - }) - } -} - -type User struct { - Name string -} - -func TestContentType_mapValue_unsupported_types(t *testing.T) { - t.Run("struct as json", func(t *testing.T) { - defer func() { - if p := recover(); p == nil { - t.Fatalf("QueueContentTypeJSON.mapValue() did not panic") - } - }() - - val := User{Name: "Alice"} - _, _ = QueueContentTypeJSON.mapValue(val) - }) - - t.Run("slice of structs as json", func(t *testing.T) { - defer func() { - if p := recover(); p == nil { - t.Fatalf("QueueContentTypeJSON.mapValue() did not panic") - } - }() - - val := User{Name: "Alice"} - _, _ = QueueContentTypeJSON.mapValue([]User{val}) - }) - - t.Run("slice of bytes as json", func(t *testing.T) { - defer func() { - if p := recover(); p == nil { - t.Fatalf("QueueContentTypeJSON.mapValue() did not panic") - } - }() - - _, _ = QueueContentTypeJSON.mapValue([]byte("hello")) - }) -} diff --git a/cloudflare/queues/producer.go b/cloudflare/queues/producer.go index bdecfaa..ef30bea 100644 --- a/cloudflare/queues/producer.go +++ b/cloudflare/queues/producer.go @@ -26,28 +26,41 @@ func NewProducer(queueName string) (*Producer, error) { return &Producer{queue: inst}, nil } -// Send sends a single message to a queue. This function allows setting send options for the message. +func (p *Producer) SendText(content string, opts ...SendOption) error { + return p.send(js.ValueOf(content), contentTypeText, opts...) +} + +func (p *Producer) SendBytes(content []byte, opts ...SendOption) error { + ua := jsutil.NewUint8Array(len(content)) + js.CopyBytesToJS(ua, content) + // accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should + // be used instead and with Uint8Array.buffer as a value, the send simply fails + return p.send(ua, contentTypeBytes, opts...) +} + +func (p *Producer) SendJSON(content any, opts ...SendOption) error { + return p.send(js.ValueOf(content), contentTypeJSON, opts...) +} + +func (p *Producer) SendV8(content js.Value, opts ...SendOption) error { + return p.send(content, contentTypeV8, opts...) +} + +// send sends a single message to a queue. This function allows setting send options for the message. // If no options are provided, the default options are used (QueueContentTypeJSON and no delay). // // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer // - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuesendoptions -func (p *Producer) Send(content any, opts ...SendOption) error { - if p.queue.IsUndefined() { - return errors.New("queue object not found") +func (p *Producer) send(body js.Value, contentType contentType, opts ...SendOption) error { + options := &sendOptions{ + ContentType: contentType, } - - options := defaultSendOptions() for _, opt := range opts { opt(options) } - jsValue, err := options.ContentType.mapValue(content) - if err != nil { - return err - } - - prom := p.queue.Call("send", jsValue, options.toJS()) - _, err = jsutil.AwaitPromise(prom) + prom := p.queue.Call("send", body, options.toJS()) + _, err := jsutil.AwaitPromise(prom) return err } diff --git a/cloudflare/queues/producer_opts.go b/cloudflare/queues/producer_opts.go index 5d345ae..cf2c5db 100644 --- a/cloudflare/queues/producer_opts.go +++ b/cloudflare/queues/producer_opts.go @@ -10,19 +10,13 @@ import ( type sendOptions struct { // ContentType - Content type of the message // Default is "json" - ContentType QueueContentType + ContentType contentType // DelaySeconds - The number of seconds to delay the message. // Default is 0 DelaySeconds int } -func defaultSendOptions() *sendOptions { - return &sendOptions{ - ContentType: QueueContentTypeJSON, - } -} - func (o *sendOptions) toJS() js.Value { obj := jsutil.NewObject() obj.Set("contentType", string(o.ContentType)) @@ -36,13 +30,6 @@ func (o *sendOptions) toJS() js.Value { type SendOption func(*sendOptions) -// WithContentType changes the content type of the message. -func WithContentType(contentType QueueContentType) SendOption { - return func(o *sendOptions) { - o.ContentType = contentType - } -} - // WithDelay changes the number of seconds to delay the message. func WithDelay(d time.Duration) SendOption { return func(o *sendOptions) {