Add message producer

This commit is contained in:
mike.art 2024-10-11 11:41:24 +02:00
parent ad33cfb9ba
commit ee70c46c1e
10 changed files with 359 additions and 1 deletions

3
_examples/queues/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
build
node_modules
.wrangler

12
_examples/queues/Makefile Normal file
View File

@ -0,0 +1,12 @@
.PHONY: dev
dev:
npx wrangler dev --port 8787
.PHONY: build
build:
go run ../../cmd/workers-assets-gen
tinygo build -o ./build/app.wasm -target wasm -no-debug ./...
.PHONY: deploy
deploy:
npx wrangler deploy

7
_examples/queues/go.mod Normal file
View File

@ -0,0 +1,7 @@
module github.com/syumai/workers/_examples/queues
go 1.22.8
require github.com/syumai/workers v0.0.0
replace github.com/syumai/workers => ../../

0
_examples/queues/go.sum Normal file
View File

105
_examples/queues/main.go Normal file
View File

@ -0,0 +1,105 @@
package main
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"github.com/syumai/workers"
"github.com/syumai/workers/cloudflare/queues"
)
const queueName = "QUEUE"
func handleErr(w http.ResponseWriter, msg string, err error) {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(msg))
}
func main() {
http.HandleFunc("/", handleProduce)
workers.Serve(nil)
}
func handleProduce(w http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/" {
w.WriteHeader(http.StatusNotFound)
return
}
if req.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
defer req.Body.Close()
q, err := queues.NewProducer(queueName)
if err != nil {
handleErr(w, "failed to init queue", err)
}
contentType := req.Header.Get("Content-Type")
switch contentType {
case "text/plain":
log.Println("Handling text content type")
err = produceText(q, req)
case "application/json":
log.Println("Handling json content type")
err = produceJson(q, req)
default:
log.Println("Handling bytes content type")
err = produceBytes(q, req)
}
if err != nil {
handleErr(w, "failed to handle request", err)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("message sent\n"))
}
func produceText(q *queues.Producer, req *http.Request) error {
content, err := io.ReadAll(req.Body)
if err != nil {
return fmt.Errorf("failed to read request body: %w", err)
}
if len(content) == 0 {
return fmt.Errorf("empty request body")
}
// text content type supports string and []byte messages
if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
return nil
}
func produceJson(q *queues.Producer, req *http.Request) error {
var data any
if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
return fmt.Errorf("failed to read request body: %w", err)
}
// json content type is default and therefore can be omitted
// json content type supports messages of types that can be serialized to json
if err := q.Send(data); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
return nil
}
func produceBytes(q *queues.Producer, req *http.Request) error {
// bytes content type support messages of type []byte, string, and io.Reader
if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}
return nil
}

View File

@ -0,0 +1,13 @@
name = "queues-producer"
main = "./build/worker.mjs"
compatibility_date = "2022-05-13"
compatibility_flags = [
"streams_enable_constructors"
]
[[queues.producers]]
queue = "my-queue"
binding = "QUEUE"
[build]
command = "make build"

View File

@ -0,0 +1,58 @@
package queues
import (
"fmt"
"io"
"syscall/js"
"github.com/syumai/workers/internal/jsutil"
)
type QueueContentType string
const (
QueueContentTypeJSON QueueContentType = "json"
QueueContentTypeText QueueContentType = "text"
QueueContentTypeBytes QueueContentType = "bytes"
QueueContentTypeV8 QueueContentType = "v8"
)
func (o QueueContentType) mapValue(val any) (js.Value, error) {
switch o {
case QueueContentTypeText:
switch v := val.(type) {
case string:
return js.ValueOf(v), nil
case []byte:
return js.ValueOf(string(v)), nil
default:
return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val)
}
case QueueContentTypeBytes:
var b []byte
switch v := val.(type) {
case string:
b = []byte(v)
case []byte:
b = v
case io.Reader:
var err error
b, err = io.ReadAll(v)
if err != nil {
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
}
default:
return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val)
}
ua := jsutil.NewUint8Array(len(b))
js.CopyBytesToJS(ua, b)
return ua.Get("buffer"), nil
case QueueContentTypeJSON, QueueContentTypeV8:
return js.ValueOf(val), nil
}
return js.Undefined(), fmt.Errorf("unknown content type: %s", o)
}

