Skip to content

Stage interface redesign: pipe type negotiation, pooled buffer copies#51

Open
znull wants to merge 21 commits into
mainfrom
znull/stage-v2-clean
Open

Stage interface redesign: pipe type negotiation, pooled buffer copies#51
znull wants to merge 21 commits into
mainfrom
znull/stage-v2-clean

Conversation

@znull
Copy link
Copy Markdown
Contributor

@znull znull commented Apr 9, 2026

Rebases and modernizes @mhagger's Stage interface redesign (#21) onto current main, with reconciliation commits filling the ~2-year gap and review-driven fixups.

What this does

Changes the Stage interface so stages declare I/O preferences and receive both stdin and stdout from the pipeline:

type Stage interface {
    Name() string
    Preferences() StagePreferences
    Start(ctx context.Context, env Env, stdin io.ReadCloser, stdout io.WriteCloser) error
    Wait() error
}

This lets Pipeline.Start() negotiate pipe types between adjacent stages:

  • os.Pipe() when either neighbor is a command (needs *os.File)
  • io.Pipe() when both neighbors are Go functions (cheaper, all userspace)
  • Pass Pipeline.stdout directly to the last stage — no more ioCopier

Performance characteristics preserved from #50

ioCopier is deleted, but the optimizations it carried are preserved in the new structure:

  • Sendfile / fd-pass for *os.File destinations is automatic now: when the last stage is a commandStage writing to an *os.File-backed destination, the fd is dup'd into the child process by exec.Cmd and the kernel handles the copy. Pinned in pipe/command_stdout_fastpath_test.go.
  • 32KB sync.Pool for copy buffers is preserved for the non-fast-path case: when a command's stdout is a non-*os.File writer, setupPooledStdout creates an os.Pipe() ourselves and runs the copy through a pooled buffer (or dst.ReadFrom when available). Without this, exec.Cmd's internal io.Copy would allocate a fresh 32KB per pipeline. See pipe/copy_pool.go.

Commits

Cherry-picked from @mhagger's version-2 branch (authorship preserved):

  1. Shush the linter
  2. pipeline_test.go cleanup (squash of 3 originals)
  3. Add pipeline benchmarks
  4. Simplify NopClosers
  5. Stage: change the interface to make stdin/stdout handling more flexible
  6. Add pipe-matching tests

