145 lines
4.2 KiB
Go
Raw Normal View History

2022-09-13 23:18:17 +09:00
package jsutil
2022-05-18 00:04:37 +09:00
import (
"bytes"
"fmt"
"io"
"syscall/js"
)
// streamReaderToReader implements io.Reader sourced from ReadableStreamDefaultReader.
2022-08-03 00:19:01 +09:00
// - ReadableStreamDefaultReader: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader
// - This implementation is based on: https://deno.land/std@0.139.0/streams/conversion.ts#L76
2022-05-18 00:04:37 +09:00
type streamReaderToReader struct {
buf bytes.Buffer
streamReader js.Value
}
// Read reads bytes from ReadableStreamDefaultReader.
func (sr *streamReaderToReader) Read(p []byte) (n int, err error) {
if sr.buf.Len() == 0 {
promise := sr.streamReader.Call("read")
resultCh := make(chan js.Value)
errCh := make(chan error)
var then, catch js.Func
then = js.FuncOf(func(_ js.Value, args []js.Value) any {
defer then.Release()
result := args[0]
if result.Get("done").Bool() {
errCh <- io.EOF
return js.Undefined()
}
resultCh <- result.Get("value")
return js.Undefined()
})
catch = js.FuncOf(func(_ js.Value, args []js.Value) any {
defer catch.Release()
result := args[0]
errCh <- fmt.Errorf("JavaScript error on read: %s", result.Call("toString").String())
return js.Undefined()
})
promise.Call("then", then).Call("catch", catch)
select {
case result := <-resultCh:
chunk := make([]byte, result.Get("byteLength").Int())
_ = js.CopyBytesToGo(chunk, result)
// The length written is always the same as the length of chunk, so it can be discarded.
2022-08-03 00:33:19 +09:00
// - https://pkg.go.dev/bytes#Buffer.Write
2022-05-18 00:04:37 +09:00
_, err := sr.buf.Write(chunk)
if err != nil {
return 0, err
}
case err := <-errCh:
return 0, err
}
}
return sr.buf.Read(p)
}
2022-09-13 23:16:58 +09:00
// ConvertStreamReaderToReader converts ReadableStreamDefaultReader to io.Reader.
func ConvertStreamReaderToReader(sr js.Value) io.Reader {
2022-05-18 00:04:37 +09:00
return &streamReaderToReader{
streamReader: sr,
}
}
// readerToReadableStream implements ReadableStream sourced from io.ReadCloser.
2022-08-03 00:19:01 +09:00
// - ReadableStream: https://developer.mozilla.org/docs/Web/API/ReadableStream
// - This implementation is based on: https://deno.land/std@0.139.0/streams/conversion.ts#L230
2022-05-18 00:04:37 +09:00
type readerToReadableStream struct {
reader io.ReadCloser
chunkBuf []byte
}
// Pull implements ReadableStream's pull method.
2022-08-03 00:19:01 +09:00
// - https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#pull
2022-05-18 00:04:37 +09:00
func (rs *readerToReadableStream) Pull(controller js.Value) error {
n, err := rs.reader.Read(rs.chunkBuf)
2023-05-27 10:47:27 +09:00
if n != 0 {
ua := NewUint8Array(n)
js.CopyBytesToJS(ua, rs.chunkBuf[:n])
controller.Call("enqueue", ua)
}
// Cloudflare Workers sometimes call `pull` to closed ReadableStream.
// When the call happens, `io.ErrClosedPipe` should be ignored.
if err == io.EOF || err == io.ErrClosedPipe {
controller.Call("close")
2022-05-18 00:04:37 +09:00
if err := rs.reader.Close(); err != nil {
return err
}
return nil
}
if err != nil {
2022-09-13 23:15:47 +09:00
jsErr := ErrorClass.New(err.Error())
2022-05-18 00:04:37 +09:00
controller.Call("error", jsErr)
if err := rs.reader.Close(); err != nil {
return err
}
return err
}
return nil
}
// Cancel implements ReadableStream's cancel method.
2022-08-03 00:19:01 +09:00
// - https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#cancel
2022-05-18 00:04:37 +09:00
func (rs *readerToReadableStream) Cancel() error {
return rs.reader.Close()
}
// https://deno.land/std@0.139.0/streams/conversion.ts#L5
const defaultChunkSize = 16_640
2022-09-13 23:16:58 +09:00
// ConvertReaderToReadableStream converts io.ReadCloser to ReadableStream.
func ConvertReaderToReadableStream(reader io.ReadCloser) js.Value {
2022-05-18 00:04:37 +09:00
stream := &readerToReadableStream{
reader: reader,
chunkBuf: make([]byte, defaultChunkSize),
}
2022-09-13 23:15:47 +09:00
rsInit := NewObject()
2022-05-18 00:04:37 +09:00
rsInit.Set("pull", js.FuncOf(func(_ js.Value, args []js.Value) any {
var cb js.Func
cb = js.FuncOf(func(this js.Value, pArgs []js.Value) any {
defer cb.Release()
resolve := pArgs[0]
reject := pArgs[1]
controller := args[0]
err := stream.Pull(controller)
if err != nil {
2022-09-13 23:15:47 +09:00
reject.Invoke(ErrorClass.New(err.Error()))
2022-05-18 00:04:37 +09:00
return js.Undefined()
}
resolve.Invoke()
return js.Undefined()
})
2022-09-13 23:15:47 +09:00
return NewPromise(cb)
2022-05-18 00:04:37 +09:00
}))
rsInit.Set("cancel", js.FuncOf(func(js.Value, []js.Value) any {
err := stream.Cancel()
if err != nil {
panic(err)
}
return js.Undefined()
}))
2022-09-13 23:15:47 +09:00
return ReadableStreamClass.New(rsInit)
2022-05-18 00:04:37 +09:00
}