Skip to content

Commit 4764d95

Browse files
znullCopilot
andcommitted
Restore identity-copy behavior for empty pipelines
Prior to the Stage interface redesign, an empty pipeline with a configured `stdout` ran a synthetic ioCopier that copied `p.stdin` (if any) to `p.stdout` and closed the destination if it came from `WithStdoutCloser()`. Restore that behavior by synthesizing an identity-copy Function stage when the pipeline has no stages but does have a configured output. The empty/no-output case remains a no-op as before. This affects callers like `pipe.New(WithStdin(r)).Output(ctx)`. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent d69a9a1 commit 4764d95

2 files changed

Lines changed: 76 additions & 0 deletions

File tree

pipe/pipeline.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,28 @@ func (p *Pipeline) Start(ctx context.Context) error {
269269
atomic.StoreUint32(&p.started, 1)
270270
ctx, p.cancel = context.WithCancel(ctx)
271271

272+
if len(p.stages) == 0 {
273+
if p.stdout == nil {
274+
// No stages and no destination: there is nothing to do
275+
// and nowhere to put `p.stdin` even if it was set.
276+
return nil
277+
}
278+
// No stages but a destination was configured: synthesize an
279+
// identity-copy stage so that `WithStdin()` is drained into
280+
// `WithStdout()`/`WithStdoutCloser()` and the destination
281+
// closer (if any) is invoked.
282+
p.stages = append(p.stages, Function(
283+
"identity",
284+
func(_ context.Context, _ Env, stdin io.Reader, stdout io.Writer) error {
285+
if stdin == nil {
286+
return nil
287+
}
288+
_, err := io.Copy(stdout, stdin)
289+
return err
290+
},
291+
))
292+
}
293+
272294
// We need to decide how to start the stages, especially what
273295
// pipes to use to connect adjacent stages (`os.Pipe()` vs.
274296
// `io.Pipe()`) based on the two stages' preferences.

pipe/pipeline_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,60 @@ func TestMain(m *testing.M) {
2626
goleak.VerifyTestMain(m)
2727
}
2828

29+
func TestPipelineEmpty(t *testing.T) {
30+
t.Parallel()
31+
p := pipe.New()
32+
assert.NoError(t, p.Run(context.Background()))
33+
}
34+
35+
func TestPipelineEmptyWithStdinAndStdout(t *testing.T) {
36+
t.Parallel()
37+
ctx := context.Background()
38+
stdout := &bytes.Buffer{}
39+
p := pipe.New(
40+
pipe.WithStdin(strings.NewReader("hello world\n")),
41+
pipe.WithStdout(stdout),
42+
)
43+
if assert.NoError(t, p.Run(ctx)) {
44+
assert.Equal(t, "hello world\n", stdout.String())
45+
}
46+
}
47+
48+
func TestPipelineEmptyOutput(t *testing.T) {
49+
t.Parallel()
50+
ctx := context.Background()
51+
p := pipe.New(pipe.WithStdin(strings.NewReader("hello world\n")))
52+
out, err := p.Output(ctx)
53+
if assert.NoError(t, err) {
54+
assert.Equal(t, "hello world\n", string(out))
55+
}
56+
}
57+
58+
func TestPipelineEmptyWithStdoutCloser(t *testing.T) {
59+
t.Parallel()
60+
ctx := context.Background()
61+
stdout := &closeTrackingWriter{}
62+
p := pipe.New(
63+
pipe.WithStdin(strings.NewReader("hello world\n")),
64+
pipe.WithStdoutCloser(stdout),
65+
)
66+
if assert.NoError(t, p.Run(ctx)) {
67+
assert.Equal(t, "hello world\n", stdout.buf.String())
68+
assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed")
69+
}
70+
}
71+
72+
type closeTrackingWriter struct {
73+
buf bytes.Buffer
74+
closed bool
75+
}
76+
77+
func (w *closeTrackingWriter) Write(p []byte) (int, error) { return w.buf.Write(p) }
78+
func (w *closeTrackingWriter) Close() error {
79+
w.closed = true
80+
return nil
81+
}
82+
2983
func TestPipelineFirstStageFailsToStart(t *testing.T) {
3084
t.Parallel()
3185
ctx := context.Background()

0 commit comments

Comments
 (0)