mirror of
https://github.com/syumai/workers.git
synced 2025-03-10 17:29:11 +00:00
Merge pull request #125 from meandnano/feature/queues
Add message producer
This commit is contained in:
commit
dfdd7e760e
3
_examples/queues/.gitignore
vendored
Normal file
3
_examples/queues/.gitignore
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
build
|
||||
node_modules
|
||||
.wrangler
|
12
_examples/queues/Makefile
Normal file
12
_examples/queues/Makefile
Normal file
@ -0,0 +1,12 @@
|
||||
.PHONY: dev
|
||||
dev:
|
||||
npx wrangler dev --port 8787
|
||||
|
||||
.PHONY: build
|
||||
build:
|
||||
go run ../../cmd/workers-assets-gen
|
||||
tinygo build -o ./build/app.wasm -target wasm -no-debug ./...
|
||||
|
||||
.PHONY: deploy
|
||||
deploy:
|
||||
npx wrangler deploy
|
38
_examples/queues/README.md
Normal file
38
_examples/queues/README.md
Normal file
@ -0,0 +1,38 @@
|
||||
# queues
|
||||
|
||||
An example of using Cloudflare Workers that interact with [Cloudflare Queues](https://developers.cloudflare.com/queues/).
|
||||
|
||||
## Running
|
||||
|
||||
### Requirements
|
||||
|
||||
This project requires these tools to be installed globally.
|
||||
|
||||
* wrangler
|
||||
* tinygo
|
||||
|
||||
### Supported commands
|
||||
|
||||
```
|
||||
make dev # run dev server
|
||||
make build # build Go Wasm binary
|
||||
make deploy # deploy worker
|
||||
```
|
||||
|
||||
### Interacting with the local queue
|
||||
|
||||
1. Start the dev server.
|
||||
```sh
|
||||
make dev
|
||||
```
|
||||
|
||||
2. Send a message to the queue.
|
||||
```sh
|
||||
curl -v -X POST http://localhost:8787/ -d '{"message": "Hello, World!"}' -H "Content-Type: application/json"
|
||||
```
|
||||
|
||||
3. Observe the response and server logs
|
||||
|
||||
4. You can pass `text/plain` content type to write queue message as the string or omit the `Content-Type` header to write queue message as
|
||||
byte array.
|
||||
|
7
_examples/queues/go.mod
Normal file
7
_examples/queues/go.mod
Normal file
@ -0,0 +1,7 @@
|
||||
module github.com/syumai/workers/_examples/queues
|
||||
|
||||
go 1.22.8
|
||||
|
||||
require github.com/syumai/workers v0.0.0
|
||||
|
||||
replace github.com/syumai/workers => ../../
|
0
_examples/queues/go.sum
Normal file
0
_examples/queues/go.sum
Normal file
105
_examples/queues/main.go
Normal file
105
_examples/queues/main.go
Normal file
@ -0,0 +1,105 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/syumai/workers"
|
||||
"github.com/syumai/workers/cloudflare/queues"
|
||||
)
|
||||
|
||||
const queueName = "QUEUE"
|
||||
|
||||
func handleErr(w http.ResponseWriter, msg string, err error) {
|
||||
log.Println(err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(msg))
|
||||
}
|
||||
|
||||
func main() {
|
||||
http.HandleFunc("/", handleProduce)
|
||||
workers.Serve(nil)
|
||||
}
|
||||
func handleProduce(w http.ResponseWriter, req *http.Request) {
|
||||
if req.URL.Path != "/" {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
if req.Method != http.MethodPost {
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
defer req.Body.Close()
|
||||
|
||||
q, err := queues.NewProducer(queueName)
|
||||
if err != nil {
|
||||
handleErr(w, "failed to init queue", err)
|
||||
}
|
||||
|
||||
contentType := req.Header.Get("Content-Type")
|
||||
switch contentType {
|
||||
case "text/plain":
|
||||
log.Println("Handling text content type")
|
||||
err = produceText(q, req)
|
||||
case "application/json":
|
||||
log.Println("Handling json content type")
|
||||
err = produceJson(q, req)
|
||||
default:
|
||||
log.Println("Handling bytes content type")
|
||||
err = produceBytes(q, req)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
handleErr(w, "failed to handle request", err)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("message sent\n"))
|
||||
}
|
||||
|
||||
func produceText(q *queues.Producer, req *http.Request) error {
|
||||
content, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read request body: %w", err)
|
||||
}
|
||||
if len(content) == 0 {
|
||||
return fmt.Errorf("empty request body")
|
||||
}
|
||||
|
||||
// text content type supports string and []byte messages
|
||||
if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil {
|
||||
return fmt.Errorf("failed to send message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func produceJson(q *queues.Producer, req *http.Request) error {
|
||||
var data any
|
||||
if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
|
||||
return fmt.Errorf("failed to read request body: %w", err)
|
||||
}
|
||||
|
||||
// json content type is default and therefore can be omitted
|
||||
// json content type supports messages of types that can be serialized to json
|
||||
if err := q.Send(data); err != nil {
|
||||
return fmt.Errorf("failed to send message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func produceBytes(q *queues.Producer, req *http.Request) error {
|
||||
// bytes content type support messages of type []byte, string, and io.Reader
|
||||
if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil {
|
||||
return fmt.Errorf("failed to send message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
13
_examples/queues/wrangler.toml
Normal file
13
_examples/queues/wrangler.toml
Normal file
@ -0,0 +1,13 @@
|
||||
name = "queues-producer"
|
||||
main = "./build/worker.mjs"
|
||||
compatibility_date = "2022-05-13"
|
||||
compatibility_flags = [
|
||||
"streams_enable_constructors"
|
||||
]
|
||||
|
||||
[[queues.producers]]
|
||||
queue = "my-queue"
|
||||
binding = "QUEUE"
|
||||
|
||||
[build]
|
||||
command = "make build"
|
82
cloudflare/queues/content_type.go
Normal file
82
cloudflare/queues/content_type.go
Normal file
@ -0,0 +1,82 @@
|
||||
package queues
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"syscall/js"
|
||||
|
||||
"github.com/syumai/workers/internal/jsutil"
|
||||
)
|
||||
|
||||
// QueueContentType represents the content type of a message produced to a queue.
|
||||
// This information mostly affects how the message body is represented in the Cloudflare UI and is NOT
|
||||
// propagated to the consumer side.
|
||||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuescontenttype
|
||||
type QueueContentType string
|
||||
|
||||
const (
|
||||
// QueueContentTypeJSON is the default content type for the produced queue message.
|
||||
// The message body is NOT being marshaled before sending and is passed to js.ValueOf directly.
|
||||
// Make sure the body is serializable to JSON.
|
||||
// - https://pkg.go.dev/syscall/js#ValueOf
|
||||
QueueContentTypeJSON QueueContentType = "json"
|
||||
|
||||
// QueueContentTypeV8 is currently treated the same as QueueContentTypeJSON.
|
||||
QueueContentTypeV8 QueueContentType = "v8"
|
||||
|
||||
// QueueContentTypeText is used to send a message as a string.
|
||||
// Supported body types are string, []byte and io.Reader.
|
||||
QueueContentTypeText QueueContentType = "text"
|
||||
|
||||
// QueueContentTypeBytes is used to send a message as a byte array.
|
||||
// Supported body types are string, []byte, and io.Reader.
|
||||
QueueContentTypeBytes QueueContentType = "bytes"
|
||||
)
|
||||
|
||||
func (o QueueContentType) mapValue(val any) (js.Value, error) {
|
||||
switch o {
|
||||
case QueueContentTypeText:
|
||||
switch v := val.(type) {
|
||||
case string:
|
||||
return js.ValueOf(v), nil
|
||||
case []byte:
|
||||
return js.ValueOf(string(v)), nil
|
||||
case io.Reader:
|
||||
b, err := io.ReadAll(v)
|
||||
if err != nil {
|
||||
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
|
||||
}
|
||||
return js.ValueOf(string(b)), nil
|
||||
default:
|
||||
return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val)
|
||||
}
|
||||
|
||||
case QueueContentTypeBytes:
|
||||
var b []byte
|
||||
switch v := val.(type) {
|
||||
case string:
|
||||
b = []byte(v)
|
||||
case []byte:
|
||||
b = v
|
||||
case io.Reader:
|
||||
var err error
|
||||
b, err = io.ReadAll(v)
|
||||
if err != nil {
|
||||
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
|
||||
}
|
||||
default:
|
||||
return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val)
|
||||
}
|
||||
|
||||
ua := jsutil.NewUint8Array(len(b))
|
||||
js.CopyBytesToJS(ua, b)
|
||||
// accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should
|
||||
// be used instead and with Uint8Array.buffer as a value, the send simply fails
|
||||
return ua, nil
|
||||
|
||||
case QueueContentTypeJSON, QueueContentTypeV8:
|
||||
return js.ValueOf(val), nil
|
||||
}
|
||||
|
||||
return js.Undefined(), fmt.Errorf("unknown content type: %s", o)
|
||||
}
|
205
cloudflare/queues/content_type_test.go
Normal file
205
cloudflare/queues/content_type_test.go
Normal file
@ -0,0 +1,205 @@
|
||||
package queues
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"syscall/js"
|
||||
"testing"
|
||||
|
||||
"github.com/syumai/workers/internal/jsutil"
|
||||
)
|
||||
|
||||
func TestContentType_mapValue(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
contentType QueueContentType
|
||||
val any
|
||||
want js.Value
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "string as text",
|
||||
contentType: QueueContentTypeText,
|
||||
val: "hello",
|
||||
want: js.ValueOf("hello"),
|
||||
},
|
||||
{
|
||||
name: "[]byte as text",
|
||||
contentType: QueueContentTypeText,
|
||||
val: []byte("hello"),
|
||||
want: js.ValueOf("hello"),
|
||||
},
|
||||
{
|
||||
name: "io.Reader as text",
|
||||
contentType: QueueContentTypeText,
|
||||
val: bytes.NewBufferString("hello"),
|
||||
want: js.ValueOf("hello"),
|
||||
},
|
||||
{
|
||||
name: "number as text",
|
||||
contentType: QueueContentTypeText,
|
||||
val: 42,
|
||||
want: js.Undefined(),
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "function as text",
|
||||
contentType: QueueContentTypeText,
|
||||
val: func() {},
|
||||
want: js.Undefined(),
|
||||
wantErr: true,
|
||||
},
|
||||
|
||||
{
|
||||
name: "string as json",
|
||||
contentType: QueueContentTypeJSON,
|
||||
val: "hello",
|
||||
want: js.ValueOf("hello"),
|
||||
},
|
||||
{
|
||||
name: "number as json",
|
||||
contentType: QueueContentTypeJSON,
|
||||
val: 42,
|
||||
want: js.ValueOf(42),
|
||||
},
|
||||
{
|
||||
name: "bool as json",
|
||||
contentType: QueueContentTypeJSON,
|
||||
val: true,
|
||||
want: js.ValueOf(true),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := tt.contentType.mapValue(tt.val)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Fatalf("%s.mapValue() error = %v, wantErr %v", tt.contentType, err, tt.wantErr)
|
||||
}
|
||||
if got.String() != tt.want.String() {
|
||||
t.Errorf("%s.mapValue() = %v, want %v", tt.contentType, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestContentType_mapValue_bytes(t *testing.T) {
|
||||
jsOf := func(b []byte) js.Value {
|
||||
ua := jsutil.NewUint8Array(len(b))
|
||||
js.CopyBytesToJS(ua, b)
|
||||
return ua
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
val any
|
||||
want js.Value
|
||||
}{
|
||||
{
|
||||
name: "[]byte as bytes",
|
||||
val: []byte("hello"),
|
||||
want: jsOf([]byte("hello")),
|
||||
},
|
||||
{
|
||||
name: "string as bytes",
|
||||
val: "hello",
|
||||
want: jsOf([]byte("hello"))},
|
||||
{
|
||||
name: "io.Reader as bytes",
|
||||
val: bytes.NewBufferString("hello"),
|
||||
want: jsOf([]byte("hello")),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := QueueContentTypeBytes.mapValue(tt.val)
|
||||
if err != nil {
|
||||
t.Fatalf("%s.mapValue() got error = %v", QueueContentTypeBytes, err)
|
||||
}
|
||||
if got.Type() != tt.want.Type() {
|
||||
t.Errorf("%s.mapValue() = type %v, want type %v", QueueContentTypeBytes, got, tt.want)
|
||||
}
|
||||
if got.String() != tt.want.String() {
|
||||
t.Errorf("%s.mapValue() = %v, want %v", QueueContentTypeBytes, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestContentType_mapValue_map(t *testing.T) {
|
||||
val := map[string]interface{}{
|
||||
"Name": "Alice",
|
||||
"Age": 42,
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
contentType QueueContentType
|
||||
}{
|
||||
{
|
||||
name: "json",
|
||||
contentType: QueueContentTypeJSON,
|
||||
},
|
||||
{
|
||||
name: "v8",
|
||||
contentType: QueueContentTypeV8,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
||||
got, err := tt.contentType.mapValue(val)
|
||||
if err != nil {
|
||||
t.Fatalf("QueueContentTypeJSON.mapValue() got error = %v", err)
|
||||
}
|
||||
if got.Type() != js.TypeObject {
|
||||
t.Errorf("QueueContentTypeJSON.mapValue() = type %v, want type %v", got, js.TypeObject)
|
||||
}
|
||||
if got.Get("Name").String() != "Alice" {
|
||||
t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Name").String(), "Alice")
|
||||
}
|
||||
if got.Get("Age").Int() != 42 {
|
||||
t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Age").Int(), 42)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type User struct {
|
||||
Name string
|
||||
}
|
||||
|
||||
func TestContentType_mapValue_unsupported_types(t *testing.T) {
|
||||
t.Run("struct as json", func(t *testing.T) {
|
||||
defer func() {
|
||||
if p := recover(); p == nil {
|
||||
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
|
||||
}
|
||||
}()
|
||||
|
||||
val := User{Name: "Alice"}
|
||||
_, _ = QueueContentTypeJSON.mapValue(val)
|
||||
})
|
||||
|
||||
t.Run("slice of structs as json", func(t *testing.T) {
|
||||
defer func() {
|
||||
if p := recover(); p == nil {
|
||||
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
|
||||
}
|
||||
}()
|
||||
|
||||
val := User{Name: "Alice"}
|
||||
_, _ = QueueContentTypeJSON.mapValue([]User{val})
|
||||
})
|
||||
|
||||
t.Run("slice of bytes as json", func(t *testing.T) {
|
||||
defer func() {
|
||||
if p := recover(); p == nil {
|
||||
t.Fatalf("QueueContentTypeJSON.mapValue() did not panic")
|
||||
}
|
||||
}()
|
||||
|
||||
_, _ = QueueContentTypeJSON.mapValue([]byte("hello"))
|
||||
})
|
||||
}
|
115
cloudflare/queues/producer.go
Normal file
115
cloudflare/queues/producer.go
Normal file
@ -0,0 +1,115 @@
|
||||
package queues
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"syscall/js"
|
||||
|
||||
"github.com/syumai/workers/cloudflare/internal/cfruntimecontext"
|
||||
"github.com/syumai/workers/internal/jsutil"
|
||||
)
|
||||
|
||||
type BatchMessage struct {
|
||||
body any
|
||||
options *sendOptions
|
||||
}
|
||||
|
||||
// NewBatchMessage creates a single message to be batched before sending to a queue.
|
||||
func NewBatchMessage(body any, opts ...SendOption) *BatchMessage {
|
||||
options := defaultSendOptions()
|
||||
for _, opt := range opts {
|
||||
opt(options)
|
||||
}
|
||||
return &BatchMessage{body: body, options: options}
|
||||
}
|
||||
|
||||
func (m *BatchMessage) toJS() (js.Value, error) {
|
||||
if m == nil {
|
||||
return js.Undefined(), errors.New("message is nil")
|
||||
}
|
||||
|
||||
jsValue, err := m.options.ContentType.mapValue(m.body)
|
||||
if err != nil {
|
||||
return js.Undefined(), err
|
||||
}
|
||||
|
||||
obj := jsutil.NewObject()
|
||||
obj.Set("body", jsValue)
|
||||
obj.Set("options", m.options.toJS())
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
type Producer struct {
|
||||
// queue - Objects that Queue API belongs to. Default is Global
|
||||
queue js.Value
|
||||
}
|
||||
|
||||
// NewProducer creates a new Producer object to send messages to a queue.
|
||||
// queueName is the name of the queue environment var to send messages to.
|
||||
// In Cloudflare API documentation, this object represents the Queue.
|
||||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer
|
||||
func NewProducer(queueName string) (*Producer, error) {
|
||||
inst := cfruntimecontext.MustGetRuntimeContextEnv().Get(queueName)
|
||||
if inst.IsUndefined() {
|
||||
return nil, fmt.Errorf("%s is undefined", queueName)
|
||||
}
|
||||
return &Producer{queue: inst}, nil
|
||||
}
|
||||
|
||||
// Send sends a single message to a queue. This function allows setting send options for the message.
|
||||
// If no options are provided, the default options are used (QueueContentTypeJSON and no delay).
|
||||
//
|
||||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer
|
||||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuesendoptions
|
||||
func (p *Producer) Send(content any, opts ...SendOption) error {
|
||||
if p.queue.IsUndefined() {
|
||||
return errors.New("queue object not found")
|
||||
}
|
||||
|
||||
options := defaultSendOptions()
|
||||
for _, opt := range opts {
|
||||
opt(options)
|
||||
}
|
||||
|
||||
jsValue, err := options.ContentType.mapValue(content)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prom := p.queue.Call("send", jsValue, options.toJS())
|
||||
_, err = jsutil.AwaitPromise(prom)
|
||||
return err
|
||||
}
|
||||
|
||||
// SendBatch sends multiple messages to a queue. This function allows setting options for each message.
|
||||
func (p *Producer) SendBatch(messages []*BatchMessage, opts ...BatchSendOption) error {
|
||||
if p.queue.IsUndefined() {
|
||||
return errors.New("queue object not found")
|
||||
}
|
||||
|
||||
if len(messages) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var options *batchSendOptions
|
||||
if len(opts) > 0 {
|
||||
options = &batchSendOptions{}
|
||||
for _, opt := range opts {
|
||||
opt(options)
|
||||
}
|
||||
}
|
||||
|
||||
jsArray := jsutil.NewArray(len(messages))
|
||||
for i, message := range messages {
|
||||
jsValue, err := message.toJS()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to convert message %d to JS: %w", i, err)
|
||||
}
|
||||
jsArray.SetIndex(i, jsValue)
|
||||
}
|
||||
|
||||
prom := p.queue.Call("sendBatch", jsArray, options.toJS())
|
||||
_, err := jsutil.AwaitPromise(prom)
|
||||
return err
|
||||
}
|
79
cloudflare/queues/producer_opts.go
Normal file
79
cloudflare/queues/producer_opts.go
Normal file
@ -0,0 +1,79 @@
|
||||
package queues
|
||||
|
||||
import (
|
||||
"syscall/js"
|
||||
"time"
|
||||
|
||||
"github.com/syumai/workers/internal/jsutil"
|
||||
)
|
||||
|
||||
type sendOptions struct {
|
||||
// ContentType - Content type of the message
|
||||
// Default is "json"
|
||||
ContentType QueueContentType
|
||||
|
||||
// DelaySeconds - The number of seconds to delay the message.
|
||||
// Default is 0
|
||||
DelaySeconds int
|
||||
}
|
||||
|
||||
func defaultSendOptions() *sendOptions {
|
||||
return &sendOptions{
|
||||
ContentType: QueueContentTypeJSON,
|
||||
}
|
||||
}
|
||||
|
||||
func (o *sendOptions) toJS() js.Value {
|
||||
obj := jsutil.NewObject()
|
||||
obj.Set("contentType", string(o.ContentType))
|
||||
|
||||
if o.DelaySeconds != 0 {
|
||||
obj.Set("delaySeconds", o.DelaySeconds)
|
||||
}
|
||||
|
||||
return obj
|
||||
}
|
||||
|
||||
type SendOption func(*sendOptions)
|
||||
|
||||
// WithContentType changes the content type of the message.
|
||||
func WithContentType(contentType QueueContentType) SendOption {
|
||||
return func(o *sendOptions) {
|
||||
o.ContentType = contentType
|
||||
}
|
||||
}
|
||||
|
||||
// WithDelay changes the number of seconds to delay the message.
|
||||
func WithDelay(d time.Duration) SendOption {
|
||||
return func(o *sendOptions) {
|
||||
o.DelaySeconds = int(d.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
type batchSendOptions struct {
|
||||
// DelaySeconds - The number of seconds to delay the message.
|
||||
// Default is 0
|
||||
DelaySeconds int
|
||||
}
|
||||
|
||||
func (o *batchSendOptions) toJS() js.Value {
|
||||
if o == nil {
|
||||
return js.Undefined()
|
||||
}
|
||||
|
||||
obj := jsutil.NewObject()
|
||||
if o.DelaySeconds != 0 {
|
||||
obj.Set("delaySeconds", o.DelaySeconds)
|
||||
}
|
||||
|
||||
return obj
|
||||
}
|
||||
|
||||
type BatchSendOption func(*batchSendOptions)
|
||||
|
||||
// WithBatchDelay changes the number of seconds to delay the message.
|
||||
func WithBatchDelay(d time.Duration) BatchSendOption {
|
||||
return func(o *batchSendOptions) {
|
||||
o.DelaySeconds = int(d.Seconds())
|
||||
}
|
||||
}
|
211
cloudflare/queues/producer_test.go
Normal file
211
cloudflare/queues/producer_test.go
Normal file
@ -0,0 +1,211 @@
|
||||
package queues
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"syscall/js"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/syumai/workers/internal/jsutil"
|
||||
)
|
||||
|
||||
func validatingProducer(t *testing.T, validateFn func(message js.Value, options js.Value) error) *Producer {
|
||||
sendFn := js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
sendArg := args[0] // this should be batch (in case of SendBatch) or a single message (in case of Send)
|
||||
var options js.Value
|
||||
if len(args) > 1 {
|
||||
options = args[1]
|
||||
}
|
||||
return jsutil.NewPromise(js.FuncOf(func(this js.Value, args []js.Value) interface{} {
|
||||
resolve := args[0]
|
||||
go func() {
|
||||
if err := validateFn(sendArg, options); err != nil {
|
||||
// must be non-fatal to avoid a deadlock
|
||||
t.Errorf("validation failed: %v", err)
|
||||
}
|
||||
resolve.Invoke(js.Undefined())
|
||||
}()
|
||||
return js.Undefined()
|
||||
}))
|
||||
})
|
||||
|
||||
queue := jsutil.NewObject()
|
||||
queue.Set("send", sendFn)
|
||||
queue.Set("sendBatch", sendFn)
|
||||
|
||||
return &Producer{queue: queue}
|
||||
}
|
||||
|
||||
func TestSend(t *testing.T) {
|
||||
t.Run("text content type", func(t *testing.T) {
|
||||
validation := func(message js.Value, options js.Value) error {
|
||||
if message.Type() != js.TypeString {
|
||||
return errors.New("message body must be a string")
|
||||
}
|
||||
if message.String() != "hello" {
|
||||
return errors.New("message body must be 'hello'")
|
||||
}
|
||||
if options.Get("contentType").String() != "text" {
|
||||
return errors.New("content type must be text")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
producer := validatingProducer(t, validation)
|
||||
err := producer.Send("hello", WithContentType(QueueContentTypeText))
|
||||
if err != nil {
|
||||
t.Fatalf("Send failed: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("json content type", func(t *testing.T) {
|
||||
validation := func(message js.Value, options js.Value) error {
|
||||
if message.Type() != js.TypeString {
|
||||
return errors.New("message body must be a string")
|
||||
}
|
||||
if message.String() != "hello" {
|
||||
return errors.New("message body must be 'hello'")
|
||||
}
|
||||
if options.Get("contentType").String() != "json" {
|
||||
return errors.New("content type must be json")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
producer := validatingProducer(t, validation)
|
||||
err := producer.Send("hello", WithContentType(QueueContentTypeJSON))
|
||||
if err != nil {
|
||||
t.Fatalf("Send failed: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestSend_ContentTypeOption(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
options []SendOption
|
||||
expectedContentType string
|
||||
expectedDelaySec int
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "text",
|
||||
options: []SendOption{WithContentType(QueueContentTypeText)},
|
||||
expectedContentType: "text",
|
||||
},
|
||||
{
|
||||
name: "json",
|
||||
options: []SendOption{WithContentType(QueueContentTypeJSON)},
|
||||
expectedContentType: "json",
|
||||
},
|
||||
{
|
||||
name: "default",
|
||||
options: nil,
|
||||
expectedContentType: "json",
|
||||
},
|
||||
{
|
||||
name: "v8",
|
||||
options: []SendOption{WithContentType(QueueContentTypeV8)},
|
||||
expectedContentType: "v8",
|
||||
},
|
||||
{
|
||||
name: "bytes",
|
||||
options: []SendOption{WithContentType(QueueContentTypeBytes)},
|
||||
expectedContentType: "bytes",
|
||||
},
|
||||
|
||||
{
|
||||
name: "delay",
|
||||
options: []SendOption{WithDelay(5 * time.Second)},
|
||||
expectedDelaySec: 5,
|
||||
expectedContentType: "json",
|
||||
},
|
||||
|
||||
{
|
||||
name: "invalid content type",
|
||||
options: []SendOption{WithContentType("invalid")},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
validation := func(message js.Value, options js.Value) error {
|
||||
gotCT := options.Get("contentType").String()
|
||||
if gotCT != string(tt.expectedContentType) {
|
||||
return fmt.Errorf("expected content type %q, got %q", tt.expectedContentType, gotCT)
|
||||
}
|
||||
gotDelaySec := jsutil.MaybeInt(options.Get("delaySeconds"))
|
||||
if gotDelaySec != tt.expectedDelaySec {
|
||||
return fmt.Errorf("expected delay %d, got %d", tt.expectedDelaySec, gotDelaySec)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
producer := validatingProducer(t, validation)
|
||||
err := producer.Send("hello", tt.options...)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Fatalf("expected error: %t, got %v", tt.wantErr, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendBatch_Defaults(t *testing.T) {
|
||||
validation := func(batch js.Value, options js.Value) error {
|
||||
if batch.Type() != js.TypeObject {
|
||||
return errors.New("message batch must be an object (array)")
|
||||
}
|
||||
if batch.Length() != 2 {
|
||||
return fmt.Errorf("expected 2 messages, got %d", batch.Length())
|
||||
}
|
||||
first := batch.Index(0)
|
||||
if first.Get("body").String() != "hello" {
|
||||
return fmt.Errorf("first message body must be 'hello', was %s", first.Get("body"))
|
||||
}
|
||||
if first.Get("options").Get("contentType").String() != "json" {
|
||||
return fmt.Errorf("first message content type must be json, was %s", first.Get("options").Get("contentType"))
|
||||
}
|
||||
|
||||
second := batch.Index(1)
|
||||
if second.Get("body").String() != "world" {
|
||||
return fmt.Errorf("second message body must be 'world', was %s", second.Get("body"))
|
||||
}
|
||||
if second.Get("options").Get("contentType").String() != "text" {
|
||||
return fmt.Errorf("second message content type must be text, was %s", second.Get("options").Get("contentType"))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var batch []*BatchMessage = []*BatchMessage{
|
||||
NewBatchMessage("hello"),
|
||||
NewBatchMessage("world", WithContentType(QueueContentTypeText)),
|
||||
}
|
||||
|
||||
producer := validatingProducer(t, validation)
|
||||
err := producer.SendBatch(batch)
|
||||
if err != nil {
|
||||
t.Fatalf("SendBatch failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendBatch_Options(t *testing.T) {
|
||||
validation := func(_ js.Value, options js.Value) error {
|
||||
if options.Get("delaySeconds").Int() != 5 {
|
||||
return fmt.Errorf("expected delay 5, got %d", options.Get("delaySeconds").Int())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var batch []*BatchMessage = []*BatchMessage{
|
||||
NewBatchMessage("hello"),
|
||||
}
|
||||
|
||||
producer := validatingProducer(t, validation)
|
||||
err := producer.SendBatch(batch, WithBatchDelay(5*time.Second))
|
||||
if err != nil {
|
||||
t.Fatalf("SendBatch failed: %v", err)
|
||||
}
|
||||
}
|
@ -26,6 +26,10 @@ func NewObject() js.Value {
|
||||
return ObjectClass.New()
|
||||
}
|
||||
|
||||
func NewArray(size int) js.Value {
|
||||
return ArrayClass.New(size)
|
||||
}
|
||||
|
||||
func NewUint8Array(size int) js.Value {
|
||||
return Uint8ArrayClass.New(size)
|
||||
}
|
||||
@ -89,7 +93,7 @@ func StrRecordToMap(v js.Value) map[string]string {
|
||||
return result
|
||||
}
|
||||
|
||||
// MaybeString returns string value of given JavaScript value or returns nil if the value is undefined.
|
||||
// MaybeString returns string value of given JavaScript value or returns "" if the value is undefined.
|
||||
func MaybeString(v js.Value) string {
|
||||
if v.IsUndefined() {
|
||||
return ""
|
||||
|
Loading…
x
Reference in New Issue
Block a user