Merge pull request #66 from elee1766/main

tcp socket support
This commit is contained in:
syumai 2023-10-22 14:27:21 +09:00 committed by GitHub
commit 7f46a930e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 334 additions and 1 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
dist
build
build
.cf

2
_examples/tcp/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
build
node_modules

12
_examples/tcp/Makefile Normal file
View File

@ -0,0 +1,12 @@
.PHONY: dev
dev:
wrangler dev
.PHONY: build
build:
go run ../../cmd/workers-assets-gen --mode go
GOOS=js GOARCH=wasm go build -o ./build/app.wasm .
.PHONY: deploy
deploy:
wrangler deploy

20
_examples/tcp/README.md Normal file
View File

@ -0,0 +1,20 @@
# tcp
- makes a connection to tcpbin.com:4242
## Development
### Requirements
This project requires these tools to be installed globally.
* wrangler
* go
### Commands
```
make dev # run dev server
make build # build Go Wasm binary
make deploy # deploy worker
```

7
_examples/tcp/go.mod Normal file
View File

@ -0,0 +1,7 @@
module github.com/syumai/workers/_examples/fetch
go 1.18
require github.com/syumai/workers v0.0.0
replace github.com/syumai/workers => ../../

2
_examples/tcp/go.sum Normal file
View File

@ -0,0 +1,2 @@
github.com/syumai/workers v0.1.0 h1:z5QfQR2X+PCKzom7RodpI5J4D5YF7NT7Qwzb9AM9dgY=
github.com/syumai/workers v0.1.0/go.mod h1:alXIDhTyeTwSzh0ZgQ3cb9HQPyyYfIejupE4Z3efr14=

40
_examples/tcp/main.go Normal file
View File

@ -0,0 +1,40 @@
package main
import (
"bufio"
"net/http"
"time"
"github.com/syumai/workers"
"github.com/syumai/workers/cloudflare"
)
func main() {
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
dialer, err := cloudflare.NewDialer(req.Context(), nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
conn, err := dialer.Dial(req.Context(), "tcp", "tcpbin.com:4242")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
conn.SetDeadline(time.Now().Add(1 * time.Hour))
_, err = conn.Write([]byte("hello.\n"))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
rd := bufio.NewReader(conn)
bts, err := rd.ReadBytes('.')
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write(bts)
})
workers.Serve(handler)
}

View File

@ -0,0 +1,6 @@
name = "tcp"
main = "./build/worker.mjs"
compatibility_date = "2023-02-24"
[build]
command = "make build"

211
cloudflare/socket.go Normal file
View File