View File

@ -0,0 +1,105 @@
package queues
import (
"errors"
"fmt"
"syscall/js"
"github.com/syumai/workers/cloudflare/internal/cfruntimecontext"
"github.com/syumai/workers/internal/jsutil"
)
type BatchMessage struct {
body any
options *sendOptions
}
func NewBatchMessage(body any, opts ...SendOption) *BatchMessage {
options := defaultSendOptions()
for _, opt := range opts {
opt(options)
}
return &BatchMessage{body: body, options: options}
}
func (m *BatchMessage) toJS() (js.Value, error) {
if m == nil {
return js.Undefined(), errors.New("message is nil")
}
jsValue, err := m.options.ContentType.mapValue(m.body)
if err != nil {
return js.Undefined(), err
}
obj := jsutil.NewObject()
obj.Set("body", jsValue)
obj.Set("options", m.options.toJS())
return obj, nil
}
type Producer struct {
// queue - Objects that Queue API belongs to. Default is Global
queue js.Value
}
func NewProducer(queueName string) (*Producer, error) {
inst := cfruntimecontext.MustGetRuntimeContextEnv().Get(queueName)
if inst.IsUndefined() {
return nil, fmt.Errorf("%s is undefined", queueName)
}
return &Producer{queue: inst}, nil
}
func (p *Producer) Send(content any, opts ...SendOption) error {
if p.queue.IsUndefined() {
return errors.New("queue object not found")
}
options := defaultSendOptions()
for _, opt := range opts {
opt(options)
}
jsValue, err := options.ContentType.mapValue(content)
if err != nil {
return err
}
prom := p.queue.Call("send", jsValue, options.toJS())
_, err = jsutil.AwaitPromise(prom)
return err
}
func (p *Producer) SendBatch(messages []*BatchMessage) error {
if p.queue.IsUndefined() {
return errors.New("queue object not found")
}
if len(messages) == 0 {
return nil
}
jsArray := jsutil.NewArray(len(messages))
for i, message := range messages {
jsValue, err := message.toJS()
if err != nil {
return fmt.Errorf("failed to convert message %d to JS: %w", i, err)
}
jsArray.SetIndex(i, jsValue)
}
prom := p.queue.Call("sendBatch", jsArray)
_, err := jsutil.AwaitPromise(prom)
return err
}
func (p *Producer) SendJsonBatch(messages ...any) error {
batch := make([]*BatchMessage, len(messages))
for i, message := range messages {
batch[i] = NewBatchMessage(message)
}
return p.SendBatch(batch)
}

View File

@ -0,0 +1,51 @@
package queues
import (
"syscall/js"
"time"
"github.com/syumai/workers/internal/jsutil"
)
type sendOptions struct {
// ContentType - Content type of the message
// Default is "json"
ContentType QueueContentType
// DelaySeconds - The number of seconds to delay the message.
// Default is 0
DelaySeconds int
}
func defaultSendOptions() *sendOptions {
return &sendOptions{
ContentType: QueueContentTypeJSON,
}
}
func (o *sendOptions) toJS() js.Value {
obj := jsutil.NewObject()
obj.Set("contentType", string(o.ContentType))
if o.DelaySeconds != 0 {
obj.Set("delaySeconds", o.DelaySeconds)
}
return obj
}
type SendOption func(*sendOptions)
// WithContentType changes the content type of the message.
func WithContentType(contentType QueueContentType) SendOption {
return func(o *sendOptions) {
o.ContentType = contentType
}
}
// WithDelay changes the number of seconds to delay the message.
func (q *Producer) WithDelay(d time.Duration) SendOption {
return func(o *sendOptions) {
o.DelaySeconds = int(d.Seconds())
}
}

View File

@ -26,6 +26,10 @@ func NewObject() js.Value {
return ObjectClass.New()
}
func NewArray(size int) js.Value {
return ArrayClass.New(size)
}
func NewUint8Array(size int) js.Value {
return Uint8ArrayClass.New(size)
}
@ -89,7 +93,7 @@ func StrRecordToMap(v js.Value) map[string]string {
return result
}
// MaybeString returns string value of given JavaScript value or returns nil if the value is undefined.
// MaybeString returns string value of given JavaScript value or returns "" if the value is undefined.
func MaybeString(v js.Value) string {
if v.IsUndefined() {
return ""