2024-11-07 11:23:07 -05:00

224 lines
6.0 KiB
Go

package jsutil
import (
"bytes"
"fmt"
"io"
"syscall/js"
)
type RawJSBodyWriter interface {
WriteRawJSBody(body js.Value)
}
type RawJSBodyGetter interface {
GetRawJSBody() js.Value
}
// readableStreamToReadCloser implements io.Reader sourced from ReadableStreamDefaultReader.
// - ReadableStreamDefaultReader: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader
// - This implementation is based on: https://deno.land/std@0.139.0/streams/conversion.ts#L76
type readableStreamToReadCloser struct {
buf bytes.Buffer
stream js.Value
streamReader *js.Value
}
var (
_ io.ReadCloser = (*readableStreamToReadCloser)(nil)
_ io.WriterTo = (*readableStreamToReadCloser)(nil)
_ RawJSBodyGetter = (*readableStreamToReadCloser)(nil)
)
// Read reads bytes from ReadableStreamDefaultReader.
func (sr *readableStreamToReadCloser) Read(p []byte) (n int, err error) {
if sr.streamReader == nil {
r := sr.stream.Call("getReader")
sr.streamReader = &r
}
if sr.buf.Len() == 0 {
resultCh := make(chan js.Value)
errCh := make(chan error)
promise := sr.streamReader.Call("read")
var then, catch js.Func
then = js.FuncOf(func(_ js.Value, args []js.Value) any {
defer then.Release()
result := args[0]
if result.Get("done").Bool() {
errCh <- io.EOF
return js.Undefined()
}
resultCh <- result.Get("value")
return js.Undefined()
})
catch = js.FuncOf(func(_ js.Value, args []js.Value) any {
defer catch.Release()
result := args[0]
errCh <- fmt.Errorf("JavaScript error on read: %s", result.Call("toString").String())
return js.Undefined()
})
promise.Call("then", then).Call("catch", catch)
select {
case result := <-resultCh:
chunk := make([]byte, result.Get("byteLength").Int())
_ = js.CopyBytesToGo(chunk, result)
// The length written is always the same as the length of chunk, so it can be discarded.
// - https://pkg.go.dev/bytes#Buffer.Write
_, err := sr.buf.Write(chunk)
if err != nil {
return 0, err
}
case err := <-errCh:
return 0, err
}
}
return sr.buf.Read(p)
}
func (sr *readableStreamToReadCloser) Close() error {
if sr.streamReader == nil {
return nil
}
sr.streamReader.Call("cancel")
return nil
}
// readerWrapper is wrapper to disable readableStreamToReadCloser's WriteTo method.
type readerWrapper struct {
io.Reader
}
func (sr *readableStreamToReadCloser) WriteTo(w io.Writer) (n int64, err error) {
if w, ok := w.(RawJSBodyWriter); ok {
w.WriteRawJSBody(sr.stream)
return 0, nil
}
return io.Copy(w, &readerWrapper{sr})
}
func (sr *readableStreamToReadCloser) GetRawJSBody() js.Value {
return sr.stream
}
// ConvertReadableStreamToReadCloser converts ReadableStream to io.ReadCloser.
func ConvertReadableStreamToReadCloser(stream js.Value) io.ReadCloser {
return &readableStreamToReadCloser{
stream: stream,
}
}
// readerToReadableStream implements ReadableStream sourced from io.ReadCloser.
// - ReadableStream: https://developer.mozilla.org/docs/Web/API/ReadableStream
// - This implementation is based on: https://deno.land/std@0.139.0/streams/conversion.ts#L230
type readerToReadableStream struct {
reader io.ReadCloser
chunkBuf []byte
}
// Pull implements ReadableStream's pull method.
// - https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#pull
func (rs *readerToReadableStream) Pull(controller js.Value) error {
n, err := rs.reader.Read(rs.chunkBuf)
if n != 0 {
ua := NewUint8Array(n)
js.CopyBytesToJS(ua, rs.chunkBuf[:n])
controller.Call("enqueue", ua)
}
// Cloudflare Workers sometimes call `pull` to closed ReadableStream.
// When the call happens, `io.ErrClosedPipe` should be ignored.
if err == io.EOF || err == io.ErrClosedPipe {
controller.Call("close")
if err := rs.reader.Close(); err != nil {
return err
}
return nil
}
if err != nil {
controller.Call("error", Error(err.Error()))
if err := rs.reader.Close(); err != nil {
return err
}
return err
}
return nil
}
// Cancel implements ReadableStream's cancel method.
// - https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#cancel
func (rs *readerToReadableStream) Cancel() error {
return rs.reader.Close()
}
// https://deno.land/std@0.139.0/streams/conversion.ts#L5
const defaultChunkSize = 16_640
// ConvertReaderToReadableStream converts io.ReadCloser to ReadableStream.
func ConvertReaderToReadableStream(reader io.ReadCloser) js.Value {
stream := &readerToReadableStream{
reader: reader,
chunkBuf: make([]byte, defaultChunkSize),
}
rsInit := NewObject()
rsInit.Set("pull", js.FuncOf(func(_ js.Value, args []js.Value) any {
var cb js.Func
cb = js.FuncOf(func(this js.Value, pArgs []js.Value) any {
defer cb.Release()
resolve := pArgs[0]
reject := pArgs[1]
controller := args[0]
go func() {
err := stream.Pull(controller)
if err != nil {
reject.Invoke(Error(err.Error()))
return
}
resolve.Invoke()
}()
return js.Undefined()
})
return NewPromise(cb)
}))
rsInit.Set("cancel", js.FuncOf(func(js.Value, []js.Value) any {
var cb js.Func
cb = js.FuncOf(func(this js.Value, pArgs []js.Value) any {
defer cb.Release()
resolve := pArgs[0]
reject := pArgs[1]
go func() {
err := stream.Cancel()
if err != nil {
reject.Invoke(Error(err.Error()))
return
}
resolve.Invoke()
}()
return js.Undefined()
})
return NewPromise(cb)
}))
return ReadableStreamClass.New(rsInit)
}
// ConvertReaderToFixedLengthStream converts io.ReadCloser to TransformStream.
func ConvertReaderToFixedLengthStream(rc io.ReadCloser, size int64) js.Value {
stream := FixedLengthStreamClass.New(js.ValueOf(size))
go func(writer js.Value) {
defer rc.Close()
chunk := make([]byte, min(size, defaultChunkSize))
for {
n, err := rc.Read(chunk)
if n > 0 {
b := Uint8ArrayClass.New(n)
js.CopyBytesToJS(b, chunk[:n])
writer.Call("write", b)
}
if err != nil {
writer.Call("close")
return
}
}
}(stream.Get("writable").Call("getWriter"))
return stream.Get("readable")
}