mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-04-02 15:55:45 -04:00
Merge pull request #136576 from pohly/log-client-go-apimachinery-network-util
apimachinery: contextual logging in network util code
This commit is contained in:
commit
0ed3bf2a9d
11 changed files with 168 additions and 82 deletions
|
|
@ -169,6 +169,7 @@ func (c *connection) newSpdyStream(stream *spdystream.Stream) {
|
|||
err := c.newStreamHandler(stream, replySent)
|
||||
rejectStream := (err != nil)
|
||||
if rejectStream {
|
||||
//nolint:logcheck // Hopefully this never gets triggered.
|
||||
klog.Warningf("Stream rejected: %v", err)
|
||||
stream.Reset()
|
||||
return
|
||||
|
|
@ -195,6 +196,7 @@ func (c *connection) sendPings(period time.Duration) {
|
|||
case <-t.C:
|
||||
}
|
||||
if _, err := c.ping(); err != nil {
|
||||
//nolint:logcheck // Hopefully this never gets triggered.
|
||||
klog.V(3).Infof("SPDY Ping failed: %v", err)
|
||||
// Continue, in case this is a transient failure.
|
||||
// c.conn.CloseChan above will tell us when the connection is
|
||||
|
|
|
|||
|
|
@ -105,14 +105,14 @@ func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Reque
|
|||
|
||||
conn, bufrw, err := hijacker.Hijack()
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err))
|
||||
runtime.HandleErrorWithContext(req.Context(), err, "Unable to upgrade: error hijacking response")
|
||||
return nil
|
||||
}
|
||||
|
||||
connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader}
|
||||
spdyConn, err := NewServerConnectionWithPings(connWithBuf, newStreamHandler, u.pingPeriod)
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err))
|
||||
runtime.HandleErrorWithContext(req.Context(), err, "Unable to upgrade: error creating SPDY server connection")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -126,8 +126,16 @@ func IsWebSocketRequestWithTunnelingProtocol(req *http.Request) bool {
|
|||
|
||||
// IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
|
||||
// read and write deadlines are pushed every time a new message is received.
|
||||
//
|
||||
// Contextual logging: IgnoreReceivesWithLogger should be used instead of IgnoreReceives in code which uses contextual logging.
|
||||
func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
|
||||
defer runtime.HandleCrash()
|
||||
IgnoreReceivesWithLogger(klog.Background(), ws, timeout)
|
||||
}
|
||||
|
||||
// IgnoreReceivesWithLogger reads from a WebSocket until it is closed, then returns. If timeout is set, the
|
||||
// read and write deadlines are pushed every time a new message is received.
|
||||
func IgnoreReceivesWithLogger(logger klog.Logger, ws *websocket.Conn, timeout time.Duration) {
|
||||
defer runtime.HandleCrashWithLogger(logger)
|
||||
var data []byte
|
||||
for {
|
||||
resetTimeout(ws, timeout)
|
||||
|
|
@ -236,7 +244,7 @@ func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.R
|
|||
// "conn.ready" and then blocks until serving is complete.
|
||||
select {
|
||||
case <-conn.ready:
|
||||
klog.V(8).Infof("websocket server initialized--serving")
|
||||
klog.FromContext(req.Context()).V(8).Info("websocket server initialized--serving")
|
||||
case <-serveHTTPComplete:
|
||||
// websocket server returned before completing initialization; cleanup and return error.
|
||||
conn.closeNonThreadSafe() //nolint:errcheck
|
||||
|
|
@ -330,13 +338,19 @@ func (conn *Conn) handle(ws *websocket.Conn) {
|
|||
conn.initialize(ws)
|
||||
defer conn.Close()
|
||||
supportsStreamClose := protocolSupportsStreamClose(conn.selectedProtocol)
|
||||
// conn.handle is typically used on the server-side and thus we have a request,
|
||||
// but don't assume that and use klog.Background as fallback.
|
||||
logger := klog.Background()
|
||||
if req := ws.Request(); req != nil {
|
||||
logger = klog.FromContext(req.Context())
|
||||
}
|
||||
|
||||
for {
|
||||
conn.resetTimeout()
|
||||
var data []byte
|
||||
if err := websocket.Message.Receive(ws, &data); err != nil {
|
||||
if err != io.EOF {
|
||||
klog.Errorf("Error on socket receive: %v", err)
|
||||
logger.Error(err, "Error on socket receive")
|
||||
}
|
||||
break
|
||||
}
|
||||
|
|
@ -345,15 +359,15 @@ func (conn *Conn) handle(ws *websocket.Conn) {
|
|||
}
|
||||
if supportsStreamClose && data[0] == remotecommand.StreamClose {
|
||||
if len(data) != 2 {
|
||||
klog.Errorf("Single channel byte should follow stream close signal. Got %d bytes", len(data)-1)
|
||||
logger.Error(nil, "Single channel byte should follow stream close signal", "receivedLength", len(data)-1)
|
||||
break
|
||||
} else {
|
||||
channel := data[1]
|
||||
if int(channel) >= len(conn.channels) {
|
||||
klog.Errorf("Close is targeted for a channel %d that is not valid, possible protocol error", channel)
|
||||
logger.Error(nil, "Close is targeted for a channel that is not valid, possible protocol error", "channel", channel)
|
||||
break
|
||||
}
|
||||
klog.V(4).Infof("Received half-close signal from client; close %d stream", channel)
|
||||
logger.V(4).Info("Received half-close signal from client, close stream", "channel", channel)
|
||||
conn.channels[channel].Close() // After first Close, other closes are noop.
|
||||
}
|
||||
continue
|
||||
|
|
@ -364,11 +378,11 @@ func (conn *Conn) handle(ws *websocket.Conn) {
|
|||
}
|
||||
data = data[1:]
|
||||
if int(channel) >= len(conn.channels) {
|
||||
klog.V(6).Infof("Frame is targeted for a reader %d that is not valid, possible protocol error", channel)
|
||||
logger.V(6).Info("Frame is targeted for a reader that is not valid, possible protocol error", "channel", channel)
|
||||
continue
|
||||
}
|
||||
if _, err := conn.channels[channel].DataFromSocket(data); err != nil {
|
||||
klog.Errorf("Unable to write frame (%d bytes) to %d: %v", len(data), channel, err)
|
||||
logger.Error(err, "Unable to write frame", "sendLength", len(data), "channel", channel, "err", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package wsstream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"io"
|
||||
"net/http"
|
||||
|
|
@ -26,6 +27,7 @@ import (
|
|||
"golang.org/x/net/websocket"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// The WebSocket subprotocol "binary.k8s.io" will only send messages to the
|
||||
|
|
@ -56,6 +58,7 @@ func NewDefaultReaderProtocols() map[string]ReaderProtocolConfig {
|
|||
|
||||
// Reader supports returning an arbitrary byte stream over a websocket channel.
|
||||
type Reader struct {
|
||||
logger klog.Logger
|
||||
err chan error
|
||||
r io.Reader
|
||||
ping bool
|
||||
|
|
@ -63,7 +66,7 @@ type Reader struct {
|
|||
protocols map[string]ReaderProtocolConfig
|
||||
selectedProtocol string
|
||||
|
||||
handleCrash func(additionalHandlers ...func(interface{})) // overridable for testing
|
||||
handleCrash func(ctx context.Context, additionalHandlers ...func(context.Context, interface{})) // overridable for testing
|
||||
}
|
||||
|
||||
// NewReader creates a WebSocket pipe that will copy the contents of r to a provided
|
||||
|
|
@ -72,13 +75,26 @@ type Reader struct {
|
|||
//
|
||||
// The protocols parameter maps subprotocol names to StreamProtocols. The empty string
|
||||
// subprotocol name is used if websocket.Config.Protocol is empty.
|
||||
//
|
||||
//logcheck:context // NewReaderWithLogger should be used instead of NewReader in code which supports contextual logging.
|
||||
func NewReader(r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader {
|
||||
return NewReaderWithLogger(klog.Background(), r, ping, protocols)
|
||||
}
|
||||
|
||||
// NewReaderWithLogger creates a WebSocket pipe that will copy the contents of r to a provided
|
||||
// WebSocket connection. If ping is true, a zero length message will be sent to the client
|
||||
// before the stream begins reading.
|
||||
//
|
||||
// The protocols parameter maps subprotocol names to StreamProtocols. The empty string
|
||||
// subprotocol name is used if websocket.Config.Protocol is empty.
|
||||
func NewReaderWithLogger(logger klog.Logger, r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader {
|
||||
return &Reader{
|
||||
logger: logger,
|
||||
r: r,
|
||||
err: make(chan error),
|
||||
ping: ping,
|
||||
protocols: protocols,
|
||||
handleCrash: runtime.HandleCrash,
|
||||
handleCrash: runtime.HandleCrashWithContext,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -100,7 +116,7 @@ func (r *Reader) handshake(config *websocket.Config, req *http.Request) error {
|
|||
// method completes.
|
||||
func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error {
|
||||
go func() {
|
||||
defer r.handleCrash()
|
||||
defer r.handleCrash(req.Context())
|
||||
websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req)
|
||||
}()
|
||||
return <-r.err
|
||||
|
|
@ -122,10 +138,10 @@ func (r *Reader) handle(ws *websocket.Conn) {
|
|||
defer closeConn()
|
||||
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
defer runtime.HandleCrashWithLogger(r.logger)
|
||||
// This blocks until the connection is closed.
|
||||
// Client should not send anything.
|
||||
IgnoreReceives(ws, r.timeout)
|
||||
IgnoreReceivesWithLogger(r.logger, ws, r.timeout)
|
||||
// Once the client closes, we should also close
|
||||
closeConn()
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package wsstream
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
|
|
@ -32,6 +33,7 @@ import (
|
|||
|
||||
func TestStream(t *testing.T) {
|
||||
input := "some random text"
|
||||
//nolint:logcheck // Intentionally uses the old API.
|
||||
r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols())
|
||||
r.SetIdleTimeout(time.Second)
|
||||
data, err := readWebSocket(r, t, nil)
|
||||
|
|
@ -45,6 +47,7 @@ func TestStream(t *testing.T) {
|
|||
|
||||
func TestStreamPing(t *testing.T) {
|
||||
input := "some random text"
|
||||
//nolint:logcheck // Intentionally uses the old API.
|
||||
r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols())
|
||||
r.SetIdleTimeout(time.Second)
|
||||
err := expectWebSocketFrames(r, t, nil, [][]byte{
|
||||
|
|
@ -59,6 +62,7 @@ func TestStreamPing(t *testing.T) {
|
|||
func TestStreamBase64(t *testing.T) {
|
||||
input := "some random text"
|
||||
encoded := base64.StdEncoding.EncodeToString([]byte(input))
|
||||
//nolint:logcheck // Intentionally uses the old API.
|
||||
r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols())
|
||||
data, err := readWebSocket(r, t, nil, "base64.binary.k8s.io")
|
||||
if !reflect.DeepEqual(data, []byte(encoded)) {
|
||||
|
|
@ -72,6 +76,7 @@ func TestStreamBase64(t *testing.T) {
|
|||
func TestStreamVersionedBase64(t *testing.T) {
|
||||
input := "some random text"
|
||||
encoded := base64.StdEncoding.EncodeToString([]byte(input))
|
||||
//nolint:logcheck // Intentionally uses the old API.
|
||||
r := NewReader(bytes.NewBuffer([]byte(input)), true, map[string]ReaderProtocolConfig{
|
||||
"": {Binary: true},
|
||||
"binary.k8s.io": {Binary: true},
|
||||
|
|
@ -100,6 +105,7 @@ func TestStreamVersionedCopy(t *testing.T) {
|
|||
}
|
||||
}
|
||||
input := "some random text"
|
||||
//nolint:logcheck // Intentionally uses the old API.
|
||||
r := NewReader(bytes.NewBuffer([]byte(input)), true, supportedProtocols)
|
||||
s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
err := r.Copy(w, req)
|
||||
|
|
@ -145,6 +151,7 @@ func TestStreamError(t *testing.T) {
|
|||
},
|
||||
err: fmt.Errorf("bad read"),
|
||||
}
|
||||
//nolint:logcheck // Intentionally uses the old API.
|
||||
r := NewReader(errs, false, NewDefaultReaderProtocols())
|
||||
|
||||
data, err := readWebSocket(r, t, nil)
|
||||
|
|
@ -165,10 +172,11 @@ func TestStreamSurvivesPanic(t *testing.T) {
|
|||
},
|
||||
panicMessage: "bad read",
|
||||
}
|
||||
//nolint:logcheck // Intentionally uses the old API.
|
||||
r := NewReader(errs, false, NewDefaultReaderProtocols())
|
||||
|
||||
// do not call runtime.HandleCrash() in handler. Otherwise, the tests are interrupted.
|
||||
r.handleCrash = func(additionalHandlers ...func(interface{})) { recover() }
|
||||
r.handleCrash = func(_ context.Context, additionalHandlers ...func(context.Context, interface{})) { recover() }
|
||||
|
||||
data, err := readWebSocket(r, t, nil)
|
||||
if !reflect.DeepEqual(data, []byte(input)) {
|
||||
|
|
@ -191,6 +199,7 @@ func TestStreamClosedDuringRead(t *testing.T) {
|
|||
err: fmt.Errorf("stuff"),
|
||||
pause: ch,
|
||||
}
|
||||
//nolint:logcheck // Intentionally uses the old API.
|
||||
r := NewReader(errs, false, NewDefaultReaderProtocols())
|
||||
|
||||
data, err := readWebSocket(r, t, func(c *websocket.Conn) {
|
||||
|
|
|
|||
|
|
@ -132,9 +132,11 @@ func SetTransportDefaults(t *http.Transport) *http.Transport {
|
|||
t = SetOldTransportDefaults(t)
|
||||
// Allow clients to disable http2 if needed.
|
||||
if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 {
|
||||
//nolint:logcheck // Should be rare, not worth converting.
|
||||
klog.Info("HTTP2 has been explicitly disabled")
|
||||
} else if allowsHTTP2(t) {
|
||||
if err := configureHTTP2Transport(t); err != nil {
|
||||
//nolint:logcheck // Should be rare, not worth converting.
|
||||
klog.Warningf("Transport failed http2 configuration: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -148,6 +150,7 @@ func readIdleTimeoutSeconds() int {
|
|||
if s := os.Getenv("HTTP2_READ_IDLE_TIMEOUT_SECONDS"); len(s) > 0 {
|
||||
i, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
//nolint:logcheck // Should be rare, not worth converting.
|
||||
klog.Warningf("Illegal HTTP2_READ_IDLE_TIMEOUT_SECONDS(%q): %v."+
|
||||
" Default value %d is used", s, err, ret)
|
||||
return ret
|
||||
|
|
@ -162,6 +165,7 @@ func pingTimeoutSeconds() int {
|
|||
if s := os.Getenv("HTTP2_PING_TIMEOUT_SECONDS"); len(s) > 0 {
|
||||
i, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
//nolint:logcheck // Should be rare, not worth converting.
|
||||
klog.Warningf("Illegal HTTP2_PING_TIMEOUT_SECONDS(%q): %v."+
|
||||
" Default value %d is used", s, err, ret)
|
||||
return ret
|
||||
|
|
@ -256,6 +260,7 @@ func CloseIdleConnectionsFor(transport http.RoundTripper) {
|
|||
case RoundTripperWrapper:
|
||||
CloseIdleConnectionsFor(transport.WrappedRoundTripper())
|
||||
default:
|
||||
//nolint:logcheck // Should be rare, not worth converting.
|
||||
klog.Warningf("unknown transport type: %T", transport)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -201,12 +201,12 @@ func parseIP(str string, family AddressFamily) (net.IP, error) {
|
|||
return net.IP(bytes), nil
|
||||
}
|
||||
|
||||
func isInterfaceUp(intf *net.Interface) bool {
|
||||
func isInterfaceUp(logger klog.Logger, intf *net.Interface) bool {
|
||||
if intf == nil {
|
||||
return false
|
||||
}
|
||||
if intf.Flags&net.FlagUp != 0 {
|
||||
klog.V(4).Infof("Interface %v is up", intf.Name)
|
||||
logger.V(4).Info("Interface is up", "interface", intf.Name)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
|
@ -218,23 +218,23 @@ func isLoopbackOrPointToPoint(intf *net.Interface) bool {
|
|||
|
||||
// getMatchingGlobalIP returns the first valid global unicast address of the given
|
||||
// 'family' from the list of 'addrs'.
|
||||
func getMatchingGlobalIP(addrs []net.Addr, family AddressFamily) (net.IP, error) {
|
||||
func getMatchingGlobalIP(logger klog.Logger, addrs []net.Addr, family AddressFamily) (net.IP, error) {
|
||||
if len(addrs) > 0 {
|
||||
for i := range addrs {
|
||||
klog.V(4).Infof("Checking addr %s.", addrs[i].String())
|
||||
logger.V(4).Info("Checking for matching global IP", "address", addrs[i])
|
||||
ip, _, err := netutils.ParseCIDRSloppy(addrs[i].String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if memberOf(ip, family) {
|
||||
if ip.IsGlobalUnicast() {
|
||||
klog.V(4).Infof("IP found %v", ip)
|
||||
logger.V(4).Info("IP found", "IP", ip)
|
||||
return ip, nil
|
||||
} else {
|
||||
klog.V(4).Infof("Non-global unicast address found %v", ip)
|
||||
logger.V(4).Info("Non-global unicast address found", "IP", ip)
|
||||
}
|
||||
} else {
|
||||
klog.V(4).Infof("%v is not an IPv%d address", ip, int(family))
|
||||
logger.V(4).Info("IP address has wrong version", "IP", ip, "IPVersion", int(family))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -244,23 +244,23 @@ func getMatchingGlobalIP(addrs []net.Addr, family AddressFamily) (net.IP, error)
|
|||
|
||||
// getIPFromInterface gets the IPs on an interface and returns a global unicast address, if any. The
|
||||
// interface must be up, the IP must in the family requested, and the IP must be a global unicast address.
|
||||
func getIPFromInterface(intfName string, forFamily AddressFamily, nw networkInterfacer) (net.IP, error) {
|
||||
func getIPFromInterface(logger klog.Logger, intfName string, forFamily AddressFamily, nw networkInterfacer) (net.IP, error) {
|
||||
intf, err := nw.InterfaceByName(intfName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if isInterfaceUp(intf) {
|
||||
if isInterfaceUp(logger, intf) {
|
||||
addrs, err := nw.Addrs(intf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klog.V(4).Infof("Interface %q has %d addresses :%v.", intfName, len(addrs), addrs)
|
||||
matchingIP, err := getMatchingGlobalIP(addrs, forFamily)
|
||||
logger.V(4).Info("Found addresses for interface", "interface", intfName, "numAddresses", len(addrs), "addresses", addrs)
|
||||
matchingIP, err := getMatchingGlobalIP(logger, addrs, forFamily)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if matchingIP != nil {
|
||||
klog.V(4).Infof("Found valid IPv%d address %v for interface %q.", int(forFamily), matchingIP, intfName)
|
||||
logger.V(4).Info("Found valid address", "IPVersion", int(forFamily), "IP", matchingIP, "interface", intfName)
|
||||
return matchingIP, nil
|
||||
}
|
||||
}
|
||||
|
|
@ -269,13 +269,13 @@ func getIPFromInterface(intfName string, forFamily AddressFamily, nw networkInte
|
|||
|
||||
// getIPFromLoopbackInterface gets the IPs on a loopback interface and returns a global unicast address, if any.
|
||||
// The loopback interface must be up, the IP must in the family requested, and the IP must be a global unicast address.
|
||||
func getIPFromLoopbackInterface(forFamily AddressFamily, nw networkInterfacer) (net.IP, error) {
|
||||
func getIPFromLoopbackInterface(logger klog.Logger, forFamily AddressFamily, nw networkInterfacer) (net.IP, error) {
|
||||
intfs, err := nw.Interfaces()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, intf := range intfs {
|
||||
if !isInterfaceUp(&intf) {
|
||||
if !isInterfaceUp(logger, &intf) {
|
||||
continue
|
||||
}
|
||||
if intf.Flags&(net.FlagLoopback) != 0 {
|
||||
|
|
@ -283,13 +283,13 @@ func getIPFromLoopbackInterface(forFamily AddressFamily, nw networkInterfacer) (
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klog.V(4).Infof("Interface %q has %d addresses :%v.", intf.Name, len(addrs), addrs)
|
||||
matchingIP, err := getMatchingGlobalIP(addrs, forFamily)
|
||||
logger.V(4).Info("Found addresses for interface", "interface", intf.Name, "numAddresses", len(addrs), "addresses", addrs)
|
||||
matchingIP, err := getMatchingGlobalIP(logger, addrs, forFamily)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if matchingIP != nil {
|
||||
klog.V(4).Infof("Found valid IPv%d address %v for interface %q.", int(forFamily), matchingIP, intf.Name)
|
||||
logger.V(4).Info("Found valid address", "IPVersion", int(forFamily), "IP", matchingIP, "interface", intf.Name)
|
||||
return matchingIP, nil
|
||||
}
|
||||
}
|
||||
|
|
@ -309,7 +309,7 @@ func memberOf(ip net.IP, family AddressFamily) bool {
|
|||
// chooseIPFromHostInterfaces looks at all system interfaces, trying to find one that is up that
|
||||
// has a global unicast address (non-loopback, non-link local, non-point2point), and returns the IP.
|
||||
// addressFamilies determines whether it prefers IPv4 or IPv6
|
||||
func chooseIPFromHostInterfaces(nw networkInterfacer, addressFamilies AddressFamilyPreference) (net.IP, error) {
|
||||
func chooseIPFromHostInterfaces(logger klog.Logger, nw networkInterfacer, addressFamilies AddressFamilyPreference) (net.IP, error) {
|
||||
intfs, err := nw.Interfaces()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -318,14 +318,14 @@ func chooseIPFromHostInterfaces(nw networkInterfacer, addressFamilies AddressFam
|
|||
return nil, fmt.Errorf("no interfaces found on host.")
|
||||
}
|
||||
for _, family := range addressFamilies {
|
||||
klog.V(4).Infof("Looking for system interface with a global IPv%d address", uint(family))
|
||||
logger.V(4).Info("Looking for system interface with a global address", "IPVersion", uint(family))
|
||||
for _, intf := range intfs {
|
||||
if !isInterfaceUp(&intf) {
|
||||
klog.V(4).Infof("Skipping: down interface %q", intf.Name)
|
||||
if !isInterfaceUp(logger, &intf) {
|
||||
logger.V(4).Info("Skipping: interface is down", "interface", intf.Name)
|
||||
continue
|
||||
}
|
||||
if isLoopbackOrPointToPoint(&intf) {
|
||||
klog.V(4).Infof("Skipping: LB or P2P interface %q", intf.Name)
|
||||
logger.V(4).Info("Skipping: is LB or P2P", "interface", intf.Name)
|
||||
continue
|
||||
}
|
||||
addrs, err := nw.Addrs(&intf)
|
||||
|
|
@ -333,7 +333,7 @@ func chooseIPFromHostInterfaces(nw networkInterfacer, addressFamilies AddressFam
|
|||
return nil, err
|
||||
}
|
||||
if len(addrs) == 0 {
|
||||
klog.V(4).Infof("Skipping: no addresses on interface %q", intf.Name)
|
||||
logger.V(4).Info("Skipping: no addresses", "interface", intf.Name)
|
||||
continue
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
|
|
@ -342,15 +342,15 @@ func chooseIPFromHostInterfaces(nw networkInterfacer, addressFamilies AddressFam
|
|||
return nil, fmt.Errorf("unable to parse CIDR for interface %q: %s", intf.Name, err)
|
||||
}
|
||||
if !memberOf(ip, family) {
|
||||
klog.V(4).Infof("Skipping: no address family match for %q on interface %q.", ip, intf.Name)
|
||||
logger.V(4).Info("Skipping: no address family match", "IP", ip, "interface", intf.Name)
|
||||
continue
|
||||
}
|
||||
// TODO: Decide if should open up to allow IPv6 LLAs in future.
|
||||
if !ip.IsGlobalUnicast() {
|
||||
klog.V(4).Infof("Skipping: non-global address %q on interface %q.", ip, intf.Name)
|
||||
logger.V(4).Info("Skipping: non-global address", "IP", ip, "interface", intf.Name)
|
||||
continue
|
||||
}
|
||||
klog.V(4).Infof("Found global unicast address %q on interface %q.", ip, intf.Name)
|
||||
logger.V(4).Info("Found global unicast address", "IP", ip, "interface", intf.Name)
|
||||
return ip, nil
|
||||
}
|
||||
}
|
||||
|
|
@ -363,20 +363,31 @@ func chooseIPFromHostInterfaces(nw networkInterfacer, addressFamilies AddressFam
|
|||
// interfaces. Otherwise, it will use IPv4 and IPv6 route information to return the
|
||||
// IP of the interface with a gateway on it (with priority given to IPv4). For a node
|
||||
// with no internet connection, it returns error.
|
||||
//
|
||||
//logcheck:context // [ChooseHostInterfaceWithLogger] should be used instead of ChooseHostInterface in code which supports contextual logging.
|
||||
func ChooseHostInterface() (net.IP, error) {
|
||||
return chooseHostInterface(preferIPv4)
|
||||
return ChooseHostInterfaceWithLogger(klog.Background())
|
||||
}
|
||||
|
||||
func chooseHostInterface(addressFamilies AddressFamilyPreference) (net.IP, error) {
|
||||
// ChooseHostInterfaceWithLogger is a method used fetch an IP for a daemon.
|
||||
// If there is no routing info file, it will choose a global IP from the system
|
||||
// interfaces. Otherwise, it will use IPv4 and IPv6 route information to return the
|
||||
// IP of the interface with a gateway on it (with priority given to IPv4). For a node
|
||||
// with no internet connection, it returns error.
|
||||
func ChooseHostInterfaceWithLogger(logger klog.Logger) (net.IP, error) {
|
||||
return chooseHostInterface(logger, preferIPv4)
|
||||
}
|
||||
|
||||
func chooseHostInterface(logger klog.Logger, addressFamilies AddressFamilyPreference) (net.IP, error) {
|
||||
var nw networkInterfacer = networkInterface{}
|
||||
if _, err := os.Stat(ipv4RouteFile); os.IsNotExist(err) {
|
||||
return chooseIPFromHostInterfaces(nw, addressFamilies)
|
||||
return chooseIPFromHostInterfaces(logger, nw, addressFamilies)
|
||||
}
|
||||
routes, err := getAllDefaultRoutes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return chooseHostInterfaceFromRoute(routes, nw, addressFamilies)
|
||||
return chooseHostInterfaceFromRoute(logger, routes, nw, addressFamilies)
|
||||
}
|
||||
|
||||
// networkInterfacer defines an interface for several net library functions. Production
|
||||
|
|
@ -427,36 +438,36 @@ func getAllDefaultRoutes() ([]Route, error) {
|
|||
// global IP address from the interface for the route. If there are routes but no global
|
||||
// address is obtained from the interfaces, it checks if the loopback interface has a global address.
|
||||
// addressFamilies determines whether it prefers IPv4 or IPv6
|
||||
func chooseHostInterfaceFromRoute(routes []Route, nw networkInterfacer, addressFamilies AddressFamilyPreference) (net.IP, error) {
|
||||
func chooseHostInterfaceFromRoute(logger klog.Logger, routes []Route, nw networkInterfacer, addressFamilies AddressFamilyPreference) (net.IP, error) {
|
||||
for _, family := range addressFamilies {
|
||||
klog.V(4).Infof("Looking for default routes with IPv%d addresses", uint(family))
|
||||
logger.V(4).Info("Looking for default routes with IP addresses", "IPVersion", uint(family))
|
||||
for _, route := range routes {
|
||||
if route.Family != family {
|
||||
continue
|
||||
}
|
||||
klog.V(4).Infof("Default route transits interface %q", route.Interface)
|
||||
finalIP, err := getIPFromInterface(route.Interface, family, nw)
|
||||
logger.V(4).Info("Default route transits interface", "interface", route.Interface)
|
||||
finalIP, err := getIPFromInterface(logger, route.Interface, family, nw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if finalIP != nil {
|
||||
klog.V(4).Infof("Found active IP %v ", finalIP)
|
||||
logger.V(4).Info("Found active IP", "IP", finalIP)
|
||||
return finalIP, nil
|
||||
}
|
||||
// In case of network setups where default routes are present, but network
|
||||
// interfaces use only link-local addresses (e.g. as described in RFC5549).
|
||||
// the global IP is assigned to the loopback interface, and we should use it
|
||||
loopbackIP, err := getIPFromLoopbackInterface(family, nw)
|
||||
loopbackIP, err := getIPFromLoopbackInterface(logger, family, nw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if loopbackIP != nil {
|
||||
klog.V(4).Infof("Found active IP %v on Loopback interface", loopbackIP)
|
||||
logger.V(4).Info("Found active IP on Loopback interface", "IP", loopbackIP)
|
||||
return loopbackIP, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
klog.V(4).Infof("No active IP found by looking at default routes")
|
||||
logger.V(4).Info("No active IP found by looking at default routes")
|
||||
return nil, fmt.Errorf("unable to select an IP from default routes.")
|
||||
}
|
||||
|
||||
|
|
@ -465,14 +476,25 @@ func chooseHostInterfaceFromRoute(routes []Route, nw networkInterfacer, addressF
|
|||
// If bindAddress is unspecified or loopback, it returns the default IP of the same
|
||||
// address family as bindAddress.
|
||||
// Otherwise, it just returns bindAddress.
|
||||
//
|
||||
//logcheck:context // [ResolveBindAddressWithLogger] should be used instead of ResolveBindAddress in code which supports contextual logging.
|
||||
func ResolveBindAddress(bindAddress net.IP) (net.IP, error) {
|
||||
return ResolveBindAddressWithLogger(klog.Background(), bindAddress)
|
||||
}
|
||||
|
||||
// ResolveBindAddressWithLogger returns the IP address of a daemon, based on the given bindAddress:
|
||||
// If bindAddress is unset, it returns the host's default IP, as with ChooseHostInterface().
|
||||
// If bindAddress is unspecified or loopback, it returns the default IP of the same
|
||||
// address family as bindAddress.
|
||||
// Otherwise, it just returns bindAddress.
|
||||
func ResolveBindAddressWithLogger(logger klog.Logger, bindAddress net.IP) (net.IP, error) {
|
||||
addressFamilies := preferIPv4
|
||||
if bindAddress != nil && memberOf(bindAddress, familyIPv6) {
|
||||
addressFamilies = preferIPv6
|
||||
}
|
||||
|
||||
if bindAddress == nil || bindAddress.IsUnspecified() || bindAddress.IsLoopback() {
|
||||
hostIP, err := chooseHostInterface(addressFamilies)
|
||||
hostIP, err := chooseHostInterface(logger, addressFamilies)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -485,10 +507,20 @@ func ResolveBindAddress(bindAddress net.IP) (net.IP, error) {
|
|||
// This is required in case of network setups where default routes are present, but network
|
||||
// interfaces use only link-local addresses (e.g. as described in RFC5549).
|
||||
// e.g when using BGP to announce a host IP over link-local ip addresses and this ip address is attached to the lo interface.
|
||||
//
|
||||
//logcheck:context // [ChooseBindAddressForInterfaceWithLogger] should be used instead of ChooseBindAddressForInterface in code which supports contextual logging.
|
||||
func ChooseBindAddressForInterface(intfName string) (net.IP, error) {
|
||||
return ChooseBindAddressForInterfaceWithLogger(klog.Background(), intfName)
|
||||
}
|
||||
|
||||
// ChooseBindAddressForInterfaceWithLogger choose a global IP for a specific interface, with priority given to IPv4.
|
||||
// This is required in case of network setups where default routes are present, but network
|
||||
// interfaces use only link-local addresses (e.g. as described in RFC5549).
|
||||
// e.g when using BGP to announce a host IP over link-local ip addresses and this ip address is attached to the lo interface.
|
||||
func ChooseBindAddressForInterfaceWithLogger(logger klog.Logger, intfName string) (net.IP, error) {
|
||||
var nw networkInterfacer = networkInterface{}
|
||||
for _, family := range preferIPv4 {
|
||||
ip, err := getIPFromInterface(intfName, family, nw)
|
||||
ip, err := getIPFromInterface(logger, intfName, family, nw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
|
|
@ -242,6 +243,7 @@ func TestParseIP(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIsInterfaceUp(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testCases := []struct {
|
||||
tcase string
|
||||
intf *net.Interface
|
||||
|
|
@ -252,7 +254,7 @@ func TestIsInterfaceUp(t *testing.T) {
|
|||
{"no interface", nil, false},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
it := isInterfaceUp(tc.intf)
|
||||
it := isInterfaceUp(logger, tc.intf)
|
||||
if it != tc.expected {
|
||||
t.Errorf("case[%v]: expected %v, got %v .", tc.tcase, tc.expected, it)
|
||||
}
|
||||
|
|
@ -269,6 +271,7 @@ func (a addrStruct) String() string {
|
|||
}
|
||||
|
||||
func TestFinalIP(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testCases := []struct {
|
||||
tcase string
|
||||
addr []net.Addr
|
||||
|
|
@ -289,7 +292,7 @@ func TestFinalIP(t *testing.T) {
|
|||
{"no addresses", []net.Addr{}, familyIPv4, nil},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
ip, err := getMatchingGlobalIP(tc.addr, tc.family)
|
||||
ip, err := getMatchingGlobalIP(logger, tc.addr, tc.family)
|
||||
if !ip.Equal(tc.expected) {
|
||||
t.Errorf("case[%v]: expected %v, got %v .err : %v", tc.tcase, tc.expected, ip, err)
|
||||
}
|
||||
|
|
@ -549,6 +552,7 @@ func (_ networkInterfaceWithInvalidAddr) Interfaces() ([]net.Interface, error) {
|
|||
}
|
||||
|
||||
func TestGetIPFromInterface(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testCases := []struct {
|
||||
tcase string
|
||||
nwname string
|
||||
|
|
@ -567,7 +571,7 @@ func TestGetIPFromInterface(t *testing.T) {
|
|||
{"bad addr", "eth3", familyIPv4, networkInterfaceWithInvalidAddr{}, nil, "invalid CIDR"},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
ip, err := getIPFromInterface(tc.nwname, tc.family, tc.nw)
|
||||
ip, err := getIPFromInterface(logger, tc.nwname, tc.family, tc.nw)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), tc.errStrFrag) {
|
||||
t.Errorf("case[%s]: Error string %q does not contain %q", tc.tcase, err, tc.errStrFrag)
|
||||
|
|
@ -581,6 +585,7 @@ func TestGetIPFromInterface(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetIPFromLoopbackInterface(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testCases := []struct {
|
||||
tcase string
|
||||
family AddressFamily
|
||||
|
|
@ -594,7 +599,7 @@ func TestGetIPFromLoopbackInterface(t *testing.T) {
|
|||
{"no global ipv6", familyIPv6, loopbackNetworkInterface{}, nil, ""},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
ip, err := getIPFromLoopbackInterface(tc.family, tc.nw)
|
||||
ip, err := getIPFromLoopbackInterface(logger, tc.family, tc.nw)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), tc.errStrFrag) {
|
||||
t.Errorf("case[%s]: Error string %q does not contain %q", tc.tcase, err, tc.errStrFrag)
|
||||
|
|
@ -608,6 +613,7 @@ func TestGetIPFromLoopbackInterface(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestChooseHostInterfaceFromRoute(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testCases := []struct {
|
||||
tcase string
|
||||
routes []Route
|
||||
|
|
@ -636,7 +642,7 @@ func TestChooseHostInterfaceFromRoute(t *testing.T) {
|
|||
{"fail get IP", routeV4, networkInterfaceFailGetAddrs{}, preferIPv4, nil},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
ip, err := chooseHostInterfaceFromRoute(tc.routes, tc.nw, tc.order)
|
||||
ip, err := chooseHostInterfaceFromRoute(logger, tc.routes, tc.nw, tc.order)
|
||||
if !ip.Equal(tc.expected) {
|
||||
t.Errorf("case[%v]: expected %v, got %+v .err : %v", tc.tcase, tc.expected, ip, err)
|
||||
}
|
||||
|
|
@ -663,6 +669,7 @@ func TestMemberOf(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetIPFromHostInterfaces(t *testing.T) {
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
testCases := []struct {
|
||||
tcase string
|
||||
nw networkInterfacer
|
||||
|
|
@ -688,7 +695,7 @@ func TestGetIPFromHostInterfaces(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
ip, err := chooseIPFromHostInterfaces(tc.nw, tc.order)
|
||||
ip, err := chooseIPFromHostInterfaces(logger, tc.nw, tc.order)
|
||||
if !ip.Equal(tc.expected) {
|
||||
t.Errorf("case[%s]: expected %+v, got %+v with err : %v", tc.tcase, tc.expected, ip, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ func DialURL(ctx context.Context, url *url.URL, transport http.RoundTripper) (ne
|
|||
|
||||
dialer, err := utilnet.DialerFor(transport)
|
||||
if err != nil {
|
||||
klog.V(5).Infof("Unable to unwrap transport %T to get dialer: %v", transport, err)
|
||||
klog.FromContext(ctx).V(5).Info("Unable to unwrap transport to get dialer", "type", fmt.Sprintf("%T", transport), "err", err)
|
||||
}
|
||||
|
||||
switch url.Scheme {
|
||||
|
|
@ -53,7 +53,7 @@ func DialURL(ctx context.Context, url *url.URL, transport http.RoundTripper) (ne
|
|||
// Get the tls config from the transport if we recognize it
|
||||
tlsConfig, err := utilnet.TLSClientConfig(transport)
|
||||
if err != nil {
|
||||
klog.V(5).Infof("Unable to unwrap transport %T to get at TLS config: %v", transport, err)
|
||||
klog.FromContext(ctx).V(5).Info("Unable to unwrap transport to get at TLS config", "type", fmt.Sprintf("%T", transport), "err", err)
|
||||
}
|
||||
|
||||
if dialer != nil {
|
||||
|
|
@ -65,7 +65,7 @@ func DialURL(ctx context.Context, url *url.URL, transport http.RoundTripper) (ne
|
|||
}
|
||||
if tlsConfig == nil {
|
||||
// tls.Client requires non-nil config
|
||||
klog.Warning("using custom dialer with no TLSClientConfig. Defaulting to InsecureSkipVerify")
|
||||
klog.FromContext(ctx).Info("Warning: using custom dialer with no TLSClientConfig, defaulting to InsecureSkipVerify")
|
||||
// tls.Handshake() requires ServerName or InsecureSkipVerify
|
||||
tlsConfig = &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
|
|
|
|||
|
|
@ -249,7 +249,7 @@ func (t *Transport) rewriteResponse(req *http.Request, resp *http.Response) (*ht
|
|||
// This is fine
|
||||
default:
|
||||
// Some encoding we don't understand-- don't try to parse this
|
||||
klog.Errorf("Proxy encountered encoding %v for text/html; can't understand this so not fixing links.", encoding)
|
||||
klog.FromContext(req.Context()).Error(nil, "Proxy encountered unknown encoding for text/html, can't understand this so not fixing links", "encoding", encoding)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
|
|
@ -258,7 +258,7 @@ func (t *Transport) rewriteResponse(req *http.Request, resp *http.Response) (*ht
|
|||
}
|
||||
err := rewriteHTML(reader, writer, urlRewriter)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to rewrite URLs: %v", err)
|
||||
klog.FromContext(req.Context()).Error(err, "Failed to rewrite URLs")
|
||||
return resp, err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -307,8 +307,9 @@ func (noSuppressPanicError) Write(p []byte) (n int, err error) {
|
|||
|
||||
// tryUpgrade returns true if the request was handled.
|
||||
func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool {
|
||||
logger := klog.FromContext(req.Context())
|
||||
if !httpstream.IsUpgradeRequest(req) {
|
||||
klog.V(6).Infof("Request was not an upgrade")
|
||||
logger.V(6).Info("Request was not an upgrade")
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
@ -332,15 +333,15 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
|||
// Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy
|
||||
// handles this in the non-upgrade path.
|
||||
utilnet.AppendForwardedForHeader(clone)
|
||||
klog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n Headers: %v", &location, clone.Header)
|
||||
logger.V(6).Info("Connecting to backend proxy (direct dial)", "location", &location, "headers", clone.Header)
|
||||
if h.UseLocationHost {
|
||||
clone.Host = h.Location.Host
|
||||
}
|
||||
clone.URL = &location
|
||||
klog.V(6).Infof("UpgradeAwareProxy: dialing for SPDY upgrade with headers: %v", clone.Header)
|
||||
logger.V(6).Info("UpgradeAwareProxy: dialing for SPDY upgrade with headers", "headers", clone.Header)
|
||||
backendConn, err = h.DialForUpgrade(clone)
|
||||
if err != nil {
|
||||
klog.V(6).Infof("Proxy connection error: %v", err)
|
||||
logger.V(6).Info("Proxy connection error", "err", err)
|
||||
h.Responder.Error(w, req, err)
|
||||
return true
|
||||
}
|
||||
|
|
@ -349,7 +350,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
|||
// determine the http response code from the backend by reading from rawResponse+backendConn
|
||||
backendHTTPResponse, headerBytes, err := getResponse(io.MultiReader(bytes.NewReader(rawResponse), backendConn))
|
||||
if err != nil {
|
||||
klog.V(6).Infof("Proxy connection error: %v", err)
|
||||
logger.V(6).Info("Proxy connection error", "err", err)
|
||||
h.Responder.Error(w, req, err)
|
||||
return true
|
||||
}
|
||||
|
|
@ -363,7 +364,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
|||
// return a generic error here.
|
||||
if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols && backendHTTPResponse.StatusCode < 400 {
|
||||
err := fmt.Errorf("invalid upgrade response: status code %d", backendHTTPResponse.StatusCode)
|
||||
klog.Errorf("Proxy upgrade error: %v", err)
|
||||
logger.Error(err, "Proxy upgrade error")
|
||||
h.Responder.Error(w, req, err)
|
||||
return true
|
||||
}
|
||||
|
|
@ -372,13 +373,13 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
|||
// hijacking should be the last step in the upgrade.
|
||||
requestHijacker, ok := w.(http.Hijacker)
|
||||
if !ok {
|
||||
klog.Errorf("Unable to hijack response writer: %T", w)
|
||||
logger.Error(nil, "Unable to hijack response writer", "type", fmt.Sprintf("%T", w))
|
||||
h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w))
|
||||
return true
|
||||
}
|
||||
requestHijackedConn, _, err := requestHijacker.Hijack()
|
||||
if err != nil {
|
||||
klog.Errorf("Unable to hijack response: %v", err)
|
||||
logger.Error(err, "Unable to hijack response")
|
||||
h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err))
|
||||
return true
|
||||
}
|
||||
|
|
@ -386,7 +387,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
|||
|
||||
if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols {
|
||||
// If the backend did not upgrade the request, echo the response from the backend to the client and return, closing the connection.
|
||||
klog.V(6).Infof("Proxy upgrade error, status code %d", backendHTTPResponse.StatusCode)
|
||||
logger.V(6).Info("Proxy upgrade error", "statusCode", backendHTTPResponse.StatusCode)
|
||||
// set read/write deadlines
|
||||
deadline := time.Now().Add(10 * time.Second)
|
||||
backendConn.SetReadDeadline(deadline)
|
||||
|
|
@ -394,7 +395,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
|||
// write the response to the client
|
||||
err := backendHTTPResponse.Write(requestHijackedConn)
|
||||
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
klog.Errorf("Error proxying data from backend to client: %v", err)
|
||||
logger.Error(err, "Error proxying data from backend to client")
|
||||
}
|
||||
// Indicate we handled the request
|
||||
return true
|
||||
|
|
@ -402,9 +403,9 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
|||
|
||||
// Forward raw response bytes back to client.
|
||||
if len(rawResponse) > 0 {
|
||||
klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse))
|
||||
logger.V(6).Info("Writing to hijacked connection", "length", len(rawResponse))
|
||||
if _, err = requestHijackedConn.Write(rawResponse); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
|
||||
utilruntime.HandleErrorWithLogger(logger, err, "Error proxying response from backend to client")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -424,7 +425,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
|||
}
|
||||
_, err := io.Copy(writer, requestHijackedConn)
|
||||
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
klog.Errorf("Error proxying data from client to backend: %v", err)
|
||||
logger.Error(err, "Error proxying data from client to backend")
|
||||
}
|
||||
close(writerComplete)
|
||||
}()
|
||||
|
|
@ -438,7 +439,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
|||
}
|
||||
_, err := io.Copy(requestHijackedConn, reader)
|
||||
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
klog.Errorf("Error proxying data from backend to client: %v", err)
|
||||
logger.Error(err, "Error proxying data from backend to client")
|
||||
}
|
||||
close(readerComplete)
|
||||
}()
|
||||
|
|
@ -449,7 +450,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
|
|||
case <-writerComplete:
|
||||
case <-readerComplete:
|
||||
}
|
||||
klog.V(6).Infof("Disconnecting from backend proxy %s\n Headers: %v", &location, clone.Header)
|
||||
logger.V(6).Info("Disconnecting from backend proxy", "location", &location, "headers", clone.Header)
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue