This commit is contained in:
a 2023-06-26 03:16:43 -05:00
parent 3bff27df38
commit 688b4b731e
No known key found for this signature in database
GPG Key ID: 374BC539FE795AF0
9 changed files with 323 additions and 0 deletions

1
_examples/tcp/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
build

12
_examples/tcp/Makefile Normal file
View File

@ -0,0 +1,12 @@
.PHONY: dev
dev:
wrangler dev
.PHONY: build
build:
go run ../../cmd/workers-assets-gen --mode go
GOOS=js GOARCH=wasm go build -o ./build/app.wasm .
.PHONY: deploy
deploy:
wrangler deploy

19
_examples/tcp/README.md Normal file
View File

@ -0,0 +1,19 @@
# tcp
## Development
### Requirements
This project requires these tools to be installed globally.
* wrangler
* tinygo
### Commands
```
make dev # run dev server
make build # build Go Wasm binary
make deploy # deploy worker
```

7
_examples/tcp/go.mod Normal file
View File

@ -0,0 +1,7 @@
module github.com/syumai/workers/_examples/fetch
go 1.18
require github.com/syumai/workers v0.0.0
replace github.com/syumai/workers => ../../

2
_examples/tcp/go.sum Normal file
View File

@ -0,0 +1,2 @@
github.com/syumai/workers v0.1.0 h1:z5QfQR2X+PCKzom7RodpI5J4D5YF7NT7Qwzb9AM9dgY=
github.com/syumai/workers v0.1.0/go.mod h1:alXIDhTyeTwSzh0ZgQ3cb9HQPyyYfIejupE4Z3efr14=

40
_examples/tcp/main.go Normal file
View File

@ -0,0 +1,40 @@
package main
import (
"bufio"
"net/http"
"time"
"github.com/syumai/workers"
"github.com/syumai/workers/cloudflare"
)
func main() {
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
dialer, err := cloudflare.NewDialer(req.Context(), nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
conn, err := dialer.Dial(req.Context(), "tcp", "tcpbin.com:4242")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
conn.SetDeadline(time.Now().Add(1 * time.Hour))
_, err = conn.Write([]byte("hello."))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
rd := bufio.NewReader(conn)
bts, err := rd.ReadBytes('.')
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write(bts)
})
workers.Serve(handler)
}

View File

@ -0,0 +1,6 @@
name = "tcp"
main = "./build/worker.mjs"
compatibility_date = "2023-02-24"
[build]
command = "make build"

204
cloudflare/socket.go Normal file
View File

