mirror of
https://github.com/syumai/workers.git
synced 2025-03-11 01:39:11 +00:00
split queue.Producer's send methods
This commit is contained in:
parent
693eaf206f
commit
9cfa0111ef
@ -1,82 +1,26 @@
|
||||
package queues
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"syscall/js"
|
||||
|
||||
"github.com/syumai/workers/internal/jsutil"
|
||||
)
|
||||
|
||||
// QueueContentType represents the content type of a message produced to a queue.
|
||||
// contentType represents the content type of a message produced to a queue.
|
||||
// This information mostly affects how the message body is represented in the Cloudflare UI and is NOT
|
||||
// propagated to the consumer side.
|
||||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuescontenttype
|
||||
type QueueContentType string
|
||||
type contentType string
|
||||
|
||||
const (
|
||||
// QueueContentTypeJSON is the default content type for the produced queue message.
|
||||
// contentTypeJSON is the default content type for the produced queue message.
|
||||
// The message body is NOT being marshaled before sending and is passed to js.ValueOf directly.
|
||||
// Make sure the body is serializable to JSON.
|
||||
// - https://pkg.go.dev/syscall/js#ValueOf
|
||||
QueueContentTypeJSON QueueContentType = "json"
|
||||
contentTypeJSON contentType = "json"
|
||||
|
||||
// QueueContentTypeV8 is currently treated the same as QueueContentTypeJSON.
|
||||
QueueContentTypeV8 QueueContentType = "v8"
|
||||
// contentTypeV8 is currently treated the same as QueueContentTypeJSON.
|
||||
contentTypeV8 contentType = "v8"
|
||||
|
||||
// QueueContentTypeText is used to send a message as a string.
|
||||
// contentTypeText is used to send a message as a string.
|
||||
// Supported body types are string, []byte and io.Reader.
|
||||
QueueContentTypeText QueueContentType = "text"
|
||||
contentTypeText contentType = "text"
|
||||
|
||||
// QueueContentTypeBytes is used to send a message as a byte array.
|
||||
// contentTypeBytes is used to send a message as a byte array.
|
||||
// Supported body types are string, []byte, and io.Reader.
|
||||
QueueContentTypeBytes QueueContentType = "bytes"
|
||||
contentTypeBytes contentType = "bytes"
|
||||
)
|
||||
|
||||
func (o QueueContentType) mapValue(val any) (js.Value, error) {
|
||||
switch o {
|
||||
case QueueContentTypeText:
|
||||
switch v := val.(type) {
|
||||
case string:
|
||||
return js.ValueOf(v), nil
|
||||
case []byte:
|
||||
return js.ValueOf(string(v)), nil
|
||||
case io.Reader:
|
||||
b, err := io.ReadAll(v)
|
||||
if err != nil {
|
||||
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
|
||||
}
|
||||
return js.ValueOf(string(b)), nil
|
||||
default:
|
||||
return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val)
|
||||
}
|
||||
|
||||
case QueueContentTypeBytes:
|
||||
var b []byte
|
||||
switch v := val.(type) {
|
||||
case string:
|
||||
b = []byte(v)
|
||||
case []byte:
|
||||
b = v
|
||||
case io.Reader:
|
||||
var err error
|
||||
b, err = io.ReadAll(v)
|
||||
if err != nil {
|
||||
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
|
||||
}
|
||||
default:
|
||||
return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val)
|
||||
}
|
||||
|
||||
ua := jsutil.NewUint8Array(len(b))
|
||||
js.CopyBytesToJS(ua, b)
|
||||
// accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should
|
||||
// be used instead and with Uint8Array.buffer as a value, the send simply fails
|
||||
return ua, nil
|
||||
|
||||
case QueueContentTypeJSON, QueueContentTypeV8:
|
||||
return js.ValueOf(val), nil
|
||||
}
|
||||
|
||||
return js.Undefined(), fmt.Errorf("unknown content type: %s", o)
|
||||
}
|
||||
|
@ -1,205 +0,0 @@
|
||||
package queues
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"syscall/js"
|
||||
"testing"
|
||||
|
||||
"github.com/syumai/workers/internal/jsutil"
|
||||
)
|
||||
|
||||
func TestContentType_mapValue(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
contentType QueueContentType
|
||||
val any
|
||||
want js.Value
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "string as text",
|
||||
contentType: QueueContentTypeText,
|
||||
val: "hello",
|
||||
want: js.ValueOf("hello"),
|
||||
},
|
||||
{
|
||||
name: "[]byte as text",
|
||||
contentType: QueueContentTypeText,
|
||||
val: []byte("hello"),
|
||||
want: js.ValueOf("hello"),
|
||||
},
|
||||
{
|
||||
name: "io.Reader as text",
|
||||
contentType: QueueContentTypeText,
|
||||
val: bytes.NewBufferString("hello"),
|
||||
want: js.ValueOf("hello"),
|
||||
},
|
||||
{
|
||||
name: "number as text",
|
||||
contentType: QueueContentTypeText,
|
||||
val: 42,
|
||||
want: js.Undefined(),
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "function as text",
|
||||
contentType: QueueContentTypeText,
|
||||
val: func() {},
|
||||
want: js.Undefined(),
|
||||
wantErr: true,
|
||||
},
|
||||
|
||||
{
|
||||
name: "string as json",
|
||||
contentType: QueueContentTypeJSON,
|
||||
val: "hello",
|
||||
want: js.ValueOf("hello"),
|
||||
},
|
||||
{
|
||||
name: "number as json",
|
||||
contentType: QueueContentTypeJSON,
|
||||
val: 42,
|
||||
want: js.ValueOf(42),
|
||||
},
|
||||
{
|
||||
name: "bool as json",
|
||||
contentType: QueueContentTypeJSON,
|
||||
val: true,
|
||||
want: js.ValueOf(true),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := tt.contentType.mapValue(tt.val)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Fatalf("%s.mapValue() error = %v, wantErr %v", tt.contentType, err, tt.wantErr)
|
||||
}
|
||||
if got.String() != tt.want.String() {
|
||||
t.Errorf("%s.mapValue() = %v, want %v", tt.contentType, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestContentType_mapValue_bytes(t *testing.T) {
|
||||
jsOf := func(b []byte) js.Value {
|
||||
ua := jsutil.NewUint8Array(len(b))
|
||||
js.CopyBytesToJS(ua, b)
|
||||
return ua
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
val any
|
||||
want js.Value
|
||||
}{
|
||||
{
|
||||
name: "[]byte as bytes",
|
||||
val: []byte("hello"),
|
||||
want: jsOf([]byte("hello")),
|
||||
},
|
||||
{
|
||||
name: "string as bytes",
|
||||
val: "hello",
|
||||
want: jsOf([]byte("hello"))},
|
||||
{
|
||||
name: "io.Reader as bytes",
|
||||
val: bytes.NewBufferString("hello"),
|
||||
want: jsOf([]byte("hello")),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := QueueContentTypeBytes.mapValue(tt.val)
|
||||
if err != nil {
|
||||
t.Fatalf("%s.mapValue() got error = %v", QueueContentTypeBytes, err)
|
||||
}
|
||||
if got.Type() != tt.want.Type() {
|
||||
t.Errorf("%s.mapValue() = type %v, want type %v", QueueContentTypeBytes, got, tt.want)
|
||||
}
|
||||
if got.String() != tt.want.String() {
|
||||
t.Errorf("%s.mapValue() = %v, want %v", QueueContentTypeBytes, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestContentType_mapValue_map(t *testing.T) {
|
||||
val := map[string]interface{}{
|
||||
"Name": "Alice",
|
||||
"Age": 42,
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
contentType QueueContentType
|
||||
}{
|
||||
{
|
||||
name: "json",
|
||||
contentType: QueueContentTypeJSON,
|
||||
},
|
||||
{
|
||||
name: "v8",
|
||||
contentType: QueueContentTypeV8,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
||||
got, err := tt.contentType.mapValue(val)
|
||||
if err != nil {
|
||||
t.Fatalf("QueueContentTypeJSON.mapValue() got error = %v", err)
|
||||
}
|
||||
if got.Type() != js.TypeObject {
|
||||
t.Errorf("QueueContentTypeJSON.mapValue() = type %v, want type %v", got, js.TypeObject)
|
||||
}
|
||||
if got.Get("Name").String() != "Alice" {
|
||||
t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Name").String(), "Alice")
|
||||
}
|
||||
if got.Get("Age").Int() != 42 {
|
||||
t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Age").Int(), 42)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type User struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
func TestContentType_mapValue_unsupported_types(t *testing.T) {
|
||||
t.Run("struct as json", func(t *testing.T) {
|
||||
defer func() {
|
||||
if p := recover(); p == nil {
|
||||
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
|
||||
}
|
||||
}()
|
||||
|
||||
val := User{Name: "Alice"}
|
||||
_, _ = QueueContentTypeJSON.mapValue(val)
|
||||
})
|
||||
|
||||
t.Run("slice of structs as json", func(t *testing.T) {
|
||||
defer func() {
|
||||
if p := recover(); p == nil {
|
||||
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
|
||||
}
|
||||
}()
|
||||
|
||||
val := User{Name: "Alice"}
|
||||
_, _ = QueueContentTypeJSON.mapValue([]User{val})
|
||||
})
|
||||
|
||||
t.Run("slice of bytes as json", func(t *testing.T) {
|
||||
defer func() {
|
||||
if p := recover(); p == nil {
|
||||
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
|
||||
}
|
||||
}()
|
||||
|
||||
_, _ = QueueContentTypeJSON.mapValue([]byte("hello"))
|
||||
})
|
||||
}
|
@ -26,28 +26,41 @@ func NewProducer(queueName string) (*Producer, error) {
|
||||
return &Producer{queue: inst}, nil
|
||||
}
|
||||
|
||||
// Send sends a single message to a queue. This function allows setting send options for the message.
|
||||
func (p *Producer) SendText(content string, opts ...SendOption) error {
|
||||
return p.send(js.ValueOf(content), contentTypeText, opts...)
|
||||
}
|
||||
|
||||
func (p *Producer) SendBytes(content []byte, opts ...SendOption) error {
|
||||
ua := jsutil.NewUint8Array(len(content))
|
||||
js.CopyBytesToJS(ua, content)
|
||||
// accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should
|
||||
// be used instead and with Uint8Array.buffer as a value, the send simply fails
|
||||
return p.send(ua, contentTypeBytes, opts...)
|
||||
}
|
||||
|
||||
func (p *Producer) SendJSON(content any, opts ...SendOption) error {
|
||||
return p.send(js.ValueOf(content), contentTypeJSON, opts...)
|
||||
}
|
||||
|
||||
func (p *Producer) SendV8(content js.Value, opts ...SendOption) error {
|
||||
return p.send(content, contentTypeV8, opts...)
|
||||
}
|
||||
|
||||
// send sends a single message to a queue. This function allows setting send options for the message.
|
||||
// If no options are provided, the default options are used (QueueContentTypeJSON and no delay).
|
||||
//
|
||||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer
|
||||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuesendoptions
|
||||
func (p *Producer) Send(content any, opts ...SendOption) error {
|
||||
if p.queue.IsUndefined() {
|
||||
return errors.New("queue object not found")
|
||||
func (p *Producer) send(body js.Value, contentType contentType, opts ...SendOption) error {
|
||||
options := &sendOptions{
|
||||
ContentType: contentType,
|
||||
}
|
||||
|
||||
options := defaultSendOptions()
|
||||
for _, opt := range opts {
|
||||
opt(options)
|
||||
}
|
||||
|
||||
jsValue, err := options.ContentType.mapValue(content)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prom := p.queue.Call("send", jsValue, options.toJS())
|
||||
_, err = jsutil.AwaitPromise(prom)
|
||||
prom := p.queue.Call("send", body, options.toJS())
|
||||
_, err := jsutil.AwaitPromise(prom)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -10,19 +10,13 @@ import (
|
||||
type sendOptions struct {
|
||||
// ContentType - Content type of the message
|
||||
// Default is "json"
|
||||
ContentType QueueContentType
|
||||
ContentType contentType
|
||||
|
||||
// DelaySeconds - The number of seconds to delay the message.
|
||||
// Default is 0
|
||||
DelaySeconds int
|
||||
}
|
||||
|
||||
func defaultSendOptions() *sendOptions {
|
||||
return &sendOptions{
|
||||
ContentType: QueueContentTypeJSON,
|
||||
}
|
||||
}
|
||||
|
||||
func (o *sendOptions) toJS() js.Value {
|
||||
obj := jsutil.NewObject()
|
||||
obj.Set("contentType", string(o.ContentType))
|
||||
@ -36,13 +30,6 @@ func (o *sendOptions) toJS() js.Value {
|
||||
|
||||
type SendOption func(*sendOptions)
|
||||
|
||||
// WithContentType changes the content type of the message.
|
||||
func WithContentType(contentType QueueContentType) SendOption {
|
||||
return func(o *sendOptions) {
|
||||
o.ContentType = contentType
|
||||
}
|
||||
}
|
||||
|
||||
// WithDelay changes the number of seconds to delay the message.
|
||||
func WithDelay(d time.Duration) SendOption {
|
||||
return func(o *sendOptions) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user