Reconciliation with main:
7. Port MemoryLimitWithObserver to new Stage interface (added on main after #21 diverged)
8. Restore panic handler for Function stages (dropped in original port)
9. Fix memoryWatchStage.Wait() to always call stopWatching()
10. Fix lint errors

Fixes surfaced by adversarial review:
11. Restore identity-copy behavior for empty pipelines (regression caught by review)
12. Add tests pinning command stdout fd-pass fast path
13. Restore pooled-buffer copy for non-*os.File command stdout
14. Forward panic handler through FilterError/IgnoreError wrappers (pre-existing bug)

Supersedes

The git-systems/pooled-copies branch (which carried #49 + #50) can be deleted after this merges.

/cc @mhagger @migue @carlosmn

@znull znull force-pushed the znull/stage-v2-clean branch from 409297e to fa6c12d Compare April 9, 2026 10:05
@znull znull changed the base branch from main to git-systems/pooled-copies April 9, 2026 10:06
@znull znull force-pushed the znull/stage-v2-clean branch 3 times, most recently from 79f4a1a to 911ed5b Compare April 9, 2026 13:37
@mhagger
Copy link
Copy Markdown
Member

mhagger commented Apr 27, 2026

This PR is still in draft mode. Do you want review/feedback already?

@znull
Copy link
Copy Markdown
Contributor Author

znull commented May 4, 2026

This PR is still in draft mode. Do you want review/feedback already?

@mhagger I ran out of time to review the LLM output before vacation. I wanted to read through the changes more fully myself before inflicting them on anyone else so I left it in draft mode.

@znull znull force-pushed the znull/stage-v2-clean branch from 911ed5b to 4764d95 Compare May 28, 2026 17:29
mhagger added 4 commits May 28, 2026 20:23
Ported from version-2 branch commits:
- 95dc2e8 pipeline_test.go: get rid of a bunch of unnecessary tmpdirs
- 5fdc22a TestPipelineStdinThatIsNeverClosed(): create stdin more simply
- c2c9802 pipeline_test.go: use WithStdoutCloser() to close stdout pipes

Tests that don't run external commands (or whose commands don't
need a specific working directory) don't need t.TempDir().
Add some benchmarks that move MB-scale data through pipelines
consisting of alternating commands and functions, one in small writes,
and one buffered into larger writes, then processing it one line at a
time. This is not so efficient, because every transition from
`Function` → `Command` requires an extra (hidden) goroutine that
copies the data from an `io.Reader` to a `*os.File`.

We can make this faster!
* Rename
  * `newNopCloser()` → `newReaderNopCloser()`
  * `nopCloser` → `readerNopCloser`
  * `nopCloserWriterTo` → `readerWriterToNopCloser`
  * `nopWriteCloser` → `writerNopCloser`

  to help keep readers and writers straight and because only the
  `Close()` part is a NOP.

* Move `writerNopCloser` to `nop_closer.go` to be with its siblings.
@znull znull force-pushed the znull/stage-v2-clean branch from 4764d95 to f97fddd Compare May 28, 2026 18:46
@znull znull changed the base branch from git-systems/pooled-copies to main May 28, 2026 18:46
@znull znull changed the title Stage interface redesign: pipe type negotiation, eliminate ioCopier Stage interface redesign: pipe type negotiation, pooled buffer copies May 28, 2026
@znull znull force-pushed the znull/stage-v2-clean branch 2 times, most recently from 2ed608b to d14ef7b Compare May 29, 2026 11:33
@znull znull self-assigned this May 29, 2026
@znull znull force-pushed the znull/stage-v2-clean branch from 08a9cf4 to fca1bfc Compare May 29, 2026 13:56
@znull znull marked this pull request as ready for review May 29, 2026 14:39
@znull znull requested a review from a team as a code owner May 29, 2026 14:39
Copilot AI review requested due to automatic review settings May 29, 2026 14:39
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR modernizes the pipeline stage contract for v2 by letting stages declare I/O preferences and receive both stdin and stdout from the pipeline, enabling better pipe selection and removing the synthetic ioCopier stage.

Changes:

  • Redesigns Stage with Preferences() and Start(..., stdin, stdout) so Pipeline.Start() can negotiate os.Pipe vs io.Pipe.
  • Reworks command/function/memory-limit stages for the new interface, including pooled stdout copies for non-file command destinations.
  • Updates module path to /v2 and adds regression/benchmark coverage for pipe matching, empty pipelines, fast-path stdout, and start-failure cleanup.
Show a summary per file
File Description
README.md Updates documentation links for the v2 module path.
go.mod Changes the module path to github.com/github/go-pipe/v2.
internal/ptree/ptree_test.go Updates internal import path for v2.
pipe/stage.go Redefines the public Stage interface and adds I/O preference types.
pipe/pipeline.go Reworks pipeline startup to negotiate pipe types and pass stdout directly.
pipe/command.go Adapts command stages to the new interface and adds pooled stdout copy handling.
pipe/function.go Adapts function stages to receive caller-provided stdout and panic handling.
pipe/filter-error.go Forwards panic handlers through error-filtering wrappers.
pipe/memorylimit.go Ports memory-watching wrappers to the new stage interface.
pipe/nop_closer.go Splits reader/writer nop closers and adds test unwrapping support.
pipe/copy_pool.go Adds pooled-buffer copy helper with ReaderFrom fast-path support.
pipe/iocopier.go Removes the old synthetic copier stage.
pipe/scanner.go Simplifies scanner error return.
pipe/command_linux.go Updates internal import path for v2.
pipe/command_test.go Applies formatting cleanup.
pipe/command_nil_panic_test.go Updates direct Start call for the new signature.
pipe/pipeline_test.go Updates tests/benchmarks for v2 behavior, empty pipelines, and panic forwarding.
pipe/memorylimit_test.go Reworks memory-limit tests for the new pipeline flow.
pipe/pipe_matching_test.go Adds coverage for negotiated stdin/stdout pipe types.
pipe/export_test.go Exposes nop-closer unwrapping for external package tests.
pipe/command_stdout_fastpath_test.go Adds tests pinning direct *os.File stdout handoff.
pipe/command_starterror_test.go Adds regression coverage for start-failure copy-goroutine cleanup.

Copilot's findings

Tip

Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

  • Files reviewed: 22/22 changed files
  • Comments generated: 2

Comment thread pipe/memorylimit.go
Comment thread pipe/export_test.go Outdated
@znull znull force-pushed the znull/stage-v2-clean branch from d7948da to 33ca03c Compare May 29, 2026 14:56
@znull znull requested a review from Copilot May 29, 2026 15:01
@znull
Copy link
Copy Markdown
Contributor Author

znull commented May 29, 2026

@mhagger I ran out of time to review the LLM output before vacation. I wanted to read through the changes more fully myself before inflicting them on anyone else so I left it in draft mode.

I think this is actually worth taking a look at now.

@znull znull requested a review from mhagger May 29, 2026 15:03
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot's findings

  • Files reviewed: 22/22 changed files
  • Comments generated: 1

Comment thread pipe/command.go Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot's findings

  • Files reviewed: 22/22 changed files
  • Comments generated: 3

Comment thread pipe/function.go
Comment thread pipe/memorylimit_test.go
Comment thread pipe/memorylimit_test.go
mhagger
mhagger previously approved these changes May 29, 2026
Copy link
Copy Markdown
Member

@mhagger mhagger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I left a few optional comments (some of which would apply to #21 too).

Comment thread pipe/stage.go Outdated
Comment thread pipe/command.go Outdated
Comment thread pipe/command.go
Comment on lines +320 to 323
for _, closer := range s.lateClosers {
if closeErr := closer.Close(); closeErr != nil && err == nil {
err = closeErr
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you think about setting s.lateClosers = nil here, to allow those objects to be freed in case the stage object remains reachable for longer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable and low risk, I'll add it

Comment thread pipe/function.go Outdated
Comment on lines +76 to +90
defer close(s.done)
defer func() {
// Cleanup resources on exit
if err := w.Close(); err != nil && s.err == nil {
s.err = fmt.Errorf("error closing output pipe for stage %q: %w", s.Name(), err)
}
if stdin != nil {
if err := stdin.Close(); err != nil && s.err == nil {
s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err)
}
}
close(s.done)
}()

defer func() {
if stdout != nil {
if err := stdout.Close(); err != nil && s.err == nil {
s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err)
}
}
}()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the point of stacking the defers like this. Wouldn't it be equivalent to put them in a single defer, which one wouldn't have to read from back to front?

Suggested change
defer close(s.done)
defer func() {
// Cleanup resources on exit
if err := w.Close(); err != nil && s.err == nil {
s.err = fmt.Errorf("error closing output pipe for stage %q: %w", s.Name(), err)
}
if stdin != nil {
if err := stdin.Close(); err != nil && s.err == nil {
s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err)
}
}
close(s.done)
}()
defer func() {
if stdout != nil {
if err := stdout.Close(); err != nil && s.err == nil {
s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err)
}
}
}()
defer func() {
if stdout != nil {
if err := stdout.Close(); err != nil && s.err == nil {
s.err = fmt.Errorf("error closing stdout for stage %q: %w", s.Name(), err)
}
}
if stdin != nil {
if err := stdin.Close(); err != nil && s.err == nil {
s.err = fmt.Errorf("error closing stdin for stage %q: %w", s.Name(), err)
}
}
close(s.done)
}()

