Add tests and docs

This commit is contained in:
mike.art 2024-10-14 22:52:44 +02:00
parent bdeff99c27
commit ae2bcf1937
6 changed files with 529 additions and 15 deletions

View File

@ -0,0 +1,38 @@
# queues
An example of using Cloudflare Workers that interact with [Cloudflare Queues](https://developers.cloudflare.com/queues/).
## Running
### Requirements
This project requires these tools to be installed globally.
* wrangler
* tinygo
### Supported commands
```
make dev # run dev server
make build # build Go Wasm binary
make deploy # deploy worker
```
### Interacting with the local queue
1. Start the dev server.
```sh
make dev
```
2. Send a message to the queue.
```sh
curl -v -X POST http://localhost:8787/ -d '{"message": "Hello, World!"}' -H "Content-Type: application/json"
```
3. Observe the response and server logs
4. You can pass `text/plain` content type to write queue message as the string or omit the `Content-Type` header to write queue message as
byte array.

View File

@ -8,13 +8,29 @@ import (
"github.com/syumai/workers/internal/jsutil"
)
// QueueContentType represents the content type of a message produced to a queue.
// This information mostly affects how the message body is represented in the Cloudflare UI and is NOT
// propagated to the consumer side.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuescontenttype
type QueueContentType string
const (
QueueContentTypeJSON QueueContentType = "json"
QueueContentTypeText QueueContentType = "text"
// QueueContentTypeJSON is the default content type for the produced queue message.
// The message body is NOT being marshaled before sending and is passed to js.ValueOf directly.
// Make sure the body is serializable to JSON.
// - https://pkg.go.dev/syscall/js#ValueOf
QueueContentTypeJSON QueueContentType = "json"
// QueueContentTypeV8 is currently treated the same as QueueContentTypeJSON.
QueueContentTypeV8 QueueContentType = "v8"
// QueueContentTypeText is used to send a message as a string.
// Supported body types are string, []byte and io.Reader.
QueueContentTypeText QueueContentType = "text"
// QueueContentTypeBytes is used to send a message as a byte array.
// Supported body types are string, []byte, and io.Reader.
QueueContentTypeBytes QueueContentType = "bytes"
QueueContentTypeV8 QueueContentType = "v8"
)
func (o QueueContentType) mapValue(val any) (js.Value, error) {
@ -25,6 +41,12 @@ func (o QueueContentType) mapValue(val any) (js.Value, error) {
return js.ValueOf(v), nil
case []byte:
return js.ValueOf(string(v)), nil
case io.Reader:
b, err := io.ReadAll(v)
if err != nil {
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
}
return js.ValueOf(string(b)), nil
default:
return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val)
}

View File

@ -0,0 +1,205 @@
package queues
import (
"bytes"
"syscall/js"
"testing"
"github.com/syumai/workers/internal/jsutil"
)
func TestContentType_mapValue(t *testing.T) {
tests := []struct {
name string
contentType QueueContentType
val any
want js.Value
wantErr bool
}{
{
name: "string as text",
contentType: QueueContentTypeText,
val: "hello",
want: js.ValueOf("hello"),
},
{
name: "[]byte as text",
contentType: QueueContentTypeText,
val: []byte("hello"),
want: js.ValueOf("hello"),
},
{
name: "io.Reader as text",
contentType: QueueContentTypeText,
val: bytes.NewBufferString("hello"),
want: js.ValueOf("hello"),
},
{
name: "number as text",
contentType: QueueContentTypeText,
val: 42,
want: js.Undefined(),
wantErr: true,
},
{
name: "function as text",
contentType: QueueContentTypeText,
val: func() {},
want: js.Undefined(),
wantErr: true,
},
{
name: "string as json",
contentType: QueueContentTypeJSON,
val: "hello",
want: js.ValueOf("hello"),
},
{
name: "number as json",
contentType: QueueContentTypeJSON,
val: 42,
want: js.ValueOf(42),
},
{
name: "bool as json",
contentType: QueueContentTypeJSON,
val: true,
want: js.ValueOf(true),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.contentType.mapValue(tt.val)
if (err != nil) != tt.wantErr {
t.Fatalf("%s.mapValue() error = %v, wantErr %v", tt.contentType, err, tt.wantErr)
}
if got.String() != tt.want.String() {
t.Errorf("%s.mapValue() = %v, want %v", tt.contentType, got, tt.want)
}
})
}
}
func TestContentType_mapValue_bytes(t *testing.T) {
jsOf := func(b []byte) js.Value {
ua := jsutil.NewUint8Array(len(b))
js.CopyBytesToJS(ua, b)
return ua
}
tests := []struct {
name string
val any
want js.Value
}{
{
name: "[]byte as bytes",
val: []byte("hello"),
want: jsOf([]byte("hello")),
},
{
name: "string as bytes",
val: "hello",
want: jsOf([]byte("hello"))},
{
name: "io.Reader as bytes",
val: bytes.NewBufferString("hello"),
want: jsOf([]byte("hello")),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := QueueContentTypeBytes.mapValue(tt.val)
if err != nil {
t.Fatalf("%s.mapValue() got error = %v", QueueContentTypeBytes, err)
}
if got.Type() != tt.want.Type() {
t.Errorf("%s.mapValue() = type %v, want type %v", QueueContentTypeBytes, got, tt.want)
}
if got.String() != tt.want.String() {
t.Errorf("%s.mapValue() = %v, want %v", QueueContentTypeBytes, got, tt.want)
}
})
}
}
func TestContentType_mapValue_map(t *testing.T) {
val := map[string]interface{}{
"Name": "Alice",
"Age": 42,
}
tests := []struct {
name string
contentType QueueContentType
}{
{
name: "json",
contentType: QueueContentTypeJSON,
},
{
name: "v8",
contentType: QueueContentTypeV8,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.contentType.mapValue(val)
if err != nil {
t.Fatalf("QueueContentTypeJSON.mapValue() got error = %v", err)
}
if got.Type() != js.TypeObject {
t.Errorf("QueueContentTypeJSON.mapValue() = type %v, want type %v", got, js.TypeObject)
}
if got.Get("Name").String() != "Alice" {
t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Name").String(), "Alice")
}
if got.Get("Age").Int() != 42 {
t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Age").Int(), 42)
}
})
}
}
type User struct {
Name string
}
func TestContentType_mapValue_unsupported_types(t *testing.T) {
t.Run("struct as json", func(t *testing.T) {
defer func() {
if p := recover(); p == nil {
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
}
}()
val := User{Name: "Alice"}
_, _ = QueueContentTypeJSON.mapValue(val)
})
t.Run("slice of structs as json", func(t *testing.T) {
defer func() {
if p := recover(); p == nil {
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
}
}()
val := User{Name: "Alice"}
_, _ = QueueContentTypeJSON.mapValue([]User{val})
})
t.Run("slice of bytes as json", func(t *testing.T) {
defer func() {
if p := recover(); p == nil {
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
}
}()
_, _ = QueueContentTypeJSON.mapValue([]byte("hello"))
})
}

