mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-02-03 20:40:26 -05:00
Add e2e tests, metrics and events for podcertificaterequests v1beta1
This commit is contained in:
parent
08a9e4fca7
commit
3f8444210c
12 changed files with 880 additions and 26 deletions
|
|
@ -952,6 +952,7 @@ func NewMainKubelet(ctx context.Context,
|
|||
podCertificateManager := podcertificate.NewIssuingManager(
|
||||
kubeDeps.KubeClient,
|
||||
klet.podManager,
|
||||
kubeDeps.Recorder,
|
||||
kubeInformers.Certificates().V1beta1().PodCertificateRequests(),
|
||||
nodeInformer,
|
||||
nodeName,
|
||||
|
|
@ -960,6 +961,8 @@ func NewMainKubelet(ctx context.Context,
|
|||
klet.podCertificateManager = podCertificateManager
|
||||
kubeInformers.Start(ctx.Done())
|
||||
go podCertificateManager.Run(ctx)
|
||||
|
||||
metrics.RegisterCollectors(collectors.PodCertificateCollectorFor(podCertificateManager))
|
||||
} else {
|
||||
klet.podCertificateManager = &podcertificate.NoOpManager{}
|
||||
klog.InfoS("Not starting PodCertificateRequest manager because we are in static kubelet mode or the PodCertificateProjection feature gate is disabled")
|
||||
|
|
|
|||
66
pkg/kubelet/metrics/collectors/podcertificate_metrics.go
Normal file
66
pkg/kubelet/metrics/collectors/podcertificate_metrics.go
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"k8s.io/component-base/metrics"
|
||||
kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/kubelet/podcertificate"
|
||||
)
|
||||
|
||||
var (
|
||||
// A gauge vector (implemented by a custom collector) reporting the current
|
||||
// number of pod certificate projected volume sources being maintained by
|
||||
// this kubelet instance.
|
||||
podCertificateStatesDesc = metrics.NewDesc(
|
||||
metrics.BuildFQName("", kubeletmetrics.KubeletSubsystem, kubeletmetrics.PodCertificateStatesKey),
|
||||
"Gauge vector reporting the number of pod certificate projected volume sources, faceted by signer_name and state.",
|
||||
[]string{"signer_name", "state"},
|
||||
nil,
|
||||
metrics.ALPHA,
|
||||
"",
|
||||
)
|
||||
)
|
||||
|
||||
type podCertificateCollector struct {
|
||||
metrics.BaseStableCollector
|
||||
manager podcertificate.Manager
|
||||
}
|
||||
|
||||
func PodCertificateCollectorFor(m podcertificate.Manager) *podCertificateCollector {
|
||||
return &podCertificateCollector{
|
||||
manager: m,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *podCertificateCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
|
||||
ch <- podCertificateStatesDesc
|
||||
}
|
||||
|
||||
func (c *podCertificateCollector) CollectWithStability(ch chan<- metrics.Metric) {
|
||||
report := c.manager.MetricReport()
|
||||
|
||||
for k, count := range report.PodCertificateStates {
|
||||
ch <- metrics.NewLazyConstMetric(
|
||||
podCertificateStatesDesc,
|
||||
metrics.GaugeValue,
|
||||
float64(count),
|
||||
k.SignerName,
|
||||
k.State,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
Copyright 2025 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package collectors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/kubernetes/pkg/kubelet/podcertificate"
|
||||
)
|
||||
|
||||
type fakePodCertificateManager struct {
|
||||
report *podcertificate.MetricReport
|
||||
}
|
||||
|
||||
func (m *fakePodCertificateManager) TrackPod(ctx context.Context, pod *corev1.Pod) {}
|
||||
func (m *fakePodCertificateManager) ForgetPod(ctx context.Context, pod *corev1.Pod) {}
|
||||
func (m *fakePodCertificateManager) GetPodCertificateCredentialBundle(ctx context.Context, namespace, podName, podUID, volumeName string, sourceIndex int) (privKey []byte, certChain []byte, err error) {
|
||||
return nil, nil, fmt.Errorf("unimplemented")
|
||||
}
|
||||
func (m *fakePodCertificateManager) MetricReport() *podcertificate.MetricReport {
|
||||
return m.report
|
||||
}
|
||||
|
||||
func TestPodCertificateCollector(t *testing.T) {
|
||||
collector := &podCertificateCollector{
|
||||
manager: &fakePodCertificateManager{
|
||||
report: &podcertificate.MetricReport{
|
||||
PodCertificateStates: map[podcertificate.SignerAndState]int{
|
||||
{SignerName: "example.com/foo", State: "fresh"}: 1,
|
||||
{SignerName: "example.com/foo", State: "overdue_for_refresh"}: 1,
|
||||
{SignerName: "example.com/foo", State: "expired"}: 0,
|
||||
{SignerName: "example.com/bar", State: "fresh"}: 2,
|
||||
{SignerName: "example.com/bar", State: "overdue_for_refresh"}: 0,
|
||||
{SignerName: "example.com/bar", State: "expired"}: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Fixed metadata on type and help text. We prepend this to every expected
|
||||
// output so we only have to modify a single place when doing adjustments.
|
||||
const metadata = `
|
||||
# HELP kubelet_podcertificate_states [ALPHA] Gauge vector reporting the number of pod certificate projected volume sources, faceted by signer_name and state.
|
||||
# TYPE kubelet_podcertificate_states gauge
|
||||
`
|
||||
|
||||
want := metadata + `
|
||||
kubelet_podcertificate_states{signer_name="example.com/foo",state="fresh"} 1.0
|
||||
kubelet_podcertificate_states{signer_name="example.com/foo",state="overdue_for_refresh"} 1.0
|
||||
kubelet_podcertificate_states{signer_name="example.com/foo",state="expired"} 0.0
|
||||
kubelet_podcertificate_states{signer_name="example.com/bar",state="fresh"} 2.0
|
||||
kubelet_podcertificate_states{signer_name="example.com/bar",state="overdue_for_refresh"} 0.0
|
||||
kubelet_podcertificate_states{signer_name="example.com/bar",state="expired"} 1.0
|
||||
`
|
||||
|
||||
metrics := []string{
|
||||
"kubelet_podcertificate_states",
|
||||
}
|
||||
|
||||
if err := testutil.CustomCollectAndCompare(collector, strings.NewReader(want), metrics...); err != nil {
|
||||
t.Errorf("unexpected collecting result:\n%s", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -176,6 +176,9 @@ const (
|
|||
PodInfeasibleResizesKey = "pod_infeasible_resizes_total"
|
||||
PodInProgressResizesKey = "pod_in_progress_resizes"
|
||||
PodDeferredAcceptedResizesKey = "pod_deferred_accepted_resizes_total"
|
||||
|
||||
// Metric key for podcertificate states.
|
||||
PodCertificateStatesKey = "podcertificate_states"
|
||||
)
|
||||
|
||||
type imageSizeBucket struct {
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ import (
|
|||
certlistersv1beta1 "k8s.io/client-go/listers/certificates/v1beta1"
|
||||
corelistersv1 "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/clock"
|
||||
|
|
@ -68,12 +69,30 @@ type Manager interface {
|
|||
// GetPodCertificateCredentialBundle is called by the volume host to
|
||||
// retrieve the credential bundle for a given pod certificate volume.
|
||||
GetPodCertificateCredentialBundle(ctx context.Context, namespace, podName, podUID, volumeName string, sourceIndex int) (privKey []byte, certChain []byte, err error)
|
||||
|
||||
// MetricReport returns a snapshot of current pod certificate states for this manager.
|
||||
MetricReport() *MetricReport
|
||||
}
|
||||
|
||||
// MetricReport contains metrics about the current state of pod certificate projected volume sources.
|
||||
type MetricReport struct {
|
||||
PodCertificateStates map[SignerAndState]int
|
||||
}
|
||||
|
||||
// SignerAndState represents a combination of a signer name and the state of a pod certificate.
|
||||
type SignerAndState struct {
|
||||
SignerName string
|
||||
State string
|
||||
}
|
||||
|
||||
// After this amount of time (plus jitter), we can assume that a PCR that we
|
||||
// created, but isn't showing up on our informer, must have been deleted.
|
||||
const assumeDeletedThreshold = 10 * time.Minute
|
||||
|
||||
// After this amount of time since the BeginRefreshAt of the certificate, we
|
||||
// consider a certificate to be overdue for refresh.
|
||||
const refreshOverdueDuration = 10 * time.Minute
|
||||
|
||||
// IssuingManager is the main implementation of Manager.
|
||||
//
|
||||
// The core construct is a workqueue that contains one entry for each
|
||||
|
|
@ -92,6 +111,8 @@ type IssuingManager struct {
|
|||
|
||||
podManager PodManager
|
||||
|
||||
recorder record.EventRecorder
|
||||
|
||||
projectionQueue workqueue.TypedRateLimitingInterface[projectionKey]
|
||||
|
||||
pcrInformer cache.SharedIndexInformer
|
||||
|
|
@ -136,6 +157,7 @@ type projectionRecord struct {
|
|||
// Interface type for all projection record states.
|
||||
type credState interface {
|
||||
getCredBundle() (privKey, certChain []byte, err error)
|
||||
metricsState(now time.Time) string
|
||||
}
|
||||
|
||||
type credStateInitial struct {
|
||||
|
|
@ -145,6 +167,10 @@ func (c *credStateInitial) getCredBundle() ([]byte, []byte, error) {
|
|||
return nil, nil, fmt.Errorf("credential bundle is not issued yet")
|
||||
}
|
||||
|
||||
func (c *credStateInitial) metricsState(_ time.Time) string {
|
||||
return "not_yet_issued"
|
||||
}
|
||||
|
||||
type credStateWait struct {
|
||||
privateKey []byte
|
||||
pcrName string
|
||||
|
|
@ -157,6 +183,10 @@ func (c *credStateWait) getCredBundle() ([]byte, []byte, error) {
|
|||
return nil, nil, fmt.Errorf("credential bundle is not issued yet")
|
||||
}
|
||||
|
||||
func (c *credStateWait) metricsState(_ time.Time) string {
|
||||
return "not_yet_issued"
|
||||
}
|
||||
|
||||
type credStateDenied struct {
|
||||
Reason string
|
||||
Message string
|
||||
|
|
@ -166,6 +196,10 @@ func (c *credStateDenied) getCredBundle() ([]byte, []byte, error) {
|
|||
return nil, nil, fmt.Errorf("PodCertificateRequest was permanently denied: reason=%q message=%q", c.Reason, c.Message)
|
||||
}
|
||||
|
||||
func (c *credStateDenied) metricsState(_ time.Time) string {
|
||||
return "denied"
|
||||
}
|
||||
|
||||
type credStateFailed struct {
|
||||
Reason string
|
||||
Message string
|
||||
|
|
@ -175,20 +209,40 @@ func (c *credStateFailed) getCredBundle() ([]byte, []byte, error) {
|
|||
return nil, nil, fmt.Errorf("PodCertificateRequest was permanently failed: reason=%q message=%q", c.Reason, c.Message)
|
||||
}
|
||||
|
||||
func (c *credStateFailed) metricsState(_ time.Time) string {
|
||||
return "failed"
|
||||
}
|
||||
|
||||
type credStateFresh struct {
|
||||
privateKey []byte
|
||||
certChain []byte
|
||||
beginRefreshAt time.Time
|
||||
privateKey []byte
|
||||
certChain []byte
|
||||
beginRefreshAt time.Time
|
||||
notAfter time.Time
|
||||
eventEmittedForOverdueForRefresh bool
|
||||
eventEmittedForExpiration bool
|
||||
}
|
||||
|
||||
func (c *credStateFresh) getCredBundle() ([]byte, []byte, error) {
|
||||
return c.privateKey, c.certChain, nil
|
||||
}
|
||||
|
||||
func (c *credStateFresh) metricsState(now time.Time) string {
|
||||
if now.After(c.notAfter) {
|
||||
return "expired"
|
||||
}
|
||||
if now.After(c.beginRefreshAt.Add(refreshOverdueDuration)) {
|
||||
return "overdue_for_refresh"
|
||||
}
|
||||
return "fresh"
|
||||
}
|
||||
|
||||
type credStateWaitRefresh struct {
|
||||
privateKey []byte
|
||||
certChain []byte
|
||||
beginRefreshAt time.Time
|
||||
privateKey []byte
|
||||
certChain []byte
|
||||
beginRefreshAt time.Time
|
||||
notAfter time.Time
|
||||
eventEmittedForOverdueForRefresh bool
|
||||
eventEmittedForExpiration bool
|
||||
|
||||
refreshPrivateKey []byte
|
||||
refreshPCRName string
|
||||
|
|
@ -201,13 +255,24 @@ func (c *credStateWaitRefresh) getCredBundle() ([]byte, []byte, error) {
|
|||
return c.privateKey, c.certChain, nil
|
||||
}
|
||||
|
||||
func (c *credStateWaitRefresh) metricsState(now time.Time) string {
|
||||
if now.After(c.notAfter) {
|
||||
return "expired"
|
||||
}
|
||||
if now.After(c.beginRefreshAt.Add(refreshOverdueDuration)) {
|
||||
return "overdue_for_refresh"
|
||||
}
|
||||
return "fresh"
|
||||
}
|
||||
|
||||
var _ Manager = (*IssuingManager)(nil)
|
||||
|
||||
func NewIssuingManager(kc kubernetes.Interface, podManager PodManager, pcrInformer certinformersv1beta1.PodCertificateRequestInformer, nodeInformer coreinformersv1.NodeInformer, nodeName types.NodeName, clock clock.WithTicker) *IssuingManager {
|
||||
func NewIssuingManager(kc kubernetes.Interface, podManager PodManager, recorder record.EventRecorder, pcrInformer certinformersv1beta1.PodCertificateRequestInformer, nodeInformer coreinformersv1.NodeInformer, nodeName types.NodeName, clock clock.WithTicker) *IssuingManager {
|
||||
m := &IssuingManager{
|
||||
kc: kc,
|
||||
|
||||
podManager: podManager,
|
||||
recorder: recorder,
|
||||
projectionQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[projectionKey]()),
|
||||
|
||||
pcrInformer: pcrInformer.Informer(),
|
||||
|
|
@ -276,6 +341,7 @@ func (m *IssuingManager) Run(ctx context.Context) {
|
|||
if !cache.WaitForCacheSync(ctx.Done(), m.pcrInformer.HasSynced, m.nodeInformer.HasSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
go wait.JitterUntilWithContext(ctx, m.runRefreshPass, 1*time.Minute, 1.0, false)
|
||||
go wait.UntilWithContext(ctx, m.runProjectionProcessor, time.Second)
|
||||
<-ctx.Done()
|
||||
|
|
@ -430,19 +496,24 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
|
|||
Message: cond.Message,
|
||||
}
|
||||
klog.V(4).InfoS("PodCertificateRequest denied, moving to credStateDenied", "key", key, "pcr", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name)
|
||||
eventMessage := fmt.Sprintf("PodCertificateRequest %s was denied, reason=%q, message=%q", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name, cond.Reason, cond.Message)
|
||||
m.recorder.Eventf(pod, corev1.EventTypeWarning, certificatesv1beta1.PodCertificateRequestConditionTypeDenied, cond.Reason, eventMessage)
|
||||
return nil
|
||||
case certificatesv1beta1.PodCertificateRequestConditionTypeFailed:
|
||||
rec.curState = &credStateFailed{
|
||||
Reason: cond.Reason,
|
||||
Message: cond.Message,
|
||||
}
|
||||
klog.V(4).InfoS("PodCertificateRequest denied, moving to credStateFailed", "key", key, "pcr", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name)
|
||||
klog.V(4).InfoS("PodCertificateRequest failed, moving to credStateFailed", "key", key, "pcr", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name)
|
||||
eventMessage := fmt.Sprintf("PodCertificateRequest %s failed, reason=%q, message=%q", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name, cond.Reason, cond.Message)
|
||||
m.recorder.Eventf(pod, corev1.EventTypeWarning, certificatesv1beta1.PodCertificateRequestConditionTypeFailed, cond.Reason, eventMessage)
|
||||
return nil
|
||||
case certificatesv1beta1.PodCertificateRequestConditionTypeIssued:
|
||||
rec.curState = &credStateFresh{
|
||||
privateKey: state.privateKey,
|
||||
certChain: cleanCertificateChain([]byte(pcr.Status.CertificateChain)),
|
||||
beginRefreshAt: pcr.Status.BeginRefreshAt.Time.Add(jitterDuration()),
|
||||
notAfter: pcr.Status.NotAfter.Time,
|
||||
}
|
||||
klog.V(4).InfoS("PodCertificateRequest issued, moving to credStateFresh", "key", key, "pcr", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name)
|
||||
return nil
|
||||
|
|
@ -476,6 +547,20 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
|
|||
|
||||
klog.V(4).InfoS("Time to refresh", "key", key)
|
||||
|
||||
// The current time is more than 10 minutes past the most recently issued certificate's `beginRefreshAt` timestamp but the state has not been labeled with overdue for refresh.
|
||||
if m.clock.Now().After(state.beginRefreshAt.Add(refreshOverdueDuration)) && !state.eventEmittedForOverdueForRefresh {
|
||||
klog.V(4).InfoS("Refresh overdue", "key", key)
|
||||
m.recorder.Eventf(pod, corev1.EventTypeWarning, "CertificateOverdueForRefresh", "PodCertificate refresh overdue")
|
||||
state.eventEmittedForOverdueForRefresh = true
|
||||
}
|
||||
|
||||
// The current time is past the most recently issued certificate's `notAfter` timestamp but the state has not been labelled with expired.
|
||||
if m.clock.Now().After(state.notAfter) && !state.eventEmittedForExpiration {
|
||||
klog.V(4).InfoS("Certificates expired", "key", key)
|
||||
m.recorder.Eventf(pod, corev1.EventTypeWarning, "CertificateExpired", "PodCertificate expired")
|
||||
state.eventEmittedForExpiration = true
|
||||
}
|
||||
|
||||
// We fetch the service account so we can know its UID. Ideally, Kubelet
|
||||
// would have a central component that tracks all service accounts related
|
||||
// to pods on the node using a single-item watch.
|
||||
|
|
@ -502,9 +587,12 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
|
|||
}
|
||||
|
||||
rec.curState = &credStateWaitRefresh{
|
||||
privateKey: state.privateKey,
|
||||
certChain: state.certChain,
|
||||
beginRefreshAt: state.beginRefreshAt,
|
||||
privateKey: state.privateKey,
|
||||
certChain: state.certChain,
|
||||
beginRefreshAt: state.beginRefreshAt,
|
||||
notAfter: state.notAfter,
|
||||
eventEmittedForOverdueForRefresh: state.eventEmittedForOverdueForRefresh,
|
||||
eventEmittedForExpiration: state.eventEmittedForExpiration,
|
||||
|
||||
refreshPrivateKey: privKey,
|
||||
refreshPCRName: pcr.ObjectMeta.Name,
|
||||
|
|
@ -531,9 +619,12 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
|
|||
// remember creating the PCR, then we must be in case 2. Return to
|
||||
// credStateFresh so we create a new PCR.
|
||||
rec.curState = &credStateFresh{
|
||||
privateKey: state.privateKey,
|
||||
certChain: state.certChain,
|
||||
beginRefreshAt: state.beginRefreshAt,
|
||||
privateKey: state.privateKey,
|
||||
certChain: state.certChain,
|
||||
beginRefreshAt: state.beginRefreshAt,
|
||||
notAfter: state.notAfter,
|
||||
eventEmittedForOverdueForRefresh: state.eventEmittedForOverdueForRefresh,
|
||||
eventEmittedForExpiration: state.eventEmittedForExpiration,
|
||||
}
|
||||
return fmt.Errorf("PodCertificateRequest appears to have been deleted")
|
||||
} else if err != nil {
|
||||
|
|
@ -550,19 +641,24 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
|
|||
Message: cond.Message,
|
||||
}
|
||||
klog.V(4).InfoS("PodCertificateRequest denied, moving to credStateDenied", "key", key, "pcr", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name)
|
||||
eventMessage := fmt.Sprintf("PodCertificateRequest %s was denied, reason=%q, message=%q", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name, cond.Reason, cond.Message)
|
||||
m.recorder.Eventf(pod, corev1.EventTypeWarning, certificatesv1beta1.PodCertificateRequestConditionTypeDenied, cond.Reason, eventMessage)
|
||||
return nil
|
||||
case certificatesv1beta1.PodCertificateRequestConditionTypeFailed:
|
||||
rec.curState = &credStateFailed{
|
||||
Reason: cond.Reason,
|
||||
Message: cond.Message,
|
||||
}
|
||||
klog.V(4).InfoS("PodCertificateRequest denied, moving to credStateFailed", "key", key, "pcr", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name)
|
||||
klog.V(4).InfoS("PodCertificateRequest failed, moving to credStateFailed", "key", key, "pcr", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name)
|
||||
eventMessage := fmt.Sprintf("PodCertificateRequest %s failed, reason=%q, message=%q", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name, cond.Reason, cond.Message)
|
||||
m.recorder.Eventf(pod, corev1.EventTypeWarning, certificatesv1beta1.PodCertificateRequestConditionTypeFailed, cond.Reason, eventMessage)
|
||||
return nil
|
||||
case certificatesv1beta1.PodCertificateRequestConditionTypeIssued:
|
||||
rec.curState = &credStateFresh{
|
||||
privateKey: state.refreshPrivateKey,
|
||||
certChain: cleanCertificateChain([]byte(pcr.Status.CertificateChain)),
|
||||
beginRefreshAt: pcr.Status.BeginRefreshAt.Time.Add(jitterDuration()),
|
||||
notAfter: pcr.Status.NotAfter.Time,
|
||||
}
|
||||
klog.V(4).InfoS("PodCertificateRequest issued, moving to credStateFresh", "key", key, "pcr", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name)
|
||||
return nil
|
||||
|
|
@ -573,6 +669,20 @@ func (m *IssuingManager) handleProjection(ctx context.Context, key projectionKey
|
|||
// projection from the workqueue. It will be redriven when the
|
||||
// PodCertificateRequest gets an update.
|
||||
klog.V(4).InfoS("PodCertificateRequest not in terminal state, remaining in credStateWaitRefresh", "key", key, "pcr", pcr.ObjectMeta.Namespace+"/"+pcr.ObjectMeta.Name)
|
||||
|
||||
// The current time is more than 10 minutes past the most recently issued certificate's `beginRefreshAt` timestamp but the state has not been labeled with overdue for refresh.
|
||||
if m.clock.Now().After(state.beginRefreshAt.Add(refreshOverdueDuration)) && !state.eventEmittedForOverdueForRefresh {
|
||||
klog.V(4).InfoS("Refresh overdue", "key", key)
|
||||
m.recorder.Eventf(pod, corev1.EventTypeWarning, "CertificateOverdueForRefresh", "PodCertificate refresh overdue")
|
||||
state.eventEmittedForOverdueForRefresh = true
|
||||
}
|
||||
|
||||
// The current time is past the most recently issued certificate's `notAfter` timestamp but the state has not been labeled with expired.
|
||||
if m.clock.Now().After(state.notAfter) && !state.eventEmittedForExpiration {
|
||||
klog.V(4).InfoS("Certificates expired", "key", key)
|
||||
m.recorder.Eventf(pod, corev1.EventTypeWarning, "CertificateExpired", "PodCertificate expired")
|
||||
state.eventEmittedForExpiration = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -708,6 +818,62 @@ func (m *IssuingManager) GetPodCertificateCredentialBundle(ctx context.Context,
|
|||
return rec.curState.getCredBundle()
|
||||
}
|
||||
|
||||
func (m *IssuingManager) MetricReport() *MetricReport {
|
||||
report := &MetricReport{
|
||||
PodCertificateStates: map[SignerAndState]int{},
|
||||
}
|
||||
|
||||
// Iterate through all pods and their podCertificate projected volume sources
|
||||
// instead of iterating through credStore, so that we can use the SignerName
|
||||
// of the podCertificate projection source.
|
||||
allPods := m.podManager.GetPods()
|
||||
for _, pod := range allPods {
|
||||
for _, v := range pod.Spec.Volumes {
|
||||
if v.Projected == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for sourceIndex, source := range v.Projected.Sources {
|
||||
if source.PodCertificate == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
key := projectionKey{
|
||||
Namespace: pod.ObjectMeta.Namespace,
|
||||
PodName: pod.ObjectMeta.Name,
|
||||
PodUID: string(pod.ObjectMeta.UID),
|
||||
VolumeName: v.Name,
|
||||
SourceIndex: sourceIndex,
|
||||
}
|
||||
|
||||
var rec *projectionRecord
|
||||
func() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
rec = m.credStore[key]
|
||||
}()
|
||||
if rec == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
func() {
|
||||
rec.lock.Lock()
|
||||
defer rec.lock.Unlock()
|
||||
|
||||
metricsKey := SignerAndState{
|
||||
SignerName: source.PodCertificate.SignerName,
|
||||
State: rec.curState.metricsState(m.clock.Now()),
|
||||
}
|
||||
report.PodCertificateStates[metricsKey]++
|
||||
}()
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return report
|
||||
}
|
||||
|
||||
func hashBytes(in []byte) []byte {
|
||||
out := sha256.Sum256(in)
|
||||
return out[:]
|
||||
|
|
@ -827,3 +993,7 @@ func (m *NoOpManager) ForgetPod(ctx context.Context, pod *corev1.Pod) {
|
|||
func (m *NoOpManager) GetPodCertificateCredentialBundle(ctx context.Context, namespace, podName, podUID, volumeName string, sourceIndex int) ([]byte, []byte, error) {
|
||||
return nil, nil, fmt.Errorf("unimplemented")
|
||||
}
|
||||
|
||||
func (m *NoOpManager) MetricReport() *MetricReport {
|
||||
return &MetricReport{}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -219,6 +219,7 @@ func TestFullFlow(t *testing.T) {
|
|||
node1PodCertificateManager := NewIssuingManager(
|
||||
kc,
|
||||
node1PodManager,
|
||||
nil,
|
||||
informerFactory.Certificates().V1beta1().PodCertificateRequests(),
|
||||
informerFactory.Core().V1().Nodes(),
|
||||
types.NodeName(node1.ObjectMeta.Name),
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/podcertificate"
|
||||
)
|
||||
|
||||
type recordingPodCertificateManager struct {
|
||||
|
|
@ -46,6 +47,10 @@ func (f *recordingPodCertificateManager) TrackPod(ctx context.Context, pod *core
|
|||
|
||||
func (f *recordingPodCertificateManager) ForgetPod(ctx context.Context, pod *corev1.Pod) {}
|
||||
|
||||
func (f *recordingPodCertificateManager) MetricReport() *podcertificate.MetricReport {
|
||||
return &podcertificate.MetricReport{}
|
||||
}
|
||||
|
||||
// Check that GetPodCertificateCredentialBundle forwards its arguments in the
|
||||
// correct order. Seems excessive, but we got here because I put the arguments
|
||||
// in the wrong order...
|
||||
|
|
|
|||
468
test/e2e/auth/projected_podcertificate.go
Normal file
468
test/e2e/auth/projected_podcertificate.go
Normal file
|
|
@ -0,0 +1,468 @@
|
|||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package auth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
certutil "k8s.io/client-go/util/cert"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
"k8s.io/kubernetes/test/utils/hermeticpodcertificatesigner"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image" // Import imageutils
|
||||
admissionapi "k8s.io/pod-security-admission/api"
|
||||
admissiontest "k8s.io/pod-security-admission/test"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
const (
|
||||
// spiffeSignerName is the name of the signer for SPIFFE certificates.
|
||||
spiffeSignerName = "row-major.net/spiffe"
|
||||
)
|
||||
|
||||
var _ = SIGDescribe("Projected PodCertificate",
|
||||
framework.WithFeatureGate(features.PodCertificateRequest),
|
||||
framework.WithFeatureGate(features.ClusterTrustBundle),
|
||||
framework.WithFeatureGate(features.ClusterTrustBundleProjection),
|
||||
func() {
|
||||
f := framework.NewDefaultFramework("projected-podcertificate")
|
||||
f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
|
||||
var (
|
||||
signerCtx context.Context
|
||||
cancelSigner context.CancelFunc
|
||||
signer *hermeticpodcertificatesigner.Controller
|
||||
caKeys []crypto.PrivateKey
|
||||
caCerts [][]byte
|
||||
)
|
||||
|
||||
ginkgo.BeforeEach(func(ctx context.Context) {
|
||||
ginkgo.By("Starting in-process pod certificate signer...")
|
||||
signerCtx, cancelSigner = context.WithCancel(context.Background())
|
||||
ginkgo.DeferCleanup(func(ctx context.Context) {
|
||||
ginkgo.By("Stopping in-process pod certificate signer...")
|
||||
if cancelSigner != nil {
|
||||
cancelSigner()
|
||||
}
|
||||
})
|
||||
|
||||
var err error
|
||||
caKeys, caCerts, err = hermeticpodcertificatesigner.GenerateCAHierarchy(1) // Generate CA once
|
||||
if err != nil {
|
||||
framework.Failf("failed to generate CA for signer: %v", err)
|
||||
}
|
||||
|
||||
signer = hermeticpodcertificatesigner.New(clock.RealClock{}, spiffeSignerName, caKeys, caCerts, f.ClientSet)
|
||||
go signer.Run(signerCtx)
|
||||
})
|
||||
|
||||
ginkgo.It("should allow server and client pods to establish an mTLS connection", func(ctx context.Context) {
|
||||
namespace := f.Namespace.Name
|
||||
ginkgo.By("Using namespace: " + namespace)
|
||||
|
||||
securityContext := generateContainerSecurityContext()
|
||||
serverDeployment, serverService := createServerObjects(namespace, securityContext)
|
||||
ginkgo.By("Creating server deployment...")
|
||||
_, err := f.ClientSet.AppsV1().Deployments(namespace).Create(ctx, serverDeployment, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
framework.Failf("failed to create server deployment: %v", err)
|
||||
}
|
||||
|
||||
ginkgo.By("Creating server service...")
|
||||
_, err = f.ClientSet.CoreV1().Services(namespace).Create(ctx, serverService, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
framework.Failf("failed to create server service: %v", err)
|
||||
}
|
||||
|
||||
clientDeployment := createClientObjects(namespace, securityContext)
|
||||
ginkgo.By("Creating client deployment...")
|
||||
_, err = f.ClientSet.AppsV1().Deployments(namespace).Create(ctx, clientDeployment, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
framework.Failf("failed to create client deployment: %v", err)
|
||||
}
|
||||
|
||||
ginkgo.By("Waiting for mTLS connection to be built...")
|
||||
|
||||
// The mtlsclient now logs successful polls, check for that pattern.
|
||||
gomega.Eventually(ctx, func(ctx context.Context) (string, error) {
|
||||
clientLabels := labels.Set{"app": "client"}
|
||||
podList, listErr := f.ClientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{LabelSelector: clientLabels.String()})
|
||||
if listErr != nil || len(podList.Items) == 0 {
|
||||
return "", fmt.Errorf("failed to get client pods: %w", err)
|
||||
}
|
||||
return e2epod.GetPodLogs(ctx, f.ClientSet, namespace, podList.Items[0].Name, "client")
|
||||
}, 30*time.Second, 2*time.Second).Should(gomega.MatchRegexp(`Got response body: Client Identity: spiffe://`),
|
||||
"client logs did not contain expected success message pattern")
|
||||
})
|
||||
|
||||
ginkgo.It("should honor UserAnnotations for SPIFFE URI path", func(ctx context.Context) {
|
||||
namespace := f.Namespace.Name
|
||||
ginkgo.By("Using namespace: " + namespace)
|
||||
|
||||
// Use customerPath to override the path in the spiffeID.
|
||||
customPath := "/custom-workload"
|
||||
userAnnotations := map[string]string{
|
||||
"spiffe/path-overriding": customPath, // Match the key supported the signer
|
||||
}
|
||||
inspectorPod := createInspectorPod(namespace, "path-override-pod", userAnnotations, nil)
|
||||
|
||||
ginkgo.By("Creating inspector pod with UserAnnotations...")
|
||||
_, err := f.ClientSet.CoreV1().Pods(namespace).Create(ctx, inspectorPod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
framework.Failf("failed to create inspector pod: %v", err)
|
||||
}
|
||||
|
||||
ginkgo.By("Waiting for inspector pod to be running...")
|
||||
err = e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, inspectorPod)
|
||||
if err != nil {
|
||||
framework.Failf("inspector pod failed to start: %v", err)
|
||||
}
|
||||
|
||||
ginkgo.By("Fetching and parsing certificate from pod logs...")
|
||||
logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, namespace, inspectorPod.Name, inspectorPod.Spec.Containers[0].Name)
|
||||
if err != nil {
|
||||
framework.Failf("failed to get pod logs: %v", err)
|
||||
}
|
||||
certs, err := certutil.ParseCertsPEM([]byte(logs))
|
||||
if err != nil {
|
||||
framework.Failf("failed to parse certificate chain from PEM block: %v", err)
|
||||
}
|
||||
gomega.Expect(certs).ToNot(gomega.BeEmpty(), "should parse at least one certificate from the bundle")
|
||||
|
||||
cert := certs[0]
|
||||
foundSPIFFEURI := false
|
||||
for _, uri := range cert.URIs {
|
||||
if uri.Scheme == "spiffe" {
|
||||
foundSPIFFEURI = true
|
||||
ginkgo.By("Found SPIFFE URI: " + uri.String())
|
||||
gomega.Expect(uri.Path).To(gomega.Equal(customPath), "SPIFFE URI path should match the custom path from UserAnnotations")
|
||||
break
|
||||
}
|
||||
}
|
||||
gomega.Expect(foundSPIFFEURI).To(gomega.BeTrueBecause("should find a SPIFFE URI in the certificate's SANs"))
|
||||
})
|
||||
|
||||
ginkgo.Describe("MaxExpirationSeconds validations", func() {
|
||||
|
||||
ginkgo.It("should issue certificate with default life time (24h) when MaxExpirationSeconds is not set", func(ctx context.Context) {
|
||||
namespace := f.Namespace.Name
|
||||
ginkgo.By("Using namespace: " + namespace)
|
||||
|
||||
// Create pod without MaxExpirationSeconds
|
||||
testPod := createInspectorPod(namespace, "default-duration-pod", nil, nil)
|
||||
ginkgo.By("Creating pod without MaxExpirationSeconds...")
|
||||
createdPod, err := f.ClientSet.CoreV1().Pods(namespace).Create(ctx, testPod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
framework.Failf("failed to create pod: %v", err)
|
||||
}
|
||||
|
||||
ginkgo.By("Waiting for pod to be running...")
|
||||
err = e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, createdPod)
|
||||
if err != nil {
|
||||
framework.Failf("Pod failed to start: %v", err)
|
||||
}
|
||||
|
||||
ginkgo.By("Fetching certificate and verifying duration...")
|
||||
logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, namespace, createdPod.Name, createdPod.Spec.Containers[0].Name)
|
||||
if err != nil {
|
||||
framework.Failf("failed to get pod logs: %v", err)
|
||||
}
|
||||
certs, err := certutil.ParseCertsPEM([]byte(logs))
|
||||
if err != nil {
|
||||
framework.Failf("failed to parse certificate: %v", err)
|
||||
}
|
||||
gomega.Expect(certs).ToNot(gomega.BeEmpty())
|
||||
cert := certs[0]
|
||||
lifeTime := cert.NotAfter.Sub(cert.NotBefore)
|
||||
expectedDuration := 24 * time.Hour // Default from signer code
|
||||
ginkgo.By(fmt.Sprintf("Verifying certificate duration %v is close to %v", lifeTime, expectedDuration))
|
||||
gomega.Expect(lifeTime).To(gomega.BeNumerically("==", expectedDuration), "Certificate duration should be 24 hours")
|
||||
})
|
||||
|
||||
ginkgo.It("should issue certificate with specified duration (1h) when MaxExpirationSeconds is set", func(ctx context.Context) {
|
||||
namespace := f.Namespace.Name
|
||||
ginkgo.By("Using namespace: " + namespace)
|
||||
|
||||
// Create pod requesting 1 hour
|
||||
requestedSeconds := int32(3600)
|
||||
testPod := createInspectorPod(namespace, "one-hour-duration-pod", nil, &requestedSeconds)
|
||||
ginkgo.By("Creating pod requesting 1 hour MaxExpirationSeconds...")
|
||||
createdPod, err := f.ClientSet.CoreV1().Pods(namespace).Create(ctx, testPod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
framework.Failf("failed to create pod: %v", err)
|
||||
}
|
||||
|
||||
ginkgo.By("Waiting for pod to be running...")
|
||||
err = e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, createdPod)
|
||||
if err != nil {
|
||||
framework.Failf("Pod failed to start: %v", err)
|
||||
}
|
||||
|
||||
ginkgo.By("Fetching certificate and verifying duration...")
|
||||
logs, err := e2epod.GetPodLogs(ctx, f.ClientSet, namespace, createdPod.Name, createdPod.Spec.Containers[0].Name)
|
||||
if err != nil {
|
||||
framework.Failf("failed to get pod logs: %v", err)
|
||||
}
|
||||
certs, err := certutil.ParseCertsPEM([]byte(logs))
|
||||
if err != nil {
|
||||
framework.Failf("failed to parse certificate: %v", err)
|
||||
}
|
||||
gomega.Expect(certs).ToNot(gomega.BeEmpty())
|
||||
cert := certs[0]
|
||||
lifeTime := cert.NotAfter.Sub(cert.NotBefore)
|
||||
expectedDuration := time.Duration(requestedSeconds) * time.Second
|
||||
ginkgo.By(fmt.Sprintf("Verifying certificate duration %v is close to %v", lifeTime, expectedDuration))
|
||||
gomega.Expect(lifeTime).To(gomega.BeNumerically("==", expectedDuration), "Certificate duration should be 1 hour")
|
||||
})
|
||||
|
||||
ginkgo.It("should fail pod startup when MaxExpirationSeconds exceeds maximum (91d)", func(ctx context.Context) {
|
||||
namespace := f.Namespace.Name
|
||||
ginkgo.By("Using namespace: " + namespace)
|
||||
|
||||
// Exceeds 91 days
|
||||
tooLongSeconds := int32((91 * 24 * 60 * 60) + 1)
|
||||
testPod := createInspectorPod(namespace, "too-long-duration-pod", nil, &tooLongSeconds)
|
||||
ginkgo.By("Creating pod requesting >91d MaxExpirationSeconds...")
|
||||
_, err := f.ClientSet.CoreV1().Pods(namespace).Create(ctx, testPod, metav1.CreateOptions{})
|
||||
if err == nil {
|
||||
framework.Fail("Error message does not match the expected.")
|
||||
}
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "if provided, maxExpirationSeconds must be <= 7862400") {
|
||||
framework.Failf("failed to create pod: %v", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
ginkgo.It("should fail pod startup when MaxExpirationSeconds is less than minimum (1h)", func(ctx context.Context) {
|
||||
namespace := f.Namespace.Name
|
||||
ginkgo.By("Using namespace: " + namespace)
|
||||
|
||||
// Less than 1 hour
|
||||
tooShortSeconds := int32(3599)
|
||||
testPod := createInspectorPod(namespace, "too-short-duration-pod", nil, &tooShortSeconds)
|
||||
ginkgo.By("Creating pod requesting <1h MaxExpirationSeconds...")
|
||||
_, err := f.ClientSet.CoreV1().Pods(namespace).Create(ctx, testPod, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "if provided, maxExpirationSeconds must be >= 3600") {
|
||||
framework.Fail("Error message does not match the expected")
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
func generateContainerSecurityContext() *v1.SecurityContext {
|
||||
desiredPSALevel := admissionapi.LevelRestricted
|
||||
desiredVersion := admissionapi.GetAPIVersion()
|
||||
minimalPod, err := admissiontest.GetMinimalValidPod(desiredPSALevel, desiredVersion)
|
||||
if err != nil {
|
||||
framework.Failf("failed to get minimal valid pod: %v", err)
|
||||
}
|
||||
return minimalPod.Spec.Containers[0].SecurityContext
|
||||
}
|
||||
|
||||
// createServerObjects creates the Deployment and Service objects for the mTLS server.
|
||||
func createServerObjects(namespace string, securityContext *v1.SecurityContext) (*appsv1.Deployment, *v1.Service) {
|
||||
replicas := int32(1)
|
||||
serverLabels := map[string]string{"app": "server"}
|
||||
|
||||
signerNameVar := spiffeSignerName
|
||||
serverDeployment := &appsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "server",
|
||||
Namespace: namespace,
|
||||
Labels: serverLabels,
|
||||
},
|
||||
Spec: appsv1.DeploymentSpec{
|
||||
Replicas: &replicas,
|
||||
Selector: &metav1.LabelSelector{MatchLabels: serverLabels},
|
||||
Template: v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{Labels: serverLabels},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "server",
|
||||
Image: imageutils.GetE2EImage(imageutils.Agnhost), // Use agnhost image >= 2.59
|
||||
Args: []string{
|
||||
"mtlsserver",
|
||||
"--listen=0.0.0.0:443",
|
||||
"--server-creds=/run/tls-config/spiffe-cred-bundle.pem",
|
||||
"--spiffe-trust-bundle=/run/tls-config/spiffe-trust-bundle.pem",
|
||||
},
|
||||
ImagePullPolicy: v1.PullAlways,
|
||||
SecurityContext: securityContext,
|
||||
VolumeMounts: []v1.VolumeMount{{Name: "tls-config", MountPath: "/run/tls-config"}},
|
||||
},
|
||||
},
|
||||
Volumes: []v1.Volume{{
|
||||
Name: "tls-config",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Projected: &v1.ProjectedVolumeSource{
|
||||
Sources: []v1.VolumeProjection{
|
||||
{
|
||||
PodCertificate: &v1.PodCertificateProjection{
|
||||
SignerName: spiffeSignerName,
|
||||
CredentialBundlePath: "spiffe-cred-bundle.pem",
|
||||
KeyType: "ECDSAP256",
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterTrustBundle: &v1.ClusterTrustBundleProjection{
|
||||
SignerName: &signerNameVar,
|
||||
LabelSelector: &metav1.LabelSelector{},
|
||||
Path: "spiffe-trust-bundle.pem",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}},
|
||||
RestartPolicy: v1.RestartPolicyAlways,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
serverService := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "server", Namespace: namespace},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
Ports: []v1.ServicePort{{Name: "https", Port: 443}},
|
||||
Selector: serverLabels,
|
||||
},
|
||||
}
|
||||
|
||||
return serverDeployment, serverService
|
||||
}
|
||||
|
||||
// createClientObjects creates the Deployment object for the mTLS client.
|
||||
func createClientObjects(namespace string, securityContext *v1.SecurityContext) *appsv1.Deployment {
|
||||
replicas := int32(1)
|
||||
clientLabels := map[string]string{"app": "client"}
|
||||
fetchURL := "https://server." + namespace + ".svc/spiffe-echo"
|
||||
signerNameVar := spiffeSignerName
|
||||
|
||||
return &appsv1.Deployment{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "client",
|
||||
Namespace: namespace,
|
||||
Labels: clientLabels,
|
||||
},
|
||||
Spec: appsv1.DeploymentSpec{
|
||||
Replicas: &replicas,
|
||||
Selector: &metav1.LabelSelector{MatchLabels: clientLabels},
|
||||
Template: v1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{Labels: clientLabels},
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyAlways,
|
||||
Containers: []v1.Container{{
|
||||
Name: "client",
|
||||
Image: imageutils.GetE2EImage(imageutils.Agnhost), // Use agnhost image >= 2.59
|
||||
Args: []string{
|
||||
"mtlsclient",
|
||||
"--fetch-url=" + fetchURL,
|
||||
"--server-trust-bundle=/run/tls-config/spiffe-trust-bundle.pem",
|
||||
"--client-cred-bundle=/run/tls-config/spiffe-cred-bundle.pem",
|
||||
},
|
||||
ImagePullPolicy: v1.PullAlways,
|
||||
SecurityContext: securityContext,
|
||||
VolumeMounts: []v1.VolumeMount{{Name: "tls-config", MountPath: "/run/tls-config"}},
|
||||
}},
|
||||
Volumes: []v1.Volume{{
|
||||
Name: "tls-config",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Projected: &v1.ProjectedVolumeSource{
|
||||
Sources: []v1.VolumeProjection{
|
||||
{
|
||||
ClusterTrustBundle: &v1.ClusterTrustBundleProjection{
|
||||
SignerName: &signerNameVar,
|
||||
LabelSelector: &metav1.LabelSelector{},
|
||||
Path: "spiffe-trust-bundle.pem",
|
||||
},
|
||||
},
|
||||
{
|
||||
PodCertificate: &v1.PodCertificateProjection{
|
||||
SignerName: spiffeSignerName,
|
||||
CredentialBundlePath: "spiffe-cred-bundle.pem",
|
||||
KeyType: "ECDSAP256",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// createInspectorPod creates a pod designed to print its certificate and wait, for inspection purposes.
|
||||
func createInspectorPod(namespace, podName string, userAnnotations map[string]string, maxExpirationSeconds *int32) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "inspector",
|
||||
Image: imageutils.GetE2EImage(imageutils.Agnhost),
|
||||
// Use standard shell commands available in the agnhost base image
|
||||
Command: []string{"/bin/sh", "-c", "cat /run/tls-config/spiffe-cred-bundle.pem && sleep infinity"},
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{Name: "tls-config", MountPath: "/run/tls-config"},
|
||||
},
|
||||
},
|
||||
},
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "tls-config",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
Projected: &v1.ProjectedVolumeSource{
|
||||
Sources: []v1.VolumeProjection{
|
||||
{
|
||||
PodCertificate: &v1.PodCertificateProjection{
|
||||
SignerName: spiffeSignerName,
|
||||
CredentialBundlePath: "spiffe-cred-bundle.pem",
|
||||
KeyType: "ECDSAP256",
|
||||
UserAnnotations: userAnnotations,
|
||||
MaxExpirationSeconds: maxExpirationSeconds,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
RestartPolicy: v1.RestartPolicyNever,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -21,7 +21,6 @@ package podcertificatesigner
|
|||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
|
@ -51,8 +50,6 @@ func init() {
|
|||
}
|
||||
|
||||
func run(cmd *cobra.Command, args []string) error {
|
||||
flag.Set("logtostderr", "true")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
|
|
|
|||
|
|
@ -170,6 +170,7 @@ func TestPodCertificateManager(t *testing.T) {
|
|||
node1PodCertificateManager := podcertificate.NewIssuingManager(
|
||||
node1Client,
|
||||
node1PodManager,
|
||||
nil,
|
||||
node1PCRInformerFactory.Certificates().V1beta1().PodCertificateRequests(),
|
||||
node1NodeInformerFactory.Core().V1().Nodes(),
|
||||
types.NodeName(node1.ObjectMeta.Name),
|
||||
|
|
|
|||
|
|
@ -27,9 +27,11 @@ import (
|
|||
"fmt"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
certsv1beta1 "k8s.io/api/certificates/v1beta1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
|
|
@ -48,16 +50,15 @@ const SpiffePathKey = "spiffe/path-overriding"
|
|||
|
||||
// Controller is an in-memory signing controller for PodCertificateRequests.
|
||||
type Controller struct {
|
||||
clock clock.PassiveClock
|
||||
|
||||
clock clock.PassiveClock
|
||||
signerName string
|
||||
|
||||
kc kubernetes.Interface
|
||||
pcrInformer cache.SharedIndexInformer
|
||||
pcrQueue workqueue.TypedRateLimitingInterface[string]
|
||||
|
||||
caKeys []crypto.PrivateKey
|
||||
caCerts [][]byte
|
||||
pcrLister certlistersv1beta1.PodCertificateRequestLister
|
||||
caKeys []crypto.PrivateKey
|
||||
caCerts [][]byte
|
||||
}
|
||||
|
||||
// New creates a new Controller.
|
||||
|
|
@ -74,6 +75,7 @@ func New(clock clock.PassiveClock, signerName string, caKeys []crypto.PrivateKey
|
|||
kc: kc,
|
||||
pcrInformer: pcrInformer,
|
||||
pcrQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
|
||||
pcrLister: certlistersv1beta1.NewPodCertificateRequestLister(pcrInformer.GetIndexer()),
|
||||
caKeys: caKeys,
|
||||
caCerts: caCerts,
|
||||
}
|
||||
|
|
@ -107,15 +109,67 @@ func New(clock clock.PassiveClock, signerName string, caKeys []crypto.PrivateKey
|
|||
|
||||
func (c *Controller) Run(ctx context.Context) {
|
||||
defer c.pcrQueue.ShutDown()
|
||||
prefix := strings.Replace(c.signerName, "/", ":", 1)
|
||||
ctbName := prefix + ":primary-bundle"
|
||||
defer func() {
|
||||
klog.Infof("Deleting ClusterTrustBundle %s", ctbName)
|
||||
err := c.kc.CertificatesV1beta1().ClusterTrustBundles().Delete(context.Background(), ctbName, metav1.DeleteOptions{})
|
||||
if err != nil && !k8serrors.IsNotFound(err) {
|
||||
klog.Errorf("Failed to delete ClusterTrustBundle %s: %v", ctbName, err)
|
||||
}
|
||||
}()
|
||||
|
||||
go c.pcrInformer.Run(ctx.Done())
|
||||
if !cache.WaitForCacheSync(ctx.Done(), c.pcrInformer.HasSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
|
||||
go wait.JitterUntilWithContext(ctx, c.ensureTrustBundle, 1*time.Minute, 1.0, true)
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (c *Controller) ensureTrustBundle(ctx context.Context) {
|
||||
// Create a ClusterTrustBundle with the signer's CA.
|
||||
prefix := strings.Replace(c.signerName, "/", ":", 1)
|
||||
ctbName := prefix + ":primary-bundle"
|
||||
caCertPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.caCerts[0]})
|
||||
wantCTB := &certsv1beta1.ClusterTrustBundle{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: ctbName},
|
||||
Spec: certsv1beta1.ClusterTrustBundleSpec{
|
||||
SignerName: c.signerName,
|
||||
TrustBundle: string(caCertPEM),
|
||||
},
|
||||
}
|
||||
|
||||
klog.Infof("Getting ClusterTrustBundle %s", ctbName)
|
||||
ctb, err := c.kc.CertificatesV1beta1().ClusterTrustBundles().Get(ctx, wantCTB.ObjectMeta.Name, metav1.GetOptions{})
|
||||
if k8serrors.IsNotFound(err) {
|
||||
_, err := c.kc.CertificatesV1beta1().ClusterTrustBundles().Create(ctx, wantCTB, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create ClusterTrustBundle %s: %v", ctbName, err)
|
||||
return
|
||||
}
|
||||
return
|
||||
} else if err != nil {
|
||||
klog.Errorf("Failed to get ClusterTrustBundle %s: %v", ctbName, err)
|
||||
return
|
||||
}
|
||||
|
||||
if apiequality.Semantic.DeepEqual(wantCTB.Spec, ctb.Spec) {
|
||||
klog.Info("ClusterTrustBundle already in correct state")
|
||||
return
|
||||
}
|
||||
|
||||
ctb = ctb.DeepCopy()
|
||||
ctb.Spec = wantCTB.Spec
|
||||
|
||||
_, err = c.kc.CertificatesV1beta1().ClusterTrustBundles().Update(ctx, ctb, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to update ClusterTrustBundle %s: %v", ctbName, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) runWorker(ctx context.Context) {
|
||||
for c.processNextWorkItem(ctx) {
|
||||
}
|
||||
|
|
@ -136,7 +190,7 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
pcr, err := certlistersv1beta1.NewPodCertificateRequestLister(c.pcrInformer.GetIndexer()).PodCertificateRequests(namespace).Get(name)
|
||||
pcr, err := c.pcrLister.PodCertificateRequests(namespace).Get(name)
|
||||
if k8serrors.IsNotFound(err) {
|
||||
c.pcrQueue.Forget(key)
|
||||
return true
|
||||
|
|
@ -204,12 +258,17 @@ func (c *Controller) handlePCR(ctx context.Context, pcr *certsv1beta1.PodCertifi
|
|||
notBefore := c.clock.Now().Add(-2 * time.Minute)
|
||||
notAfter := notBefore.Add(lifetime)
|
||||
beginRefreshAt := notAfter.Add(-30 * time.Minute)
|
||||
// Construct DNS names
|
||||
dnsNames := []string{
|
||||
fmt.Sprintf("server.%s.svc", pcr.ObjectMeta.Namespace),
|
||||
}
|
||||
template := &x509.Certificate{
|
||||
URIs: []*url.URL{spiffeURI},
|
||||
NotBefore: notBefore,
|
||||
NotAfter: notAfter,
|
||||
KeyUsage: x509.KeyUsageDigitalSignature,
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
|
||||
DNSNames: dnsNames,
|
||||
}
|
||||
|
||||
signingCert, err := x509.ParseCertificate(c.caCerts[len(c.caCerts)-1])
|
||||
|
|
@ -261,7 +320,6 @@ func (c *Controller) handlePCR(ctx context.Context, pcr *certsv1beta1.PodCertifi
|
|||
if err != nil {
|
||||
return fmt.Errorf("while updating PodCertificateRequest: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -209,7 +209,7 @@ const (
|
|||
func initImageConfigs(list RegistryList) (map[ImageID]Config, map[ImageID]Config) {
|
||||
configs := map[ImageID]Config{}
|
||||
configs[AgnhostPrev] = Config{list.PromoterE2eRegistry, "agnhost", "2.55"}
|
||||
configs[Agnhost] = Config{list.PromoterE2eRegistry, "agnhost", "2.57"}
|
||||
configs[Agnhost] = Config{list.PromoterE2eRegistry, "agnhost", "2.59"}
|
||||
configs[AgnhostPrivate] = Config{list.PrivateRegistry, "agnhost", "2.6"}
|
||||
configs[APIServer] = Config{list.PromoterE2eRegistry, "sample-apiserver", "1.29.2"}
|
||||
configs[AppArmorLoader] = Config{list.PromoterE2eRegistry, "apparmor-loader", "1.4"}
|
||||
|
|
|
|||
Loading…
Reference in a new issue