@ -0,0 +1,211 @@
package cloudflare
import (
"context"
"io"
"net"
"os"
"syscall/js"
"time"
"github.com/syumai/workers/cloudflare/internal/cfruntimecontext"
"github.com/syumai/workers/internal/jsutil"
)
type Dialer struct {
connect js.Value
opts *SocketOptions
ctx context.Context
}
type SocketOptions struct {
SecureTransport string `json:"secureTransport"`
AllowHalfOpen bool `json:"allowHalfOpen"`
}
// NewDialer
func NewDialer(ctx context.Context, options *SocketOptions) (*Dialer, error) {
connect, err := cfruntimecontext.GetRuntimeContextValue(ctx, "connect")
if err != nil {
return nil, err
}
return &Dialer{connect: connect, opts: options, ctx: ctx}, nil
}
func (d *Dialer) Dial(ctx context.Context, network, addr string) (net.Conn, error) {
switch network {
case "tcp":
default:
panic("not implemented")
}
optionsObj := jsutil.NewObject()
if d.opts != nil {
if d.opts.AllowHalfOpen {
optionsObj.Set("allowHalfOpen", true)
}
if d.opts.SecureTransport != "" {
optionsObj.Set("secureTransport", d.opts.SecureTransport)
}
}
sock := &TCPSocket{}
sock.socket = d.connect.Invoke(addr, optionsObj)
sock.options = d.opts
sock.init(d.ctx)
return sock, nil
}
func (sock *TCPSocket) init(ctx context.Context) {
sock.SetDeadline(time.Now().Add(999999 * time.Hour))
sock.writer = sock.socket.Get("writable").Call("getWriter")
sock.reader = sock.socket.Get("readable").Call("getReader")
sock.rd = jsutil.ConvertReadableStreamToReader(sock.reader)
sock.ctx, sock.cn = context.WithCancel(ctx)
return
}
type TCPSocket struct {
socket js.Value
writer js.Value
reader js.Value
rd io.Reader
options *SocketOptions
readDeadline time.Time
writeDeadline time.Time
ctx context.Context
cn context.CancelFunc
}
func (t *TCPSocket) Socket() js.Value {
return t.socket
}
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
func (t *TCPSocket) Read(b []byte) (n int, err error) {
ctx, cn := context.WithDeadline(t.ctx, t.readDeadline)
defer cn()
done := make(chan struct{})
go func() {
n, err = t.rd.Read(b)
close(done)
}()
select {
case <-done:
return
case <-ctx.Done():
return 0, os.ErrDeadlineExceeded
}
}
// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
func (t *TCPSocket) Write(b []byte) (n int, err error) {
ctx, cn := context.WithDeadline(t.ctx, t.writeDeadline)
defer cn()
done := make(chan struct{})
go func() {
arr := jsutil.NewUint8Array(len(b))
js.CopyBytesToJS(arr, b)
_, err = jsutil.AwaitPromise(t.writer.Call("write", arr))
if err == nil {
n = len(b)
}
close(done)
}()
select {
case <-done:
return
case <-ctx.Done():
return 0, os.ErrDeadlineExceeded
}
}
// StartTls will call startTls on the socket
func (t *TCPSocket) StartTls() *TCPSocket {
sock := &TCPSocket{}
sock.socket = t.socket.Call("startTls")
sock.options = t.options
sock.init(t.ctx)
return sock
}
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (t *TCPSocket) Close() error {
t.cn()
t.socket.Call("close")
return nil
}
// CloseRead closes the read side of the connection.
func (t *TCPSocket) CloseRead() error {
t.reader.Call("close")
return nil
}
// CloseWrite closes the write side of the connection.
func (t *TCPSocket) CloseWrite() error {
t.writer.Call("close")
return nil
}
// LocalAddr returns the local network address, if known.
func (t *TCPSocket) LocalAddr() net.Addr {
return nil
}
// RemoteAddr returns the remote network address, if known.
func (t *TCPSocket) RemoteAddr() net.Addr {
return nil
}
// SetDeadline sets the read and write deadlines associated
// with the connection. It is equivalent to calling both
// SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations
// fail instead of blocking. The deadline applies to all future
// and pending I/O, not just the immediately following call to
// Read or Write. After a deadline has been exceeded, the
// connection can be refreshed by setting a deadline in the future.
//
// If the deadline is exceeded a call to Read or Write or to other
// I/O methods will return an error that wraps os.ErrDeadlineExceeded.
// This can be tested using errors.Is(err, os.ErrDeadlineExceeded).
// The error's Timeout method will return true, but note that there
// are other possible errors for which the Timeout method will
// return true even if the deadline has not been exceeded.
//
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
//
// A zero value for t means I/O operations will not time out.
func (t *TCPSocket) SetDeadline(deadline time.Time) error {
t.SetReadDeadline(deadline)
t.SetWriteDeadline(deadline)
return nil
}
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (t *TCPSocket) SetReadDeadline(deadline time.Time) error {
t.readDeadline = deadline
return nil
}
// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
// some of the data was successfully written.
// A zero value for t means Write will not time out.
func (t *TCPSocket) SetWriteDeadline(deadline time.Time) error {
t.writeDeadline = deadline
return nil
}

View File

@ -7,6 +7,38 @@ import (
"syscall/js"
)
// streamReaderToReader implements io.Reader sourced from ReadableStream
// https://developers.cloudflare.com/workers/runtime-apis/streams/readablestream/
type readableStreamToReader struct {
buf bytes.Buffer
streamReader js.Value
}
// Read reads bytes from ReadableStreamDefaultReader.
func (sr *readableStreamToReader) Read(p []byte) (n int, err error) {
if sr.buf.Len() == 0 {
promise, err := AwaitPromise(sr.streamReader.Call("read"))
if err != nil {
return 0, err
}
result := promise.Get("value")
chunk := make([]byte, result.Get("byteLength").Int())
_ = js.CopyBytesToGo(chunk, result)
_, err = sr.buf.Write(chunk)
if err != nil {
return 0, err
}
}
return sr.buf.Read(p)
}
// ConvertReadableStreamToReader converts ReadableStreamDefaultReader to io.Reader.
func ConvertReadableStreamToReader(sr js.Value) io.Reader {
return &readableStreamToReader{
streamReader: sr,
}
}
// streamReaderToReader implements io.Reader sourced from ReadableStreamDefaultReader.
// - ReadableStreamDefaultReader: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader
// - This implementation is based on: https://deno.land/std@0.139.0/streams/conversion.ts#L76