From f20ef3d4efa62600cd075494fe2f35a0a50e27aa Mon Sep 17 00:00:00 2001 From: Alexandre Mutel Date: Wed, 6 May 2026 22:45:38 +0200 Subject: [PATCH 1/2] Coordinate Copilot CLI stderr pump cleanup --- dotnet/src/Client.cs | 426 ++++++++++++++++++++++++++++++------------- 1 file changed, 298 insertions(+), 128 deletions(-) diff --git a/dotnet/src/Client.cs b/dotnet/src/Client.cs index 9608d98c3..a97eaf36e 100644 --- a/dotnet/src/Client.cs +++ b/dotnet/src/Client.cs @@ -57,6 +57,7 @@ public sealed partial class CopilotClient : IDisposable, IAsyncDisposable /// Minimum protocol version this SDK can communicate with. /// private const int MinProtocolVersion = 3; + private static readonly TimeSpan StderrPumpShutdownTimeout = TimeSpan.FromSeconds(5); /// /// Provides a thread-safe collection of active Copilot sessions, indexed by session identifier. @@ -235,6 +236,7 @@ async Task StartCoreAsync(CancellationToken ct) var startTimestamp = Stopwatch.GetTimestamp(); Connection? connection = null; Process? cliProcess = null; + ProcessStderrPump? stderrPump = null; try { @@ -247,10 +249,11 @@ async Task StartCoreAsync(CancellationToken ct) else { // Child process (stdio or TCP) - var (startedProcess, portOrNull, stderrBuffer) = await StartCliServerAsync(ct); + var (startedProcess, portOrNull, startedStderrPump) = await StartCliServerAsync(ct); cliProcess = startedProcess; + stderrPump = startedStderrPump; _actualPort = portOrNull; - connection = await ConnectToServerAsync(cliProcess, portOrNull is null ? null : "localhost", portOrNull, stderrBuffer, ct); + connection = await ConnectToServerAsync(cliProcess, portOrNull is null ? null : "localhost", portOrNull, stderrPump, ct); } LoggingHelpers.LogTiming(_logger, LogLevel.Debug, null, @@ -279,20 +282,33 @@ async Task StartCoreAsync(CancellationToken ct) } catch (Exception ex) { - if (ex is not OperationCanceledException) + var cleanupErrors = new List(); + try { - LoggingHelpers.LogTiming(_logger, LogLevel.Warning, ex, - "CopilotClient.StartAsync failed. Elapsed={Elapsed}", - startTimestamp); - } + if (ex is not OperationCanceledException) + { + LoggingHelpers.LogTiming(_logger, LogLevel.Warning, ex, + "CopilotClient.StartAsync failed. Elapsed={Elapsed}", + startTimestamp); + } - if (connection is not null) - { - await CleanupConnectionAsync(connection, errors: null); + if (connection is not null) + { + await CleanupConnectionAsync(connection, cleanupErrors); + } + else if (cliProcess is not null) + { + await CleanupCliProcessAsync(cliProcess, stderrPump, cleanupErrors, _logger); + } + + foreach (var cleanupError in cleanupErrors) + { + _logger.LogDebug(cleanupError, "Failed to clean up Copilot client connection after startup failure"); + } } - else if (cliProcess is not null) + finally { - await CleanupCliProcessAsync(cliProcess, errors: null, _logger); + _connectionTask = null; } throw; @@ -403,8 +419,6 @@ private async Task CleanupConnectionAsync(List? errors) return; } - _connectionTask = null; - Connection ctx; try { @@ -415,6 +429,13 @@ private async Task CleanupConnectionAsync(List? errors) _logger.LogDebug(ex, "Ignoring failed Copilot client startup during cleanup"); return; } + finally + { + if (ReferenceEquals(_connectionTask, connectionTask)) + { + _connectionTask = null; + } + } await CleanupConnectionAsync(ctx, errors); } @@ -436,31 +457,60 @@ private async Task CleanupConnectionAsync(Connection ctx, List? error if (ctx.CliProcess is { } childProcess) { - await CleanupCliProcessAsync(childProcess, errors, _logger); + await CleanupCliProcessAsync(childProcess, ctx.StderrPump, errors, _logger); } } - private static async Task CleanupCliProcessAsync(Process childProcess, List? errors, ILogger? logger) + private static async Task CleanupCliProcessAsync(Process childProcess, ProcessStderrPump? stderrPump, List? errors, ILogger? logger) { + stderrPump?.Cancel(); + try { + if (!childProcess.HasExited) + { + childProcess.Kill(entireProcessTree: true); + // Kill is asynchronous; wait for the root CLI process to exit so cleanup callers + // do not observe StopAsync/DisposeAsync completion while it is still tearing down. + await childProcess.WaitForExitAsync(); + } + } + catch (Exception ex) + { + AddCleanupError(errors, ex, logger); + } + + if (stderrPump is not null) + { + var stderrPumpWaitTimestamp = Stopwatch.GetTimestamp(); try { - if (!childProcess.HasExited) + await stderrPump.WaitForCompletionAsync(StderrPumpShutdownTimeout); + } + catch (TimeoutException ex) + { + if (logger is not null) { - childProcess.Kill(entireProcessTree: true); - await childProcess.WaitForExitAsync(); + LoggingHelpers.LogTiming(logger, LogLevel.Debug, ex, + "Timed out waiting for CLI stderr pump to stop. Elapsed={Elapsed}, Timeout={Timeout}", + stderrPumpWaitTimestamp, + StderrPumpShutdownTimeout); } + + AddCleanupError(errors, ex, logger); + } + catch (Exception ex) + { + AddCleanupError(errors, ex, logger); } finally { - childProcess.Dispose(); + stderrPump.Dispose(); } } - catch (Exception ex) - { - AddCleanupError(errors, ex, logger); - } + + try { childProcess.Dispose(); } + catch (Exception ex) { AddCleanupError(errors, ex, logger); } } private static void AddCleanupError(List? errors, Exception ex, ILogger? logger) @@ -1655,7 +1705,7 @@ private static bool IsUnsupportedConnectMethod(RemoteRpcException ex) || string.Equals(ex.Message, "Unhandled method connect", StringComparison.Ordinal); } - private async Task<(Process Process, int? DetectedLocalhostTcpPort, StringBuilder StderrBuffer)> StartCliServerAsync(CancellationToken cancellationToken) + private async Task<(Process Process, int? DetectedLocalhostTcpPort, ProcessStderrPump StderrPump)> StartCliServerAsync(CancellationToken cancellationToken) { var options = _options; var logger = _logger; @@ -1779,38 +1829,29 @@ private static bool IsUnsupportedConnectMethod(RemoteRpcException ex) if (telemetry.CaptureContent is { } capture) startInfo.Environment["OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"] = capture ? "true" : "false"; } - Process? cliProcess = null; + var cliProcess = new Process { StartInfo = startInfo }; try { - cliProcess = new Process { StartInfo = startInfo }; var spawnTimestamp = Stopwatch.GetTimestamp(); cliProcess.Start(); LoggingHelpers.LogTiming(logger, LogLevel.Debug, null, "CopilotClient.StartCliServerAsync subprocess spawned. Elapsed={Elapsed}", spawnTimestamp); + } + catch + { + cliProcess.Dispose(); + throw; + } - // Capture stderr for error messages and forward to logger - var stderrBuffer = new StringBuilder(); - var stderrReader = Task.Run(async () => - { - while (true) - { - var line = await cliProcess.StandardError.ReadLineAsync(cancellationToken); - if (line is null) - { - break; - } - - lock (stderrBuffer) - { - stderrBuffer.AppendLine(line); - } - - logger.LogWarning("[CLI] {Line}", line); - } - }, cancellationToken); + // Capture stderr for error messages and forward to logger. + // The pump has its own lifetime token and is later cancelled/observed + // by the owning Connection before the process is disposed. + var stderrPump = ProcessStderrPump.Start(cliProcess, logger); - var detectedLocalhostTcpPort = (int?)null; + var detectedLocalhostTcpPort = (int?)null; + try + { if (!useStdio) { // Wait for port announcement @@ -1818,39 +1859,47 @@ private static bool IsUnsupportedConnectMethod(RemoteRpcException ex) using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(TimeSpan.FromSeconds(30)); - while (!cts.Token.IsCancellationRequested) + try { - var line = await cliProcess.StandardOutput.ReadLineAsync(cts.Token); - if (line is null) - { - await stderrReader; - throw CreateCliExitedException("Runtime process exited unexpectedly", stderrBuffer); - } - - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug("[CLI] {Line}", line); - } - - if (ListeningOnPortRegex().Match(line) is { Success: true } match) + while (true) { - detectedLocalhostTcpPort = int.Parse(match.Groups[1].Value, CultureInfo.InvariantCulture); - LoggingHelpers.LogTiming(logger, LogLevel.Debug, null, - "CopilotClient.StartCliServerAsync TCP port wait complete. Elapsed={Elapsed}, Port={Port}", - portWaitTimestamp, - detectedLocalhostTcpPort.Value); - break; + var line = await cliProcess.StandardOutput.ReadLineAsync(cts.Token); + if (line is null) + { + throw CreateCliExitedException("CLI process exited unexpectedly", stderrPump.Buffer); + } + + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("[CLI] {Line}", line); + } + + if (ListeningOnPortRegex().Match(line) is { Success: true } match) + { + detectedLocalhostTcpPort = int.Parse(match.Groups[1].Value, CultureInfo.InvariantCulture); + LoggingHelpers.LogTiming(logger, LogLevel.Debug, null, + "CopilotClient.StartCliServerAsync TCP port wait complete. Elapsed={Elapsed}, Port={Port}", + portWaitTimestamp, + detectedLocalhostTcpPort.Value); + break; + } } } + catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested && cts.IsCancellationRequested) + { + throw CreateCliExitedException("Timed out waiting for Copilot CLI to report its TCP listening port.", stderrPump.Buffer); + } } - return (cliProcess, detectedLocalhostTcpPort, stderrBuffer); + return (cliProcess, detectedLocalhostTcpPort, stderrPump); } catch { - if (cliProcess is not null) + var cleanupErrors = new List(); + await CleanupCliProcessAsync(cliProcess, stderrPump, cleanupErrors, logger); + foreach (var cleanupError in cleanupErrors) { - await CleanupCliProcessAsync(cliProcess, errors: null, logger); + logger.LogDebug(cleanupError, "Failed to clean up Copilot CLI process after startup failure"); } throw; @@ -1898,77 +1947,94 @@ private static (string FileName, IEnumerable Args) ResolveCliCommand(str return (cliPath, args); } - private async Task ConnectToServerAsync(Process? cliProcess, string? tcpHost, int? tcpPort, StringBuilder? stderrBuffer, CancellationToken cancellationToken) + private async Task ConnectToServerAsync(Process? cliProcess, string? tcpHost, int? tcpPort, ProcessStderrPump? stderrPump, CancellationToken cancellationToken) { var setupTimestamp = Stopwatch.GetTimestamp(); - Stream inputStream, outputStream; NetworkStream? networkStream = null; + JsonRpc? rpc = null; - if (_connection is StdioRuntimeConnection) + try { - if (cliProcess == null) - { - throw new InvalidOperationException("Runtime process not started"); - } + Stream inputStream, outputStream; - inputStream = cliProcess.StandardOutput.BaseStream; - outputStream = cliProcess.StandardInput.BaseStream; - } - else - { - if (tcpHost is null || tcpPort is null) + if (_connection is StdioRuntimeConnection) { - throw new InvalidOperationException("Cannot connect because TCP host or port are not available"); - } + if (cliProcess == null) + { + throw new InvalidOperationException("Runtime process not started"); + } - var socket = new Socket(SocketType.Stream, ProtocolType.Tcp); - try - { - var tcpConnectTimestamp = Stopwatch.GetTimestamp(); - LogConnectingToCliServer(_logger, tcpHost, tcpPort.Value); - await socket.ConnectAsync(tcpHost, tcpPort.Value, cancellationToken); - LoggingHelpers.LogTiming(_logger, LogLevel.Debug, null, - "CopilotClient.ConnectToServerAsync TCP connect complete. Elapsed={Elapsed}, Host={Host}, Port={Port}", - tcpConnectTimestamp, - tcpHost, - tcpPort.Value); + inputStream = cliProcess.StandardOutput.BaseStream; + outputStream = cliProcess.StandardInput.BaseStream; } - catch + else { - socket.Dispose(); - throw; + if (tcpHost is null || tcpPort is null) + { + throw new InvalidOperationException("Cannot connect because TCP host or port are not available"); + } + + var socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + try + { + var tcpConnectTimestamp = Stopwatch.GetTimestamp(); + LogConnectingToCliServer(_logger, tcpHost, tcpPort.Value); + await socket.ConnectAsync(tcpHost, tcpPort.Value, cancellationToken); + LoggingHelpers.LogTiming(_logger, LogLevel.Debug, null, + "CopilotClient.ConnectToServerAsync TCP connect complete. Elapsed={Elapsed}, Host={Host}, Port={Port}", + tcpConnectTimestamp, + tcpHost, + tcpPort.Value); + } + catch + { + socket.Dispose(); + throw; + } + + inputStream = outputStream = networkStream = new NetworkStream(socket, ownsSocket: true); } - inputStream = outputStream = networkStream = new NetworkStream(socket, ownsSocket: true); - } + rpc = new JsonRpc( + outputStream, + inputStream, + SerializerOptionsForMessageFormatter, + _logger); + + var handler = new RpcHandler(this); + rpc.SetLocalRpcMethod("session.event", handler.OnSessionEvent); + rpc.SetLocalRpcMethod("session.lifecycle", handler.OnSessionLifecycle); + rpc.SetLocalRpcMethod("userInput.request", handler.OnUserInputRequest); + rpc.SetLocalRpcMethod("exitPlanMode.request", handler.OnExitPlanModeRequest); + rpc.SetLocalRpcMethod("autoModeSwitch.request", handler.OnAutoModeSwitchRequest); + rpc.SetLocalRpcMethod("hooks.invoke", handler.OnHooksInvoke); + rpc.SetLocalRpcMethod("systemMessage.transform", handler.OnSystemMessageTransform); + ClientSessionApiRegistration.RegisterClientSessionApiHandlers(rpc, sessionId => + { + var session = GetSession(sessionId) ?? throw new ArgumentException($"Unknown session {sessionId}"); + return session.ClientSessionApis; + }); + rpc.StartListening(); + LoggingHelpers.LogTiming(_logger, LogLevel.Debug, null, + "CopilotClient.ConnectToServerAsync transport setup complete. Elapsed={Elapsed}", + setupTimestamp); - var rpc = new JsonRpc( - outputStream, - inputStream, - SerializerOptionsForMessageFormatter, - _logger); + _serverRpc = new ServerRpc(rpc); - var handler = new RpcHandler(this); - rpc.SetLocalRpcMethod("session.event", handler.OnSessionEvent); - rpc.SetLocalRpcMethod("session.lifecycle", handler.OnSessionLifecycle); - rpc.SetLocalRpcMethod("userInput.request", handler.OnUserInputRequest); - rpc.SetLocalRpcMethod("exitPlanMode.request", handler.OnExitPlanModeRequest); - rpc.SetLocalRpcMethod("autoModeSwitch.request", handler.OnAutoModeSwitchRequest); - rpc.SetLocalRpcMethod("hooks.invoke", handler.OnHooksInvoke); - rpc.SetLocalRpcMethod("systemMessage.transform", handler.OnSystemMessageTransform); - ClientSessionApiRegistration.RegisterClientSessionApiHandlers(rpc, sessionId => + return new Connection(rpc, cliProcess, networkStream, stderrPump); + } + catch { - var session = GetSession(sessionId) ?? throw new ArgumentException($"Unknown session {sessionId}"); - return session.ClientSessionApis; - }); - rpc.StartListening(); - LoggingHelpers.LogTiming(_logger, LogLevel.Debug, null, - "CopilotClient.ConnectToServerAsync transport setup complete. Elapsed={Elapsed}", - setupTimestamp); - - _serverRpc = new ServerRpc(rpc); + try { rpc?.Dispose(); } + catch (Exception ex) { _logger.LogDebug(ex, "Failed to dispose JSON-RPC connection after startup failure"); } - return new Connection(rpc, cliProcess, networkStream, stderrBuffer); + if (networkStream is not null) + { + try { await networkStream.DisposeAsync(); } + catch (Exception ex) { _logger.LogDebug(ex, "Failed to dispose TCP stream after startup failure"); } + } + throw; + } } private static JsonSerializerOptions SerializerOptionsForMessageFormatter { get; } = CreateSerializerOptions(); @@ -2155,12 +2221,116 @@ private class Connection( JsonRpc rpc, Process? cliProcess, // Set if we created the child process NetworkStream? networkStream, // Set if using TCP - StringBuilder? stderrBuffer = null) // Captures stderr for error messages + ProcessStderrPump? stderrPump = null) // Captures stderr for error messages { public Process? CliProcess => cliProcess; public JsonRpc Rpc => rpc; public NetworkStream? NetworkStream => networkStream; - public StringBuilder? StderrBuffer => stderrBuffer; + public ProcessStderrPump? StderrPump => stderrPump; + public StringBuilder? StderrBuffer => stderrPump?.Buffer; + } + + private sealed class ProcessStderrPump : IDisposable + { + private readonly CancellationTokenSource _cancellationTokenSource = new(); + private readonly Task _completion; + private int _disposeRequested; + + private ProcessStderrPump(Process process, ILogger logger) + { + _completion = Task.Run(() => PumpAsync(process, logger, _cancellationTokenSource.Token)); + } + + public StringBuilder Buffer { get; } = new(); + + public static ProcessStderrPump Start(Process process, ILogger logger) + { + return new ProcessStderrPump(process, logger); + } + + public void Cancel() + { + try + { + _cancellationTokenSource.Cancel(); + } + catch (ObjectDisposedException) + { + } + } + + public async Task WaitForCompletionAsync(TimeSpan timeout) + { + var completedTask = await Task.WhenAny(_completion, Task.Delay(timeout)); + if (!ReferenceEquals(completedTask, _completion)) + { + throw new TimeoutException(); + } + + await _completion; + } + + public void Dispose() + { + if (Interlocked.Exchange(ref _disposeRequested, 1) != 0) + { + return; + } + + Cancel(); + + if (_completion.IsCompleted) + { + _cancellationTokenSource.Dispose(); + } + else + { + _ = _completion.ContinueWith( + static (_, state) => ((CancellationTokenSource)state!).Dispose(), + _cancellationTokenSource, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } + } + + private async Task PumpAsync(Process process, ILogger logger, CancellationToken cancellationToken) + { + try + { + while (true) + { + var line = await process.StandardError.ReadLineAsync(cancellationToken); + if (line is null) + { + break; + } + + lock (Buffer) + { + Buffer.AppendLine(line); + } + + logger.LogWarning("[CLI] {Line}", line); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + } + catch (InvalidOperationException) when (cancellationToken.IsCancellationRequested) + { + } + catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested) + { + } + catch (IOException) when (cancellationToken.IsCancellationRequested) + { + } + catch (Exception ex) + { + logger.LogDebug(ex, "CLI stderr pump stopped unexpectedly"); + } + } } private static class ProcessArgumentEscaper From c3641ed9c315f2daa01a281d717b1980747548fd Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 28 May 2026 21:42:46 -0400 Subject: [PATCH 2/2] Address review feedback for stderr pump cleanup Iteratively refined the PR per inline review: Client.cs - Simplify ProcessStderrPump: drop IDisposable, remove _disposeRequested field, deferred-dispose ContinueWith machinery, and the try/catch (ObjectDisposedException) in Cancel(). The CancellationTokenSource has no CancelAfter and no registrations after ReadLineAsync exits, so GC with the pump is sufficient. Drop the matching finally { stderrPump.Dispose(); } block in CleanupCliProcessAsync. - Consolidate PumpAsync catches into one `catch (Exception e) when (cancellationToken.IsCancellationRequested && e is ...)`. - Refactor both read loops to `while (await ... is string line)`: - PumpAsync (stderr pump) - StartCliServerAsync port-wait loop; EOF handling moves out of the loop body and triggers via `detectedLocalhostTcpPort is null`. - Revert error text to `Runtime process exited unexpectedly` and the three `CLI stderr pump` log messages to `runtime stderr pump` to match origin/main wording. - Replace `var detectedLocalhostTcpPort = (int?)null;` with `int? detectedLocalhostTcpPort = null;`. Polyfills/DownlevelExtensions.cs - Add DownlevelTaskExtensions with WaitAsync on Task and Task. Use a single linked CancellationTokenSource (no separate timeoutCts) and `CancellationToken cancellationToken = default` to avoid an extra overload. The Task overload delegates to the Task overload via `((Task)task).WaitAsync(...)`. test/Polyfills/TaskExtensions.cs - Delete the test-only polyfill; the src polyfill now covers Task for net472 (test pulls Polyfills/*.cs into the test assembly). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- dotnet/src/Client.cs | 166 ++++++-------------- dotnet/src/Polyfills/DownlevelExtensions.cs | 31 ++++ dotnet/test/Polyfills/TaskExtensions.cs | 73 --------- 3 files changed, 75 insertions(+), 195 deletions(-) delete mode 100644 dotnet/test/Polyfills/TaskExtensions.cs diff --git a/dotnet/src/Client.cs b/dotnet/src/Client.cs index a97eaf36e..03f43c9a2 100644 --- a/dotnet/src/Client.cs +++ b/dotnet/src/Client.cs @@ -57,7 +57,7 @@ public sealed partial class CopilotClient : IDisposable, IAsyncDisposable /// Minimum protocol version this SDK can communicate with. /// private const int MinProtocolVersion = 3; - private static readonly TimeSpan StderrPumpShutdownTimeout = TimeSpan.FromSeconds(5); + private static readonly TimeSpan s_stderrPumpShutdownTimeout = TimeSpan.FromSeconds(5); /// /// Provides a thread-safe collection of active Copilot sessions, indexed by session identifier. @@ -282,33 +282,20 @@ async Task StartCoreAsync(CancellationToken ct) } catch (Exception ex) { - var cleanupErrors = new List(); - try + if (ex is not OperationCanceledException) { - if (ex is not OperationCanceledException) - { - LoggingHelpers.LogTiming(_logger, LogLevel.Warning, ex, - "CopilotClient.StartAsync failed. Elapsed={Elapsed}", - startTimestamp); - } - - if (connection is not null) - { - await CleanupConnectionAsync(connection, cleanupErrors); - } - else if (cliProcess is not null) - { - await CleanupCliProcessAsync(cliProcess, stderrPump, cleanupErrors, _logger); - } + LoggingHelpers.LogTiming(_logger, LogLevel.Warning, ex, + "CopilotClient.StartAsync failed. Elapsed={Elapsed}", + startTimestamp); + } - foreach (var cleanupError in cleanupErrors) - { - _logger.LogDebug(cleanupError, "Failed to clean up Copilot client connection after startup failure"); - } + if (connection is not null) + { + await CleanupConnectionAsync(connection, errors: null); } - finally + else if (cliProcess is not null) { - _connectionTask = null; + await CleanupCliProcessAsync(cliProcess, stderrPump, errors: null, _logger); } throw; @@ -419,6 +406,8 @@ private async Task CleanupConnectionAsync(List? errors) return; } + _connectionTask = null; + Connection ctx; try { @@ -429,13 +418,6 @@ private async Task CleanupConnectionAsync(List? errors) _logger.LogDebug(ex, "Ignoring failed Copilot client startup during cleanup"); return; } - finally - { - if (ReferenceEquals(_connectionTask, connectionTask)) - { - _connectionTask = null; - } - } await CleanupConnectionAsync(ctx, errors); } @@ -485,16 +467,16 @@ private static async Task CleanupCliProcessAsync(Process childProcess, ProcessSt var stderrPumpWaitTimestamp = Stopwatch.GetTimestamp(); try { - await stderrPump.WaitForCompletionAsync(StderrPumpShutdownTimeout); + await stderrPump.Completion.WaitAsync(s_stderrPumpShutdownTimeout); } catch (TimeoutException ex) { if (logger is not null) { LoggingHelpers.LogTiming(logger, LogLevel.Debug, ex, - "Timed out waiting for CLI stderr pump to stop. Elapsed={Elapsed}, Timeout={Timeout}", + "Timed out waiting for runtime stderr pump to stop. Elapsed={Elapsed}, Timeout={Timeout}", stderrPumpWaitTimestamp, - StderrPumpShutdownTimeout); + s_stderrPumpShutdownTimeout); } AddCleanupError(errors, ex, logger); @@ -503,10 +485,6 @@ private static async Task CleanupCliProcessAsync(Process childProcess, ProcessSt { AddCleanupError(errors, ex, logger); } - finally - { - stderrPump.Dispose(); - } } try { childProcess.Dispose(); } @@ -1844,14 +1822,15 @@ private static bool IsUnsupportedConnectMethod(RemoteRpcException ex) throw; } - // Capture stderr for error messages and forward to logger. - // The pump has its own lifetime token and is later cancelled/observed - // by the owning Connection before the process is disposed. - var stderrPump = ProcessStderrPump.Start(cliProcess, logger); - - var detectedLocalhostTcpPort = (int?)null; + ProcessStderrPump? stderrPump = null; + int? detectedLocalhostTcpPort = null; try { + // Capture stderr for error messages and forward to logger. + // The pump has its own lifetime token and is later cancelled/observed + // by the owning Connection before the process is disposed. + stderrPump = ProcessStderrPump.Start(cliProcess, logger); + if (!useStdio) { // Wait for port announcement @@ -1861,14 +1840,8 @@ private static bool IsUnsupportedConnectMethod(RemoteRpcException ex) try { - while (true) + while (await cliProcess.StandardOutput.ReadLineAsync(cts.Token) is string line) { - var line = await cliProcess.StandardOutput.ReadLineAsync(cts.Token); - if (line is null) - { - throw CreateCliExitedException("CLI process exited unexpectedly", stderrPump.Buffer); - } - if (logger.IsEnabled(LogLevel.Debug)) { logger.LogDebug("[CLI] {Line}", line); @@ -1884,6 +1857,17 @@ private static bool IsUnsupportedConnectMethod(RemoteRpcException ex) break; } } + + if (detectedLocalhostTcpPort is null) + { + // The CLI's stdout closed (process exited). Drain stderr + // before throwing so the surfaced exception includes the + // final diagnostic lines. + try { await stderrPump.Completion.WaitAsync(s_stderrPumpShutdownTimeout, CancellationToken.None); } + catch (TimeoutException) { /* best-effort: include whatever was captured */ } + catch (Exception ex) { logger.LogDebug(ex, "Runtime stderr pump faulted while draining"); } + throw CreateCliExitedException("Runtime process exited unexpectedly", stderrPump.Buffer); + } } catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested && cts.IsCancellationRequested) { @@ -1895,12 +1879,7 @@ private static bool IsUnsupportedConnectMethod(RemoteRpcException ex) } catch { - var cleanupErrors = new List(); - await CleanupCliProcessAsync(cliProcess, stderrPump, cleanupErrors, logger); - foreach (var cleanupError in cleanupErrors) - { - logger.LogDebug(cleanupError, "Failed to clean up Copilot CLI process after startup failure"); - } + await CleanupCliProcessAsync(cliProcess, stderrPump, errors: null, logger); throw; } @@ -2230,11 +2209,10 @@ private class Connection( public StringBuilder? StderrBuffer => stderrPump?.Buffer; } - private sealed class ProcessStderrPump : IDisposable + private sealed class ProcessStderrPump { private readonly CancellationTokenSource _cancellationTokenSource = new(); private readonly Task _completion; - private int _disposeRequested; private ProcessStderrPump(Process process, ILogger logger) { @@ -2243,69 +2221,21 @@ private ProcessStderrPump(Process process, ILogger logger) public StringBuilder Buffer { get; } = new(); + public Task Completion => _completion; + public static ProcessStderrPump Start(Process process, ILogger logger) { return new ProcessStderrPump(process, logger); } - public void Cancel() - { - try - { - _cancellationTokenSource.Cancel(); - } - catch (ObjectDisposedException) - { - } - } - - public async Task WaitForCompletionAsync(TimeSpan timeout) - { - var completedTask = await Task.WhenAny(_completion, Task.Delay(timeout)); - if (!ReferenceEquals(completedTask, _completion)) - { - throw new TimeoutException(); - } - - await _completion; - } - - public void Dispose() - { - if (Interlocked.Exchange(ref _disposeRequested, 1) != 0) - { - return; - } - - Cancel(); - - if (_completion.IsCompleted) - { - _cancellationTokenSource.Dispose(); - } - else - { - _ = _completion.ContinueWith( - static (_, state) => ((CancellationTokenSource)state!).Dispose(), - _cancellationTokenSource, - CancellationToken.None, - TaskContinuationOptions.ExecuteSynchronously, - TaskScheduler.Default); - } - } + public void Cancel() => _cancellationTokenSource.Cancel(); private async Task PumpAsync(Process process, ILogger logger, CancellationToken cancellationToken) { try { - while (true) + while (await process.StandardError.ReadLineAsync(cancellationToken) is string line) { - var line = await process.StandardError.ReadLineAsync(cancellationToken); - if (line is null) - { - break; - } - lock (Buffer) { Buffer.AppendLine(line); @@ -2314,21 +2244,13 @@ private async Task PumpAsync(Process process, ILogger logger, CancellationToken logger.LogWarning("[CLI] {Line}", line); } } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) - { - } - catch (InvalidOperationException) when (cancellationToken.IsCancellationRequested) - { - } - catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested) - { - } - catch (IOException) when (cancellationToken.IsCancellationRequested) + catch (Exception e) when (cancellationToken.IsCancellationRequested + && e is OperationCanceledException or InvalidOperationException or ObjectDisposedException or IOException) { } catch (Exception ex) { - logger.LogDebug(ex, "CLI stderr pump stopped unexpectedly"); + logger.LogDebug(ex, "Runtime stderr pump stopped unexpectedly"); } } } diff --git a/dotnet/src/Polyfills/DownlevelExtensions.cs b/dotnet/src/Polyfills/DownlevelExtensions.cs index 0fdf70f3e..17c98643e 100644 --- a/dotnet/src/Polyfills/DownlevelExtensions.cs +++ b/dotnet/src/Polyfills/DownlevelExtensions.cs @@ -614,4 +614,35 @@ internal static class DownlevelValueTaskExtensions public static ValueTask FromResult(T result) => new(result); } } + + internal static class DownlevelTaskExtensions + { + extension(Task task) + { + public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + using var delayCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var completed = await Task.WhenAny(task, Task.Delay(timeout, delayCts.Token)).ConfigureAwait(false); + if (!ReferenceEquals(completed, task)) + { + cancellationToken.ThrowIfCancellationRequested(); + throw new TimeoutException(); + } + + delayCts.Cancel(); + await task.ConfigureAwait(false); + } + } + + extension(Task task) + { + public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken = default) + { + await ((Task)task).WaitAsync(timeout, cancellationToken).ConfigureAwait(false); + return await task.ConfigureAwait(false); + } + } + } } diff --git a/dotnet/test/Polyfills/TaskExtensions.cs b/dotnet/test/Polyfills/TaskExtensions.cs deleted file mode 100644 index 04096e81d..000000000 --- a/dotnet/test/Polyfills/TaskExtensions.cs +++ /dev/null @@ -1,73 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) Microsoft Corporation. All rights reserved. - *--------------------------------------------------------------------------------------------*/ - -// Polyfills for Task APIs not available on .NET Framework. -// These are test-only and not optimized for production use. - -#if !NET8_0_OR_GREATER - -using System.Threading; - -namespace System.Threading.Tasks; - -internal static class TestDownlevelTaskExtensions -{ - extension(Task task) - { - public Task WaitAsync(TimeSpan timeout) - { - if (task.IsCompleted) - { - return task; - } - - return WaitAsyncCore(task, timeout); - } - } - - extension(Task task) - { - public Task WaitAsync(TimeSpan timeout) - { - if (task.IsCompleted) - { - return task; - } - - return WaitAsyncCoreGeneric(task, timeout); - } - } - - private static async Task WaitAsyncCore(Task task, TimeSpan timeout) - { - using var cts = new CancellationTokenSource(); - var delayTask = Task.Delay(timeout, cts.Token); - var completedTask = await Task.WhenAny(task, delayTask).ConfigureAwait(false); - if (completedTask == task) - { - cts.Cancel(); - await task.ConfigureAwait(false); - } - else - { - throw new TimeoutException(); - } - } - - private static async Task WaitAsyncCoreGeneric(Task task, TimeSpan timeout) - { - using var cts = new CancellationTokenSource(); - var delayTask = Task.Delay(timeout, cts.Token); - var completedTask = await Task.WhenAny(task, delayTask).ConfigureAwait(false); - if (completedTask == task) - { - cts.Cancel(); - return await task.ConfigureAwait(false); - } - - throw new TimeoutException(); - } -} - -#endif