From 688b4b731e7c1a6da5ebe7f0bb2a4e1038597a27 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 26 Jun 2023 03:16:43 -0500 Subject: [PATCH] socket --- _examples/tcp/.gitignore | 1 + _examples/tcp/Makefile | 12 +++ _examples/tcp/README.md | 19 ++++ _examples/tcp/go.mod | 7 ++ _examples/tcp/go.sum | 2 + _examples/tcp/main.go | 40 +++++++ _examples/tcp/wrangler.toml | 6 ++ cloudflare/socket.go | 204 ++++++++++++++++++++++++++++++++++++ internal/jsutil/stream.go | 32 ++++++ 9 files changed, 323 insertions(+) create mode 100644 _examples/tcp/.gitignore create mode 100644 _examples/tcp/Makefile create mode 100644 _examples/tcp/README.md create mode 100644 _examples/tcp/go.mod create mode 100644 _examples/tcp/go.sum create mode 100644 _examples/tcp/main.go create mode 100644 _examples/tcp/wrangler.toml create mode 100644 cloudflare/socket.go diff --git a/_examples/tcp/.gitignore b/_examples/tcp/.gitignore new file mode 100644 index 0000000..c795b05 --- /dev/null +++ b/_examples/tcp/.gitignore @@ -0,0 +1 @@ +build \ No newline at end of file diff --git a/_examples/tcp/Makefile b/_examples/tcp/Makefile new file mode 100644 index 0000000..171f487 --- /dev/null +++ b/_examples/tcp/Makefile @@ -0,0 +1,12 @@ +.PHONY: dev +dev: + wrangler dev + +.PHONY: build +build: + go run ../../cmd/workers-assets-gen --mode go + GOOS=js GOARCH=wasm go build -o ./build/app.wasm . + +.PHONY: deploy +deploy: + wrangler deploy diff --git a/_examples/tcp/README.md b/_examples/tcp/README.md new file mode 100644 index 0000000..614ed55 --- /dev/null +++ b/_examples/tcp/README.md @@ -0,0 +1,19 @@ +# tcp + + +## Development + +### Requirements + +This project requires these tools to be installed globally. + +* wrangler +* tinygo + +### Commands + +``` +make dev # run dev server +make build # build Go Wasm binary +make deploy # deploy worker +``` diff --git a/_examples/tcp/go.mod b/_examples/tcp/go.mod new file mode 100644 index 0000000..b1625c4 --- /dev/null +++ b/_examples/tcp/go.mod @@ -0,0 +1,7 @@ +module github.com/syumai/workers/_examples/fetch + +go 1.18 + +require github.com/syumai/workers v0.0.0 + +replace github.com/syumai/workers => ../../ diff --git a/_examples/tcp/go.sum b/_examples/tcp/go.sum new file mode 100644 index 0000000..8c27871 --- /dev/null +++ b/_examples/tcp/go.sum @@ -0,0 +1,2 @@ +github.com/syumai/workers v0.1.0 h1:z5QfQR2X+PCKzom7RodpI5J4D5YF7NT7Qwzb9AM9dgY= +github.com/syumai/workers v0.1.0/go.mod h1:alXIDhTyeTwSzh0ZgQ3cb9HQPyyYfIejupE4Z3efr14= diff --git a/_examples/tcp/main.go b/_examples/tcp/main.go new file mode 100644 index 0000000..dc668df --- /dev/null +++ b/_examples/tcp/main.go @@ -0,0 +1,40 @@ +package main + +import ( + "bufio" + "net/http" + "time" + + "github.com/syumai/workers" + "github.com/syumai/workers/cloudflare" +) + +func main() { + handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + dialer, err := cloudflare.NewDialer(req.Context(), nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + conn, err := dialer.Dial(req.Context(), "tcp", "tcpbin.com:4242") + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer conn.Close() + conn.SetDeadline(time.Now().Add(1 * time.Hour)) + _, err = conn.Write([]byte("hello.")) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + rd := bufio.NewReader(conn) + bts, err := rd.ReadBytes('.') + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Write(bts) + }) + workers.Serve(handler) +} diff --git a/_examples/tcp/wrangler.toml b/_examples/tcp/wrangler.toml new file mode 100644 index 0000000..6872846 --- /dev/null +++ b/_examples/tcp/wrangler.toml @@ -0,0 +1,6 @@ +name = "tcp" +main = "./build/worker.mjs" +compatibility_date = "2023-02-24" + +[build] +command = "make build" diff --git a/cloudflare/socket.go b/cloudflare/socket.go new file mode 100644 index 0000000..2163c3e --- /dev/null +++ b/cloudflare/socket.go @@ -0,0 +1,204 @@ +package cloudflare + +import ( + "context" + "io" + "net" + "os" + "syscall/js" + "time" + + "github.com/syumai/workers/cloudflare/internal/cfruntimecontext" + "github.com/syumai/workers/internal/jsutil" +) + +type Dialer struct { + connect js.Value + opts *SocketOptions +} + +type SocketOptions struct { + SecureTransport string `json:"secureTransport"` + AllowHalfOpen bool `json:"allowHalfOpen"` +} + +// NewDialer +func NewDialer(ctx context.Context, options *SocketOptions) (*Dialer, error) { + connect, err := cfruntimecontext.GetRuntimeContextValue(ctx, "connect") + if err != nil { + return nil, err + } + return &Dialer{connect: connect, opts: options}, nil +} + +func (d *Dialer) Dial(ctx context.Context, network, addr string) (net.Conn, error) { + switch network { + case "tcp": + default: + panic("not implemented") + } + optionsObj := jsutil.NewObject() + if d.opts != nil { + if d.opts.AllowHalfOpen { + optionsObj.Set("allowHalfOpen", true) + } + if d.opts.SecureTransport != "" { + optionsObj.Set("secureTransport", d.opts.SecureTransport) + } + } + sock := &TCPSocket{} + sock.socket = d.connect.Invoke(addr, optionsObj) + sock.writer = sock.socket.Get("writable").Call("getWriter") + sock.reader = sock.socket.Get("readable").Call("getReader") + sock.options = d.opts + + sock.goSide, sock.jsSide = net.Pipe() + + sock.ctx, sock.cn = context.WithCancel(context.Background()) + + { + sock.rd = jsutil.ConvertReadableStreamToReader(sock.reader) + } + + return sock, nil +} + +type TCPSocket struct { + socket js.Value + writer js.Value + reader js.Value + + rd io.Reader + wr io.Writer + + options *SocketOptions + + readDeadline time.Time + writeDeadline time.Time + + goSide net.Conn + jsSide net.Conn + + ctx context.Context + cn context.CancelFunc +} + +// Read reads data from the connection. +// Read can be made to time out and return an error after a fixed +// time limit; see SetDeadline and SetReadDeadline. +func (t *TCPSocket) Read(b []byte) (n int, err error) { + //ctx, cn := context.WithDeadline(t.ctx, t.readDeadline) + //defer cn() + //done := make(chan struct{}) + //go func() { + n, err = t.rd.Read(b) + // close(done) + //}() + //select { + //case <-done: + // return + //case <-ctx.Done(): + // return 0, os.ErrDeadlineExceeded + //} + + return +} + +// Write writes data to the connection. +// Write can be made to time out and return an error after a fixed +// time limit; see SetDeadline and SetWriteDeadline. +func (t *TCPSocket) Write(b []byte) (n int, err error) { + ctx, cn := context.WithDeadline(t.ctx, t.writeDeadline) + defer cn() + done := make(chan struct{}) + go func() { + arr := jsutil.NewUint8Array(len(b)) + js.CopyBytesToJS(arr, b) + _, err = jsutil.AwaitPromise(t.writer.Call("write", arr)) + if err == nil { + n = len(b) + } + close(done) + }() + select { + case <-done: + return + case <-ctx.Done(): + return 0, os.ErrDeadlineExceeded + } +} + +// Close closes the connection. +// Any blocked Read or Write operations will be unblocked and return errors. +func (t *TCPSocket) Close() error { + t.cn() + t.socket.Call("close") + return nil +} + +// CloseRead closes the read side of the connection. +func (t *TCPSocket) CloseRead() error { + t.reader.Call("close") + return nil +} + +// CloseWrite closes the write side of the connection. +func (t *TCPSocket) CloseWrite() error { + t.writer.Call("close") + return nil +} + +// LocalAddr returns the local network address, if known. +func (t *TCPSocket) LocalAddr() net.Addr { + return nil +} + +// RemoteAddr returns the remote network address, if known. +func (t *TCPSocket) RemoteAddr() net.Addr { + return nil +} + +// SetDeadline sets the read and write deadlines associated +// with the connection. It is equivalent to calling both +// SetReadDeadline and SetWriteDeadline. +// +// A deadline is an absolute time after which I/O operations +// fail instead of blocking. The deadline applies to all future +// and pending I/O, not just the immediately following call to +// Read or Write. After a deadline has been exceeded, the +// connection can be refreshed by setting a deadline in the future. +// +// If the deadline is exceeded a call to Read or Write or to other +// I/O methods will return an error that wraps os.ErrDeadlineExceeded. +// This can be tested using errors.Is(err, os.ErrDeadlineExceeded). +// The error's Timeout method will return true, but note that there +// are other possible errors for which the Timeout method will +// return true even if the deadline has not been exceeded. +// +// An idle timeout can be implemented by repeatedly extending +// the deadline after successful Read or Write calls. +// +// A zero value for t means I/O operations will not time out. +func (t *TCPSocket) SetDeadline(deadline time.Time) error { + t.SetReadDeadline(deadline) + t.SetWriteDeadline(deadline) + return nil +} + +// SetReadDeadline sets the deadline for future Read calls +// and any currently-blocked Read call. +// A zero value for t means Read will not time out. +func (t *TCPSocket) SetReadDeadline(deadline time.Time) error { + t.goSide.SetReadDeadline(deadline) + return nil +} + +// SetWriteDeadline sets the deadline for future Write calls +// and any currently-blocked Write call. +// Even if write times out, it may return n > 0, indicating that +// some of the data was successfully written. +// A zero value for t means Write will not time out. +func (t *TCPSocket) SetWriteDeadline(deadline time.Time) error { + t.writeDeadline = deadline + return nil +} diff --git a/internal/jsutil/stream.go b/internal/jsutil/stream.go index 60120b8..2614e31 100644 --- a/internal/jsutil/stream.go +++ b/internal/jsutil/stream.go @@ -7,6 +7,38 @@ import ( "syscall/js" ) +// streamReaderToReader implements io.Reader sourced from ReadableStream +// https://developers.cloudflare.com/workers/runtime-apis/streams/readablestream/ +type readableStreamToReader struct { + buf bytes.Buffer + streamReader js.Value +} + +// Read reads bytes from ReadableStreamDefaultReader. +func (sr *readableStreamToReader) Read(p []byte) (n int, err error) { + if sr.buf.Len() == 0 { + promise, err := AwaitPromise(sr.streamReader.Call("read")) + if err != nil { + return 0, err + } + result := promise.Get("value") + chunk := make([]byte, result.Get("byteLength").Int()) + _ = js.CopyBytesToGo(chunk, result) + _, err = sr.buf.Write(chunk) + if err != nil { + return 0, err + } + } + return sr.buf.Read(p) +} + +// ConvertReadableStreamToReader converts ReadableStreamDefaultReader to io.Reader. +func ConvertReadableStreamToReader(sr js.Value) io.Reader { + return &readableStreamToReader{ + streamReader: sr, + } +} + // streamReaderToReader implements io.Reader sourced from ReadableStreamDefaultReader. // - ReadableStreamDefaultReader: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader // - This implementation is based on: https://deno.land/std@0.139.0/streams/conversion.ts#L76