From bceccaf5a1ed608cdea6002ff77f39bc27a7dbb7 Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Fri, 22 May 2026 14:44:17 -0400 Subject: [PATCH 1/8] add logic to process syncEvents, prevent sync events from entering queue Signed-off-by: Eric Pickard --- internal/controller/controller.go | 11 ++ internal/controller/reporting.go | 188 +++++++++++++++++++----------- 2 files changed, 132 insertions(+), 67 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index c80f063..842ad5d 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "strings" + "sync/atomic" "time" "github.com/github/deployment-tracker/internal/metadata" @@ -87,6 +88,7 @@ type Controller struct { // informerSyncTimeout is the maximum time allowed for all informers to sync // and prevents sync from hanging indefinitely. informerSyncTimeout time.Duration + syncing atomic.Bool } // New creates a new deployment tracker controller. @@ -147,10 +149,16 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato unknownArtifacts: amcache.NewExpiring(), informerSyncTimeout: informerSyncTimeoutDuration, } + cntrl.syncing.Store(true) // Add event handlers to the informer _, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj any) { + // Skip adding sync events + if cntrl.syncing.Load() { + return + } + pod, ok := obj.(*corev1.Pod) if !ok { slog.Error("Invalid object returned", @@ -314,6 +322,9 @@ func (c *Controller) Run(ctx context.Context, workers int) error { return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions") } } + c.syncing.Store(false) + syncClusterPods := c.podInformer.GetIndexer().List() + c.processSyncEvents(ctx, syncClusterPods) slog.Info("Starting workers", "count", workers, diff --git a/internal/controller/reporting.go b/internal/controller/reporting.go index 9d95b9a..fd030b9 100644 --- a/internal/controller/reporting.go +++ b/internal/controller/reporting.go @@ -14,7 +14,6 @@ import ( "github.com/github/deployment-tracker/pkg/deploymentrecord" "github.com/github/deployment-tracker/pkg/dtmetrics" "github.com/github/deployment-tracker/pkg/ociutil" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -92,14 +91,14 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { return nil } - var lastErr error - // Gather aggregate metadata for adds/updates var aggPodMetadata *metadata.AggregatePodMetadata if event.EventType != EventDeleted { aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) } + var lastErr error + // Record info for each container in the pod for _, container := range pod.Spec.Containers { if err := c.recordContainer(ctx, pod, container, event.EventType, wl.Name, aggPodMetadata); err != nil { @@ -117,99 +116,105 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { return lastErr } -// recordContainer records a single container's deployment info. -func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) error { - var cacheKey string +func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []interface{}) { + syncRecords := []*deploymentrecord.DeploymentRecord{} + for _, p := range syncClusterPods { + pod, ok := p.(*corev1.Pod) + if !ok { + slog.Error("Invalid object type in sync cluster pod list") + continue + } - status := deploymentrecord.StatusDeployed - if eventType == EventDeleted { - status = deploymentrecord.StatusDecommissioned + // Resolve the workload name for the deployment record. + wl := c.workloadResolver.Resolve(pod) + if wl.Name == "" { + slog.Debug("Could not resolve workload name for sync pod, skipping", + "namespace", pod.Namespace, + "pod", pod.Name, + ) + continue + } + if pod.Status.Phase != corev1.PodRunning || !workload.HasSupportedOwner(pod) { + continue + } + + // Gather aggregate metadata for adds/updates + var aggPodMetadata *metadata.AggregatePodMetadata + aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) + + // Record info for each container in the pod + for _, container := range pod.Spec.Containers { + record := c.buildRecord(pod, container, EventCreated, wl.Name, aggPodMetadata) + if record != nil { + syncRecords = append(syncRecords, record) + } + } + + // Also record init containers + for _, container := range pod.Spec.InitContainers { + record := c.buildRecord(pod, container, EventCreated, wl.Name, aggPodMetadata) + if record != nil { + syncRecords = append(syncRecords, record) + } + } } - dn := getARDeploymentName(pod, container, c.cfg.Template, workloadName) - digest := getContainerDigest(pod, container.Name) + slog.Info("Sync Records: ", "len", len(syncRecords), "content", syncRecords) +} - if dn == "" || digest == "" { - slog.Debug("Skipping container: missing deployment name or digest", - "namespace", pod.Namespace, - "pod", pod.Name, - "container", container.Name, - "deployment_name", dn, - "has_digest", digest != "", - ) +// recordContainer records a single container's deployment info. +func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) error { + // Create deployment record + record := c.buildRecord(pod, container, eventType, workloadName, aggPodMetadata) + if record == nil { + slog.Debug("Unable to build record for container, skipping", + "deployment_name", getARDeploymentName(pod, container, c.cfg.Template, workloadName)) return nil } // Check if we've already recorded this deployment - switch status { + var cacheKey string + switch record.Status { case deploymentrecord.StatusDeployed: - cacheKey = getCacheKey(EventCreated, dn, digest) + cacheKey = getCacheKey(EventCreated, record.DeploymentName, record.Digest) if _, exists := c.observedDeployments.Get(cacheKey); exists { slog.Debug("Deployment already observed, skipping post", - "deployment_name", dn, - "digest", digest, + "deployment_name", record.DeploymentName, + "digest", record.Digest, ) return nil } case deploymentrecord.StatusDecommissioned: - cacheKey = getCacheKey(EventDeleted, dn, digest) + cacheKey = getCacheKey(EventDeleted, record.DeploymentName, record.Digest) if _, exists := c.observedDeployments.Get(cacheKey); exists { slog.Debug("Deployment already deleted, skipping post", - "deployment_name", dn, - "digest", digest, + "deployment_name", record.DeploymentName, + "digest", record.Digest, ) return nil } default: - return fmt.Errorf("invalid status: %s", status) + return fmt.Errorf("invalid status: %s", record.Status) } // Check if this artifact was previously unknown (404 from the API) - if _, exists := c.unknownArtifacts.Get(digest); exists { + if _, exists := c.unknownArtifacts.Get(record.Digest); exists { dtmetrics.PostDeploymentRecordUnknownArtifactCacheHit.Inc() slog.Debug("Artifact previously returned 404, skipping post", - "deployment_name", dn, - "digest", digest, + "deployment_name", record.DeploymentName, + "digest", record.Digest, ) return nil } - // Extract image name and tag - imageName, version := ociutil.ExtractName(container.Image) - - // Format runtime risks and tags - var runtimeRisks []deploymentrecord.RuntimeRisk - var tags map[string]string - if aggPodMetadata != nil { - for risk := range aggPodMetadata.RuntimeRisks { - runtimeRisks = append(runtimeRisks, risk) - } - slices.Sort(runtimeRisks) - tags = aggPodMetadata.Tags - } - - // Create deployment record - record := deploymentrecord.NewDeploymentRecord( - imageName, - digest, - version, - c.cfg.LogicalEnvironment, - c.cfg.PhysicalEnvironment, - c.cfg.Cluster, - status, - dn, - runtimeRisks, - tags, - ) - if err := c.apiClient.PostOne(ctx, record); err != nil { // Return if no artifact is found and cache the digest var noArtifactErr *deploymentrecord.NoArtifactError if errors.As(err, &noArtifactErr) { - c.unknownArtifacts.Set(digest, true, unknownArtifactTTL) + c.unknownArtifacts.Set(record.Digest, true, unknownArtifactTTL) slog.Info("No artifact found, digest cached as unknown", - "deployment_name", dn, - "digest", digest, + "deployment_name", record.DeploymentName, + "digest", record.Digest, ) return nil } @@ -250,26 +255,75 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta ) // Update cache after successful post - switch status { + switch record.Status { case deploymentrecord.StatusDeployed: - cacheKey = getCacheKey(EventCreated, dn, digest) + cacheKey = getCacheKey(EventCreated, record.DeploymentName, record.Digest) c.observedDeployments.Set(cacheKey, true, 2*time.Minute) // If there was a previous delete event, remove that - cacheKey = getCacheKey(EventDeleted, dn, digest) + cacheKey = getCacheKey(EventDeleted, record.DeploymentName, record.Digest) c.observedDeployments.Delete(cacheKey) case deploymentrecord.StatusDecommissioned: - cacheKey = getCacheKey(EventDeleted, dn, digest) + cacheKey = getCacheKey(EventDeleted, record.DeploymentName, record.Digest) c.observedDeployments.Set(cacheKey, true, 2*time.Minute) // If there was a previous create event, remove that - cacheKey = getCacheKey(EventCreated, dn, digest) + cacheKey = getCacheKey(EventCreated, record.DeploymentName, record.Digest) c.observedDeployments.Delete(cacheKey) default: - return fmt.Errorf("invalid status: %s", status) + return fmt.Errorf("invalid status: %s", record.Status) } return nil } +func (c *Controller) buildRecord(pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) *deploymentrecord.DeploymentRecord { + dn := getARDeploymentName(pod, container, c.cfg.Template, workloadName) + digest := getContainerDigest(pod, container.Name) + + if dn == "" || digest == "" { + slog.Debug("Skipping container: missing deployment name or digest", + "namespace", pod.Namespace, + "pod", pod.Name, + "container", container.Name, + "deployment_name", dn, + "has_digest", digest != "", + ) + return nil + } + + status := deploymentrecord.StatusDeployed + if eventType == EventDeleted { + status = deploymentrecord.StatusDecommissioned + } + + // Extract image name and tag + imageName, version := ociutil.ExtractName(container.Image) + + // Format runtime risks and tags + var runtimeRisks []deploymentrecord.RuntimeRisk + var tags map[string]string + if aggPodMetadata != nil { + for risk := range aggPodMetadata.RuntimeRisks { + runtimeRisks = append(runtimeRisks, risk) + } + slices.Sort(runtimeRisks) + tags = aggPodMetadata.Tags + } + + // Create deployment record + return deploymentrecord.NewDeploymentRecord( + imageName, + digest, + version, + c.cfg.LogicalEnvironment, + c.cfg.PhysicalEnvironment, + c.cfg.Cluster, + status, + dn, + runtimeRisks, + tags, + ) +} + func getCacheKey(ev, dn, digest string) string { return ev + "||" + dn + "||" + digest } From 9fbb4334796d7a3f9adb95eda4ca123f05b2b22c Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Fri, 22 May 2026 16:56:54 -0400 Subject: [PATCH 2/8] add PostCluster to client Signed-off-by: Eric Pickard --- internal/controller/controller.go | 6 +- .../controller/controller_integration_test.go | 4 + internal/controller/controller_test.go | 4 + internal/controller/reporting.go | 24 ++- pkg/deploymentrecord/client.go | 139 ++++++++++++------ pkg/deploymentrecord/record.go | 48 +++--- 6 files changed, 159 insertions(+), 66 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 842ad5d..6147fcb 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -49,6 +49,7 @@ type ttlCache interface { type deploymentRecordPoster interface { PostOne(ctx context.Context, record *deploymentrecord.DeploymentRecord) error + PostCluster(ctx context.Context, records []*deploymentrecord.DeploymentRecord, cluster string) ([]byte, error) } type podMetadataAggregator interface { @@ -324,7 +325,10 @@ func (c *Controller) Run(ctx context.Context, workers int) error { } c.syncing.Store(false) syncClusterPods := c.podInformer.GetIndexer().List() - c.processSyncEvents(ctx, syncClusterPods) + err := c.processSyncEvents(ctx, syncClusterPods) + if err != nil { + return fmt.Errorf("sync events failed: %w", err) + } slog.Info("Starting workers", "count", workers, diff --git a/internal/controller/controller_integration_test.go b/internal/controller/controller_integration_test.go index fce91ed..31f8307 100644 --- a/internal/controller/controller_integration_test.go +++ b/internal/controller/controller_integration_test.go @@ -37,6 +37,10 @@ func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.D return m.err } +func (m *mockRecordPoster) PostCluster(_ context.Context, _ []*deploymentrecord.DeploymentRecord, _ string) ([]byte, error) { + return nil, nil +} + // Helper that allows tests to read captured records safely. func (m *mockRecordPoster) getRecords() []*deploymentrecord.DeploymentRecord { m.mu.Lock() diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 2bba0f9..822263e 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -35,6 +35,10 @@ func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.DeploymentRe return m.lastErr } +func (m *mockPoster) PostCluster(_ context.Context, _ []*deploymentrecord.DeploymentRecord, _ string) ([]byte, error) { + return nil, nil +} + func (m *mockPoster) getCalls() int { m.mu.Lock() defer m.mu.Unlock() diff --git a/internal/controller/reporting.go b/internal/controller/reporting.go index fd030b9..51f5740 100644 --- a/internal/controller/reporting.go +++ b/internal/controller/reporting.go @@ -116,7 +116,7 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { return lastErr } -func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []interface{}) { +func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []interface{}) error { syncRecords := []*deploymentrecord.DeploymentRecord{} for _, p := range syncClusterPods { pod, ok := p.(*corev1.Pod) @@ -125,6 +125,10 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []in continue } + if pod.Status.Phase != corev1.PodRunning || !workload.HasSupportedOwner(pod) { + continue + } + // Resolve the workload name for the deployment record. wl := c.workloadResolver.Resolve(pod) if wl.Name == "" { @@ -134,13 +138,9 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []in ) continue } - if pod.Status.Phase != corev1.PodRunning || !workload.HasSupportedOwner(pod) { - continue - } // Gather aggregate metadata for adds/updates - var aggPodMetadata *metadata.AggregatePodMetadata - aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) + aggPodMetadata := c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) // Record info for each container in the pod for _, container := range pod.Spec.Containers { @@ -159,7 +159,17 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []in } } - slog.Info("Sync Records: ", "len", len(syncRecords), "content", syncRecords) + respBody, err := c.apiClient.PostCluster(ctx, syncRecords, c.cfg.Cluster) + if err != nil { + slog.Error("Failed to post sync cluster records", + "error", err, + "record_count", len(syncRecords), + ) + return fmt.Errorf("failed to post sync cluster records: %w", err) + } + slog.Info("Successfully posted sync cluster records", "body", string(respBody)) + + return nil } // recordContainer records a single container's deployment info. diff --git a/pkg/deploymentrecord/client.go b/pkg/deploymentrecord/client.go index 05915f0..618aa1f 100644 --- a/pkg/deploymentrecord/client.go +++ b/pkg/deploymentrecord/client.go @@ -194,21 +194,81 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { return fmt.Errorf("failed to marshal record: %w", err) } - bodyReader := bytes.NewReader(body) + respBody, statusCode, lastErr := c.PostWithRetry(ctx, url, body) + + switch { + case statusCode >= 200 && statusCode < 300: + dtmetrics.PostDeploymentRecordOk.Inc() + return nil + case statusCode == 404: + dtmetrics.PostDeploymentRecordUnknownArtifact.Inc() + slog.Debug("no artifact attestation found, no record created", + "status_code", statusCode, + "container_name", record.Name, + "resp_msg", string(respBody), + "digest", record.Digest, + ) + return &NoArtifactError{err: fmt.Errorf("no attestation found for %s", record.Digest)} + default: + dtmetrics.PostDeploymentRecordHardFail.Inc() + slog.Error("all retries exhausted", + "count", c.retries, + "error", lastErr, + "container_name", record.Name, + ) + return fmt.Errorf("all retries exhausted: %w", lastErr) + } +} + +// PostCluster sends a full cluster state of records to GitHub deployment +// records cluster API. +func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, cluster string) ([]byte, error) { + if len(records) <= 0 { + slog.Debug("Records is empty, skipping") + return nil, nil + } + url := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record/cluster/%s", c.baseURL, c.org, cluster) + + body, err := buildClusterRequestBody(records) + if err != nil { + return nil, fmt.Errorf("failed to marshal records: %w", err) + } + + respBody, statusCode, lastErr := c.PostWithRetry(ctx, url, body) + + switch { + case statusCode >= 200 && statusCode < 300: + dtmetrics.PostDeploymentRecordOk.Inc() + return respBody, nil + case statusCode == 404: + return nil, fmt.Errorf("cluster endpoint not found") + default: + dtmetrics.PostDeploymentRecordHardFail.Inc() + slog.Error("all retries exhausted", + "count", c.retries, + "error", lastErr, + "cluster", cluster, + ) + return nil, fmt.Errorf("all retries exhausted: %w", lastErr) + } +} + +func (c *Client) PostWithRetry(ctx context.Context, url string, body []byte) ([]byte, int, error) { + bodyReader := bytes.NewReader(body) var lastErr error // The first attempt is not a retry! for attempt := range c.retries + 1 { - if err = waitForBackoff(ctx, attempt); err != nil { - return err + if err := waitForBackoff(ctx, attempt); err != nil { + return nil, 0, err } - if err = c.waitForServerRateLimit(ctx); err != nil { - return err + if err := c.waitForServerRateLimit(ctx); err != nil { + return nil, 0, err } - if err = c.requestThrottler.Wait(ctx); err != nil { - return fmt.Errorf("request throttler wait failed: %w", err) + if err := c.requestThrottler.Wait(ctx); err != nil { + return nil, 0, fmt.Errorf("request throttler wait failed: %w", err) } // Reset reader position for retries @@ -216,7 +276,7 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bodyReader) if err != nil { - return fmt.Errorf("failed to create request: %w", err) + return nil, 0, fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/json") @@ -225,7 +285,7 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { // locking tok, err := c.transport.Token(ctx) if err != nil { - return fmt.Errorf("failed to get access token: %w", err) + return nil, 0, fmt.Errorf("failed to get access token: %w", err) } req.Header.Set("Authorization", "Bearer "+tok) } else if c.apiToken != "" { @@ -249,31 +309,16 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { continue } - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - // Drain and close response body to enable connection reuse - _, _ = io.Copy(io.Discard, resp.Body) - _ = resp.Body.Close() - dtmetrics.PostDeploymentRecordOk.Inc() - return nil - } - - // Drain and close response body to enable connection reuse by reading body for error logging - respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) - _, _ = io.Copy(io.Discard, resp.Body) + // Drain and close response body to enable connection reuse + respBody, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() switch { + case resp.StatusCode >= 200 && resp.StatusCode < 300: + return respBody, resp.StatusCode, nil case resp.StatusCode == 404: // No artifact found - do not retry - dtmetrics.PostDeploymentRecordUnknownArtifact.Inc() - slog.Debug("no artifact attestation found, no record created", - "attempt", attempt, - "status_code", resp.StatusCode, - "container_name", record.Name, - "resp_msg", string(respBody), - "digest", record.Digest, - ) - return &NoArtifactError{err: fmt.Errorf("no attestation found for %s", record.Digest)} + return respBody, resp.StatusCode, nil case resp.StatusCode >= 400 && resp.StatusCode < 500: // Check headers that indicate rate limiting if resp.Header.Get("Retry-After") != "" || resp.Header.Get("X-Ratelimit-Remaining") == "0" { @@ -286,7 +331,7 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { "retry-after", resp.Header.Get("Retry-After"), "x-ratelimit-remaining", resp.Header.Get("X-Ratelimit-Remaining"), "retry_delay", retryDelay.Seconds(), - "container_name", record.Name, + "url", url, "resp_msg", string(respBody), ) lastErr = fmt.Errorf("rate limited, attempt %d", attempt) @@ -297,30 +342,23 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { slog.Warn("client error, aborting", "attempt", attempt, "status_code", resp.StatusCode, - "container_name", record.Name, + "url", url, "resp_msg", string(respBody), ) - return &ClientError{err: fmt.Errorf("unexpected client err with status code %d", resp.StatusCode)} + return nil, resp.StatusCode, &ClientError{err: fmt.Errorf("unexpected client err with status code %d", resp.StatusCode)} default: // Retry with backoff dtmetrics.PostDeploymentRecordSoftFail.Inc() slog.Debug("retriable error", "attempt", attempt, "status_code", resp.StatusCode, - "container_name", record.Name, + "url", url, "resp_msg", string(respBody), ) lastErr = fmt.Errorf("server error, attempt %d", attempt) } } - - dtmetrics.PostDeploymentRecordHardFail.Inc() - slog.Error("all retries exhausted", - "count", c.retries, - "error", lastErr, - "container_name", record.Name, - ) - return fmt.Errorf("all retries exhausted: %w", lastErr) + return nil, 0, lastErr } // waitForServerRateLimit blocks until the global server rate limit backoff has elapsed. @@ -410,6 +448,25 @@ func buildRequestBody(record *DeploymentRecord) ([]byte, error) { }) } +// buildClusterRequestBody count the total records, builds DeploymentRecords, +// and returns []byte. +func buildClusterRequestBody(records []*DeploymentRecord) ([]byte, error) { + if len(records) <= 0 { + return nil, nil + } + deploymentRecords := []DeploymentRecordBase{} + + for _, r := range records { + deploymentRecords = append(deploymentRecords, r.DeploymentRecordBase) + } + + return json.Marshal(DeploymentRecords{ + LogicalEnvironment: records[0].LogicalEnvironment, + PhysicalEnvironment: records[0].PhysicalEnvironment, + Deployments: deploymentRecords, + }) +} + func waitForBackoff(ctx context.Context, attempt int) error { if attempt > 0 { backoff := time.Duration(math.Pow(2, diff --git a/pkg/deploymentrecord/record.go b/pkg/deploymentrecord/record.go index 5ad140e..ce04613 100644 --- a/pkg/deploymentrecord/record.go +++ b/pkg/deploymentrecord/record.go @@ -32,16 +32,28 @@ var validRuntimeRisks = map[RuntimeRisk]bool{ // DeploymentRecord represents a deployment event record. type DeploymentRecord struct { - Name string `json:"name"` - Digest string `json:"digest"` - Version string `json:"version,omitempty"` - LogicalEnvironment string `json:"logical_environment"` - PhysicalEnvironment string `json:"physical_environment"` - Cluster string `json:"cluster"` - Status string `json:"status"` - DeploymentName string `json:"deployment_name"` - RuntimeRisks []RuntimeRisk `json:"runtime_risks,omitempty"` - Tags map[string]string `json:"tags,omitempty"` + DeploymentRecordBase + LogicalEnvironment string `json:"logical_environment"` + PhysicalEnvironment string `json:"physical_environment"` + Cluster string `json:"cluster"` +} + +// DeploymentRecordBase represents a deployment record for the deployment record cluster endpoint. +type DeploymentRecordBase struct { + Name string `json:"name"` + Digest string `json:"digest"` + Version string `json:"version,omitempty"` + Status string `json:"status"` + DeploymentName string `json:"deployment_name"` + RuntimeRisks []RuntimeRisk `json:"runtime_risks,omitempty"` + Tags map[string]string `json:"tags,omitempty"` +} + +// DeploymentRecords represents the post body for the deployment record cluster endpoint. +type DeploymentRecords struct { + LogicalEnvironment string `json:"logical_environment"` + PhysicalEnvironment string `json:"physical_environment"` + Deployments []DeploymentRecordBase `json:"deployments"` } // NewDeploymentRecord creates a new DeploymentRecord with the given status. @@ -56,16 +68,18 @@ func NewDeploymentRecord(name, digest, version, logicalEnv, physicalEnv, } return &DeploymentRecord{ - Name: name, - Digest: digest, - Version: version, LogicalEnvironment: logicalEnv, PhysicalEnvironment: physicalEnv, Cluster: cluster, - Status: status, - DeploymentName: deploymentName, - RuntimeRisks: runtimeRisks, - Tags: tags, + DeploymentRecordBase: DeploymentRecordBase{ + Name: name, + Digest: digest, + Version: version, + Status: status, + DeploymentName: deploymentName, + RuntimeRisks: runtimeRisks, + Tags: tags, + }, } } From 6279e19f9fccfd41187ffb82e83b3d36514b28b1 Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Fri, 22 May 2026 17:55:16 -0400 Subject: [PATCH 3/8] add logic to fillCaches after PostCluster Signed-off-by: Eric Pickard --- internal/controller/reporting.go | 36 +++++++++++++++++++++++++++++++- pkg/deploymentrecord/client.go | 28 ++++++++++++++++++------- pkg/deploymentrecord/record.go | 36 +++++++++++++++++++++++++------- 3 files changed, 83 insertions(+), 17 deletions(-) diff --git a/internal/controller/reporting.go b/internal/controller/reporting.go index 51f5740..795e4e9 100644 --- a/internal/controller/reporting.go +++ b/internal/controller/reporting.go @@ -2,6 +2,7 @@ package controller import ( "context" + "encoding/json" "errors" "fmt" "log/slog" @@ -158,6 +159,10 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []in } } } + if len(syncRecords) == 0 { + slog.Info("No sync records to post") + return nil + } respBody, err := c.apiClient.PostCluster(ctx, syncRecords, c.cfg.Cluster) if err != nil { @@ -167,11 +172,40 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []in ) return fmt.Errorf("failed to post sync cluster records: %w", err) } - slog.Info("Successfully posted sync cluster records", "body", string(respBody)) + var deploymentRecords deploymentrecord.DeploymentRecordsClusterResp + err = json.Unmarshal(respBody, &deploymentRecords) + if err != nil { + slog.Error("Failed to unmarshall response", + "error", err, + "record_count", len(syncRecords), + ) + return nil + } + slog.Info("Successfully posted sync cluster records", + "created", len(deploymentRecords.DeploymentRecords), + "errors", len(deploymentRecords.Errors), + ) + c.fillCaches(deploymentRecords) return nil } +func (c *Controller) fillCaches(deploymentRecords deploymentrecord.DeploymentRecordsClusterResp) { + slog.Info("Filling caches after posting sync cluster records") + // Fill observedDeployments cache with successful digests + for _, r := range deploymentRecords.DeploymentRecords { + cacheKey := getCacheKey(EventCreated, r.DeploymentName, r.Digest) + c.observedDeployments.Set(cacheKey, true, 2*time.Minute) + } + + // Fill unknownArtifacts cache with unknown digests + for _, r := range deploymentRecords.Errors { + if r.Cause == "not_found" { + c.unknownArtifacts.Set(r.Digest, true, unknownArtifactTTL) + } + } +} + // recordContainer records a single container's deployment info. func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) error { // Create deployment record diff --git a/pkg/deploymentrecord/client.go b/pkg/deploymentrecord/client.go index 618aa1f..98547ef 100644 --- a/pkg/deploymentrecord/client.go +++ b/pkg/deploymentrecord/client.go @@ -196,7 +196,16 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { respBody, statusCode, lastErr := c.PostWithRetry(ctx, url, body) + var clientErr *ClientError switch { + case errors.As(lastErr, &clientErr): + dtmetrics.PostDeploymentRecordClientError.Inc() + slog.Warn("client error, aborting", + "status_code", statusCode, + "url", url, + "resp_msg", string(respBody), + ) + return fmt.Errorf("client error: %w", lastErr) case statusCode >= 200 && statusCode < 300: dtmetrics.PostDeploymentRecordOk.Inc() return nil @@ -237,11 +246,21 @@ func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, c respBody, statusCode, lastErr := c.PostWithRetry(ctx, url, body) + var clientErr *ClientError switch { + case errors.As(lastErr, &clientErr): + dtmetrics.PostDeploymentRecordClientError.Inc() + slog.Warn("client error, aborting", + "status_code", statusCode, + "url", url, + "resp_msg", string(respBody), + ) + return nil, fmt.Errorf("client error: %w", lastErr) case statusCode >= 200 && statusCode < 300: dtmetrics.PostDeploymentRecordOk.Inc() return respBody, nil case statusCode == 404: + dtmetrics.PostDeploymentRecordHardFail.Inc() return nil, fmt.Errorf("cluster endpoint not found") default: dtmetrics.PostDeploymentRecordHardFail.Inc() @@ -317,7 +336,7 @@ func (c *Client) PostWithRetry(ctx context.Context, url string, body []byte) ([] case resp.StatusCode >= 200 && resp.StatusCode < 300: return respBody, resp.StatusCode, nil case resp.StatusCode == 404: - // No artifact found - do not retry + // Not found - do not retry return respBody, resp.StatusCode, nil case resp.StatusCode >= 400 && resp.StatusCode < 500: // Check headers that indicate rate limiting @@ -338,13 +357,6 @@ func (c *Client) PostWithRetry(ctx context.Context, url string, body []byte) ([] continue } // Don't retry non rate limiting client errors - dtmetrics.PostDeploymentRecordClientError.Inc() - slog.Warn("client error, aborting", - "attempt", attempt, - "status_code", resp.StatusCode, - "url", url, - "resp_msg", string(respBody), - ) return nil, resp.StatusCode, &ClientError{err: fmt.Errorf("unexpected client err with status code %d", resp.StatusCode)} default: // Retry with backoff diff --git a/pkg/deploymentrecord/record.go b/pkg/deploymentrecord/record.go index ce04613..aba10ac 100644 --- a/pkg/deploymentrecord/record.go +++ b/pkg/deploymentrecord/record.go @@ -30,14 +30,6 @@ var validRuntimeRisks = map[RuntimeRisk]bool{ SensitiveData: true, } -// DeploymentRecord represents a deployment event record. -type DeploymentRecord struct { - DeploymentRecordBase - LogicalEnvironment string `json:"logical_environment"` - PhysicalEnvironment string `json:"physical_environment"` - Cluster string `json:"cluster"` -} - // DeploymentRecordBase represents a deployment record for the deployment record cluster endpoint. type DeploymentRecordBase struct { Name string `json:"name"` @@ -49,6 +41,28 @@ type DeploymentRecordBase struct { Tags map[string]string `json:"tags,omitempty"` } +// DeploymentRecord represents a deployment event record. +type DeploymentRecord struct { + DeploymentRecordBase + LogicalEnvironment string `json:"logical_environment"` + PhysicalEnvironment string `json:"physical_environment"` + Cluster string `json:"cluster"` +} + +// DeploymentRecordResp represents the response of a created deployment record from the +// deployment record cluster endpoint. +type DeploymentRecordResp struct { + DeploymentRecord + Created string `json:"created"` + UpdatedAt string `json:"updated_at"` + AttestationId int `json:"attestation_id"` +} + +type DeploymentRecordErrorResp struct { + DeploymentRecord + Cause string `json:"cause"` +} + // DeploymentRecords represents the post body for the deployment record cluster endpoint. type DeploymentRecords struct { LogicalEnvironment string `json:"logical_environment"` @@ -56,6 +70,12 @@ type DeploymentRecords struct { Deployments []DeploymentRecordBase `json:"deployments"` } +type DeploymentRecordsClusterResp struct { + TotalCount int `json:"total_count"` + DeploymentRecords []*DeploymentRecordResp `json:"deployment_records"` + Errors []*DeploymentRecordErrorResp `json:"errors,omitempty"` +} + // NewDeploymentRecord creates a new DeploymentRecord with the given status. // Status must be either StatusDeployed or StatusDecommissioned. // From 85e4e6e82c1f7288418402a8bd133693396c4c6e Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Thu, 28 May 2026 14:26:28 -0400 Subject: [PATCH 4/8] enable partial success, refactors and improvements Signed-off-by: Eric Pickard --- internal/controller/reporting.go | 107 ++++++++++++++++++++----------- pkg/deploymentrecord/client.go | 26 ++++++-- pkg/deploymentrecord/record.go | 3 +- 3 files changed, 90 insertions(+), 46 deletions(-) diff --git a/internal/controller/reporting.go b/internal/controller/reporting.go index 795e4e9..abef295 100644 --- a/internal/controller/reporting.go +++ b/internal/controller/reporting.go @@ -118,7 +118,48 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { } func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []interface{}) error { - syncRecords := []*deploymentrecord.DeploymentRecord{} + syncRecords := c.makeSyncRecords(ctx, syncClusterPods) + if len(syncRecords) == 0 { + slog.Info("No sync records to post") + return nil + } + + respBody, err := c.apiClient.PostCluster(ctx, syncRecords, c.cfg.Cluster) + var clusterNoRepositoriesError *deploymentrecord.ClusterNoRepositoriesError + if err != nil { + if errors.As(err, &clusterNoRepositoriesError) { + slog.Info("Cluster sync found no creatable records", + "org", c.cfg.Organization, + ) + return nil + } + slog.Error("Failed to post sync cluster records", + "error", err, + "record_count", len(syncRecords), + ) + return fmt.Errorf("failed to post sync cluster records: %w", err) + } + var deploymentRecords deploymentrecord.DeploymentRecordsClusterResp + err = json.Unmarshal(respBody, &deploymentRecords) + if err != nil { + slog.Error("Failed to unmarshall response", + "error", err, + "record_count", len(syncRecords), + ) + return nil + } + slog.Info("Successfully posted sync cluster records", + "created", len(deploymentRecords.DeploymentRecords), + "errors", len(deploymentRecords.Errors), + ) + + c.fillCaches(deploymentRecords) + return nil +} + +func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []interface{}) []*deploymentrecord.DeploymentRecord { + seenSyncRecords := make(map[string]bool) + var syncRecords []*deploymentrecord.DeploymentRecord for _, p := range syncClusterPods { pod, ok := p.(*corev1.Pod) if !ok { @@ -140,54 +181,42 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []in continue } - // Gather aggregate metadata for adds/updates - aggPodMetadata := c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) - - // Record info for each container in the pod - for _, container := range pod.Spec.Containers { - record := c.buildRecord(pod, container, EventCreated, wl.Name, aggPodMetadata) - if record != nil { - syncRecords = append(syncRecords, record) + allContainers := make([]corev1.Container, 0, len(pod.Spec.Containers)+len(pod.Spec.InitContainers)) + allContainers = append(allContainers, pod.Spec.Containers...) + allContainers = append(allContainers, pod.Spec.InitContainers...) + + // Filter out containers already in syncRecords + var newContainers []corev1.Container + for _, container := range allContainers { + dn := getARDeploymentName(pod, container, c.cfg.Template, wl.Name) + digest := getContainerDigest(pod, container.Name) + if dn == "" || digest == "" { + continue + } + key := getCacheKey(EventCreated, dn, digest) + if seenSyncRecords[key] { + continue } + seenSyncRecords[key] = true + newContainers = append(newContainers, container) } - // Also record init containers - for _, container := range pod.Spec.InitContainers { + if len(newContainers) == 0 { + continue + } + + // Only gather aggregate metadata if there are new containers + aggPodMetadata := c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) + + for _, container := range newContainers { record := c.buildRecord(pod, container, EventCreated, wl.Name, aggPodMetadata) if record != nil { syncRecords = append(syncRecords, record) } } } - if len(syncRecords) == 0 { - slog.Info("No sync records to post") - return nil - } - - respBody, err := c.apiClient.PostCluster(ctx, syncRecords, c.cfg.Cluster) - if err != nil { - slog.Error("Failed to post sync cluster records", - "error", err, - "record_count", len(syncRecords), - ) - return fmt.Errorf("failed to post sync cluster records: %w", err) - } - var deploymentRecords deploymentrecord.DeploymentRecordsClusterResp - err = json.Unmarshal(respBody, &deploymentRecords) - if err != nil { - slog.Error("Failed to unmarshall response", - "error", err, - "record_count", len(syncRecords), - ) - return nil - } - slog.Info("Successfully posted sync cluster records", - "created", len(deploymentRecords.DeploymentRecords), - "errors", len(deploymentRecords.Errors), - ) - c.fillCaches(deploymentRecords) - return nil + return syncRecords } func (c *Controller) fillCaches(deploymentRecords deploymentrecord.DeploymentRecordsClusterResp) { diff --git a/pkg/deploymentrecord/client.go b/pkg/deploymentrecord/client.go index 98547ef..86df00e 100644 --- a/pkg/deploymentrecord/client.go +++ b/pkg/deploymentrecord/client.go @@ -11,6 +11,7 @@ import ( "math" "math/rand/v2" "net/http" + "net/url" "regexp" "strconv" "strings" @@ -164,6 +165,18 @@ func (c *ClientError) Unwrap() error { return c.err } +type ClusterNoRepositoriesError struct { + err error +} + +func (c *ClusterNoRepositoriesError) Error() string { + return fmt.Sprintf("cluster_no_repositories_error: %s", c.err.Error()) +} + +func (c *ClusterNoRepositoriesError) Unwrap() error { + return c.err +} + // NoArtifactError represents a 404 client response whose body indicates "no artifacts found". type NoArtifactError struct { err error @@ -194,7 +207,7 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { return fmt.Errorf("failed to marshal record: %w", err) } - respBody, statusCode, lastErr := c.PostWithRetry(ctx, url, body) + respBody, statusCode, lastErr := c.postWithRetry(ctx, url, body) var clientErr *ClientError switch { @@ -237,14 +250,14 @@ func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, c return nil, nil } - url := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record/cluster/%s", c.baseURL, c.org, cluster) + url := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record/cluster/%s", c.baseURL, c.org, url.PathEscape(cluster)) body, err := buildClusterRequestBody(records) if err != nil { return nil, fmt.Errorf("failed to marshal records: %w", err) } - respBody, statusCode, lastErr := c.PostWithRetry(ctx, url, body) + respBody, statusCode, lastErr := c.postWithRetry(ctx, url, body) var clientErr *ClientError switch { @@ -260,8 +273,8 @@ func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, c dtmetrics.PostDeploymentRecordOk.Inc() return respBody, nil case statusCode == 404: - dtmetrics.PostDeploymentRecordHardFail.Inc() - return nil, fmt.Errorf("cluster endpoint not found") + dtmetrics.PostDeploymentRecordUnknownArtifact.Inc() + return nil, &ClusterNoRepositoriesError{err: fmt.Errorf("no repositories found")} default: dtmetrics.PostDeploymentRecordHardFail.Inc() slog.Error("all retries exhausted", @@ -273,7 +286,7 @@ func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, c } } -func (c *Client) PostWithRetry(ctx context.Context, url string, body []byte) ([]byte, int, error) { +func (c *Client) postWithRetry(ctx context.Context, url string, body []byte) ([]byte, int, error) { bodyReader := bytes.NewReader(body) var lastErr error // The first attempt is not a retry! @@ -475,6 +488,7 @@ func buildClusterRequestBody(records []*DeploymentRecord) ([]byte, error) { return json.Marshal(DeploymentRecords{ LogicalEnvironment: records[0].LogicalEnvironment, PhysicalEnvironment: records[0].PhysicalEnvironment, + PartialSuccess: true, Deployments: deploymentRecords, }) } diff --git a/pkg/deploymentrecord/record.go b/pkg/deploymentrecord/record.go index aba10ac..bf46ff2 100644 --- a/pkg/deploymentrecord/record.go +++ b/pkg/deploymentrecord/record.go @@ -55,7 +55,7 @@ type DeploymentRecordResp struct { DeploymentRecord Created string `json:"created"` UpdatedAt string `json:"updated_at"` - AttestationId int `json:"attestation_id"` + AttestationID int `json:"attestation_id"` } type DeploymentRecordErrorResp struct { @@ -67,6 +67,7 @@ type DeploymentRecordErrorResp struct { type DeploymentRecords struct { LogicalEnvironment string `json:"logical_environment"` PhysicalEnvironment string `json:"physical_environment"` + PartialSuccess bool `json:"partial_success"` Deployments []DeploymentRecordBase `json:"deployments"` } From 1e60f34b6cab1e6bf2b67a4afd9e3dc3857397d4 Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Thu, 28 May 2026 15:17:01 -0400 Subject: [PATCH 5/8] add tests for processSyncEvents Signed-off-by: Eric Pickard --- internal/controller/controller_test.go | 43 +++++-- internal/controller/reporting_test.go | 169 +++++++++++++++++++++++-- 2 files changed, 195 insertions(+), 17 deletions(-) diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 822263e..85486b1 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/github/deployment-tracker/internal/metadata" "github.com/github/deployment-tracker/internal/workload" "github.com/github/deployment-tracker/pkg/deploymentrecord" "github.com/stretchr/testify/assert" @@ -23,9 +24,13 @@ import ( // mockPoster records all PostOne calls and returns a configurable error. type mockPoster struct { - mu sync.Mutex - calls int - lastErr error + mu sync.Mutex + calls int + clusterCalls int + clusterRecordCount int + lastErr error + clusterResp []byte + clusterErr error } func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.DeploymentRecord) error { @@ -35,27 +40,46 @@ func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.DeploymentRe return m.lastErr } -func (m *mockPoster) PostCluster(_ context.Context, _ []*deploymentrecord.DeploymentRecord, _ string) ([]byte, error) { - return nil, nil +func (m *mockPoster) PostCluster(_ context.Context, records []*deploymentrecord.DeploymentRecord, _ string) ([]byte, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.clusterCalls++ + m.clusterRecordCount = len(records) + return m.clusterResp, m.clusterErr } -func (m *mockPoster) getCalls() int { +func (m *mockPoster) getPostOneCalls() int { m.mu.Lock() defer m.mu.Unlock() return m.calls } +func (m *mockPoster) getPostClusterCalls() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.clusterCalls +} + // mockResolver is a test double for the workloadResolver interface. -type mockResolver struct{} +type mockResolver struct { + name string +} -func (*mockResolver) Resolve(_ *corev1.Pod) workload.Identity { - return workload.Identity{} +func (m *mockResolver) Resolve(_ *corev1.Pod) workload.Identity { + return workload.Identity{Name: m.name} } func (*mockResolver) IsActive(_ string, _ workload.Identity) bool { return false } +// mockMetadataAggregator is a test double for the podMetadataAggregator interface. +type mockMetadataAggregator struct{} + +func (*mockMetadataAggregator) BuildAggregatePodMetadata(_ context.Context, _ *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata { + return nil +} + // newTestController creates a minimal Controller suitable for unit-testing // recordContainer without a real Kubernetes cluster. func newTestController(poster *mockPoster) *Controller { @@ -68,6 +92,7 @@ func newTestController(poster *mockPoster) *Controller { Cluster: "test", }, workloadResolver: &mockResolver{}, + metadataAggregator: &mockMetadataAggregator{}, observedDeployments: amcache.NewExpiring(), unknownArtifacts: amcache.NewExpiring(), } diff --git a/internal/controller/reporting_test.go b/internal/controller/reporting_test.go index b4073f2..3fb97e1 100644 --- a/internal/controller/reporting_test.go +++ b/internal/controller/reporting_test.go @@ -2,12 +2,16 @@ package controller import ( "context" + "encoding/json" + "fmt" "testing" "time" "github.com/github/deployment-tracker/pkg/deploymentrecord" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestRecordContainer_UnknownArtifactCachePopulatedOn404(t *testing.T) { @@ -22,7 +26,7 @@ func TestRecordContainer_UnknownArtifactCachePopulatedOn404(t *testing.T) { // First call should hit the API and get a 404 err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) + assert.Equal(t, 1, poster.getPostOneCalls()) // Digest should now be in the unknown artifacts cache _, exists := ctrl.unknownArtifacts.Get(digest) @@ -41,12 +45,12 @@ func TestRecordContainer_UnknownArtifactCacheSkipsAPICall(t *testing.T) { // First call — API returns 404, populates cache err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) + assert.Equal(t, 1, poster.getPostOneCalls()) // Second call — should be served from cache, no API call err = ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls(), "API should not be called for cached unknown artifact") + assert.Equal(t, 1, poster.getPostOneCalls(), "API should not be called for cached unknown artifact") } func TestRecordContainer_UnknownArtifactCacheAppliesToDecommission(t *testing.T) { @@ -61,12 +65,12 @@ func TestRecordContainer_UnknownArtifactCacheAppliesToDecommission(t *testing.T) // Deploy call — 404, populates cache err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) + assert.Equal(t, 1, poster.getPostOneCalls()) // Decommission call for same digest — should skip API err = ctrl.recordContainer(context.Background(), pod, container, EventDeleted, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls(), "decommission should also be skipped for cached unknown artifact") + assert.Equal(t, 1, poster.getPostOneCalls(), "decommission should also be skipped for cached unknown artifact") } func TestRecordContainer_UnknownArtifactCacheExpires(t *testing.T) { @@ -84,7 +88,7 @@ func TestRecordContainer_UnknownArtifactCacheExpires(t *testing.T) { // Immediately — should be cached err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 0, poster.getCalls(), "should skip API while cached") + assert.Equal(t, 0, poster.getPostOneCalls(), "should skip API while cached") // Wait for expiry time.Sleep(100 * time.Millisecond) @@ -92,7 +96,7 @@ func TestRecordContainer_UnknownArtifactCacheExpires(t *testing.T) { // After expiry — should call API again err = ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls(), "should call API after cache expiry") + assert.Equal(t, 1, poster.getPostOneCalls(), "should call API after cache expiry") } func TestRecordContainer_SuccessfulPostDoesNotPopulateUnknownCache(t *testing.T) { @@ -104,9 +108,158 @@ func TestRecordContainer_SuccessfulPostDoesNotPopulateUnknownCache(t *testing.T) err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) - assert.Equal(t, 1, poster.getCalls()) + assert.Equal(t, 1, poster.getPostOneCalls()) // Digest should NOT be in the unknown artifacts cache _, exists := ctrl.unknownArtifacts.Get(digest) assert.False(t, exists, "successful post should not cache digest as unknown") } + +func TestProcessSyncEvents_EmptyPodList(t *testing.T) { + t.Parallel() + poster := &mockPoster{} + ctrl := newTestController(poster) + + err := ctrl.processSyncEvents(context.Background(), []interface{}{}) + require.NoError(t, err) + assert.Equal(t, 0, poster.getPostClusterCalls(), "PostCluster should not be called for empty pod list") +} + +func TestProcessSyncEvents_HappyPath(t *testing.T) { + t.Parallel() + digest := "sha256:abc123" + unknownDigest := "sha256:notfound999" + unauthorizedDigest := "sha256:unauthorized999" + clusterResp := deploymentrecord.DeploymentRecordsClusterResp{ + TotalCount: 1, + DeploymentRecords: []*deploymentrecord.DeploymentRecordResp{{ + DeploymentRecord: deploymentrecord.DeploymentRecord{ + DeploymentRecordBase: deploymentrecord.DeploymentRecordBase{ + DeploymentName: "default/test-deploy/app", + Digest: digest, + }, + }, + }}, + Errors: []*deploymentrecord.DeploymentRecordErrorResp{ + { + DeploymentRecord: deploymentrecord.DeploymentRecord{ + DeploymentRecordBase: deploymentrecord.DeploymentRecordBase{ + Digest: unknownDigest, + }, + }, + Cause: "not_found", + }, + { + DeploymentRecord: deploymentrecord.DeploymentRecord{ + DeploymentRecordBase: deploymentrecord.DeploymentRecordBase{ + Digest: unauthorizedDigest, + }, + }, + Cause: "unauthorized", + }, + }, + } + respJSON, err := json.Marshal(clusterResp) + require.NoError(t, err) + + poster := &mockPoster{clusterResp: respJSON} + ctrl := newTestController(poster) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + + err = ctrl.processSyncEvents(context.Background(), []interface{}{ + makePod("app", "test-deploy-abc123", digest, "ReplicaSet"), + makePod("unknown", "test-deploy-abc123", unknownDigest, "ReplicaSet"), + makePod("unauthorized", "test-deploy-abc123", unauthorizedDigest, "ReplicaSet"), + }) + require.NoError(t, err) + assert.Equal(t, 1, poster.getPostClusterCalls(), "PostCluster should be called once") + assert.Equal(t, 3, poster.clusterRecordCount, "PostCluster should receive 3 records") + + // Successful record should be in observedDeployments cache + cacheKey := getCacheKey(EventCreated, "default/test-deploy/app", digest) + _, exists := ctrl.observedDeployments.Get(cacheKey) + assert.True(t, exists, "successful record should populate observedDeployments cache") + + // not_found error should be in unknownArtifacts cache + _, exists = ctrl.unknownArtifacts.Get("sha256:notfound999") + assert.True(t, exists, "not_found error should populate unknownArtifacts cache") + + // unauthorized error should not be in unknownArtifacts cache + _, exists = ctrl.unknownArtifacts.Get("sha256:unauthorized999") + assert.False(t, exists, "unauthorized error should not populate unknownArtifacts cache") +} + +func TestProcessSyncEvents_DedupeContainers(t *testing.T) { + t.Parallel() + digest := "sha256:abc123" + poster := &mockPoster{} + ctrl := newTestController(poster) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + + pod := makePod("app", "test-deploy-abc123", digest, "ReplicaSet") + + err := ctrl.processSyncEvents(context.Background(), []interface{}{pod, pod}) + require.NoError(t, err) + assert.Equal(t, 1, poster.getPostClusterCalls(), "PostCluster should be called once") + assert.Equal(t, 1, poster.clusterRecordCount, "PostCluster should receive only 1 record") +} + +func TestProcessSyncEvents_PostCluster404(t *testing.T) { + t.Parallel() + poster := &mockPoster{ + clusterErr: &deploymentrecord.ClusterNoRepositoriesError{}, + } + ctrl := newTestController(poster) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + pod := makePod("app", "test-deploy-abc123", "sha256:abc123", "ReplicaSet") + + err := ctrl.processSyncEvents(context.Background(), []interface{}{pod}) + require.NoError(t, err, "ClusterNoRepositoriesError should not propagate") + assert.Equal(t, 1, poster.getPostClusterCalls()) + + // Caches should remain empty since no response was processed + cacheKey := getCacheKey(EventCreated, "default/test-deploy/app", "sha256:abc123") + _, exists := ctrl.observedDeployments.Get(cacheKey) + assert.False(t, exists, "observedDeployments should not be populated on 404") +} + +func TestProcessSyncEvents_PostCluster500(t *testing.T) { + t.Parallel() + poster := &mockPoster{ + clusterErr: fmt.Errorf("server error"), + } + ctrl := newTestController(poster) + ctrl.workloadResolver = &mockResolver{name: "test-deploy"} + pod := makePod("app", "test-deploy-abc123", "sha256:abc123", "ReplicaSet") + + err := ctrl.processSyncEvents(context.Background(), []interface{}{pod}) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to post sync cluster records") + assert.Equal(t, 1, poster.getPostClusterCalls()) +} + +func makePod(containerName string, parentName string, digest string, parentKind string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + OwnerReferences: []metav1.OwnerReference{{ + Kind: parentKind, + Name: parentName, + }}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: containerName, + Image: "nginx:latest", + }}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{{ + Name: containerName, + ImageID: fmt.Sprintf("docker-pullable://nginx@%s", digest), + }}, + }, + } +} From 92408e468ba269ecacb885d3d1b3f3629f7f7c0b Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Thu, 28 May 2026 15:30:07 -0400 Subject: [PATCH 6/8] add tests for PostCluster Signed-off-by: Eric Pickard --- pkg/deploymentrecord/client_test.go | 164 ++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) diff --git a/pkg/deploymentrecord/client_test.go b/pkg/deploymentrecord/client_test.go index 9a2efb1..d854e05 100644 --- a/pkg/deploymentrecord/client_test.go +++ b/pkg/deploymentrecord/client_test.go @@ -728,3 +728,167 @@ func TestPostOneRespectsRetryAfterAcrossGoroutines(t *testing.T) { } wg.Wait() } + +func TestPostCluster(t *testing.T) { + tests := []struct { + name string + records []*DeploymentRecord + handler http.HandlerFunc + wantErr bool + errType any + errContain string + wantBody bool + wantOk float64 + wantUnknownArtifact float64 + wantSoftFail float64 + wantHardFail float64 + wantClientError float64 + }{ + { + name: "empty records returns nil", + records: []*DeploymentRecord{}, + handler: func(_ http.ResponseWriter, _ *http.Request) { + t.Fatal("server should not be called with empty records") + }, + }, + { + name: "success on 207 returns body", + records: []*DeploymentRecord{testRecord()}, + handler: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusMultiStatus) + _, _ = w.Write([]byte(`{"total_count":1,"deployment_records":[],"errors":[]}`)) + }, + wantBody: true, + wantOk: 1, + }, + { + name: "404 returns ClusterNoRepositoriesError", + records: []*DeploymentRecord{testRecord()}, + handler: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNotFound) + }, + wantErr: true, + errType: &ClusterNoRepositoriesError{}, + wantUnknownArtifact: 1, + }, + { + name: "400 returns client error", + records: []*DeploymentRecord{testRecord()}, + handler: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte("bad request")) + }, + wantErr: true, + errContain: "client error", + wantClientError: 1, + }, + { + name: "500 retries exhausted returns error", + records: []*DeploymentRecord{testRecord()}, + handler: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }, + wantErr: true, + errContain: "all retries exhausted", + wantSoftFail: 1, + wantHardFail: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := httptest.NewServer(tt.handler) + t.Cleanup(srv.Close) + + client, err := NewClient(srv.URL, "test-org", WithRetries(0)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + + counters := allCounters() + snapshots := make([]float64, len(counters)) + for i, c := range counters { + snapshots[i] = testutil.ToFloat64(c) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + respBody, err := client.PostCluster(ctx, tt.records, "test-cluster") + + if tt.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + if tt.errType != nil { + switch tt.errType.(type) { + case *ClusterNoRepositoriesError: + var e *ClusterNoRepositoriesError + if !errors.As(err, &e) { + t.Errorf("expected ClusterNoRepositoriesError, got %T: %v", err, err) + } + default: + t.Fatalf("unexpected error type in test: %T", tt.errType) + } + } + if tt.errContain != "" && !strings.Contains(err.Error(), tt.errContain) { + t.Errorf("error %q should contain %q", err.Error(), tt.errContain) + } + } else if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if tt.wantBody && respBody == nil { + t.Error("expected non-nil response body") + } + + wantDeltas := []float64{ + tt.wantOk, + tt.wantUnknownArtifact, + 0, // rate limited + tt.wantSoftFail, + tt.wantHardFail, + tt.wantClientError, + } + names := []string{ + "PostDeploymentRecordOk", + "PostDeploymentRecordUnknownArtifact", + "PostDeploymentRecordRateLimited", + "PostDeploymentRecordSoftFail", + "PostDeploymentRecordHardFail", + "PostDeploymentRecordClientError", + } + for i, c := range counters { + got := testutil.ToFloat64(c) - snapshots[i] + if got != wantDeltas[i] { + t.Errorf("%s delta = %v, want %v", names[i], got, wantDeltas[i]) + } + } + }) + } +} + +func TestPostCluster_URLEscapesCluster(t *testing.T) { + var rawPath string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rawPath = r.URL.RawPath + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"total_count":0,"deployment_records":[]}`)) + })) + t.Cleanup(srv.Close) + + client, err := NewClient(srv.URL, "test-org", WithRetries(0)) + if err != nil { + t.Fatalf("failed to create client: %v", err) + } + + _, err = client.PostCluster(context.Background(), []*DeploymentRecord{testRecord()}, "cluster/with spaces") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + wantSuffix := "/cluster/cluster%2Fwith%20spaces" + if !strings.HasSuffix(rawPath, wantSuffix) { + t.Errorf("raw path %q should end with %q", rawPath, wantSuffix) + } +} From a48254e8bcd6d87c51a28dcc684cb509854c072e Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Fri, 29 May 2026 16:43:22 -0400 Subject: [PATCH 7/8] update url variable names Signed-off-by: Eric Pickard --- pkg/deploymentrecord/client.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/deploymentrecord/client.go b/pkg/deploymentrecord/client.go index 86df00e..80e1a0f 100644 --- a/pkg/deploymentrecord/client.go +++ b/pkg/deploymentrecord/client.go @@ -200,14 +200,14 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { return errors.New("record cannot be nil") } - url := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record", c.baseURL, c.org) + singleUrl := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record", c.baseURL, c.org) body, err := buildRequestBody(record) if err != nil { return fmt.Errorf("failed to marshal record: %w", err) } - respBody, statusCode, lastErr := c.postWithRetry(ctx, url, body) + respBody, statusCode, lastErr := c.postWithRetry(ctx, singleUrl, body) var clientErr *ClientError switch { @@ -215,7 +215,7 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { dtmetrics.PostDeploymentRecordClientError.Inc() slog.Warn("client error, aborting", "status_code", statusCode, - "url", url, + "url", singleUrl, "resp_msg", string(respBody), ) return fmt.Errorf("client error: %w", lastErr) @@ -250,14 +250,14 @@ func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, c return nil, nil } - url := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record/cluster/%s", c.baseURL, c.org, url.PathEscape(cluster)) + clusterUrl := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record/cluster/%s", c.baseURL, c.org, url.PathEscape(cluster)) body, err := buildClusterRequestBody(records) if err != nil { return nil, fmt.Errorf("failed to marshal records: %w", err) } - respBody, statusCode, lastErr := c.postWithRetry(ctx, url, body) + respBody, statusCode, lastErr := c.postWithRetry(ctx, clusterUrl, body) var clientErr *ClientError switch { @@ -265,7 +265,7 @@ func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, c dtmetrics.PostDeploymentRecordClientError.Inc() slog.Warn("client error, aborting", "status_code", statusCode, - "url", url, + "url", clusterUrl, "resp_msg", string(respBody), ) return nil, fmt.Errorf("client error: %w", lastErr) From 3bfff1a679defd7dfb66648664d557157d469b05 Mon Sep 17 00:00:00 2001 From: Eric Pickard Date: Fri, 29 May 2026 17:36:33 -0400 Subject: [PATCH 8/8] refactor record.go, address linting items Signed-off-by: Eric Pickard --- internal/controller/controller.go | 4 +- .../controller/controller_integration_test.go | 8 +-- internal/controller/controller_test.go | 4 +- internal/controller/reporting.go | 12 ++--- internal/controller/reporting_test.go | 45 ++++++++-------- pkg/deploymentrecord/client.go | 50 +++++++++--------- pkg/deploymentrecord/client_test.go | 20 +++---- pkg/deploymentrecord/record.go | 52 ++++++++++--------- 8 files changed, 101 insertions(+), 94 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 6147fcb..d82ff21 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -48,8 +48,8 @@ type ttlCache interface { } type deploymentRecordPoster interface { - PostOne(ctx context.Context, record *deploymentrecord.DeploymentRecord) error - PostCluster(ctx context.Context, records []*deploymentrecord.DeploymentRecord, cluster string) ([]byte, error) + PostOne(ctx context.Context, record *deploymentrecord.Record) error + PostCluster(ctx context.Context, records []*deploymentrecord.Record, cluster string) ([]byte, error) } type podMetadataAggregator interface { diff --git a/internal/controller/controller_integration_test.go b/internal/controller/controller_integration_test.go index 31f8307..066f020 100644 --- a/internal/controller/controller_integration_test.go +++ b/internal/controller/controller_integration_test.go @@ -26,23 +26,23 @@ import ( type mockRecordPoster struct { mu sync.Mutex - records []*deploymentrecord.DeploymentRecord + records []*deploymentrecord.Record err error // to simulate failures } -func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.DeploymentRecord) error { +func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.Record) error { m.mu.Lock() defer m.mu.Unlock() m.records = append(m.records, record) return m.err } -func (m *mockRecordPoster) PostCluster(_ context.Context, _ []*deploymentrecord.DeploymentRecord, _ string) ([]byte, error) { +func (m *mockRecordPoster) PostCluster(_ context.Context, _ []*deploymentrecord.Record, _ string) ([]byte, error) { return nil, nil } // Helper that allows tests to read captured records safely. -func (m *mockRecordPoster) getRecords() []*deploymentrecord.DeploymentRecord { +func (m *mockRecordPoster) getRecords() []*deploymentrecord.Record { m.mu.Lock() defer m.mu.Unlock() return slices.Clone(m.records) diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 85486b1..935fe3c 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -33,14 +33,14 @@ type mockPoster struct { clusterErr error } -func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.DeploymentRecord) error { +func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.Record) error { m.mu.Lock() defer m.mu.Unlock() m.calls++ return m.lastErr } -func (m *mockPoster) PostCluster(_ context.Context, records []*deploymentrecord.DeploymentRecord, _ string) ([]byte, error) { +func (m *mockPoster) PostCluster(_ context.Context, records []*deploymentrecord.Record, _ string) ([]byte, error) { m.mu.Lock() defer m.mu.Unlock() m.clusterCalls++ diff --git a/internal/controller/reporting.go b/internal/controller/reporting.go index abef295..e9c0c72 100644 --- a/internal/controller/reporting.go +++ b/internal/controller/reporting.go @@ -117,7 +117,7 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { return lastErr } -func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []interface{}) error { +func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []any) error { syncRecords := c.makeSyncRecords(ctx, syncClusterPods) if len(syncRecords) == 0 { slog.Info("No sync records to post") @@ -139,7 +139,7 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []in ) return fmt.Errorf("failed to post sync cluster records: %w", err) } - var deploymentRecords deploymentrecord.DeploymentRecordsClusterResp + var deploymentRecords deploymentrecord.RecordsClusterResp err = json.Unmarshal(respBody, &deploymentRecords) if err != nil { slog.Error("Failed to unmarshall response", @@ -157,9 +157,9 @@ func (c *Controller) processSyncEvents(ctx context.Context, syncClusterPods []in return nil } -func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []interface{}) []*deploymentrecord.DeploymentRecord { +func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []any) []*deploymentrecord.Record { seenSyncRecords := make(map[string]bool) - var syncRecords []*deploymentrecord.DeploymentRecord + var syncRecords []*deploymentrecord.Record for _, p := range syncClusterPods { pod, ok := p.(*corev1.Pod) if !ok { @@ -219,7 +219,7 @@ func (c *Controller) makeSyncRecords(ctx context.Context, syncClusterPods []inte return syncRecords } -func (c *Controller) fillCaches(deploymentRecords deploymentrecord.DeploymentRecordsClusterResp) { +func (c *Controller) fillCaches(deploymentRecords deploymentrecord.RecordsClusterResp) { slog.Info("Filling caches after posting sync cluster records") // Fill observedDeployments cache with successful digests for _, r := range deploymentRecords.DeploymentRecords { @@ -348,7 +348,7 @@ func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, conta return nil } -func (c *Controller) buildRecord(pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) *deploymentrecord.DeploymentRecord { +func (c *Controller) buildRecord(pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) *deploymentrecord.Record { dn := getARDeploymentName(pod, container, c.cfg.Template, workloadName) digest := getContainerDigest(pod, container.Name) diff --git a/internal/controller/reporting_test.go b/internal/controller/reporting_test.go index 3fb97e1..ac8a379 100644 --- a/internal/controller/reporting_test.go +++ b/internal/controller/reporting_test.go @@ -3,6 +3,7 @@ package controller import ( "context" "encoding/json" + "errors" "fmt" "testing" "time" @@ -120,7 +121,7 @@ func TestProcessSyncEvents_EmptyPodList(t *testing.T) { poster := &mockPoster{} ctrl := newTestController(poster) - err := ctrl.processSyncEvents(context.Background(), []interface{}{}) + err := ctrl.processSyncEvents(context.Background(), []any{}) require.NoError(t, err) assert.Equal(t, 0, poster.getPostClusterCalls(), "PostCluster should not be called for empty pod list") } @@ -130,28 +131,28 @@ func TestProcessSyncEvents_HappyPath(t *testing.T) { digest := "sha256:abc123" unknownDigest := "sha256:notfound999" unauthorizedDigest := "sha256:unauthorized999" - clusterResp := deploymentrecord.DeploymentRecordsClusterResp{ + clusterResp := deploymentrecord.RecordsClusterResp{ TotalCount: 1, - DeploymentRecords: []*deploymentrecord.DeploymentRecordResp{{ - DeploymentRecord: deploymentrecord.DeploymentRecord{ - DeploymentRecordBase: deploymentrecord.DeploymentRecordBase{ + DeploymentRecords: []*deploymentrecord.RecordResp{{ + Record: deploymentrecord.Record{ + BaseRecord: deploymentrecord.BaseRecord{ DeploymentName: "default/test-deploy/app", Digest: digest, }, }, }}, - Errors: []*deploymentrecord.DeploymentRecordErrorResp{ + Errors: []*deploymentrecord.RecordErrorResp{ { - DeploymentRecord: deploymentrecord.DeploymentRecord{ - DeploymentRecordBase: deploymentrecord.DeploymentRecordBase{ + Record: deploymentrecord.Record{ + BaseRecord: deploymentrecord.BaseRecord{ Digest: unknownDigest, }, }, Cause: "not_found", }, { - DeploymentRecord: deploymentrecord.DeploymentRecord{ - DeploymentRecordBase: deploymentrecord.DeploymentRecordBase{ + Record: deploymentrecord.Record{ + BaseRecord: deploymentrecord.BaseRecord{ Digest: unauthorizedDigest, }, }, @@ -166,10 +167,10 @@ func TestProcessSyncEvents_HappyPath(t *testing.T) { ctrl := newTestController(poster) ctrl.workloadResolver = &mockResolver{name: "test-deploy"} - err = ctrl.processSyncEvents(context.Background(), []interface{}{ - makePod("app", "test-deploy-abc123", digest, "ReplicaSet"), - makePod("unknown", "test-deploy-abc123", unknownDigest, "ReplicaSet"), - makePod("unauthorized", "test-deploy-abc123", unauthorizedDigest, "ReplicaSet"), + err = ctrl.processSyncEvents(context.Background(), []any{ + makeTestPod("app", "test-deploy-abc123", digest, "ReplicaSet"), + makeTestPod("unknown", "test-deploy-abc456", unknownDigest, "ReplicaSet"), + makeTestPod("unauthorized", "test-deploy-abc789", unauthorizedDigest, "Job"), }) require.NoError(t, err) assert.Equal(t, 1, poster.getPostClusterCalls(), "PostCluster should be called once") @@ -196,9 +197,9 @@ func TestProcessSyncEvents_DedupeContainers(t *testing.T) { ctrl := newTestController(poster) ctrl.workloadResolver = &mockResolver{name: "test-deploy"} - pod := makePod("app", "test-deploy-abc123", digest, "ReplicaSet") + pod := makeTestPod("app", "test-deploy-abc123", digest, "ReplicaSet") - err := ctrl.processSyncEvents(context.Background(), []interface{}{pod, pod}) + err := ctrl.processSyncEvents(context.Background(), []any{pod, pod}) require.NoError(t, err) assert.Equal(t, 1, poster.getPostClusterCalls(), "PostCluster should be called once") assert.Equal(t, 1, poster.clusterRecordCount, "PostCluster should receive only 1 record") @@ -211,9 +212,9 @@ func TestProcessSyncEvents_PostCluster404(t *testing.T) { } ctrl := newTestController(poster) ctrl.workloadResolver = &mockResolver{name: "test-deploy"} - pod := makePod("app", "test-deploy-abc123", "sha256:abc123", "ReplicaSet") + pod := makeTestPod("app", "test-deploy-abc123", "sha256:abc123", "ReplicaSet") - err := ctrl.processSyncEvents(context.Background(), []interface{}{pod}) + err := ctrl.processSyncEvents(context.Background(), []any{pod}) require.NoError(t, err, "ClusterNoRepositoriesError should not propagate") assert.Equal(t, 1, poster.getPostClusterCalls()) @@ -226,19 +227,19 @@ func TestProcessSyncEvents_PostCluster404(t *testing.T) { func TestProcessSyncEvents_PostCluster500(t *testing.T) { t.Parallel() poster := &mockPoster{ - clusterErr: fmt.Errorf("server error"), + clusterErr: errors.New("server error"), } ctrl := newTestController(poster) ctrl.workloadResolver = &mockResolver{name: "test-deploy"} - pod := makePod("app", "test-deploy-abc123", "sha256:abc123", "ReplicaSet") + pod := makeTestPod("app", "test-deploy-abc123", "sha256:abc123", "ReplicaSet") - err := ctrl.processSyncEvents(context.Background(), []interface{}{pod}) + err := ctrl.processSyncEvents(context.Background(), []any{pod}) require.Error(t, err) assert.Contains(t, err.Error(), "failed to post sync cluster records") assert.Equal(t, 1, poster.getPostClusterCalls()) } -func makePod(containerName string, parentName string, digest string, parentKind string) *corev1.Pod { +func makeTestPod(containerName string, parentName string, digest string, parentKind string) *corev1.Pod { return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pod", diff --git a/pkg/deploymentrecord/client.go b/pkg/deploymentrecord/client.go index 80e1a0f..b5adc30 100644 --- a/pkg/deploymentrecord/client.go +++ b/pkg/deploymentrecord/client.go @@ -165,6 +165,8 @@ func (c *ClientError) Unwrap() error { return c.err } +// ClusterNoRepositoriesError represents a 404 response from the cluster endpoint +// indicating no repositories were found for the given cluster. type ClusterNoRepositoriesError struct { err error } @@ -195,19 +197,19 @@ func (n *NoArtifactError) Unwrap() error { // PostOne posts a single deployment record to the GitHub deployment // records API. -func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { +func (c *Client) PostOne(ctx context.Context, record *Record) error { if record == nil { return errors.New("record cannot be nil") } - singleUrl := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record", c.baseURL, c.org) + singleURL := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record", c.baseURL, c.org) body, err := buildRequestBody(record) if err != nil { return fmt.Errorf("failed to marshal record: %w", err) } - respBody, statusCode, lastErr := c.postWithRetry(ctx, singleUrl, body) + respBody, statusCode, lastErr := c.postWithRetry(ctx, singleURL, body) var clientErr *ClientError switch { @@ -215,7 +217,7 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { dtmetrics.PostDeploymentRecordClientError.Inc() slog.Warn("client error, aborting", "status_code", statusCode, - "url", singleUrl, + "url", singleURL, "resp_msg", string(respBody), ) return fmt.Errorf("client error: %w", lastErr) @@ -244,20 +246,20 @@ func (c *Client) PostOne(ctx context.Context, record *DeploymentRecord) error { // PostCluster sends a full cluster state of records to GitHub deployment // records cluster API. -func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, cluster string) ([]byte, error) { - if len(records) <= 0 { +func (c *Client) PostCluster(ctx context.Context, records []*Record, cluster string) ([]byte, error) { + if len(records) == 0 { slog.Debug("Records is empty, skipping") return nil, nil } - clusterUrl := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record/cluster/%s", c.baseURL, c.org, url.PathEscape(cluster)) + clusterURL := fmt.Sprintf("%s/orgs/%s/artifacts/metadata/deployment-record/cluster/%s", c.baseURL, c.org, url.PathEscape(cluster)) body, err := buildClusterRequestBody(records) if err != nil { return nil, fmt.Errorf("failed to marshal records: %w", err) } - respBody, statusCode, lastErr := c.postWithRetry(ctx, clusterUrl, body) + respBody, statusCode, lastErr := c.postWithRetry(ctx, clusterURL, body) var clientErr *ClientError switch { @@ -265,7 +267,7 @@ func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, c dtmetrics.PostDeploymentRecordClientError.Inc() slog.Warn("client error, aborting", "status_code", statusCode, - "url", clusterUrl, + "url", clusterURL, "resp_msg", string(respBody), ) return nil, fmt.Errorf("client error: %w", lastErr) @@ -274,7 +276,7 @@ func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, c return respBody, nil case statusCode == 404: dtmetrics.PostDeploymentRecordUnknownArtifact.Inc() - return nil, &ClusterNoRepositoriesError{err: fmt.Errorf("no repositories found")} + return nil, &ClusterNoRepositoriesError{err: errors.New("no repositories found")} default: dtmetrics.PostDeploymentRecordHardFail.Inc() slog.Error("all retries exhausted", @@ -286,7 +288,7 @@ func (c *Client) PostCluster(ctx context.Context, records []*DeploymentRecord, c } } -func (c *Client) postWithRetry(ctx context.Context, url string, body []byte) ([]byte, int, error) { +func (c *Client) postWithRetry(ctx context.Context, targetURL string, body []byte) ([]byte, int, error) { bodyReader := bytes.NewReader(body) var lastErr error // The first attempt is not a retry! @@ -306,7 +308,7 @@ func (c *Client) postWithRetry(ctx context.Context, url string, body []byte) ([] // Reset reader position for retries bodyReader.Reset(body) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bodyReader) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL, bodyReader) if err != nil { return nil, 0, fmt.Errorf("failed to create request: %w", err) } @@ -363,7 +365,7 @@ func (c *Client) postWithRetry(ctx context.Context, url string, body []byte) ([] "retry-after", resp.Header.Get("Retry-After"), "x-ratelimit-remaining", resp.Header.Get("X-Ratelimit-Remaining"), "retry_delay", retryDelay.Seconds(), - "url", url, + "url", targetURL, "resp_msg", string(respBody), ) lastErr = fmt.Errorf("rate limited, attempt %d", attempt) @@ -377,7 +379,7 @@ func (c *Client) postWithRetry(ctx context.Context, url string, body []byte) ([] slog.Debug("retriable error", "attempt", attempt, "status_code", resp.StatusCode, - "url", url, + "url", targetURL, "resp_msg", string(respBody), ) lastErr = fmt.Errorf("server error, attempt %d", attempt) @@ -463,29 +465,29 @@ func parseRateLimitDelay(resp *http.Response) time.Duration { // buildRequestBody adds return_records=false to a deployment record request body // which results in a minimal response payload. -func buildRequestBody(record *DeploymentRecord) ([]byte, error) { +func buildRequestBody(record *Record) ([]byte, error) { return json.Marshal(struct { - DeploymentRecord + Record ReturnRecords bool `json:"return_records"` }{ - DeploymentRecord: *record, - ReturnRecords: false, + Record: *record, + ReturnRecords: false, }) } -// buildClusterRequestBody count the total records, builds DeploymentRecords, +// buildClusterRequestBody count the total records, builds ClusterRecordsBody, // and returns []byte. -func buildClusterRequestBody(records []*DeploymentRecord) ([]byte, error) { - if len(records) <= 0 { +func buildClusterRequestBody(records []*Record) ([]byte, error) { + if len(records) == 0 { return nil, nil } - deploymentRecords := []DeploymentRecordBase{} + deploymentRecords := []BaseRecord{} for _, r := range records { - deploymentRecords = append(deploymentRecords, r.DeploymentRecordBase) + deploymentRecords = append(deploymentRecords, r.BaseRecord) } - return json.Marshal(DeploymentRecords{ + return json.Marshal(ClusterRecordsBody{ LogicalEnvironment: records[0].LogicalEnvironment, PhysicalEnvironment: records[0].PhysicalEnvironment, PartialSuccess: true, diff --git a/pkg/deploymentrecord/client_test.go b/pkg/deploymentrecord/client_test.go index d854e05..d602ac4 100644 --- a/pkg/deploymentrecord/client_test.go +++ b/pkg/deploymentrecord/client_test.go @@ -278,8 +278,8 @@ func TestValidOrgPattern(t *testing.T) { } } -// testRecord returns a minimal valid DeploymentRecord for testing. -func testRecord() *DeploymentRecord { +// testRecord returns a minimal valid Record for testing. +func testRecord() *Record { return NewDeploymentRecord( "ghcr.io/my-org/my-image", "sha256:abc123", @@ -309,7 +309,7 @@ func allCounters() []prometheus.Counter { func TestPostOne(t *testing.T) { tests := []struct { name string - record *DeploymentRecord + record *Record retries int handler http.HandlerFunc wantErr bool @@ -732,7 +732,7 @@ func TestPostOneRespectsRetryAfterAcrossGoroutines(t *testing.T) { func TestPostCluster(t *testing.T) { tests := []struct { name string - records []*DeploymentRecord + records []*Record handler http.HandlerFunc wantErr bool errType any @@ -746,14 +746,14 @@ func TestPostCluster(t *testing.T) { }{ { name: "empty records returns nil", - records: []*DeploymentRecord{}, + records: []*Record{}, handler: func(_ http.ResponseWriter, _ *http.Request) { t.Fatal("server should not be called with empty records") }, }, { name: "success on 207 returns body", - records: []*DeploymentRecord{testRecord()}, + records: []*Record{testRecord()}, handler: func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusMultiStatus) _, _ = w.Write([]byte(`{"total_count":1,"deployment_records":[],"errors":[]}`)) @@ -763,7 +763,7 @@ func TestPostCluster(t *testing.T) { }, { name: "404 returns ClusterNoRepositoriesError", - records: []*DeploymentRecord{testRecord()}, + records: []*Record{testRecord()}, handler: func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotFound) }, @@ -773,7 +773,7 @@ func TestPostCluster(t *testing.T) { }, { name: "400 returns client error", - records: []*DeploymentRecord{testRecord()}, + records: []*Record{testRecord()}, handler: func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte("bad request")) @@ -784,7 +784,7 @@ func TestPostCluster(t *testing.T) { }, { name: "500 retries exhausted returns error", - records: []*DeploymentRecord{testRecord()}, + records: []*Record{testRecord()}, handler: func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusInternalServerError) }, @@ -882,7 +882,7 @@ func TestPostCluster_URLEscapesCluster(t *testing.T) { t.Fatalf("failed to create client: %v", err) } - _, err = client.PostCluster(context.Background(), []*DeploymentRecord{testRecord()}, "cluster/with spaces") + _, err = client.PostCluster(context.Background(), []*Record{testRecord()}, "cluster/with spaces") if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/deploymentrecord/record.go b/pkg/deploymentrecord/record.go index bf46ff2..e8e6f6e 100644 --- a/pkg/deploymentrecord/record.go +++ b/pkg/deploymentrecord/record.go @@ -30,8 +30,8 @@ var validRuntimeRisks = map[RuntimeRisk]bool{ SensitiveData: true, } -// DeploymentRecordBase represents a deployment record for the deployment record cluster endpoint. -type DeploymentRecordBase struct { +// BaseRecord represents a deployment record for the deployment record cluster endpoint. +type BaseRecord struct { Name string `json:"name"` Digest string `json:"digest"` Version string `json:"version,omitempty"` @@ -41,58 +41,62 @@ type DeploymentRecordBase struct { Tags map[string]string `json:"tags,omitempty"` } -// DeploymentRecord represents a deployment event record. -type DeploymentRecord struct { - DeploymentRecordBase +// Record represents a deployment event record. +type Record struct { + BaseRecord LogicalEnvironment string `json:"logical_environment"` PhysicalEnvironment string `json:"physical_environment"` Cluster string `json:"cluster"` } -// DeploymentRecordResp represents the response of a created deployment record from the +// RecordResp represents the response of a created deployment record from the // deployment record cluster endpoint. -type DeploymentRecordResp struct { - DeploymentRecord +type RecordResp struct { + Record Created string `json:"created"` UpdatedAt string `json:"updated_at"` AttestationID int `json:"attestation_id"` } -type DeploymentRecordErrorResp struct { - DeploymentRecord +// RecordErrorResp represents a failed deployment record from the +// deployment record cluster endpoint, including the cause of the failure. +type RecordErrorResp struct { + Record Cause string `json:"cause"` } -// DeploymentRecords represents the post body for the deployment record cluster endpoint. -type DeploymentRecords struct { - LogicalEnvironment string `json:"logical_environment"` - PhysicalEnvironment string `json:"physical_environment"` - PartialSuccess bool `json:"partial_success"` - Deployments []DeploymentRecordBase `json:"deployments"` +// ClusterRecordsBody represents the post body for the deployment record cluster endpoint. +type ClusterRecordsBody struct { + LogicalEnvironment string `json:"logical_environment"` + PhysicalEnvironment string `json:"physical_environment"` + PartialSuccess bool `json:"partial_success"` + Deployments []BaseRecord `json:"deployments"` } -type DeploymentRecordsClusterResp struct { - TotalCount int `json:"total_count"` - DeploymentRecords []*DeploymentRecordResp `json:"deployment_records"` - Errors []*DeploymentRecordErrorResp `json:"errors,omitempty"` +// RecordsClusterResp represents the response from the deployment record +// cluster endpoint, containing successfully created records and any errors. +type RecordsClusterResp struct { + TotalCount int `json:"total_count"` + DeploymentRecords []*RecordResp `json:"deployment_records"` + Errors []*RecordErrorResp `json:"errors,omitempty"` } -// NewDeploymentRecord creates a new DeploymentRecord with the given status. +// NewDeploymentRecord creates a new Record with the given status. // Status must be either StatusDeployed or StatusDecommissioned. // //nolint:revive func NewDeploymentRecord(name, digest, version, logicalEnv, physicalEnv, - cluster, status, deploymentName string, runtimeRisks []RuntimeRisk, tags map[string]string) *DeploymentRecord { + cluster, status, deploymentName string, runtimeRisks []RuntimeRisk, tags map[string]string) *Record { // Validate status if status != StatusDeployed && status != StatusDecommissioned { status = StatusDeployed // default to deployed if invalid } - return &DeploymentRecord{ + return &Record{ LogicalEnvironment: logicalEnv, PhysicalEnvironment: physicalEnv, Cluster: cluster, - DeploymentRecordBase: DeploymentRecordBase{ + BaseRecord: BaseRecord{ Name: name, Digest: digest, Version: version,