Maybe s.recoverPanic() could be put in the same function, though that one might be more readable as a separate defer.

Copy link
Copy Markdown
Contributor Author

@znull znull May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious to me how to put s.recoverPanic() in the single anonymous defer. Maybe it can be done safely? I don't understand recover() well enough tbh. It would have to run first in the single defer, but thinking about the panic vs. normal-exit gets murky.

I like the way you wrote it, because it makes the non-panic path read in order. The downside is that the anonymous defer's scope reads in forward order, but the outer scope (where it and the recoverPanic() defer are defined) are in reverse order, so it's kind of confusing to have time going in both directions.

The advantage of the stacked-defer way of writing it is that time consistently reads in one direction (within the go func()), albeit bottom-to-top. I think on the balance of things, I actually prefer the stacked defers, as weird as it looks at first.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I looked up the arcane (to me) rules for using recover(), and reminded myself why stuff like this makes me nervous. What I was only vaguely remembering was:

  • recover() must be called directly by the deferred function, not another stack frame deeper
  • recover() is goroutine-local (I guess this should be fairly obvious but still)

So here's the single-defer, forward-reading version in d89ebc7.

Comment thread pipe/memorylimit.go Outdated
Comment thread pipe/memorylimit.go Outdated
Comment on lines +278 to +290
// SetPanicHandler forwards the handler to the wrapped stage if it
// implements `StagePanicHandlerAware`. Without this, wrapping a
// panicking stage in `MemoryLimit` / `MemoryObserver` /
// `MemoryLimitWithObserver` would silently bypass
// `WithStagePanicHandler` (the type assertion in `Pipeline.Start()`
// only sees this wrapper's methods, not the wrapped stage's
// `SetPanicHandler`), letting the panic propagate out of the
// goroutine and crash the host process.
func (m *memoryWatchStage) SetPanicHandler(ph StagePanicHandler) {
if phs, ok := m.stage.(StagePanicHandlerAware); ok {
phs.SetPanicHandler(ph)
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the wrapped stage doesn't implement StagePanicHandlerAware? Then this stage seems to claim to be installing a panic handler, but actually does nothing.

This is a general problem with Go interfaces and it is difficult to solve.

One way might be to require all stages to implement SetPanicHandler().

Another way would be to have two memoryWatchStage variants; one that is returned if the wrapped stage is StagePanicHandlerAware, and the other if not. This is not quite as awful as it sounds, because the first one could embed the second one. But it's still pretty awful, and obviously doesn't generalize gracefully to multiple optional interfaces.

It's too late for me to check whether this is really feasible, but maybe there would be a way to add panic handlers by wrapping stages with a thing that handles panics but defers to the main stage for everything else? Since this PR transitions the API to v2, other API changes are fair game 😉

I had one last random question regarding the PanicHandler interface: do you know if this is a standard way to implement a panic handler (e.g., used in other libraries)? Because ISTM that another possibility would be to define that interface as func() error, and have the panic handler call recover() itself. It would be a little bit more code, but the code would maybe look more familiar.

The problem that I started this comment with is not really terrible and maybe doesn't need to be fixed, but it awakened my curiosity.

mhagger and others added 8 commits May 29, 2026 21:47
The old `Stage` interface, and in particular its `Start()` method, was
not ideal. `Start()` was responsible for creating its own stdout,
without knowledge of what will be consuming it.

In practice, there are only two main stages:

* `commandStage` ultimately runs a subprocess, which needs an
  `*os.File` as both stdin and stdout. The old code created its stdout
  using `cmd.StdoutPipe()`, which creates an `*os.File`.

* `goStage` runs a Go function, which should be happy with any kind of
  `io.ReadCloser` / `io.WriteCloser` for its stdin and stdout. The old
  code created its stdout using `io.Pipe()`, which _doesn't_ return an
  `*os.File`.

There are some scenarios where the old behavior was not ideal:

1. If a `goStage` was followed by a `commandStage`, the `commandStage`
   would had to consume the non-`*os.File` stdin that was created by
   the former. But since an external command requires an `*os.File`,
   `exec.Cmd` had to create an `os.Pipe()` internally and create an
   extra goroutine to copy from the `io.Reader` to the pipe. This is
   not only wasteful, but also meant that the `goStage` was not
   informed when the subprocess terminated or closed its stdin. (For
   example, the copy goroutine could block waiting to read from the
   `io.Reader`.)

2. If `Pipeline.stdout` was set, then an extra stage was always needed
   to copy from the output of the last stage to `Pipeline.stdout`. But:

   * If the last stage was a `commandStage` and `Pipeline.stdout` was
     an `*os.File`, then this copy was unnecessary; the subprocess
     could instead have written directly to the corresponding file
     descriptor. This was wasteful, and also lead to cases where the
     subprocess couldn't detect that `Pipeline.stdout` had been
     closed.

   * If the last stage was a `goStage`, then the copy was also
     unnecessary; the stage could have written directly to
     `Pipeline.stdout` whatever its type.

Problem (1) could have been fixed by changing `goStage` to always use
`os.Pipe()` to create its stdout pipe. But that would be wasteful if
two `goStage`s were adjacent, in which case they could use a cheaper
`io.Pipe()` instead. And it wouldn't solve problem (2) at all.

Both problems can only be solved by considering both the producer
_and_ the consumer of the stdin and stdout of any stage. If either end
is a `commandStage`, then it is preferable to us `os.Pipe()`. If both
ends are `goStage`s, then it is preferable to use `io.Pipe()`. And if
`Pipeline.Stdout` is set, the last stage should write directly into it
whenever possible.

This PR solves the problem by changing the `Stage` interface to add a
`Preferences()` method and change the signature of the `Start()`
method:

    Preferences() StagePreferences
    Start(
        ctx context.Context, env Env,
	stdin io.ReadCloser, stdout io.WriteCloser,
    ) error

The first indicates what kind of stdin/stdout the stage prefers, and
the second starts up the stage with a `stdin` and `stdout` that are
provided by the caller, rather than letting the stage return its own
stdout.

Now, when a stage is added to a `Pipeline`, then `Pipeline.Start()`
uses the first method to figure out what kind of pipes are preferred
between this stage and its neighbors, then the second is called to
start the stage with the preferred type of pipe if possible. It also
passes `Pipeline.stdout` into the last stage rather than copying the
data an extra time.

Note that this is a backwards-incompatible change, and thus will
require a change to v2. Any clients that implement their own `Stage`
will have to change their stage to conform to the new interface.

However, clients that only create stages using the functions in this
package (e.g., `pipe.Command()`, `pipe.CommandStage()`,
`pipe.Function()`, `pipe.LinewiseFunction()`, etc.) should continue to
work without changes, since those functions' signatures haven't
changed. Such clients will get the benefit of the new behavior. For
example, the benchmarks `BenchmarkMoreDataBuffered` and
`BenchmarkMoreDataUnbuffered` (admittedly, worst cases for the old
code) are sped up by roughly 2.25x and 6.6x, respectively:

```
snare:~/github/proj/go-pipe/git(main-bench)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go
goos: linux
goarch: amd64
cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz
BenchmarkSingleProgram-20         	    8497	   1383275 ns/op
BenchmarkTenPrograms-20           	    2186	   5388075 ns/op
BenchmarkTenFunctions-20          	   37605	    324808 ns/op
BenchmarkTenMixedStages-20        	    3380	   3565218 ns/op
BenchmarkMoreDataUnbuffered-20    	      25	 423838490 ns/op
BenchmarkMoreDataBuffered-20      	      44	 261734773 ns/op
PASS
ok  	command-line-arguments	76.120s
172.91user 91.15system 1:16.56elapsed 344%CPU (0avgtext+0avgdata 114080maxresident)k
0inputs+7768outputs (40major+3819487minor)pagefaults 0swaps

snare:~/github/proj/go-pipe/git(version-2)$ /bin/time go test -bench=. -benchtime=10s ./pipe/pipeline_test.go
goos: linux
goarch: amd64
cpu: Intel(R) Xeon(R) W-2255 CPU @ 3.70GHz
BenchmarkSingleProgram-20         	    8458	   1366214 ns/op
BenchmarkTenPrograms-20           	    2233	   5296019 ns/op
BenchmarkTenFunctions-20          	   42453	    289761 ns/op
BenchmarkTenMixedStages-20        	    3398	   3497226 ns/op
BenchmarkMoreDataUnbuffered-20    	     177	  64410211 ns/op
BenchmarkMoreDataBuffered-20      	     100	 115728132 ns/op
PASS
ok  	command-line-arguments	82.751s
175.42user 142.81system 1:23.21elapsed 382%CPU (0avgtext+0avgdata 114080maxresident)k
0inputs+7776outputs (42major+3883888minor)pagefaults 0swaps
```

Also, look how much simpler `testMemoryLimit()` has become, since it
doesn't need the awkward workaround that was previously required.

In terms of backwards compatibility, some applications might notice a
difference with the new pipe structure. The difference should usually
be an improvement, for example lower resource consumption and less
risk of deadlock. It is conceivable that some applications were in
some way relying on the delayed completion of pipelines when an
`io.Pipe` was closed, though I'm having trouble imagining scenarios
like that in the real world.
The most complicated code dealing with the change to `Stage.Start()`
is the selection of which types of stdin/stderr to pass to stages, and
that's also the main advantage of the new interface. So add a bunch of
tests that the correct types (especially, `io.Pipe()` vs. `os.Pipe()`)
are indeed being selected.
MemoryLimitWithObserver was added to main (PR #48) after the version-2
branch diverged. Port it to the new Stage interface and add test coverage.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The version-2 Stage interface redesign dropped the panic recovery from
goStage (or preceded its addition). Add/restore it: add
SetPanicHandler(), recoverPanic(), and WithStagePanicHandler pipeline option.

The goroutine uses stacked defers (close(done) → close stdin → close
stdout → recoverPanic) so that when a Function panics, recoverPanic
fires first (sets s.err), then cleanup runs, then done closes — allowing
Wait() to return the caught panic error.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Wait() returned the inner stage's error immediately without calling
stopWatching(), which meant the memory watcher goroutine was never
cancelled when the stage exited with an error (e.g., from being killed
due to memory limit). This prevented the observer from logging peak
memory usage on kill.

Fix: always call stopWatching() before returning, regardless of whether
the inner stage returned an error.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Remove Go 1.22+ unnecessary loop variable copy (copyloopvar)
- Replace unused parameters with _ (revive)
- Add nolint directive for FinishEarly naming (staticcheck ST1012)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Prior to the Stage interface redesign, an empty pipeline with a
configured `stdout` ran a synthetic ioCopier that copied `p.stdin` (if
any) to `p.stdout` and closed the destination if it came from
`WithStdoutCloser()`.

Restore that behavior by synthesizing an identity-copy Function stage
when the pipeline has no stages but does have a configured output. The
empty/no-output case remains a no-op as before.

This affects callers like `pipe.New(WithStdin(r)).Output(ctx)`.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
One of the optimizations enabled by the Stage interface redesign in #21
is that when a commandStage at the end of a pipeline has its stdout
pointed at an *os.File, exec.Cmd dup's that fd straight into the child
process. The PR called out two benefits:

- the Go-side copy stage between subprocess and Pipeline.stdout
becomes unnecessary (wasteful goroutine + buffer);
- the subprocess can detect when Pipeline.stdout is closed, which
the old intermediate pipe hid from it.

Until now nothing asserted this contract directly. The pipe-type
negotiation tests in pipe_matching_test.go only check that the right
kind of pipe was chosen between mock stages; the existing Command +
WithStdoutCloser(*os.File) test would still pass even if a future
refactor silently wrapped stdout in a Go-side io.Copy.

Pin the contract with two tests (one direct, one through Pipeline) that
assert s.cmd.Stdout == userProvidedFile after Start() for both
WithStdoutCloser(*os.File) and WithStdout(*os.File) (writerNopCloser-
wrapped). A regression would now fail loudly with a message that names
the optimization.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@znull znull force-pushed the znull/stage-v2-clean branch from 02b2958 to 913011b Compare May 29, 2026 19:50
znull and others added 7 commits May 29, 2026 22:10
When a command stage's stdout is not an `*os.File` (e.g. a
`bytes.Buffer` via `WithStdout()` or a custom writer via
`WithStdoutCloser()`), the fd-pass fast path doesn't apply and the data
has to flow through Go. Left to its own devices, `exec.Cmd` would set up
an internal `os.Pipe()` and run `io.Copy` with a fresh 32KB buffer
allocated per invocation.

To reduce GC pressure, let's do that copy directly: create the
`os.Pipe()` ourselves, set the write end as `cmd.Stdout` (so exec.Cmd
still does an fd dup into the child), and run the copy from the read end
to the user's writer in our own goroutine, drawing the 32KB buffer from
a `sync.Pool` (`copy_pool.go`).

Destinations that implement `io.ReaderFrom` (`*net.TCPConn`, `*os.File`)
are still routed through `ReadFrom` so platform fast paths like splice
continue to apply. The pure `*os.File` and `writerNopCloser{*os.File}`
paths are unchanged: the fd is still passed directly to the child
process.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
`efStage` (the wrapper returned by `FilterError` and `IgnoreError`)
embeds the `Stage` interface, which only exposes the four Stage methods.
So when `Pipeline.Start()` checks `if phs, ok := s.(StagePanicHandlerAware); ok`,
the assertion silently fails for any wrapped stage — even if the
underlying stage is a `goStage` that implements `SetPanicHandler`.

This means a configured `WithStagePanicHandler` is bypassed when the
panicking Function is wrapped in `IgnoreError`. The goroutine inside
`goStage.Start` sees `panicHandler == nil` and returns without calling
`recover()`, letting the panic propagate up the runtime and crash the
host process.

memoryWatchStage had a similar issue.

This was a pre-existing bug in main (not introduced by the version-2
Stage interface redesign), but we're already touching the panic-handler,
so let's fix it here.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Ensure that in case of cmd.Start() failure for pipelines that use pooled
buffers, we don't leak a pooled-buffer-copy goroutine.

Could be triggered/detected by using a command stage that fails on
command-not-found under the race detector.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The Stage interface in this series is not backwards compatible:

- Start()'s signature changed from:

    Start(ctx, env, stdin io.ReadCloser) (io.ReadCloser, error)

  to:

    Start(ctx, env, stdin io.ReadCloser, stdout io.WriteCloser) error

- Stage gained a new required method Preferences().

Callers that only construct stages via the package's exported
constructors (Command(), CommandStage(), Function(), ...) are
unaffected, but anyone implementing Stage themselves has to update their
implementation.
It doesn't really mean much of anything to specify IOPreferenceNil.
Nothing uses it, which isn't surprising because it's not clear what it
would even mean anywhere other than the begin/end of a pipeline.
PR feedback: nil out lateClosers after use, so that closers can
potentially be garbage collected in cases where the stage hangs around
for a while.
@znull znull force-pushed the znull/stage-v2-clean branch from 913011b to 5c2f257 Compare May 29, 2026 20:11
znull and others added 2 commits May 29, 2026 23:45
Based on PR feedback, rewrite the use of recover() for function stages
so that the code flows in forward order. Inline recoverPanic() to keep
the recover in the first stack frame.
Of all the different types of pipe.Stage, most don't need to have a
panic handler, because most are not running user functions. Yet we were
paying the price of having panic forwarding as part of the interface,
which was awkward and error-prone for the rest of the stages to
implement cleanly.

Instead, we can just pass the panic handler through Start instead. We
use a trailing StartOptions struct carrying PanicHandler in Stage.Start.
The StagePanicHandlerAware interface and its panic.go file are removed
(StagePanicHandler moves next to StartOptions in stage.go).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants