mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
unix socket support
Signed-off-by: Boris Rybalkin <ribalkin@gmail.com>
This commit is contained in:
parent
b6437ef4b3
commit
ae73ace4a4
1
.gitignore
vendored
1
.gitignore
vendored
@ -4,3 +4,4 @@ node_modules
|
|||||||
# Go workspaces
|
# Go workspaces
|
||||||
go.work
|
go.work
|
||||||
go.work.sum
|
go.work.sum
|
||||||
|
.idea
|
@ -4,9 +4,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||||
@ -39,16 +41,30 @@ type HTTPClient struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewHTTPClient(shortTimeout, longTimeout time.Duration, destHomeServer string) *HTTPClient {
|
func NewHTTPClient(shortTimeout, longTimeout time.Duration, destHomeServer string) *HTTPClient {
|
||||||
|
baseUrl := destHomeServer
|
||||||
|
if strings.HasPrefix(destHomeServer, "/") {
|
||||||
|
baseUrl = "http://unix"
|
||||||
|
}
|
||||||
|
|
||||||
return &HTTPClient{
|
return &HTTPClient{
|
||||||
LongTimeoutClient: &http.Client{
|
LongTimeoutClient: newClient(longTimeout, destHomeServer),
|
||||||
Timeout: longTimeout,
|
Client: newClient(shortTimeout, destHomeServer),
|
||||||
Transport: otelhttp.NewTransport(http.DefaultTransport),
|
DestinationServer: baseUrl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClient(timeout time.Duration, destHomeServer string) *http.Client {
|
||||||
|
transport := http.DefaultTransport
|
||||||
|
if strings.HasPrefix(destHomeServer, "/") {
|
||||||
|
transport = &http.Transport{
|
||||||
|
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
|
||||||
|
return net.Dial("unix", destHomeServer)
|
||||||
},
|
},
|
||||||
Client: &http.Client{
|
}
|
||||||
Timeout: shortTimeout,
|
}
|
||||||
Transport: otelhttp.NewTransport(http.DefaultTransport),
|
return &http.Client{
|
||||||
},
|
Timeout: timeout,
|
||||||
DestinationServer: destHomeServer,
|
Transport: otelhttp.NewTransport(transport),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,7 +82,7 @@ func (v *HTTPClient) Versions(ctx context.Context) ([]string, error) {
|
|||||||
return nil, fmt.Errorf("/versions returned HTTP %d", res.StatusCode)
|
return nil, fmt.Errorf("/versions returned HTTP %d", res.StatusCode)
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
body, err := ioutil.ReadAll(res.Body)
|
body, err := io.ReadAll(res.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -99,7 +115,7 @@ func (v *HTTPClient) WhoAmI(ctx context.Context, accessToken string) (string, st
|
|||||||
return "", "", fmt.Errorf("/whoami returned HTTP %d", res.StatusCode)
|
return "", "", fmt.Errorf("/whoami returned HTTP %d", res.StatusCode)
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
body, err := ioutil.ReadAll(res.Body)
|
body, err := io.ReadAll(res.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", err
|
return "", "", err
|
||||||
}
|
}
|
||||||
|
25
v3.go
25
v3.go
@ -4,7 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"embed"
|
"embed"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
@ -216,6 +219,11 @@ func RunSyncV3Server(h http.Handler, bindAddr, destV2Server, tlsCert, tlsKey str
|
|||||||
|
|
||||||
// Block forever
|
// Block forever
|
||||||
var err error
|
var err error
|
||||||
|
if strings.HasPrefix(bindAddr, "/") {
|
||||||
|
logger.Info().Msgf("listening on unix socket %s", bindAddr)
|
||||||
|
listener := unixSocketListener(bindAddr)
|
||||||
|
err = http.Serve(listener, srv)
|
||||||
|
} else {
|
||||||
if tlsCert != "" && tlsKey != "" {
|
if tlsCert != "" && tlsKey != "" {
|
||||||
logger.Info().Msgf("listening TLS on %s", bindAddr)
|
logger.Info().Msgf("listening TLS on %s", bindAddr)
|
||||||
err = http.ListenAndServeTLS(bindAddr, tlsCert, tlsKey, srv)
|
err = http.ListenAndServeTLS(bindAddr, tlsCert, tlsKey, srv)
|
||||||
@ -223,6 +231,7 @@ func RunSyncV3Server(h http.Handler, bindAddr, destV2Server, tlsCert, tlsKey str
|
|||||||
logger.Info().Msgf("listening on %s", bindAddr)
|
logger.Info().Msgf("listening on %s", bindAddr)
|
||||||
err = http.ListenAndServe(bindAddr, srv)
|
err = http.ListenAndServe(bindAddr, srv)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sentry.CaptureException(err)
|
sentry.CaptureException(err)
|
||||||
// TODO: Fatal() calls os.Exit. Will that give time for sentry.Flush() to run?
|
// TODO: Fatal() calls os.Exit. Will that give time for sentry.Flush() to run?
|
||||||
@ -230,6 +239,22 @@ func RunSyncV3Server(h http.Handler, bindAddr, destV2Server, tlsCert, tlsKey str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func unixSocketListener(bindAddr string) net.Listener {
|
||||||
|
err := os.Remove(bindAddr)
|
||||||
|
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||||
|
logger.Fatal().Err(err).Msg("failed to remove existing unix socket")
|
||||||
|
}
|
||||||
|
listener, err := net.Listen("unix", bindAddr)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal().Err(err).Msg("failed to serve unix socket")
|
||||||
|
}
|
||||||
|
err = os.Chmod(bindAddr, 0755)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal().Err(err).Msg("failed to set unix socket permissions")
|
||||||
|
}
|
||||||
|
return listener
|
||||||
|
}
|
||||||
|
|
||||||
type HandlerError struct {
|
type HandlerError struct {
|
||||||
StatusCode int
|
StatusCode int
|
||||||
Err error
|
Err error
|
||||||
|
Loading…
x
Reference in New Issue
Block a user