View File

@ -14,6 +14,7 @@ type BatchMessage struct {
options *sendOptions
}
// NewBatchMessage creates a single message to be batched before sending to a queue.
func NewBatchMessage(body any, opts ...SendOption) *BatchMessage {
options := defaultSendOptions()
for _, opt := range opts {
@ -44,6 +45,10 @@ type Producer struct {
queue js.Value
}
// NewProducer creates a new Producer object to send messages to a queue.
// queueName is the name of the queue environment var to send messages to.
// In Cloudflare API documentation, this object represents the Queue.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer
func NewProducer(queueName string) (*Producer, error) {
inst := cfruntimecontext.MustGetRuntimeContextEnv().Get(queueName)
if inst.IsUndefined() {
@ -52,6 +57,11 @@ func NewProducer(queueName string) (*Producer, error) {
return &Producer{queue: inst}, nil
}
// Send sends a single message to a queue. This function allows setting send options for the message.
// If no options are provided, the default options are used (QueueContentTypeJSON and no delay).
//
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuesendoptions
func (p *Producer) Send(content any, opts ...SendOption) error {
if p.queue.IsUndefined() {
return errors.New("queue object not found")
@ -72,7 +82,8 @@ func (p *Producer) Send(content any, opts ...SendOption) error {
return err
}
func (p *Producer) SendBatch(messages []*BatchMessage) error {
// SendBatch sends multiple messages to a queue. This function allows setting options for each message.
func (p *Producer) SendBatch(messages []*BatchMessage, opts ...BatchSendOption) error {
if p.queue.IsUndefined() {
return errors.New("queue object not found")
}
@ -81,6 +92,14 @@ func (p *Producer) SendBatch(messages []*BatchMessage) error {
return nil
}
var options *batchSendOptions
if len(opts) > 0 {
options = &batchSendOptions{}
for _, opt := range opts {
opt(options)
}
}
jsArray := jsutil.NewArray(len(messages))
for i, message := range messages {
jsValue, err := message.toJS()
@ -90,16 +109,7 @@ func (p *Producer) SendBatch(messages []*BatchMessage) error {
jsArray.SetIndex(i, jsValue)
}
prom := p.queue.Call("sendBatch", jsArray)
prom := p.queue.Call("sendBatch", jsArray, options.toJS())
_, err := jsutil.AwaitPromise(prom)
return err
}
func (p *Producer) SendJsonBatch(messages ...any) error {
batch := make([]*BatchMessage, len(messages))
for i, message := range messages {
batch[i] = NewBatchMessage(message)
}
return p.SendBatch(batch)
}

View File

@ -44,8 +44,36 @@ func WithContentType(contentType QueueContentType) SendOption {
}
// WithDelay changes the number of seconds to delay the message.
func (q *Producer) WithDelay(d time.Duration) SendOption {
func WithDelay(d time.Duration) SendOption {
return func(o *sendOptions) {
o.DelaySeconds = int(d.Seconds())
}
}
type batchSendOptions struct {
// DelaySeconds - The number of seconds to delay the message.
// Default is 0
DelaySeconds int
}
func (o *batchSendOptions) toJS() js.Value {
if o == nil {
return js.Undefined()
}
obj := jsutil.NewObject()
if o.DelaySeconds != 0 {
obj.Set("delaySeconds", o.DelaySeconds)
}
return obj
}
type BatchSendOption func(*batchSendOptions)
// WithBatchDelay changes the number of seconds to delay the message.
func WithBatchDelay(d time.Duration) BatchSendOption {
return func(o *batchSendOptions) {
o.DelaySeconds = int(d.Seconds())
}
}

View File

@ -0,0 +1,211 @@
package queues
import (
"errors"
"fmt"
"syscall/js"
"testing"
"time"
"github.com/syumai/workers/internal/jsutil"
)
func validatingProducer(t *testing.T, validateFn func(message js.Value, options js.Value) error) *Producer {
sendFn := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
sendArg := args[0] // this should be batch (in case of SendBatch) or a single message (in case of Send)
var options js.Value
if len(args) > 1 {
options = args[1]
}
return jsutil.NewPromise(js.FuncOf(func(this js.Value, args []js.Value) interface{} {
resolve := args[0]
go func() {
if err := validateFn(sendArg, options); err != nil {
// must be non-fatal to avoid a deadlock
t.Errorf("validation failed: %v", err)
}
resolve.Invoke(js.Undefined())
}()
return js.Undefined()
}))
})
queue := jsutil.NewObject()
queue.Set("send", sendFn)
queue.Set("sendBatch", sendFn)
return &Producer{queue: queue}
}
func TestSend(t *testing.T) {
t.Run("text content type", func(t *testing.T) {
validation := func(message js.Value, options js.Value) error {
if message.Type() != js.TypeString {
return errors.New("message body must be a string")
}
if message.String() != "hello" {
return errors.New("message body must be 'hello'")
}
if options.Get("contentType").String() != "text" {
return errors.New("content type must be text")
}
return nil
}
producer := validatingProducer(t, validation)
err := producer.Send("hello", WithContentType(QueueContentTypeText))
if err != nil {
t.Fatalf("Send failed: %v", err)
}
})
t.Run("json content type", func(t *testing.T) {
validation := func(message js.Value, options js.Value) error {
if message.Type() != js.TypeString {
return errors.New("message body must be a string")
}
if message.String() != "hello" {
return errors.New("message body must be 'hello'")
}
if options.Get("contentType").String() != "json" {
return errors.New("content type must be json")
}
return nil
}
producer := validatingProducer(t, validation)
err := producer.Send("hello", WithContentType(QueueContentTypeJSON))
if err != nil {
t.Fatalf("Send failed: %v", err)
}
})
}
func TestSend_ContentTypeOption(t *testing.T) {
tests := []struct {
name string
options []SendOption
expectedContentType string
expectedDelaySec int
wantErr bool
}{
{
name: "text",
options: []SendOption{WithContentType(QueueContentTypeText)},
expectedContentType: "text",
},
{
name: "json",
options: []SendOption{WithContentType(QueueContentTypeJSON)},
expectedContentType: "json",
},
{
name: "default",
options: nil,
expectedContentType: "json",
},
{
name: "v8",
options: []SendOption{WithContentType(QueueContentTypeV8)},
expectedContentType: "v8",
},
{
name: "bytes",
options: []SendOption{WithContentType(QueueContentTypeBytes)},
expectedContentType: "bytes",
},
{
name: "delay",
options: []SendOption{WithDelay(5 * time.Second)},
expectedDelaySec: 5,
expectedContentType: "json",
},
{
name: "invalid content type",
options: []SendOption{WithContentType("invalid")},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
validation := func(message js.Value, options js.Value) error {
gotCT := options.Get("contentType").String()
if gotCT != string(tt.expectedContentType) {
return fmt.Errorf("expected content type %q, got %q", tt.expectedContentType, gotCT)
}
gotDelaySec := jsutil.MaybeInt(options.Get("delaySeconds"))
if gotDelaySec != tt.expectedDelaySec {
return fmt.Errorf("expected delay %d, got %d", tt.expectedDelaySec, gotDelaySec)
}
return nil
}
producer := validatingProducer(t, validation)
err := producer.Send("hello", tt.options...)
if (err != nil) != tt.wantErr {
t.Fatalf("expected error: %t, got %v", tt.wantErr, err)
}
})
}
}
func TestSendBatch_Defaults(t *testing.T) {
validation := func(batch js.Value, options js.Value) error {
if batch.Type() != js.TypeObject {
return errors.New("message batch must be an object (array)")
}
if batch.Length() != 2 {
return fmt.Errorf("expected 2 messages, got %d", batch.Length())
}
first := batch.Index(0)
if first.Get("body").String() != "hello" {
return fmt.Errorf("first message body must be 'hello', was %s", first.Get("body"))
}
if first.Get("options").Get("contentType").String() != "json" {
return fmt.Errorf("first message content type must be json, was %s", first.Get("options").Get("contentType"))
}
second := batch.Index(1)
if second.Get("body").String() != "world" {
return fmt.Errorf("second message body must be 'world', was %s", second.Get("body"))
}
if second.Get("options").Get("contentType").String() != "text" {
return fmt.Errorf("second message content type must be text, was %s", second.Get("options").Get("contentType"))
}
return nil
}
var batch []*BatchMessage = []*BatchMessage{
NewBatchMessage("hello"),
NewBatchMessage("world", WithContentType(QueueContentTypeText)),
}
producer := validatingProducer(t, validation)
err := producer.SendBatch(batch)
if err != nil {
t.Fatalf("SendBatch failed: %v", err)
}
}
func TestSendBatch_Options(t *testing.T) {
validation := func(_ js.Value, options js.Value) error {
if options.Get("delaySeconds").Int() != 5 {
return fmt.Errorf("expected delay 5, got %d", options.Get("delaySeconds").Int())
}
return nil
}
var batch []*BatchMessage = []*BatchMessage{
NewBatchMessage("hello"),
}
producer := validatingProducer(t, validation)
err := producer.SendBatch(batch, WithBatchDelay(5*time.Second))
if err != nil {
t.Fatalf("SendBatch failed: %v", err)
}
}