@ -0,0 +1,204 @@
package cloudflare
import (
"context"
"io"
"net"
"os"
"syscall/js"
"time"
"github.com/syumai/workers/cloudflare/internal/cfruntimecontext"
"github.com/syumai/workers/internal/jsutil"
)
type Dialer struct {
connect js.Value
opts *SocketOptions
}
type SocketOptions struct {
SecureTransport string `json:"secureTransport"`
AllowHalfOpen bool `json:"allowHalfOpen"`
}
// NewDialer
func NewDialer(ctx context.Context, options *SocketOptions) (*Dialer, error) {
connect, err := cfruntimecontext.GetRuntimeContextValue(ctx, "connect")
if err != nil {
return nil, err
}
return &Dialer{connect: connect, opts: options}, nil
}
func (d *Dialer) Dial(ctx context.Context, network, addr string) (net.Conn, error) {
switch network {
case "tcp":
default:
panic("not implemented")
}
optionsObj := jsutil.NewObject()
if d.opts != nil {
if d.opts.AllowHalfOpen {
optionsObj.Set("allowHalfOpen", true)
}
if d.opts.SecureTransport != "" {
optionsObj.Set("secureTransport", d.opts.SecureTransport)
}
}
sock := &TCPSocket{}
sock.socket = d.connect.Invoke(addr, optionsObj)
sock.writer = sock.socket.Get("writable").Call("getWriter")
sock.reader = sock.socket.Get("readable").Call("getReader")
sock.options = d.opts
sock.goSide, sock.jsSide = net.Pipe()
sock.ctx, sock.cn = context.WithCancel(context.Background())
{
sock.rd = jsutil.ConvertReadableStreamToReader(sock.reader)
}
return sock, nil
}
type TCPSocket struct {
socket js.Value
writer js.Value
reader js.Value
rd io.Reader
wr io.Writer
options *SocketOptions
readDeadline time.Time
writeDeadline time.Time
goSide net.Conn
jsSide net.Conn
ctx context.Context
cn context.CancelFunc
}
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
func (t *TCPSocket) Read(b []byte) (n int, err error) {
//ctx, cn := context.WithDeadline(t.ctx, t.readDeadline)
//defer cn()
//done := make(chan struct{})
//go func() {
n, err = t.rd.Read(b)
// close(done)
//}()
//select {
//case <-done:
// return
//case <-ctx.Done():
// return 0, os.ErrDeadlineExceeded
//}
return
}
// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
func (t *TCPSocket) Write(b []byte) (n int, err error) {
ctx, cn := context.WithDeadline(t.ctx, t.writeDeadline)
defer cn()
done := make(chan struct{})
go func() {
arr := jsutil.NewUint8Array(len(b))
js.CopyBytesToJS(arr, b)
_, err = jsutil.AwaitPromise(t.writer.Call("write", arr))
if err == nil {
n = len(b)
}
close(done)
}()
select {
case <-done:
return
case <-ctx.Done():
return 0, os.ErrDeadlineExceeded
}
}
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (t *TCPSocket) Close() error {
t.cn()
t.socket.Call("close")
return nil
}
// CloseRead closes the read side of the connection.
func (t *TCPSocket) CloseRead() error {
t.reader.Call("close")
return nil
}
// CloseWrite closes the write side of the connection.
func (t *TCPSocket) CloseWrite() error {
t.writer.Call("close")
return nil
}
// LocalAddr returns the local network address, if known.
func (t *TCPSocket) LocalAddr() net.Addr {
return nil
}
// RemoteAddr returns the remote network address, if known.
func (t *TCPSocket) RemoteAddr() net.Addr {
return nil
}
// SetDeadline sets the read and write deadlines associated
// with the connection. It is equivalent to calling both
// SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations
// fail instead of blocking. The deadline applies to all future
// and pending I/O, not just the immediately following call to
// Read or Write. After a deadline has been exceeded, the
// connection can be refreshed by setting a deadline in the future.
//
// If the deadline is exceeded a call to Read or Write or to other
// I/O methods will return an error that wraps os.ErrDeadlineExceeded.
// This can be tested using errors.Is(err, os.ErrDeadlineExceeded).
// The error's Timeout method will return true, but note that there
// are other possible errors for which the Timeout method will
// return true even if the deadline has not been exceeded.
//
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
//
// A zero value for t means I/O operations will not time out.
func (t *TCPSocket) SetDeadline(deadline time.Time) error {
t.SetReadDeadline(deadline)
t.SetWriteDeadline(deadline)
return nil
}
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (t *TCPSocket) SetReadDeadline(deadline time.Time) error {
t.goSide.SetReadDeadline(deadline)
return nil
}
// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
// some of the data was successfully written.
// A zero value for t means Write will not time out.
func (t *TCPSocket) SetWriteDeadline(deadline time.Time) error {
t.writeDeadline = deadline
return nil
}

View File

@ -7,6 +7,38 @@ import (
"syscall/js"
)
// streamReaderToReader implements io.Reader sourced from ReadableStream
// https://developers.cloudflare.com/workers/runtime-apis/streams/readablestream/
type readableStreamToReader struct {
buf bytes.Buffer
streamReader js.Value
}
// Read reads bytes from ReadableStreamDefaultReader.
func (sr *readableStreamToReader) Read(p []byte) (n int, err error) {
if sr.buf.Len() == 0 {
promise, err := AwaitPromise(sr.streamReader.Call("read"))
if err != nil {
return 0, err
}
result := promise.Get("value")
chunk := make([]byte, result.Get("byteLength").Int())
_ = js.CopyBytesToGo(chunk, result)
_, err = sr.buf.Write(chunk)
if err != nil {
return 0, err
}
}
return sr.buf.Read(p)
}
// ConvertReadableStreamToReader converts ReadableStreamDefaultReader to io.Reader.
func ConvertReadableStreamToReader(sr js.Value) io.Reader {
return &readableStreamToReader{
streamReader: sr,
}
}
// streamReaderToReader implements io.Reader sourced from ReadableStreamDefaultReader.
// - ReadableStreamDefaultReader: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader
// - This implementation is based on: https://deno.land/std@0.139.0/streams/conversion.ts#L76