Skip to content

Commit d14ef7b

Browse files
znullCopilot
andcommitted
Forward panic handler through FilterError/IgnoreError wrappers
`efStage` (the wrapper returned by `FilterError` and `IgnoreError`) embeds the `Stage` interface, which only exposes the four Stage methods. So when `Pipeline.Start()` checks `if phs, ok := s.(StagePanicHandlerAware); ok`, the assertion silently fails for any wrapped stage — even if the underlying stage is a `goStage` that implements `SetPanicHandler`. The result: a configured `WithStagePanicHandler` is bypassed when the panicking Function is wrapped in `IgnoreError`. The goroutine inside `goStage.Start` sees `panicHandler == nil` and returns without calling `recover()`, letting the panic propagate up the runtime and crash the host process. This was a pre-existing bug in main (not introduced by the Stage interface redesign), but we're already touching the panic-handler surface to restore it for bare Function stages, so fix it in the same series. Implement `SetPanicHandler` on `efStage` and forward it to the embedded stage if it implements `StagePanicHandlerAware`. Add a `var _ StagePanicHandlerAware = efStage{}` compile-time assertion and a regression test that wraps a panicking Function in `IgnoreError` and verifies the handler runs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 8114965 commit d14ef7b

2 files changed

Lines changed: 44 additions & 0 deletions

File tree

pipe/filter-error.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,21 @@ func (s efStage) Wait() error {
2626
return s.filter(s.Stage.Wait())
2727
}
2828

29+
// SetPanicHandler forwards the handler to the wrapped stage if it
30+
// implements `StagePanicHandlerAware`. Without this, wrapping a
31+
// panicking `Function` in `IgnoreError` / `FilterError` would
32+
// silently bypass `WithStagePanicHandler` (the type assertion in
33+
// `Pipeline.Start()` only sees this wrapper's methods, not the
34+
// embedded stage's `SetPanicHandler`), letting the panic propagate
35+
// out of the goroutine and crash the host process.
36+
func (s efStage) SetPanicHandler(ph StagePanicHandler) {
37+
if phs, ok := s.Stage.(StagePanicHandlerAware); ok {
38+
phs.SetPanicHandler(ph)
39+
}
40+
}
41+
42+
var _ StagePanicHandlerAware = efStage{}
43+
2944
// ErrorMatcher decides whether its argument matches some class of
3045
// errors (e.g., errors that we want to ignore). The function will
3146
// only be invoked for non-nil errors.

pipe/pipeline_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,35 @@ func TestFunction(t *testing.T) {
532532
assert.ErrorContains(t, err, "panic handled")
533533
assert.Empty(t, out)
534534
})
535+
536+
t.Run("panic with handler through IgnoreError", func(t *testing.T) {
537+
// Regression: efStage (the FilterError/IgnoreError wrapper)
538+
// previously did not implement StagePanicHandlerAware, so
539+
// the type assertion in Pipeline.Start() silently failed
540+
// and the wrapped goStage never received the panic handler.
541+
// The result was an unrecovered panic crashing the host.
542+
p := pipe.New(
543+
pipe.WithStagePanicHandler(func(p any) error {
544+
return fmt.Errorf("panic handled: %v", p)
545+
}),
546+
)
547+
p.Add(
548+
pipe.Print("hello world"),
549+
pipe.IgnoreError(
550+
pipe.Function(
551+
"farewell",
552+
func(_ context.Context, _ pipe.Env, _ io.Reader, _ io.Writer) error {
553+
panic("this is a panic")
554+
},
555+
),
556+
func(_ error) bool { return false },
557+
),
558+
)
559+
560+
out, err := p.Output(ctx)
561+
assert.ErrorContains(t, err, "panic handled")
562+
assert.Empty(t, out)
563+
})
535564
}
536565

537566
func TestPipelineWithFunction(t *testing.T) {

0 commit comments

Comments
 (0)