diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go index d4ceab84f06..1b91f30516d 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go @@ -169,6 +169,7 @@ func (c *connection) newSpdyStream(stream *spdystream.Stream) { err := c.newStreamHandler(stream, replySent) rejectStream := (err != nil) if rejectStream { + //nolint:logcheck // Hopefully this never gets triggered. klog.Warningf("Stream rejected: %v", err) stream.Reset() return @@ -195,6 +196,7 @@ func (c *connection) sendPings(period time.Duration) { case <-t.C: } if _, err := c.ping(); err != nil { + //nolint:logcheck // Hopefully this never gets triggered. klog.V(3).Infof("SPDY Ping failed: %v", err) // Continue, in case this is a transient failure. // c.conn.CloseChan above will tell us when the connection is diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go index d30ae2fa3dc..c15e7afcc43 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go @@ -105,14 +105,14 @@ func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Reque conn, bufrw, err := hijacker.Hijack() if err != nil { - runtime.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err)) + runtime.HandleErrorWithContext(req.Context(), err, "Unable to upgrade: error hijacking response") return nil } connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader} spdyConn, err := NewServerConnectionWithPings(connWithBuf, newStreamHandler, u.pingPeriod) if err != nil { - runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err)) + runtime.HandleErrorWithContext(req.Context(), err, "Unable to upgrade: error creating SPDY server connection") return nil } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go index 2e477fee2ae..93c9040ab96 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go @@ -126,8 +126,16 @@ func IsWebSocketRequestWithTunnelingProtocol(req *http.Request) bool { // IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the // read and write deadlines are pushed every time a new message is received. +// +// Contextual logging: IgnoreReceivesWithLogger should be used instead of IgnoreReceives in code which uses contextual logging. func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) { - defer runtime.HandleCrash() + IgnoreReceivesWithLogger(klog.Background(), ws, timeout) +} + +// IgnoreReceivesWithLogger reads from a WebSocket until it is closed, then returns. If timeout is set, the +// read and write deadlines are pushed every time a new message is received. +func IgnoreReceivesWithLogger(logger klog.Logger, ws *websocket.Conn, timeout time.Duration) { + defer runtime.HandleCrashWithLogger(logger) var data []byte for { resetTimeout(ws, timeout) @@ -236,7 +244,7 @@ func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.R // "conn.ready" and then blocks until serving is complete. select { case <-conn.ready: - klog.V(8).Infof("websocket server initialized--serving") + klog.FromContext(req.Context()).V(8).Info("websocket server initialized--serving") case <-serveHTTPComplete: // websocket server returned before completing initialization; cleanup and return error. conn.closeNonThreadSafe() //nolint:errcheck @@ -330,13 +338,19 @@ func (conn *Conn) handle(ws *websocket.Conn) { conn.initialize(ws) defer conn.Close() supportsStreamClose := protocolSupportsStreamClose(conn.selectedProtocol) + // conn.handle is typically used on the server-side and thus we have a request, + // but don't assume that and use klog.Background as fallback. + logger := klog.Background() + if req := ws.Request(); req != nil { + logger = klog.FromContext(req.Context()) + } for { conn.resetTimeout() var data []byte if err := websocket.Message.Receive(ws, &data); err != nil { if err != io.EOF { - klog.Errorf("Error on socket receive: %v", err) + logger.Error(err, "Error on socket receive") } break } @@ -345,15 +359,15 @@ func (conn *Conn) handle(ws *websocket.Conn) { } if supportsStreamClose && data[0] == remotecommand.StreamClose { if len(data) != 2 { - klog.Errorf("Single channel byte should follow stream close signal. Got %d bytes", len(data)-1) + logger.Error(nil, "Single channel byte should follow stream close signal", "receivedLength", len(data)-1) break } else { channel := data[1] if int(channel) >= len(conn.channels) { - klog.Errorf("Close is targeted for a channel %d that is not valid, possible protocol error", channel) + logger.Error(nil, "Close is targeted for a channel that is not valid, possible protocol error", "channel", channel) break } - klog.V(4).Infof("Received half-close signal from client; close %d stream", channel) + logger.V(4).Info("Received half-close signal from client, close stream", "channel", channel) conn.channels[channel].Close() // After first Close, other closes are noop. } continue @@ -364,11 +378,11 @@ func (conn *Conn) handle(ws *websocket.Conn) { } data = data[1:] if int(channel) >= len(conn.channels) { - klog.V(6).Infof("Frame is targeted for a reader %d that is not valid, possible protocol error", channel) + logger.V(6).Info("Frame is targeted for a reader that is not valid, possible protocol error", "channel", channel) continue } if _, err := conn.channels[channel].DataFromSocket(data); err != nil { - klog.Errorf("Unable to write frame (%d bytes) to %d: %v", len(data), channel, err) + logger.Error(err, "Unable to write frame", "sendLength", len(data), "channel", channel, "err", err) continue } } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/stream.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/stream.go index ba7e6a519af..1e8135e1a70 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/stream.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/stream.go @@ -17,6 +17,7 @@ limitations under the License. package wsstream import ( + "context" "encoding/base64" "io" "net/http" @@ -26,6 +27,7 @@ import ( "golang.org/x/net/websocket" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" ) // The WebSocket subprotocol "binary.k8s.io" will only send messages to the @@ -56,6 +58,7 @@ func NewDefaultReaderProtocols() map[string]ReaderProtocolConfig { // Reader supports returning an arbitrary byte stream over a websocket channel. type Reader struct { + logger klog.Logger err chan error r io.Reader ping bool @@ -63,7 +66,7 @@ type Reader struct { protocols map[string]ReaderProtocolConfig selectedProtocol string - handleCrash func(additionalHandlers ...func(interface{})) // overridable for testing + handleCrash func(ctx context.Context, additionalHandlers ...func(context.Context, interface{})) // overridable for testing } // NewReader creates a WebSocket pipe that will copy the contents of r to a provided @@ -72,13 +75,26 @@ type Reader struct { // // The protocols parameter maps subprotocol names to StreamProtocols. The empty string // subprotocol name is used if websocket.Config.Protocol is empty. +// +//logcheck:context // NewReaderWithLogger should be used instead of NewReader in code which supports contextual logging. func NewReader(r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader { + return NewReaderWithLogger(klog.Background(), r, ping, protocols) +} + +// NewReaderWithLogger creates a WebSocket pipe that will copy the contents of r to a provided +// WebSocket connection. If ping is true, a zero length message will be sent to the client +// before the stream begins reading. +// +// The protocols parameter maps subprotocol names to StreamProtocols. The empty string +// subprotocol name is used if websocket.Config.Protocol is empty. +func NewReaderWithLogger(logger klog.Logger, r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader { return &Reader{ + logger: logger, r: r, err: make(chan error), ping: ping, protocols: protocols, - handleCrash: runtime.HandleCrash, + handleCrash: runtime.HandleCrashWithContext, } } @@ -100,7 +116,7 @@ func (r *Reader) handshake(config *websocket.Config, req *http.Request) error { // method completes. func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error { go func() { - defer r.handleCrash() + defer r.handleCrash(req.Context()) websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req) }() return <-r.err @@ -122,10 +138,10 @@ func (r *Reader) handle(ws *websocket.Conn) { defer closeConn() go func() { - defer runtime.HandleCrash() + defer runtime.HandleCrashWithLogger(r.logger) // This blocks until the connection is closed. // Client should not send anything. - IgnoreReceives(ws, r.timeout) + IgnoreReceivesWithLogger(r.logger, ws, r.timeout) // Once the client closes, we should also close closeConn() }() diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/stream_test.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/stream_test.go index 226bc3f210f..c7ea1aad707 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/stream_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/stream_test.go @@ -18,6 +18,7 @@ package wsstream import ( "bytes" + "context" "encoding/base64" "fmt" "io" @@ -32,6 +33,7 @@ import ( func TestStream(t *testing.T) { input := "some random text" + //nolint:logcheck // Intentionally uses the old API. r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols()) r.SetIdleTimeout(time.Second) data, err := readWebSocket(r, t, nil) @@ -45,6 +47,7 @@ func TestStream(t *testing.T) { func TestStreamPing(t *testing.T) { input := "some random text" + //nolint:logcheck // Intentionally uses the old API. r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols()) r.SetIdleTimeout(time.Second) err := expectWebSocketFrames(r, t, nil, [][]byte{ @@ -59,6 +62,7 @@ func TestStreamPing(t *testing.T) { func TestStreamBase64(t *testing.T) { input := "some random text" encoded := base64.StdEncoding.EncodeToString([]byte(input)) + //nolint:logcheck // Intentionally uses the old API. r := NewReader(bytes.NewBuffer([]byte(input)), true, NewDefaultReaderProtocols()) data, err := readWebSocket(r, t, nil, "base64.binary.k8s.io") if !reflect.DeepEqual(data, []byte(encoded)) { @@ -72,6 +76,7 @@ func TestStreamBase64(t *testing.T) { func TestStreamVersionedBase64(t *testing.T) { input := "some random text" encoded := base64.StdEncoding.EncodeToString([]byte(input)) + //nolint:logcheck // Intentionally uses the old API. r := NewReader(bytes.NewBuffer([]byte(input)), true, map[string]ReaderProtocolConfig{ "": {Binary: true}, "binary.k8s.io": {Binary: true}, @@ -100,6 +105,7 @@ func TestStreamVersionedCopy(t *testing.T) { } } input := "some random text" + //nolint:logcheck // Intentionally uses the old API. r := NewReader(bytes.NewBuffer([]byte(input)), true, supportedProtocols) s, addr := newServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { err := r.Copy(w, req) @@ -145,6 +151,7 @@ func TestStreamError(t *testing.T) { }, err: fmt.Errorf("bad read"), } + //nolint:logcheck // Intentionally uses the old API. r := NewReader(errs, false, NewDefaultReaderProtocols()) data, err := readWebSocket(r, t, nil) @@ -165,10 +172,11 @@ func TestStreamSurvivesPanic(t *testing.T) { }, panicMessage: "bad read", } + //nolint:logcheck // Intentionally uses the old API. r := NewReader(errs, false, NewDefaultReaderProtocols()) // do not call runtime.HandleCrash() in handler. Otherwise, the tests are interrupted. - r.handleCrash = func(additionalHandlers ...func(interface{})) { recover() } + r.handleCrash = func(_ context.Context, additionalHandlers ...func(context.Context, interface{})) { recover() } data, err := readWebSocket(r, t, nil) if !reflect.DeepEqual(data, []byte(input)) { @@ -191,6 +199,7 @@ func TestStreamClosedDuringRead(t *testing.T) { err: fmt.Errorf("stuff"), pause: ch, } + //nolint:logcheck // Intentionally uses the old API. r := NewReader(errs, false, NewDefaultReaderProtocols()) data, err := readWebSocket(r, t, func(c *websocket.Conn) { diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go index 8cc1810af13..8912804c50e 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go @@ -132,9 +132,11 @@ func SetTransportDefaults(t *http.Transport) *http.Transport { t = SetOldTransportDefaults(t) // Allow clients to disable http2 if needed. if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 { + //nolint:logcheck // Should be rare, not worth converting. klog.Info("HTTP2 has been explicitly disabled") } else if allowsHTTP2(t) { if err := configureHTTP2Transport(t); err != nil { + //nolint:logcheck // Should be rare, not worth converting. klog.Warningf("Transport failed http2 configuration: %v", err) } } @@ -148,6 +150,7 @@ func readIdleTimeoutSeconds() int { if s := os.Getenv("HTTP2_READ_IDLE_TIMEOUT_SECONDS"); len(s) > 0 { i, err := strconv.Atoi(s) if err != nil { + //nolint:logcheck // Should be rare, not worth converting. klog.Warningf("Illegal HTTP2_READ_IDLE_TIMEOUT_SECONDS(%q): %v."+ " Default value %d is used", s, err, ret) return ret @@ -162,6 +165,7 @@ func pingTimeoutSeconds() int { if s := os.Getenv("HTTP2_PING_TIMEOUT_SECONDS"); len(s) > 0 { i, err := strconv.Atoi(s) if err != nil { + //nolint:logcheck // Should be rare, not worth converting. klog.Warningf("Illegal HTTP2_PING_TIMEOUT_SECONDS(%q): %v."+ " Default value %d is used", s, err, ret) return ret @@ -256,6 +260,7 @@ func CloseIdleConnectionsFor(transport http.RoundTripper) { case RoundTripperWrapper: CloseIdleConnectionsFor(transport.WrappedRoundTripper()) default: + //nolint:logcheck // Should be rare, not worth converting. klog.Warningf("unknown transport type: %T", transport) } } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/interface.go b/staging/src/k8s.io/apimachinery/pkg/util/net/interface.go index 01d028e727d..3ccf227af02 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/interface.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/interface.go @@ -201,12 +201,12 @@ func parseIP(str string, family AddressFamily) (net.IP, error) { return net.IP(bytes), nil } -func isInterfaceUp(intf *net.Interface) bool { +func isInterfaceUp(logger klog.Logger, intf *net.Interface) bool { if intf == nil { return false } if intf.Flags&net.FlagUp != 0 { - klog.V(4).Infof("Interface %v is up", intf.Name) + logger.V(4).Info("Interface is up", "interface", intf.Name) return true } return false @@ -218,23 +218,23 @@ func isLoopbackOrPointToPoint(intf *net.Interface) bool { // getMatchingGlobalIP returns the first valid global unicast address of the given // 'family' from the list of 'addrs'. -func getMatchingGlobalIP(addrs []net.Addr, family AddressFamily) (net.IP, error) { +func getMatchingGlobalIP(logger klog.Logger, addrs []net.Addr, family AddressFamily) (net.IP, error) { if len(addrs) > 0 { for i := range addrs { - klog.V(4).Infof("Checking addr %s.", addrs[i].String()) + logger.V(4).Info("Checking for matching global IP", "address", addrs[i]) ip, _, err := netutils.ParseCIDRSloppy(addrs[i].String()) if err != nil { return nil, err } if memberOf(ip, family) { if ip.IsGlobalUnicast() { - klog.V(4).Infof("IP found %v", ip) + logger.V(4).Info("IP found", "IP", ip) return ip, nil } else { - klog.V(4).Infof("Non-global unicast address found %v", ip) + logger.V(4).Info("Non-global unicast address found", "IP", ip) } } else { - klog.V(4).Infof("%v is not an IPv%d address", ip, int(family)) + logger.V(4).Info("IP address has wrong version", "IP", ip, "IPVersion", int(family)) } } @@ -244,23 +244,23 @@ func getMatchingGlobalIP(addrs []net.Addr, family AddressFamily) (net.IP, error) // getIPFromInterface gets the IPs on an interface and returns a global unicast address, if any. The // interface must be up, the IP must in the family requested, and the IP must be a global unicast address. -func getIPFromInterface(intfName string, forFamily AddressFamily, nw networkInterfacer) (net.IP, error) { +func getIPFromInterface(logger klog.Logger, intfName string, forFamily AddressFamily, nw networkInterfacer) (net.IP, error) { intf, err := nw.InterfaceByName(intfName) if err != nil { return nil, err } - if isInterfaceUp(intf) { + if isInterfaceUp(logger, intf) { addrs, err := nw.Addrs(intf) if err != nil { return nil, err } - klog.V(4).Infof("Interface %q has %d addresses :%v.", intfName, len(addrs), addrs) - matchingIP, err := getMatchingGlobalIP(addrs, forFamily) + logger.V(4).Info("Found addresses for interface", "interface", intfName, "numAddresses", len(addrs), "addresses", addrs) + matchingIP, err := getMatchingGlobalIP(logger, addrs, forFamily) if err != nil { return nil, err } if matchingIP != nil { - klog.V(4).Infof("Found valid IPv%d address %v for interface %q.", int(forFamily), matchingIP, intfName) + logger.V(4).Info("Found valid address", "IPVersion", int(forFamily), "IP", matchingIP, "interface", intfName) return matchingIP, nil } } @@ -269,13 +269,13 @@ func getIPFromInterface(intfName string, forFamily AddressFamily, nw networkInte // getIPFromLoopbackInterface gets the IPs on a loopback interface and returns a global unicast address, if any. // The loopback interface must be up, the IP must in the family requested, and the IP must be a global unicast address. -func getIPFromLoopbackInterface(forFamily AddressFamily, nw networkInterfacer) (net.IP, error) { +func getIPFromLoopbackInterface(logger klog.Logger, forFamily AddressFamily, nw networkInterfacer) (net.IP, error) { intfs, err := nw.Interfaces() if err != nil { return nil, err } for _, intf := range intfs { - if !isInterfaceUp(&intf) { + if !isInterfaceUp(logger, &intf) { continue } if intf.Flags&(net.FlagLoopback) != 0 { @@ -283,13 +283,13 @@ func getIPFromLoopbackInterface(forFamily AddressFamily, nw networkInterfacer) ( if err != nil { return nil, err } - klog.V(4).Infof("Interface %q has %d addresses :%v.", intf.Name, len(addrs), addrs) - matchingIP, err := getMatchingGlobalIP(addrs, forFamily) + logger.V(4).Info("Found addresses for interface", "interface", intf.Name, "numAddresses", len(addrs), "addresses", addrs) + matchingIP, err := getMatchingGlobalIP(logger, addrs, forFamily) if err != nil { return nil, err } if matchingIP != nil { - klog.V(4).Infof("Found valid IPv%d address %v for interface %q.", int(forFamily), matchingIP, intf.Name) + logger.V(4).Info("Found valid address", "IPVersion", int(forFamily), "IP", matchingIP, "interface", intf.Name) return matchingIP, nil } } @@ -309,7 +309,7 @@ func memberOf(ip net.IP, family AddressFamily) bool { // chooseIPFromHostInterfaces looks at all system interfaces, trying to find one that is up that // has a global unicast address (non-loopback, non-link local, non-point2point), and returns the IP. // addressFamilies determines whether it prefers IPv4 or IPv6 -func chooseIPFromHostInterfaces(nw networkInterfacer, addressFamilies AddressFamilyPreference) (net.IP, error) { +func chooseIPFromHostInterfaces(logger klog.Logger, nw networkInterfacer, addressFamilies AddressFamilyPreference) (net.IP, error) { intfs, err := nw.Interfaces() if err != nil { return nil, err @@ -318,14 +318,14 @@ func chooseIPFromHostInterfaces(nw networkInterfacer, addressFamilies AddressFam return nil, fmt.Errorf("no interfaces found on host.") } for _, family := range addressFamilies { - klog.V(4).Infof("Looking for system interface with a global IPv%d address", uint(family)) + logger.V(4).Info("Looking for system interface with a global address", "IPVersion", uint(family)) for _, intf := range intfs { - if !isInterfaceUp(&intf) { - klog.V(4).Infof("Skipping: down interface %q", intf.Name) + if !isInterfaceUp(logger, &intf) { + logger.V(4).Info("Skipping: interface is down", "interface", intf.Name) continue } if isLoopbackOrPointToPoint(&intf) { - klog.V(4).Infof("Skipping: LB or P2P interface %q", intf.Name) + logger.V(4).Info("Skipping: is LB or P2P", "interface", intf.Name) continue } addrs, err := nw.Addrs(&intf) @@ -333,7 +333,7 @@ func chooseIPFromHostInterfaces(nw networkInterfacer, addressFamilies AddressFam return nil, err } if len(addrs) == 0 { - klog.V(4).Infof("Skipping: no addresses on interface %q", intf.Name) + logger.V(4).Info("Skipping: no addresses", "interface", intf.Name) continue } for _, addr := range addrs { @@ -342,15 +342,15 @@ func chooseIPFromHostInterfaces(nw networkInterfacer, addressFamilies AddressFam return nil, fmt.Errorf("unable to parse CIDR for interface %q: %s", intf.Name, err) } if !memberOf(ip, family) { - klog.V(4).Infof("Skipping: no address family match for %q on interface %q.", ip, intf.Name) + logger.V(4).Info("Skipping: no address family match", "IP", ip, "interface", intf.Name) continue } // TODO: Decide if should open up to allow IPv6 LLAs in future. if !ip.IsGlobalUnicast() { - klog.V(4).Infof("Skipping: non-global address %q on interface %q.", ip, intf.Name) + logger.V(4).Info("Skipping: non-global address", "IP", ip, "interface", intf.Name) continue } - klog.V(4).Infof("Found global unicast address %q on interface %q.", ip, intf.Name) + logger.V(4).Info("Found global unicast address", "IP", ip, "interface", intf.Name) return ip, nil } } @@ -363,20 +363,31 @@ func chooseIPFromHostInterfaces(nw networkInterfacer, addressFamilies AddressFam // interfaces. Otherwise, it will use IPv4 and IPv6 route information to return the // IP of the interface with a gateway on it (with priority given to IPv4). For a node // with no internet connection, it returns error. +// +//logcheck:context // [ChooseHostInterfaceWithLogger] should be used instead of ChooseHostInterface in code which supports contextual logging. func ChooseHostInterface() (net.IP, error) { - return chooseHostInterface(preferIPv4) + return ChooseHostInterfaceWithLogger(klog.Background()) } -func chooseHostInterface(addressFamilies AddressFamilyPreference) (net.IP, error) { +// ChooseHostInterfaceWithLogger is a method used fetch an IP for a daemon. +// If there is no routing info file, it will choose a global IP from the system +// interfaces. Otherwise, it will use IPv4 and IPv6 route information to return the +// IP of the interface with a gateway on it (with priority given to IPv4). For a node +// with no internet connection, it returns error. +func ChooseHostInterfaceWithLogger(logger klog.Logger) (net.IP, error) { + return chooseHostInterface(logger, preferIPv4) +} + +func chooseHostInterface(logger klog.Logger, addressFamilies AddressFamilyPreference) (net.IP, error) { var nw networkInterfacer = networkInterface{} if _, err := os.Stat(ipv4RouteFile); os.IsNotExist(err) { - return chooseIPFromHostInterfaces(nw, addressFamilies) + return chooseIPFromHostInterfaces(logger, nw, addressFamilies) } routes, err := getAllDefaultRoutes() if err != nil { return nil, err } - return chooseHostInterfaceFromRoute(routes, nw, addressFamilies) + return chooseHostInterfaceFromRoute(logger, routes, nw, addressFamilies) } // networkInterfacer defines an interface for several net library functions. Production @@ -427,36 +438,36 @@ func getAllDefaultRoutes() ([]Route, error) { // global IP address from the interface for the route. If there are routes but no global // address is obtained from the interfaces, it checks if the loopback interface has a global address. // addressFamilies determines whether it prefers IPv4 or IPv6 -func chooseHostInterfaceFromRoute(routes []Route, nw networkInterfacer, addressFamilies AddressFamilyPreference) (net.IP, error) { +func chooseHostInterfaceFromRoute(logger klog.Logger, routes []Route, nw networkInterfacer, addressFamilies AddressFamilyPreference) (net.IP, error) { for _, family := range addressFamilies { - klog.V(4).Infof("Looking for default routes with IPv%d addresses", uint(family)) + logger.V(4).Info("Looking for default routes with IP addresses", "IPVersion", uint(family)) for _, route := range routes { if route.Family != family { continue } - klog.V(4).Infof("Default route transits interface %q", route.Interface) - finalIP, err := getIPFromInterface(route.Interface, family, nw) + logger.V(4).Info("Default route transits interface", "interface", route.Interface) + finalIP, err := getIPFromInterface(logger, route.Interface, family, nw) if err != nil { return nil, err } if finalIP != nil { - klog.V(4).Infof("Found active IP %v ", finalIP) + logger.V(4).Info("Found active IP", "IP", finalIP) return finalIP, nil } // In case of network setups where default routes are present, but network // interfaces use only link-local addresses (e.g. as described in RFC5549). // the global IP is assigned to the loopback interface, and we should use it - loopbackIP, err := getIPFromLoopbackInterface(family, nw) + loopbackIP, err := getIPFromLoopbackInterface(logger, family, nw) if err != nil { return nil, err } if loopbackIP != nil { - klog.V(4).Infof("Found active IP %v on Loopback interface", loopbackIP) + logger.V(4).Info("Found active IP on Loopback interface", "IP", loopbackIP) return loopbackIP, nil } } } - klog.V(4).Infof("No active IP found by looking at default routes") + logger.V(4).Info("No active IP found by looking at default routes") return nil, fmt.Errorf("unable to select an IP from default routes.") } @@ -465,14 +476,25 @@ func chooseHostInterfaceFromRoute(routes []Route, nw networkInterfacer, addressF // If bindAddress is unspecified or loopback, it returns the default IP of the same // address family as bindAddress. // Otherwise, it just returns bindAddress. +// +//logcheck:context // [ResolveBindAddressWithLogger] should be used instead of ResolveBindAddress in code which supports contextual logging. func ResolveBindAddress(bindAddress net.IP) (net.IP, error) { + return ResolveBindAddressWithLogger(klog.Background(), bindAddress) +} + +// ResolveBindAddressWithLogger returns the IP address of a daemon, based on the given bindAddress: +// If bindAddress is unset, it returns the host's default IP, as with ChooseHostInterface(). +// If bindAddress is unspecified or loopback, it returns the default IP of the same +// address family as bindAddress. +// Otherwise, it just returns bindAddress. +func ResolveBindAddressWithLogger(logger klog.Logger, bindAddress net.IP) (net.IP, error) { addressFamilies := preferIPv4 if bindAddress != nil && memberOf(bindAddress, familyIPv6) { addressFamilies = preferIPv6 } if bindAddress == nil || bindAddress.IsUnspecified() || bindAddress.IsLoopback() { - hostIP, err := chooseHostInterface(addressFamilies) + hostIP, err := chooseHostInterface(logger, addressFamilies) if err != nil { return nil, err } @@ -485,10 +507,20 @@ func ResolveBindAddress(bindAddress net.IP) (net.IP, error) { // This is required in case of network setups where default routes are present, but network // interfaces use only link-local addresses (e.g. as described in RFC5549). // e.g when using BGP to announce a host IP over link-local ip addresses and this ip address is attached to the lo interface. +// +//logcheck:context // [ChooseBindAddressForInterfaceWithLogger] should be used instead of ChooseBindAddressForInterface in code which supports contextual logging. func ChooseBindAddressForInterface(intfName string) (net.IP, error) { + return ChooseBindAddressForInterfaceWithLogger(klog.Background(), intfName) +} + +// ChooseBindAddressForInterfaceWithLogger choose a global IP for a specific interface, with priority given to IPv4. +// This is required in case of network setups where default routes are present, but network +// interfaces use only link-local addresses (e.g. as described in RFC5549). +// e.g when using BGP to announce a host IP over link-local ip addresses and this ip address is attached to the lo interface. +func ChooseBindAddressForInterfaceWithLogger(logger klog.Logger, intfName string) (net.IP, error) { var nw networkInterfacer = networkInterface{} for _, family := range preferIPv4 { - ip, err := getIPFromInterface(intfName, family, nw) + ip, err := getIPFromInterface(logger, intfName, family, nw) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/interface_test.go b/staging/src/k8s.io/apimachinery/pkg/util/net/interface_test.go index 77ff70a19c5..b791aa10901 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/net/interface_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/net/interface_test.go @@ -23,6 +23,7 @@ import ( "strings" "testing" + "k8s.io/klog/v2/ktesting" netutils "k8s.io/utils/net" ) @@ -242,6 +243,7 @@ func TestParseIP(t *testing.T) { } func TestIsInterfaceUp(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testCases := []struct { tcase string intf *net.Interface @@ -252,7 +254,7 @@ func TestIsInterfaceUp(t *testing.T) { {"no interface", nil, false}, } for _, tc := range testCases { - it := isInterfaceUp(tc.intf) + it := isInterfaceUp(logger, tc.intf) if it != tc.expected { t.Errorf("case[%v]: expected %v, got %v .", tc.tcase, tc.expected, it) } @@ -269,6 +271,7 @@ func (a addrStruct) String() string { } func TestFinalIP(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testCases := []struct { tcase string addr []net.Addr @@ -289,7 +292,7 @@ func TestFinalIP(t *testing.T) { {"no addresses", []net.Addr{}, familyIPv4, nil}, } for _, tc := range testCases { - ip, err := getMatchingGlobalIP(tc.addr, tc.family) + ip, err := getMatchingGlobalIP(logger, tc.addr, tc.family) if !ip.Equal(tc.expected) { t.Errorf("case[%v]: expected %v, got %v .err : %v", tc.tcase, tc.expected, ip, err) } @@ -549,6 +552,7 @@ func (_ networkInterfaceWithInvalidAddr) Interfaces() ([]net.Interface, error) { } func TestGetIPFromInterface(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testCases := []struct { tcase string nwname string @@ -567,7 +571,7 @@ func TestGetIPFromInterface(t *testing.T) { {"bad addr", "eth3", familyIPv4, networkInterfaceWithInvalidAddr{}, nil, "invalid CIDR"}, } for _, tc := range testCases { - ip, err := getIPFromInterface(tc.nwname, tc.family, tc.nw) + ip, err := getIPFromInterface(logger, tc.nwname, tc.family, tc.nw) if err != nil { if !strings.Contains(err.Error(), tc.errStrFrag) { t.Errorf("case[%s]: Error string %q does not contain %q", tc.tcase, err, tc.errStrFrag) @@ -581,6 +585,7 @@ func TestGetIPFromInterface(t *testing.T) { } func TestGetIPFromLoopbackInterface(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testCases := []struct { tcase string family AddressFamily @@ -594,7 +599,7 @@ func TestGetIPFromLoopbackInterface(t *testing.T) { {"no global ipv6", familyIPv6, loopbackNetworkInterface{}, nil, ""}, } for _, tc := range testCases { - ip, err := getIPFromLoopbackInterface(tc.family, tc.nw) + ip, err := getIPFromLoopbackInterface(logger, tc.family, tc.nw) if err != nil { if !strings.Contains(err.Error(), tc.errStrFrag) { t.Errorf("case[%s]: Error string %q does not contain %q", tc.tcase, err, tc.errStrFrag) @@ -608,6 +613,7 @@ func TestGetIPFromLoopbackInterface(t *testing.T) { } func TestChooseHostInterfaceFromRoute(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testCases := []struct { tcase string routes []Route @@ -636,7 +642,7 @@ func TestChooseHostInterfaceFromRoute(t *testing.T) { {"fail get IP", routeV4, networkInterfaceFailGetAddrs{}, preferIPv4, nil}, } for _, tc := range testCases { - ip, err := chooseHostInterfaceFromRoute(tc.routes, tc.nw, tc.order) + ip, err := chooseHostInterfaceFromRoute(logger, tc.routes, tc.nw, tc.order) if !ip.Equal(tc.expected) { t.Errorf("case[%v]: expected %v, got %+v .err : %v", tc.tcase, tc.expected, ip, err) } @@ -663,6 +669,7 @@ func TestMemberOf(t *testing.T) { } func TestGetIPFromHostInterfaces(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) testCases := []struct { tcase string nw networkInterfacer @@ -688,7 +695,7 @@ func TestGetIPFromHostInterfaces(t *testing.T) { } for _, tc := range testCases { - ip, err := chooseIPFromHostInterfaces(tc.nw, tc.order) + ip, err := chooseIPFromHostInterfaces(logger, tc.nw, tc.order) if !ip.Equal(tc.expected) { t.Errorf("case[%s]: expected %+v, got %+v with err : %v", tc.tcase, tc.expected, ip, err) } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go index e5196d1ee83..d6ba23d416c 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/dial.go @@ -39,7 +39,7 @@ func DialURL(ctx context.Context, url *url.URL, transport http.RoundTripper) (ne dialer, err := utilnet.DialerFor(transport) if err != nil { - klog.V(5).Infof("Unable to unwrap transport %T to get dialer: %v", transport, err) + klog.FromContext(ctx).V(5).Info("Unable to unwrap transport to get dialer", "type", fmt.Sprintf("%T", transport), "err", err) } switch url.Scheme { @@ -53,7 +53,7 @@ func DialURL(ctx context.Context, url *url.URL, transport http.RoundTripper) (ne // Get the tls config from the transport if we recognize it tlsConfig, err := utilnet.TLSClientConfig(transport) if err != nil { - klog.V(5).Infof("Unable to unwrap transport %T to get at TLS config: %v", transport, err) + klog.FromContext(ctx).V(5).Info("Unable to unwrap transport to get at TLS config", "type", fmt.Sprintf("%T", transport), "err", err) } if dialer != nil { @@ -65,7 +65,7 @@ func DialURL(ctx context.Context, url *url.URL, transport http.RoundTripper) (ne } if tlsConfig == nil { // tls.Client requires non-nil config - klog.Warning("using custom dialer with no TLSClientConfig. Defaulting to InsecureSkipVerify") + klog.FromContext(ctx).Info("Warning: using custom dialer with no TLSClientConfig, defaulting to InsecureSkipVerify") // tls.Handshake() requires ServerName or InsecureSkipVerify tlsConfig = &tls.Config{ InsecureSkipVerify: true, diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/transport.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/transport.go index 5a2dd6e14c8..1c17f53df03 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/transport.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/transport.go @@ -249,7 +249,7 @@ func (t *Transport) rewriteResponse(req *http.Request, resp *http.Response) (*ht // This is fine default: // Some encoding we don't understand-- don't try to parse this - klog.Errorf("Proxy encountered encoding %v for text/html; can't understand this so not fixing links.", encoding) + klog.FromContext(req.Context()).Error(nil, "Proxy encountered unknown encoding for text/html, can't understand this so not fixing links", "encoding", encoding) return resp, nil } @@ -258,7 +258,7 @@ func (t *Transport) rewriteResponse(req *http.Request, resp *http.Response) (*ht } err := rewriteHTML(reader, writer, urlRewriter) if err != nil { - klog.Errorf("Failed to rewrite URLs: %v", err) + klog.FromContext(req.Context()).Error(err, "Failed to rewrite URLs") return resp, err } diff --git a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go index 8c30a366de9..812168462d3 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go @@ -307,8 +307,9 @@ func (noSuppressPanicError) Write(p []byte) (n int, err error) { // tryUpgrade returns true if the request was handled. func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool { + logger := klog.FromContext(req.Context()) if !httpstream.IsUpgradeRequest(req) { - klog.V(6).Infof("Request was not an upgrade") + logger.V(6).Info("Request was not an upgrade") return false } @@ -332,15 +333,15 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques // Only append X-Forwarded-For in the upgrade path, since httputil.NewSingleHostReverseProxy // handles this in the non-upgrade path. utilnet.AppendForwardedForHeader(clone) - klog.V(6).Infof("Connecting to backend proxy (direct dial) %s\n Headers: %v", &location, clone.Header) + logger.V(6).Info("Connecting to backend proxy (direct dial)", "location", &location, "headers", clone.Header) if h.UseLocationHost { clone.Host = h.Location.Host } clone.URL = &location - klog.V(6).Infof("UpgradeAwareProxy: dialing for SPDY upgrade with headers: %v", clone.Header) + logger.V(6).Info("UpgradeAwareProxy: dialing for SPDY upgrade with headers", "headers", clone.Header) backendConn, err = h.DialForUpgrade(clone) if err != nil { - klog.V(6).Infof("Proxy connection error: %v", err) + logger.V(6).Info("Proxy connection error", "err", err) h.Responder.Error(w, req, err) return true } @@ -349,7 +350,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques // determine the http response code from the backend by reading from rawResponse+backendConn backendHTTPResponse, headerBytes, err := getResponse(io.MultiReader(bytes.NewReader(rawResponse), backendConn)) if err != nil { - klog.V(6).Infof("Proxy connection error: %v", err) + logger.V(6).Info("Proxy connection error", "err", err) h.Responder.Error(w, req, err) return true } @@ -363,7 +364,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques // return a generic error here. if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols && backendHTTPResponse.StatusCode < 400 { err := fmt.Errorf("invalid upgrade response: status code %d", backendHTTPResponse.StatusCode) - klog.Errorf("Proxy upgrade error: %v", err) + logger.Error(err, "Proxy upgrade error") h.Responder.Error(w, req, err) return true } @@ -372,13 +373,13 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques // hijacking should be the last step in the upgrade. requestHijacker, ok := w.(http.Hijacker) if !ok { - klog.Errorf("Unable to hijack response writer: %T", w) + logger.Error(nil, "Unable to hijack response writer", "type", fmt.Sprintf("%T", w)) h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w)) return true } requestHijackedConn, _, err := requestHijacker.Hijack() if err != nil { - klog.Errorf("Unable to hijack response: %v", err) + logger.Error(err, "Unable to hijack response") h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err)) return true } @@ -386,7 +387,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques if backendHTTPResponse.StatusCode != http.StatusSwitchingProtocols { // If the backend did not upgrade the request, echo the response from the backend to the client and return, closing the connection. - klog.V(6).Infof("Proxy upgrade error, status code %d", backendHTTPResponse.StatusCode) + logger.V(6).Info("Proxy upgrade error", "statusCode", backendHTTPResponse.StatusCode) // set read/write deadlines deadline := time.Now().Add(10 * time.Second) backendConn.SetReadDeadline(deadline) @@ -394,7 +395,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques // write the response to the client err := backendHTTPResponse.Write(requestHijackedConn) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - klog.Errorf("Error proxying data from backend to client: %v", err) + logger.Error(err, "Error proxying data from backend to client") } // Indicate we handled the request return true @@ -402,9 +403,9 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques // Forward raw response bytes back to client. if len(rawResponse) > 0 { - klog.V(6).Infof("Writing %d bytes to hijacked connection", len(rawResponse)) + logger.V(6).Info("Writing to hijacked connection", "length", len(rawResponse)) if _, err = requestHijackedConn.Write(rawResponse); err != nil { - utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err)) + utilruntime.HandleErrorWithLogger(logger, err, "Error proxying response from backend to client") } } @@ -424,7 +425,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques } _, err := io.Copy(writer, requestHijackedConn) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - klog.Errorf("Error proxying data from client to backend: %v", err) + logger.Error(err, "Error proxying data from client to backend") } close(writerComplete) }() @@ -438,7 +439,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques } _, err := io.Copy(requestHijackedConn, reader) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - klog.Errorf("Error proxying data from backend to client: %v", err) + logger.Error(err, "Error proxying data from backend to client") } close(readerComplete) }() @@ -449,7 +450,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques case <-writerComplete: case <-readerComplete: } - klog.V(6).Infof("Disconnecting from backend proxy %s\n Headers: %v", &location, clone.Header) + logger.V(6).Info("Disconnecting from backend proxy", "location", &location, "headers", clone.Header) return true }