diff --git a/internal/controller/controller.go b/internal/controller/controller.go index c80f063..d82ff21 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "strings" + "sync/atomic" "time" "github.com/github/deployment-tracker/internal/metadata" @@ -47,7 +48,8 @@ type ttlCache interface { } type deploymentRecordPoster interface { - PostOne(ctx context.Context, record *deploymentrecord.DeploymentRecord) error + PostOne(ctx context.Context, record *deploymentrecord.Record) error + PostCluster(ctx context.Context, records []*deploymentrecord.Record, cluster string) ([]byte, error) } type podMetadataAggregator interface { @@ -87,6 +89,7 @@ type Controller struct { // informerSyncTimeout is the maximum time allowed for all informers to sync // and prevents sync from hanging indefinitely. informerSyncTimeout time.Duration + syncing atomic.Bool } // New creates a new deployment tracker controller. @@ -147,10 +150,16 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato unknownArtifacts: amcache.NewExpiring(), informerSyncTimeout: informerSyncTimeoutDuration, } + cntrl.syncing.Store(true) // Add event handlers to the informer _, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { + // Skip adding sync events + if cntrl.syncing.Load() { + return + } + pod, ok := obj.(*corev1.Pod) if !ok { slog.Error("Invalid object returned", @@ -314,6 +323,12 @@ func (c *Controller) Run(ctx context.Context, workers int) error { return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions") } } + c.syncing.Store(false) + syncClusterPods := c.podInformer.GetIndexer().List() + err := c.processSyncEvents(ctx, syncClusterPods) + if err != nil { + return fmt.Errorf("sync events failed: %w", err) + } slog.Info("Starting workers", "count", workers, diff --git a/internal/controller/controller_integration_test.go b/internal/controller/controller_integration_test.go index fce91ed..066f020 100644 --- a/internal/controller/controller_integration_test.go +++ b/internal/controller/controller_integration_test.go @@ -26,19 +26,23 @@ import ( type mockRecordPoster struct { mu sync.Mutex - records []*deploymentrecord.DeploymentRecord + records []*deploymentrecord.Record err error // to simulate failures } -func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.DeploymentRecord) error { +func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.Record) error { m.mu.Lock() defer m.mu.Unlock() m.records = append(m.records, record) return m.err } +func (m *mockRecordPoster) PostCluster(_ context.Context, _ []*deploymentrecord.Record, _ string) ([]byte, error) { + return nil, nil +} + // Helper that allows tests to read captured records safely. -func (m *mockRecordPoster) getRecords() []*deploymentrecord.DeploymentRecord { +func (m *mockRecordPoster) getRecords() []*deploymentrecord.Record { m.mu.Lock() defer m.mu.Unlock() return slices.Clone(m.records) diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 2bba0f9..935fe3c 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/github/deployment-tracker/internal/metadata" "github.com/github/deployment-tracker/internal/workload" "github.com/github/deployment-tracker/pkg/deploymentrecord" "github.com/stretchr/testify/assert" @@ -23,35 +24,62 @@ import ( // mockPoster records all PostOne calls and returns a configurable error. type mockPoster struct { - mu sync.Mutex - calls int - lastErr error + mu sync.Mutex + calls int + clusterCalls int + clusterRecordCount int + lastErr error + clusterResp []byte + clusterErr error } -func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.DeploymentRecord) error { +func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.Record) error { m.mu.Lock() defer m.mu.Unlock() m.calls++ return m.lastErr } -func (m *mockPoster) getCalls() int { +func (m *mockPoster) PostCluster(_ context.Context, records []*deploymentrecord.Record, _ string) ([]byte, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.clusterCalls++ + m.clusterRecordCount = len(records) + return m.clusterResp, m.clusterErr +} + +func (m *mockPoster) getPostOneCalls() int { m.mu.Lock() defer m.mu.Unlock() return m.calls } +func (m *mockPoster) getPostClusterCalls() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.clusterCalls +} + // mockResolver is a test double for the workloadResolver interface. -type mockResolver struct{} +type mockResolver struct { + name string +} -func (*mockResolver) Resolve(_ *corev1.Pod) workload.Identity { - return workload.Identity{} +func (m *mockResolver) Resolve(_ *corev1.Pod) workload.Identity { + return workload.Identity{Name: m.name} } func (*mockResolver) IsActive(_ string, _ workload.Identity) bool { return false } +// mockMetadataAggregator is a test double for the podMetadataAggregator interface. +type mockMetadataAggregator struct{} + +func (*mockMetadataAggregator) BuildAggregatePodMetadata(_ context.Context, _ *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata { + return nil +} + // newTestController creates a minimal Controller suitable for unit-testing // recordContainer without a real Kubernetes cluster. func newTestController(poster *mockPoster) *Controller { @@ -64,6 +92,7 @@ func newTestController(poster *mockPoster) *Controller { Cluster: "test", }, workloadResolver: &mockResolver{}, + metadataAggregator: &mockMetadataAggregator{}, observedDeployments: amcache.NewExpiring(), unknownArtifacts: amcache.NewExpiring(), } diff --git a/internal/controller/reporting.go b/internal/controller/reporting.go index 9d95b9a..e9c0c72 100644 --- a/internal/controller/reporting.go +++ b/internal/controller/reporting.go @@ -2,6 +2,7 @@ package controller import ( "context" + "encoding/json" "errors" "fmt" "log/slog" @@ -14,7 +15,6 @@ import ( "github.com/github/deployment-tracker/pkg/deploymentrecord" "github.com/github/deployment-tracker/pkg/dtmetrics" "github.com/github/deployment-tracker/pkg/ociutil" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -92,14 +92,14 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { return nil } - var lastErr error - // Gather aggregate metadata for adds/updates var aggPodMetadata *metadata.AggregatePodMetadata if event.EventType != EventDeleted { aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) } + var lastErr error + // Record info for each container in the pod for _, container := range pod.Spec.Containers { if err := c.recordContainer(ctx, pod, container, event.EventType, wl.Name, aggPodMetadata); err != nil { @@ -117,99 +117,177 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { return lastErr } -// recordContainer records a single container's deployment info. -func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) error { - var cacheKey string +func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []any) error { + syncRecords := c.makeSyncRecords(ctx, syncClusterPods) + if len(syncRecords) == 0 { + slog.Info("No sync records to post") + return nil + } - status := deploymentrecord.StatusDeployed - if eventType == EventDeleted { - status = deploymentrecord.StatusDecommissioned + respBody, err := c.apiClient.PostCluster(ctx, syncRecords, c.cfg.Cluster) + var clusterNoRepositoriesError *deploymentrecord.ClusterNoRepositoriesError + if err != nil { + if errors.As(err, &clusterNoRepositoriesError) { + slog.Info("Cluster sync found no creatable records", + "org", c.cfg.Organization, + ) + return nil + } + slog.Error("Failed to post sync cluster records", + "error", err, + "record_count", len(syncRecords), + ) + return fmt.Errorf("failed to post sync cluster records: %w", err) + } + var deploymentRecords deploymentrecord.RecordsClusterResp + err = json.Unmarshal(respBody, &deploymentRecords) + if err != nil { + slog.Error("Failed to unmarshall response", + "error", err, + "record_count", len(syncRecords), + ) + return nil } + slog.Info("Successfully posted sync cluster records", + "created", len(deploymentRecords.DeploymentRecords), + "errors", len(deploymentRecords.Errors), + ) - dn := getARDeploymentName(pod, container, c.cfg.Template, workloadName) - digest := getContainerDigest(pod, container.Name) + c.fillCaches(deploymentRecords) + return nil +} - if dn == "" || digest == "" { - slog.Debug("Skipping container: missing deployment name or digest", - "namespace", pod.Namespace, - "pod", pod.Name, - "container", container.Name, - "deployment_name", dn, - "has_digest", digest != "", - ) +func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []any) []*deploymentrecord.Record { + seenSyncRecords := make(map[string]bool) + var syncRecords []*deploymentrecord.Record + for _, p := range syncClusterPods { + pod, ok := p.(*corev1.Pod) + if !ok { + slog.Error("Invalid object type in sync cluster pod list") + continue + } + + if pod.Status.Phase != corev1.PodRunning || !workload.HasSupportedOwner(pod) { + continue + } + + // Resolve the workload name for the deployment record. + wl := c.workloadResolver.Resolve(pod) + if wl.Name == "" { + slog.Debug("Could not resolve workload name for sync pod, skipping", + "namespace", pod.Namespace, + "pod", pod.Name, + ) + continue + } + + allContainers := make([]corev1.Container, 0, len(pod.Spec.Containers)+len(pod.Spec.InitContainers)) + allContainers = append(allContainers, pod.Spec.Containers...) + allContainers = append(allContainers, pod.Spec.InitContainers...) + + // Filter out containers already in syncRecords + var newContainers []corev1.Container + for _, container := range allContainers { + dn := getARDeploymentName(pod, container, c.cfg.Template, wl.Name) + digest := getContainerDigest(pod, container.Name) + if dn == "" || digest == "" { + continue + } + key := getCacheKey(EventCreated, dn, digest) + if seenSyncRecords[key] { + continue + } + seenSyncRecords[key] = true + newContainers = append(newContainers, container) + } + + if len(newContainers) == 0 { + continue + } + + // Only gather aggregate metadata if there are new containers + aggPodMetadata := c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) + + for _, container := range newContainers { + record := c.buildRecord(pod, container, EventCreated, wl.Name, aggPodMetadata) + if record != nil { + syncRecords = append(syncRecords, record) + } + } + } + + return syncRecords +} + +func (c *Controller) fillCaches(deploymentRecords deploymentrecord.RecordsClusterResp) { + slog.Info("Filling caches after posting sync cluster records") + // Fill observedDeployments cache with successful digests + for _, r := range deploymentRecords.DeploymentRecords { + cacheKey := getCacheKey(EventCreated, r.DeploymentName, r.Digest) + c.observedDeployments.Set(cacheKey, true, 2*time.Minute) + } + + // Fill unknownArtifacts cache with unknown digests + for _, r := range deploymentRecords.Errors { + if r.Cause == "not_found" { + c.unknownArtifacts.Set(r.Digest, true, unknownArtifactTTL) + } + } +} + +// recordContainer records a single container's deployment info. +func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) error { + // Create deployment record + record := c.buildRecord(pod, container, eventType, workloadName, aggPodMetadata) + if record == nil { + slog.Debug("Unable to build record for container, skipping", + "deployment_name", getARDeploymentName(pod, container, c.cfg.Template, workloadName)) return nil } // Check if we've already recorded this deployment - switch status { + var cacheKey string + switch record.Status { case deploymentrecord.StatusDeployed: - cacheKey = getCacheKey(EventCreated, dn, digest) + cacheKey = getCacheKey(EventCreated, record.DeploymentName, record.Digest) if _, exists := c.observedDeployments.Get(cacheKey); exists { slog.Debug("Deployment already observed, skipping post", - "deployment_name", dn, - "digest", digest, + "deployment_name", record.DeploymentName, + "digest", record.Digest, ) return nil } case deploymentrecord.StatusDecommissioned: - cacheKey = getCacheKey(EventDeleted, dn, digest) + cacheKey = getCacheKey(EventDeleted, record.DeploymentName, record.Digest) if _, exists := c.observedDeployments.Get(cacheKey); exists { slog.Debug("Deployment already deleted, skipping post", - "deployment_name", dn, - "digest", digest, + "deployment_name", record.DeploymentName, + "digest", record.Digest, ) return nil } default: - return fmt.Errorf("invalid status: %s", status) + return fmt.Errorf("invalid status: %s", record.Status) } // Check if this artifact was previously unknown (404 from the API) - if _, exists := c.unknownArtifacts.Get(digest); exists { + if _, exists := c.unknownArtifacts.Get(record.Digest); exists { dtmetrics.PostDeploymentRecordUnknownArtifactCacheHit.Inc() slog.Debug("Artifact previously returned 404, skipping post", - "deployment_name", dn, - "digest", digest, + "deployment_name", record.DeploymentName, + "digest", record.Digest, ) return nil } - // Extract image name and tag - imageName, version := ociutil.ExtractName(container.Image) - - // Format runtime risks and tags - var runtimeRisks []deploymentrecord.RuntimeRisk - var tags map[string]string - if aggPodMetadata != nil { - for risk := range aggPodMetadata.RuntimeRisks { - runtimeRisks = append(runtimeRisks, risk) - } - slices.Sort(runtimeRisks) - tags = aggPodMetadata.Tags - } - - // Create deployment record - record := deploymentrecord.NewDeploymentRecord( - imageName, - digest, - version, - c.cfg.LogicalEnvironment, - c.cfg.PhysicalEnvironment, - c.cfg.Cluster, - status, - dn, - runtimeRisks, - tags, - ) - if err := c.apiClient.PostOne(ctx, record); err != nil { // Return if no artifact is found and cache the digest var noArtifactErr *deploymentrecord.NoArtifactError if errors.As(err, &noArtifactErr) { - c.unknownArtifacts.Set(digest, true, unknownArtifactTTL) + c.unknownArtifacts.Set(record.Digest, true, unknownArtifactTTL) slog.Info("No artifact found, digest cached as unknown", - "deployment_name", dn, - "digest", digest, + "deployment_name", record.DeploymentName, + "digest", record.Digest, ) return nil } @@ -250,26 +328,75 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta ) // Update cache after successful post - switch status { + switch record.Status { case deploymentrecord.StatusDeployed: - cacheKey = getCacheKey(EventCreated, dn, digest) + cacheKey = getCacheKey(EventCreated, record.DeploymentName, record.Digest) c.observedDeployments.Set(cacheKey, true, 2*time.Minute) // If there was a previous delete event, remove that - cacheKey = getCacheKey(EventDeleted, dn, digest) + cacheKey = getCacheKey(EventDeleted, record.DeploymentName, record.Digest) c.observedDeployments.Delete(cacheKey) case deploymentrecord.StatusDecommissioned: - cacheKey = getCacheKey(EventDeleted, dn, digest) + cacheKey = getCacheKey(EventDeleted, record.DeploymentName, record.Digest) c.observedDeployments.Set(cacheKey, true, 2*time.Minute) // If there was a previous create event, remove that - cacheKey = getCacheKey(EventCreated, dn, digest) + cacheKey = getCacheKey(EventCreated, record.DeploymentName, record.Digest) c.observedDeployments.Delete(cacheKey) default: - return fmt.Errorf("invalid status: %s", status) + return fmt.Errorf("invalid status: %s", record.Status) } return nil } +func (c *Controller) buildRecord(pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) *deploymentrecord.Record { + dn := getARDeploymentName(pod, container, c.cfg.Template, workloadName) + digest := getContainerDigest(pod, container.Name) + + if dn == "" || digest == "" { + slog.Debug("Skipping container: missing deployment name or digest", + "namespace", pod.Namespace, + "pod", pod.Name, + "container", container.Name, + "deployment_name", dn, + "has_digest", digest != "", + ) + return nil + } + + status := deploymentrecord.StatusDeployed + if eventType == EventDeleted { + status = deploymentrecord.StatusDecommissioned + } + + // Extract image name and tag + imageName, version := ociutil.ExtractName(container.Image) + + // Format runtime risks and tags + var runtimeRisks []deploymentrecord.RuntimeRisk + var tags map[string]string + if aggPodMetadata != nil { + for risk := range aggPodMetadata.RuntimeRisks { + runtimeRisks = append(runtimeRisks, risk) + } + slices.Sort(runtimeRisks) + tags = aggPodMetadata.Tags + } + + // Create deployment record + return deploymentrecord.NewDeploymentRecord( + imageName, + digest, + version, + c.cfg.LogicalEnvironment, + c.cfg.PhysicalEnvironment, + c.cfg.Cluster, + status, + dn, + runtimeRisks, + tags, + ) +} + func getCacheKey(ev, dn, digest string) string { return ev + "||" + dn + "||" + digest } diff --git a/internal/controller/reporting_test.go b/internal/controller/reporting_test.go index b4073f2..ac8a379 100644 --- a/internal/controller/reporting_test.go +++ b/internal/controller/reporting_test.go @@ -2,12 +2,17 @@ package controller import ( "context" + "encoding/json" + "errors" + "fmt" "testing" "time" "github.com/github/deployment-tracker/pkg/deploymentrecord" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestRecordContainer_UnknownArtifactCachePopulatedOn404(t *testing.T) { @@ -22,7 +27,7 @@ func TestRecordContainer_UnknownArtifactCachePopulatedOn404(t *testing.T) { // First call should hit the API and get a 404 err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) + assert.Equal(t, 1, poster.getPostOneCalls()) // Digest should now be in the unknown artifacts cache _, exists := ctrl.unknownArtifacts.Get(digest) @@ -41,12 +46,12 @@ func TestRecordContainer_UnknownArtifactCacheSkipsAPICall(t *testing.T) { // First call — API returns 404, populates cache err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) + assert.Equal(t, 1, poster.getPostOneCalls()) // Second call — should be served from cache, no API call err = ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls(), "API should not be called for cached unknown artifact") + assert.Equal(t, 1, poster.getPostOneCalls(), "API should not be called for cached unknown artifact") } func TestRecordContainer_UnknownArtifactCacheAppliesToDecommission(t *testing.T) { @@ -61,12 +66,12 @@ func TestRecordContainer_UnknownArtifactCacheAppliesToDecommission(t *testing.T) // Deploy call — 404, populates cache err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) + assert.Equal(t, 1, poster.getPostOneCalls()) // Decommission call for same digest — should skip API err = ctrl.recordContainer(context.Background(), pod, container, EventDeleted, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls(), "decommission should also be skipped for cached unknown artifact") + assert.Equal(t, 1, poster.getPostOneCalls(), "decommission should also be skipped for cached unknown artifact") } func TestRecordContainer_UnknownArtifactCacheExpires(t *testing.T) { @@ -84,7 +89,7 @@ func TestRecordContainer_UnknownArtifactCacheExpires(t *testing.T) { // Immediately — should be cached err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 0, poster.getCalls(), "should skip API while cached") + assert.Equal(t, 0, poster.getPostOneCalls(), "should skip API while cached") // Wait for expiry time.Sleep(100 * time.Millisecond) @@ -92,7 +97,7 @@ func TestRecordContainer_UnknownArtifactCacheExpires(t *testing.T) { // After expiry — should call API again err = ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls(), "should call API after cache expiry") + assert.Equal(t, 1, poster.getPostOneCalls(), "should call API after cache expiry") } func TestRecordContainer_SuccessfulPostDoesNotPopulateUnknownCache(t *testing.T) { @@ -104,9 +109,158 @@ func TestRecordContainer_SuccessfulPostDoesNotPopulateUnknownCache(t *testing.T) err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) + assert.Equal(t, 1, poster.getPostOneCalls()) // Digest should NOT be in the unknown artifacts cache _, exists := ctrl.unknownArtifacts.Get(digest) assert.False(t, exists, "successful post should not cache digest as unknown") } + +func TestProcessSyncEvents_EmptyPodList(t *testing.T) { + t.Parallel() + poster := &mockPoster{} + ctrl := newTestController(poster) + + err := ctrl.processSyncEvents(context.Background(), []any{}) + require.NoError(t, err) + assert.Equal(t, 0, poster.getPostClusterCalls(), "PostCluster should not be called for empty pod list") +} + +func TestProcessSyncEvents_HappyPath(t *testing.T) { + t.Parallel() + digest := "sha256:abc123" + unknownDigest := "sha256:notfound999" + unauthorizedDigest := "sha256:unauthorized999" + clusterResp := deploymentrecord.RecordsClusterResp{ + TotalCount: 1, + DeploymentRecords: []*deploymentrecord.RecordResp{{ + Record: deploymentrecord.Record{ + BaseRecord: deploymentrecord.BaseRecord{ + DeploymentName: "default/test-deploy/app", + Digest: digest, + }, + }, + }}, + Errors: []*deploymentrecord.RecordErrorResp{ + { + Record: deploymentrecord.Record{ + BaseRecord: deploymentrecord.BaseRecord{ + Digest: unknownDigest, + }, + }, + Cause: "not_found", + }, + { + Record: deploymentrecord.Record{ + BaseRecord: deploymentrecord.BaseRecord{ + Digest: unauthorizedDigest, + }, + }, + Cause: "unauthorized", + }, + }, + } + respJSON, err := json.Marshal(clusterResp) + require.NoError(t, err) + + poster := &mockPoster{clusterResp: respJSON} + ctrl := newTestController(poster) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + + err = ctrl.processSyncEvents(context.Background(), []any{ + makeTestPod("app", "test-deploy-abc123", digest, "ReplicaSet"), + makeTestPod("unknown", "test-deploy-abc456", unknownDigest, "ReplicaSet"), + makeTestPod("unauthorized", "test-deploy-abc789", unauthorizedDigest, "Job"), + }) + require.NoError(t, err) + assert.Equal(t, 1, poster.getPostClusterCalls(), "PostCluster should be called once") + assert.Equal(t, 3, poster.clusterRecordCount, "PostCluster should receive 3 records") + + // Successful record should be in observedDeployments cache + cacheKey := getCacheKey(EventCreated, "default/test-deploy/app", digest) + _, exists := ctrl.observedDeployments.Get(cacheKey) + assert.True(t, exists, "successful record should populate observedDeployments cache") + + // not_found error should be in unknownArtifacts cache + _, exists = ctrl.unknownArtifacts.Get("sha256:notfound999") + assert.True(t, exists, "not_found error should populate unknownArtifacts cache") + + // unauthorized error should not be in unknownArtifacts cache + _, exists = ctrl.unknownArtifacts.Get("sha256:unauthorized999") + assert.False(t, exists, "unauthorized error should not populate unknownArtifacts cache") +} + +func TestProcessSyncEvents_DedupeContainers(t *testing.T) { + t.Parallel() + digest := "sha256:abc123" + poster := &mockPoster{} + ctrl := newTestController(poster) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + + pod := makeTestPod("app", "test-deploy-abc123", digest, "ReplicaSet") + + err := ctrl.processSyncEvents(context.Background(), []any{pod, pod}) + require.NoError(t, err) + assert.Equal(t, 1, poster.getPostClusterCalls(), "PostCluster should be called once") + assert.Equal(t, 1, poster.clusterRecordCount, "PostCluster should receive only 1 record") +} + +func TestProcessSyncEvents_PostCluster404(t *testing.T) { + t.Parallel() + poster := &mockPoster{ + clusterErr: &deploymentrecord.ClusterNoRepositoriesError{}, + } + ctrl := newTestController(poster) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + pod := makeTestPod("app", "test-deploy-abc123", "sha256:abc123", "ReplicaSet") + + err := ctrl.processSyncEvents(context.Background(), []any{pod}) + require.NoError(t, err, "ClusterNoRepositoriesError should not propagate") + assert.Equal(t, 1, poster.getPostClusterCalls()) + + // Caches should remain empty since no response was processed + cacheKey := getCacheKey(EventCreated, "default/test-deploy/app", "sha256:abc123") + _, exists := ctrl.observedDeployments.Get(cacheKey) + assert.False(t, exists, "observedDeployments should not be populated on 404") +} + +func TestProcessSyncEvents_PostCluster500(t *testing.T) { + t.Parallel() + poster := &mockPoster{ + clusterErr: errors.New("server error"), + } + ctrl := newTestController(poster) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + pod := makeTestPod("app", "test-deploy-abc123", "sha256:abc123", "ReplicaSet") + + err := ctrl.processSyncEvents(context.Background(), []any{pod}) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to post sync cluster records") + assert.Equal(t, 1, poster.getPostClusterCalls()) +} + +func makeTestPod(containerName string, parentName string, digest string, parentKind string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{{ + Kind: parentKind, + Name: parentName, + }}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + Image: "nginx:latest", + }}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{{ + Name: containerName, + ImageID: fmt.Sprintf("docker-pullable://nginx@%s", digest), + }}, + }, + } +} diff --git a/pkg/deploymentrecord/client.go b/pkg/deploymentrecord/client.go index 05915f0..b5adc30 100644 --- a/pkg/deploymentrecord/client.go +++ b/pkg/deploymentrecord/client.go @@ -11,6 +11,7 @@ import ( "math" "math/rand/v2" "net/http" + "net/url" "regexp" "strconv" "strings" @@ -164,6 +165,20 @@ func (c *ClientError) Unwrap() error { return c.err } +// ClusterNoRepositoriesError represents a 404 response from the cluster endpoint +// indicating no repositories were found for the given cluster. +type ClusterNoRepositoriesError struct { + err error +} + +func (c *ClusterNoRepositoriesError) Error() string { + return fmt.Sprintf("cluster_no_repositories_error: %s", c.err.Error()) +} + +func (c *ClusterNoRepositoriesError) Unwrap() error { + return c.err +} + // NoArtifactError represents a 404 client response whose body indicates "no artifacts found". type NoArtifactError struct { err error @@ -182,41 +197,120 @@ func (n *NoArtifactError) Unwrap() error { // PostOne posts a single deployment record to the GitHub deployment // records API. -func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { +func (c *Client) PostOne(ctx context.Context, record *Record) error { if record == nil { return errors.New("record cannot be nil") } - url := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record", c.baseURL, c.org) + singleURL := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record", c.baseURL, c.org) body, err := buildRequestBody(record) if err != nil { return fmt.Errorf("failed to marshal record: %w", err) } - bodyReader := bytes.NewReader(body) + respBody, statusCode, lastErr := c.postWithRetry(ctx, singleURL, body) + + var clientErr *ClientError + switch { + case errors.As(lastErr, &clientErr): + dtmetrics.PostDeploymentRecordClientError.Inc() + slog.Warn("client error, aborting", + "status_code", statusCode, + "url", singleURL, + "resp_msg", string(respBody), + ) + return fmt.Errorf("client error: %w", lastErr) + case statusCode >= 200 && statusCode < 300: + dtmetrics.PostDeploymentRecordOk.Inc() + return nil + case statusCode == 404: + dtmetrics.PostDeploymentRecordUnknownArtifact.Inc() + slog.Debug("no artifact attestation found, no record created", + "status_code", statusCode, + "container_name", record.Name, + "resp_msg", string(respBody), + "digest", record.Digest, + ) + return &NoArtifactError{err: fmt.Errorf("no attestation found for %s", record.Digest)} + default: + dtmetrics.PostDeploymentRecordHardFail.Inc() + slog.Error("all retries exhausted", + "count", c.retries, + "error", lastErr, + "container_name", record.Name, + ) + return fmt.Errorf("all retries exhausted: %w", lastErr) + } +} + +// PostCluster sends a full cluster state of records to GitHub deployment +// records cluster API. +func (c *Client) PostCluster(ctx context.Context, records []*Record, cluster string) ([]byte, error) { + if len(records) == 0 { + slog.Debug("Records is empty, skipping") + return nil, nil + } + + clusterURL := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record/cluster/%s", c.baseURL, c.org, url.PathEscape(cluster)) + body, err := buildClusterRequestBody(records) + if err != nil { + return nil, fmt.Errorf("failed to marshal records: %w", err) + } + + respBody, statusCode, lastErr := c.postWithRetry(ctx, clusterURL, body) + + var clientErr *ClientError + switch { + case errors.As(lastErr, &clientErr): + dtmetrics.PostDeploymentRecordClientError.Inc() + slog.Warn("client error, aborting", + "status_code", statusCode, + "url", clusterURL, + "resp_msg", string(respBody), + ) + return nil, fmt.Errorf("client error: %w", lastErr) + case statusCode >= 200 && statusCode < 300: + dtmetrics.PostDeploymentRecordOk.Inc() + return respBody, nil + case statusCode == 404: + dtmetrics.PostDeploymentRecordUnknownArtifact.Inc() + return nil, &ClusterNoRepositoriesError{err: errors.New("no repositories found")} + default: + dtmetrics.PostDeploymentRecordHardFail.Inc() + slog.Error("all retries exhausted", + "count", c.retries, + "error", lastErr, + "cluster", cluster, + ) + return nil, fmt.Errorf("all retries exhausted: %w", lastErr) + } +} + +func (c *Client) postWithRetry(ctx context.Context, targetURL string, body []byte) ([]byte, int, error) { + bodyReader := bytes.NewReader(body) var lastErr error // The first attempt is not a retry! for attempt := range c.retries + 1 { - if err = waitForBackoff(ctx, attempt); err != nil { - return err + if err := waitForBackoff(ctx, attempt); err != nil { + return nil, 0, err } - if err = c.waitForServerRateLimit(ctx); err != nil { - return err + if err := c.waitForServerRateLimit(ctx); err != nil { + return nil, 0, err } - if err = c.requestThrottler.Wait(ctx); err != nil { - return fmt.Errorf("request throttler wait failed: %w", err) + if err := c.requestThrottler.Wait(ctx); err != nil { + return nil, 0, fmt.Errorf("request throttler wait failed: %w", err) } // Reset reader position for retries bodyReader.Reset(body) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bodyReader) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, bodyReader) if err != nil { - return fmt.Errorf("failed to create request: %w", err) + return nil, 0, fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/json") @@ -225,7 +319,7 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { // locking tok, err := c.transport.Token(ctx) if err != nil { - return fmt.Errorf("failed to get access token: %w", err) + return nil, 0, fmt.Errorf("failed to get access token: %w", err) } req.Header.Set("Authorization", "Bearer "+tok) } else if c.apiToken != "" { @@ -249,31 +343,16 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { continue } - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - // Drain and close response body to enable connection reuse - _, _ = io.Copy(io.Discard, resp.Body) - _ = resp.Body.Close() - dtmetrics.PostDeploymentRecordOk.Inc() - return nil - } - - // Drain and close response body to enable connection reuse by reading body for error logging - respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) - _, _ = io.Copy(io.Discard, resp.Body) + // Drain and close response body to enable connection reuse + respBody, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() switch { + case resp.StatusCode >= 200 && resp.StatusCode < 300: + return respBody, resp.StatusCode, nil case resp.StatusCode == 404: - // No artifact found - do not retry - dtmetrics.PostDeploymentRecordUnknownArtifact.Inc() - slog.Debug("no artifact attestation found, no record created", - "attempt", attempt, - "status_code", resp.StatusCode, - "container_name", record.Name, - "resp_msg", string(respBody), - "digest", record.Digest, - ) - return &NoArtifactError{err: fmt.Errorf("no attestation found for %s", record.Digest)} + // Not found - do not retry + return respBody, resp.StatusCode, nil case resp.StatusCode >= 400 && resp.StatusCode < 500: // Check headers that indicate rate limiting if resp.Header.Get("Retry-After") != "" || resp.Header.Get("X-Ratelimit-Remaining") == "0" { @@ -286,41 +365,27 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { "retry-after", resp.Header.Get("Retry-After"), "x-ratelimit-remaining", resp.Header.Get("X-Ratelimit-Remaining"), "retry_delay", retryDelay.Seconds(), - "container_name", record.Name, + "url", targetURL, "resp_msg", string(respBody), ) lastErr = fmt.Errorf("rate limited, attempt %d", attempt) continue } // Don't retry non rate limiting client errors - dtmetrics.PostDeploymentRecordClientError.Inc() - slog.Warn("client error, aborting", - "attempt", attempt, - "status_code", resp.StatusCode, - "container_name", record.Name, - "resp_msg", string(respBody), - ) - return &ClientError{err: fmt.Errorf("unexpected client err with status code %d", resp.StatusCode)} + return nil, resp.StatusCode, &ClientError{err: fmt.Errorf("unexpected client err with status code %d", resp.StatusCode)} default: // Retry with backoff dtmetrics.PostDeploymentRecordSoftFail.Inc() slog.Debug("retriable error", "attempt", attempt, "status_code", resp.StatusCode, - "container_name", record.Name, + "url", targetURL, "resp_msg", string(respBody), ) lastErr = fmt.Errorf("server error, attempt %d", attempt) } } - - dtmetrics.PostDeploymentRecordHardFail.Inc() - slog.Error("all retries exhausted", - "count", c.retries, - "error", lastErr, - "container_name", record.Name, - ) - return fmt.Errorf("all retries exhausted: %w", lastErr) + return nil, 0, lastErr } // waitForServerRateLimit blocks until the global server rate limit backoff has elapsed. @@ -400,13 +465,33 @@ func parseRateLimitDelay(resp *http.Response) time.Duration { // buildRequestBody adds return_records=false to a deployment record request body // which results in a minimal response payload. -func buildRequestBody(record *DeploymentRecord) ([]byte, error) { +func buildRequestBody(record *Record) ([]byte, error) { return json.Marshal(struct { - DeploymentRecord + Record ReturnRecords bool `json:"return_records"` }{ - DeploymentRecord: *record, - ReturnRecords: false, + Record: *record, + ReturnRecords: false, + }) +} + +// buildClusterRequestBody count the total records, builds ClusterRecordsBody, +// and returns []byte. +func buildClusterRequestBody(records []*Record) ([]byte, error) { + if len(records) == 0 { + return nil, nil + } + deploymentRecords := []BaseRecord{} + + for _, r := range records { + deploymentRecords = append(deploymentRecords, r.BaseRecord) + } + + return json.Marshal(ClusterRecordsBody{ + LogicalEnvironment: records[0].LogicalEnvironment, + PhysicalEnvironment: records[0].PhysicalEnvironment, + PartialSuccess: true, + Deployments: deploymentRecords, }) } diff --git a/pkg/deploymentrecord/client_test.go b/pkg/deploymentrecord/client_test.go index 9a2efb1..d602ac4 100644 --- a/pkg/deploymentrecord/client_test.go +++ b/pkg/deploymentrecord/client_test.go @@ -278,8 +278,8 @@ func TestValidOrgPattern(t *testing.T) { } } -// testRecord returns a minimal valid DeploymentRecord for testing. -func testRecord() *DeploymentRecord { +// testRecord returns a minimal valid Record for testing. +func testRecord() *Record { return NewDeploymentRecord( "ghcr.io/my-org/my-image", "sha256:abc123", @@ -309,7 +309,7 @@ func allCounters() []prometheus.Counter { func TestPostOne(t *testing.T) { tests := []struct { name string - record *DeploymentRecord + record *Record retries int handler http.HandlerFunc wantErr bool @@ -728,3 +728,167 @@ func TestPostOneRespectsRetryAfterAcrossGoroutines(t *testing.T) { } wg.Wait() } + +func TestPostCluster(t *testing.T) { + tests := []struct { + name string + records []*Record + handler http.HandlerFunc + wantErr bool + errType any + errContain string + wantBody bool + wantOk float64 + wantUnknownArtifact float64 + wantSoftFail float64 + wantHardFail float64 + wantClientError float64 + }{ + { + name: "empty records returns nil", + records: []*Record{}, + handler: func(_ http.ResponseWriter, _ *http.Request) { + t.Fatal("server should not be called with empty records") + }, + }, + { + name: "success on 207 returns body", + records: []*Record{testRecord()}, + handler: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusMultiStatus) + _, _ = w.Write([]byte(`{"total_count":1,"deployment_records":[],"errors":[]}`)) + }, + wantBody: true, + wantOk: 1, + }, + { + name: "404 returns ClusterNoRepositoriesError", + records: []*Record{testRecord()}, + handler: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNotFound) + }, + wantErr: true, + errType: &ClusterNoRepositoriesError{}, + wantUnknownArtifact: 1, + }, + { + name: "400 returns client error", + records: []*Record{testRecord()}, + handler: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte("bad request")) + }, + wantErr: true, + errContain: "client error", + wantClientError: 1, + }, + { + name: "500 retries exhausted returns error", + records: []*Record{testRecord()}, + handler: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }, + wantErr: true, + errContain: "all retries exhausted", + wantSoftFail: 1, + wantHardFail: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := httptest.NewServer(tt.handler) + t.Cleanup(srv.Close) + + client, err := NewClient(srv.URL, "test-org", WithRetries(0)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + + counters := allCounters() + snapshots := make([]float64, len(counters)) + for i, c := range counters { + snapshots[i] = testutil.ToFloat64(c) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + respBody, err := client.PostCluster(ctx, tt.records, "test-cluster") + + if tt.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + if tt.errType != nil { + switch tt.errType.(type) { + case *ClusterNoRepositoriesError: + var e *ClusterNoRepositoriesError + if !errors.As(err, &e) { + t.Errorf("expected ClusterNoRepositoriesError, got %T: %v", err, err) + } + default: + t.Fatalf("unexpected error type in test: %T", tt.errType) + } + } + if tt.errContain != "" && !strings.Contains(err.Error(), tt.errContain) { + t.Errorf("error %q should contain %q", err.Error(), tt.errContain) + } + } else if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if tt.wantBody && respBody == nil { + t.Error("expected non-nil response body") + } + + wantDeltas := []float64{ + tt.wantOk, + tt.wantUnknownArtifact, + 0, // rate limited + tt.wantSoftFail, + tt.wantHardFail, + tt.wantClientError, + } + names := []string{ + "PostDeploymentRecordOk", + "PostDeploymentRecordUnknownArtifact", + "PostDeploymentRecordRateLimited", + "PostDeploymentRecordSoftFail", + "PostDeploymentRecordHardFail", + "PostDeploymentRecordClientError", + } + for i, c := range counters { + got := testutil.ToFloat64(c) - snapshots[i] + if got != wantDeltas[i] { + t.Errorf("%s delta = %v, want %v", names[i], got, wantDeltas[i]) + } + } + }) + } +} + +func TestPostCluster_URLEscapesCluster(t *testing.T) { + var rawPath string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rawPath = r.URL.RawPath + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"total_count":0,"deployment_records":[]}`)) + })) + t.Cleanup(srv.Close) + + client, err := NewClient(srv.URL, "test-org", WithRetries(0)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + + _, err = client.PostCluster(context.Background(), []*Record{testRecord()}, "cluster/with spaces") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + wantSuffix := "/cluster/cluster%2Fwith%20spaces" + if !strings.HasSuffix(rawPath, wantSuffix) { + t.Errorf("raw path %q should end with %q", rawPath, wantSuffix) + } +} diff --git a/pkg/deploymentrecord/record.go b/pkg/deploymentrecord/record.go index 5ad140e..e8e6f6e 100644 --- a/pkg/deploymentrecord/record.go +++ b/pkg/deploymentrecord/record.go @@ -30,42 +30,81 @@ var validRuntimeRisks = map[RuntimeRisk]bool{ SensitiveData: true, } -// DeploymentRecord represents a deployment event record. -type DeploymentRecord struct { - Name string `json:"name"` - Digest string `json:"digest"` - Version string `json:"version,omitempty"` - LogicalEnvironment string `json:"logical_environment"` - PhysicalEnvironment string `json:"physical_environment"` - Cluster string `json:"cluster"` - Status string `json:"status"` - DeploymentName string `json:"deployment_name"` - RuntimeRisks []RuntimeRisk `json:"runtime_risks,omitempty"` - Tags map[string]string `json:"tags,omitempty"` +// BaseRecord represents a deployment record for the deployment record cluster endpoint. +type BaseRecord struct { + Name string `json:"name"` + Digest string `json:"digest"` + Version string `json:"version,omitempty"` + Status string `json:"status"` + DeploymentName string `json:"deployment_name"` + RuntimeRisks []RuntimeRisk `json:"runtime_risks,omitempty"` + Tags map[string]string `json:"tags,omitempty"` } -// NewDeploymentRecord creates a new DeploymentRecord with the given status. +// Record represents a deployment event record. +type Record struct { + BaseRecord + LogicalEnvironment string `json:"logical_environment"` + PhysicalEnvironment string `json:"physical_environment"` + Cluster string `json:"cluster"` +} + +// RecordResp represents the response of a created deployment record from the +// deployment record cluster endpoint. +type RecordResp struct { + Record + Created string `json:"created"` + UpdatedAt string `json:"updated_at"` + AttestationID int `json:"attestation_id"` +} + +// RecordErrorResp represents a failed deployment record from the +// deployment record cluster endpoint, including the cause of the failure. +type RecordErrorResp struct { + Record + Cause string `json:"cause"` +} + +// ClusterRecordsBody represents the post body for the deployment record cluster endpoint. +type ClusterRecordsBody struct { + LogicalEnvironment string `json:"logical_environment"` + PhysicalEnvironment string `json:"physical_environment"` + PartialSuccess bool `json:"partial_success"` + Deployments []BaseRecord `json:"deployments"` +} + +// RecordsClusterResp represents the response from the deployment record +// cluster endpoint, containing successfully created records and any errors. +type RecordsClusterResp struct { + TotalCount int `json:"total_count"` + DeploymentRecords []*RecordResp `json:"deployment_records"` + Errors []*RecordErrorResp `json:"errors,omitempty"` +} + +// NewDeploymentRecord creates a new Record with the given status. // Status must be either StatusDeployed or StatusDecommissioned. // //nolint:revive func NewDeploymentRecord(name, digest, version, logicalEnv, physicalEnv, - cluster, status, deploymentName string, runtimeRisks []RuntimeRisk, tags map[string]string) *DeploymentRecord { + cluster, status, deploymentName string, runtimeRisks []RuntimeRisk, tags map[string]string) *Record { // Validate status if status != StatusDeployed && status != StatusDecommissioned { status = StatusDeployed // default to deployed if invalid } - return &DeploymentRecord{ - Name: name, - Digest: digest, - Version: version, + return &Record{ LogicalEnvironment: logicalEnv, PhysicalEnvironment: physicalEnv, Cluster: cluster, - Status: status, - DeploymentName: deploymentName, - RuntimeRisks: runtimeRisks, - Tags: tags, + BaseRecord: BaseRecord{ + Name: name, + Digest: digest, + Version: version, + Status: status, + DeploymentName: deploymentName, + RuntimeRisks: runtimeRisks, + Tags: tags, + }, } }