update queues example

This commit is contained in:
syumai 2024-11-10 00:27:12 +09:00
parent d0919c3abf
commit 15f2b81fcf
2 changed files with 14 additions and 18 deletions

View File

@ -1,6 +1,6 @@
.PHONY: dev .PHONY: dev
dev: dev:
npx wrangler dev --port 8787 wrangler dev
.PHONY: build .PHONY: build
build: build:
@ -9,4 +9,4 @@ build:
.PHONY: deploy .PHONY: deploy
deploy: deploy:
npx wrangler deploy wrangler deploy

View File

@ -23,6 +23,7 @@ func main() {
http.HandleFunc("/", handleProduce) http.HandleFunc("/", handleProduce)
workers.Serve(nil) workers.Serve(nil)
} }
func handleProduce(w http.ResponseWriter, req *http.Request) { func handleProduce(w http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/" { if req.URL.Path != "/" {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
@ -48,7 +49,7 @@ func handleProduce(w http.ResponseWriter, req *http.Request) {
err = produceText(q, req) err = produceText(q, req)
case "application/json": case "application/json":
log.Println("Handling json content type") log.Println("Handling json content type")
err = produceJson(q, req) err = produceJSON(q, req)
default: default:
log.Println("Handling bytes content type") log.Println("Handling bytes content type")
err = produceBytes(q, req) err = produceBytes(q, req)
@ -68,38 +69,33 @@ func produceText(q *queues.Producer, req *http.Request) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to read request body: %w", err) return fmt.Errorf("failed to read request body: %w", err)
} }
if len(content) == 0 { // text content type supports string
return fmt.Errorf("empty request body") if err := q.SendText(string(content)); err != nil {
}
// text content type supports string and []byte messages
if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil {
return fmt.Errorf("failed to send message: %w", err) return fmt.Errorf("failed to send message: %w", err)
} }
return nil return nil
} }
func produceJson(q *queues.Producer, req *http.Request) error { func produceJSON(q *queues.Producer, req *http.Request) error {
var data any var data any
if err := json.NewDecoder(req.Body).Decode(&data); err != nil { if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
return fmt.Errorf("failed to read request body: %w", err) return fmt.Errorf("failed to read request body: %w", err)
} }
// json content type is default and therefore can be omitted
// json content type supports messages of types that can be serialized to json // json content type supports messages of types that can be serialized to json
if err := q.Send(data); err != nil { if err := q.SendJSON(data); err != nil {
return fmt.Errorf("failed to send message: %w", err) return fmt.Errorf("failed to send message: %w", err)
} }
return nil return nil
} }
func produceBytes(q *queues.Producer, req *http.Request) error { func produceBytes(q *queues.Producer, req *http.Request) error {
// bytes content type support messages of type []byte, string, and io.Reader content, err := io.ReadAll(req.Body)
if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil { if err != nil {
return fmt.Errorf("failed to read request body: %w", err)
}
// bytes content type support messages of type []byte
if err := q.SendBytes(content); err != nil {
return fmt.Errorf("failed to send message: %w", err) return fmt.Errorf("failed to send message: %w", err)
} }
return nil return nil
} }