From b31565d8aabfdedead0831d579b44805060741f8 Mon Sep 17 00:00:00 2001 From: polinasok <51177946+polinasok@users.noreply.github.com> Date: Mon, 25 Oct 2021 12:39:28 -0700 Subject: [PATCH] service/dap: support JSON-RPC and DAP on the same port from "dlv debug/exec/test/attach" (#2755) * Support JSON-RPC and DAP on the same port * Fix test Co-authored-by: Polina Sokolova --- cmd/dlv/dlv_test.go | 182 +++++++++++++++++++++++++++++++++- service/dap/daptest/client.go | 31 +++++- service/dap/server.go | 33 +++--- service/dap/server_test.go | 23 +---- service/rpccommon/server.go | 26 ++++- 5 files changed, 249 insertions(+), 46 deletions(-) diff --git a/cmd/dlv/dlv_test.go b/cmd/dlv/dlv_test.go index 7320ec59..565d868c 100644 --- a/cmd/dlv/dlv_test.go +++ b/cmd/dlv/dlv_test.go @@ -24,8 +24,10 @@ import ( "github.com/go-delve/delve/pkg/goversion" protest "github.com/go-delve/delve/pkg/proc/test" "github.com/go-delve/delve/pkg/terminal" + "github.com/go-delve/delve/service/dap" "github.com/go-delve/delve/service/dap/daptest" "github.com/go-delve/delve/service/rpc2" + godap "github.com/google/go-dap" "golang.org/x/tools/go/packages" ) @@ -52,6 +54,7 @@ func TestMain(m *testing.M) { } func assertNoError(err error, t testing.TB, s string) { + t.Helper() if err != nil { _, file, line, _ := runtime.Caller(1) fname := filepath.Base(file) @@ -647,8 +650,8 @@ func TestTypecheckRPC(t *testing.T) { } } -// TestDap verifies that a dap server can be started and shut down. -func TestDap(t *testing.T) { +// TestDAPCmd verifies that a dap server can be started and shut down. +func TestDAPCmd(t *testing.T) { const listenAddr = "127.0.0.1:40575" dlvbin, tmpdir := getDlvBin(t) @@ -691,8 +694,179 @@ func TestDap(t *testing.T) { cmd.Wait() } -// TestDapWithClient tests dlv dap --client-addr can be started and shut down. -func TestDapWithClient(t *testing.T) { +func newDAPRemoteClient(t *testing.T, addr string) *daptest.Client { + c := daptest.NewClient(addr) + c.AttachRequest(map[string]interface{}{"mode": "remote", "stopOnEntry": true}) + c.ExpectInitializedEvent(t) + c.ExpectAttachResponse(t) + c.ConfigurationDoneRequest() + c.ExpectStoppedEvent(t) + c.ExpectConfigurationDoneResponse(t) + return c +} + +func TestRemoteDAPClient(t *testing.T) { + const listenAddr = "127.0.0.1:40576" + + dlvbin, tmpdir := getDlvBin(t) + defer os.RemoveAll(tmpdir) + + buildtestdir := filepath.Join(protest.FindFixturesDir(), "buildtest") + cmd := exec.Command(dlvbin, "debug", "--headless", "--log-output=dap", "--log", "--listen", listenAddr) + cmd.Dir = buildtestdir + stdout, err := cmd.StdoutPipe() + assertNoError(err, t, "stdout pipe") + defer stdout.Close() + stderr, err := cmd.StderrPipe() + assertNoError(err, t, "stderr pipe") + defer stderr.Close() + assertNoError(cmd.Start(), t, "start headless instance") + + scanOut := bufio.NewScanner(stdout) + scanErr := bufio.NewScanner(stderr) + // Wait for the debug server to start + scanOut.Scan() + t.Log(scanOut.Text()) + go func() { // Capture logging + for scanErr.Scan() { + t.Log(scanErr.Text()) + } + }() + + client := newDAPRemoteClient(t, listenAddr) + client.ContinueRequest(1) + client.ExpectContinueResponse(t) + client.ExpectTerminatedEvent(t) + + client.DisconnectRequest() + client.ExpectOutputEventProcessExited(t, 0) + client.ExpectOutputEventDetaching(t) + client.ExpectDisconnectResponse(t) + client.ExpectTerminatedEvent(t) + if _, err := client.ReadMessage(); err == nil { + t.Error("expected read error upon shutdown") + } + client.Close() + cmd.Wait() +} + +func closeDAPRemoteMultiClient(t *testing.T, c *daptest.Client) { + c.DisconnectRequest() + c.ExpectOutputEventClosingClient(t) + c.ExpectDisconnectResponse(t) + c.ExpectTerminatedEvent(t) + c.Close() + time.Sleep(10 * time.Millisecond) +} + +func TestRemoteDAPClientMulti(t *testing.T) { + const listenAddr = "127.0.0.1:40577" + + dlvbin, tmpdir := getDlvBin(t) + defer os.RemoveAll(tmpdir) + + buildtestdir := filepath.Join(protest.FindFixturesDir(), "buildtest") + cmd := exec.Command(dlvbin, "debug", "--headless", "--accept-multiclient", "--log-output=debugger", "--log", "--listen", listenAddr) + cmd.Dir = buildtestdir + stdout, err := cmd.StdoutPipe() + assertNoError(err, t, "stdout pipe") + defer stdout.Close() + stderr, err := cmd.StderrPipe() + assertNoError(err, t, "stderr pipe") + defer stderr.Close() + assertNoError(cmd.Start(), t, "start headless instance") + + scanOut := bufio.NewScanner(stdout) + scanErr := bufio.NewScanner(stderr) + // Wait for the debug server to start + scanOut.Scan() + t.Log(scanOut.Text()) + go func() { // Capture logging + for scanErr.Scan() { + t.Log(scanErr.Text()) + } + }() + + // Client 1 connects and continues to main.main + dapclient := newDAPRemoteClient(t, listenAddr) + dapclient.SetFunctionBreakpointsRequest([]godap.FunctionBreakpoint{{Name: "main.main"}}) + dapclient.ExpectSetFunctionBreakpointsResponse(t) + dapclient.ContinueRequest(1) + dapclient.ExpectContinueResponse(t) + dapclient.ExpectStoppedEvent(t) + dapclient.CheckStopLocation(t, 1, "main.main", 5) + closeDAPRemoteMultiClient(t, dapclient) + + // Client 2 reconnects at main.main and continues to process exit + dapclient2 := newDAPRemoteClient(t, listenAddr) + dapclient2.CheckStopLocation(t, 1, "main.main", 5) + dapclient2.ContinueRequest(1) + dapclient2.ExpectContinueResponse(t) + dapclient2.ExpectTerminatedEvent(t) + closeDAPRemoteMultiClient(t, dapclient2) + + // Attach to exited processs is an error + dapclient3 := daptest.NewClient(listenAddr) + dapclient3.AttachRequest(map[string]interface{}{"mode": "remote", "stopOnEntry": true}) + dapclient3.ExpectErrorResponseWith(t, dap.FailedToAttach, `Process \d+ has exited with status 0`, true) + closeDAPRemoteMultiClient(t, dapclient3) + + // But rpc clients can still connect and restart + rpcclient := rpc2.NewClient(listenAddr) + if _, err := rpcclient.Restart(false); err != nil { + t.Errorf("error restarting with rpc client: %v", err) + } + if err := rpcclient.Detach(true); err != nil { + t.Fatalf("error detaching from headless instance: %v", err) + } + cmd.Wait() +} + +func TestRemoteDAPClientAfterContinue(t *testing.T) { + const listenAddr = "127.0.0.1:40578" + + dlvbin, tmpdir := getDlvBin(t) + defer os.RemoveAll(tmpdir) + + fixture := protest.BuildFixture("loopprog", 0) + cmd := exec.Command(dlvbin, "exec", fixture.Path, "--headless", "--continue", "--accept-multiclient", "--log-output=debugger,dap", "--log", "--listen", listenAddr) + stdout, err := cmd.StdoutPipe() + assertNoError(err, t, "stdout pipe") + defer stdout.Close() + stderr, err := cmd.StderrPipe() + assertNoError(err, t, "stderr pipe") + defer stderr.Close() + assertNoError(cmd.Start(), t, "start headless instance") + + scanOut := bufio.NewScanner(stdout) + scanErr := bufio.NewScanner(stderr) + // Wait for the debug server to start + scanOut.Scan() // "API server listening..."" + t.Log(scanOut.Text()) + // Wait for the program to start + scanOut.Scan() // "past main" + t.Log(scanOut.Text()) + + go func() { // Capture logging + for scanErr.Scan() { + t.Log(scanErr.Text()) + } + }() + + c := newDAPRemoteClient(t, listenAddr) + c.DisconnectRequestWithKillOption(true) + c.ExpectOutputEventDetachingKill(t) + c.ExpectDisconnectResponse(t) + c.ExpectTerminatedEvent(t) + if _, err := c.ReadMessage(); err == nil { + t.Error("expected read error upon shutdown") + } + c.Close() + cmd.Wait() +} + +// TestDAPCmdWithClient tests dlv dap --client-addr can be started and shut down. +func TestDAPCmdWithClient(t *testing.T) { listener, err := net.Listen("tcp", ":0") if err != nil { t.Fatalf("cannot setup listener required for testing: %v", err) diff --git a/service/dap/daptest/client.go b/service/dap/daptest/client.go index bf2143f6..0dfb1e23 100644 --- a/service/dap/daptest/client.go +++ b/service/dap/daptest/client.go @@ -88,11 +88,11 @@ func (c *Client) ExpectVisibleErrorResponse(t *testing.T) *dap.ErrorResponse { return er } -func (c *Client) expectErrorResponse(t *testing.T, id int, message string) *dap.ErrorResponse { +func (c *Client) ExpectErrorResponseWith(t *testing.T, id int, message string, showUser bool) *dap.ErrorResponse { t.Helper() er := c.ExpectErrorResponse(t) - if er.Body.Error.Id != id || er.Message != message { - t.Errorf("\ngot %#v\nwant Id=%d Message=%q", er, id, message) + if matched, _ := regexp.MatchString(message, er.Body.Error.Format); !matched || er.Body.Error.Id != id || er.Body.Error.ShowUser != showUser { + t.Errorf("got %#v, want Id=%d Format=%q ShowUser=%v", er, id, message, showUser) } return er } @@ -129,12 +129,12 @@ func pretty(v interface{}) string { func (c *Client) ExpectNotYetImplementedErrorResponse(t *testing.T) *dap.ErrorResponse { t.Helper() - return c.expectErrorResponse(t, 7777, "Not yet implemented") + return c.ExpectErrorResponseWith(t, 7777, "Not yet implemented", false) } func (c *Client) ExpectUnsupportedCommandErrorResponse(t *testing.T) *dap.ErrorResponse { t.Helper() - return c.expectErrorResponse(t, 9999, "Unsupported command") + return c.ExpectErrorResponseWith(t, 9999, "Unsupported command", false) } func (c *Client) ExpectOutputEventRegex(t *testing.T, want string) *dap.OutputEvent { @@ -173,6 +173,27 @@ func (c *Client) ExpectOutputEventTerminating(t *testing.T) *dap.OutputEvent { return c.ExpectOutputEventRegex(t, `Terminating process [0-9]+\n`) } +func (c *Client) ExpectOutputEventClosingClient(t *testing.T) *dap.OutputEvent { + t.Helper() + return c.ExpectOutputEventRegex(t, `Closing client session, but leaving multi-client DAP server running at .+:[0-9]+\n`) +} + +func (c *Client) CheckStopLocation(t *testing.T, thread int, name string, line int) { + t.Helper() + c.StackTraceRequest(thread, 0, 20) + st := c.ExpectStackTraceResponse(t) + if len(st.Body.StackFrames) < 1 { + t.Errorf("\ngot %#v\nwant len(stackframes) => 1", st) + } else { + if line != -1 && st.Body.StackFrames[0].Line != line { + t.Errorf("\ngot %#v\nwant Line=%d", st, line) + } + if st.Body.StackFrames[0].Name != name { + t.Errorf("\ngot %#v\nwant Name=%q", st, name) + } + } +} + // InitializeRequest sends an 'initialize' request. func (c *Client) InitializeRequest() { request := &dap.InitializeRequest{Request: *c.newRequest("initialize")} diff --git a/service/dap/server.go b/service/dap/server.go index b185f7fd..b7c1c7c1 100644 --- a/service/dap/server.go +++ b/service/dap/server.go @@ -166,9 +166,9 @@ type Config struct { // log is used for structured logging. log *logrus.Entry - // stopTriggered is closed when the server is Stop()-ed. + // StopTriggered is closed when the server is Stop()-ed. // Can be used to safeguard against duplicate shutdown sequences. - stopTriggered chan struct{} + StopTriggered chan struct{} } type process struct { @@ -277,7 +277,7 @@ func NewServer(config *service.Config) *Server { config: &Config{ Config: config, log: logger, - stopTriggered: make(chan struct{}), + StopTriggered: make(chan struct{}), }, listener: config.Listener, } @@ -286,7 +286,7 @@ func NewServer(config *service.Config) *Server { // NewSession creates a new client session that can handle DAP traffic. // It takes an open connection and provides a Close() method to shut it // down when the DAP session disconnects or a connection error occurs. -func NewSession(conn io.ReadWriteCloser, config *Config) *Session { +func NewSession(conn io.ReadWriteCloser, config *Config, debugger *debugger.Debugger) *Session { if config.log == nil { config.log = logflags.DAPLogger() } @@ -298,6 +298,7 @@ func NewSession(conn io.ReadWriteCloser, config *Config) *Session { variableHandles: newVariablesHandlesMap(), args: defaultArgs, exceptionErr: nil, + debugger: debugger, } } @@ -328,11 +329,11 @@ func (s *Session) setLaunchAttachArgs(args LaunchAttachCommonConfig) error { // connection. It shuts down the underlying debugger and kills the target // process if it was launched by it or stops the noDebug process. // This method mustn't be called more than once. -// stopTriggered notifies other goroutines that stop is in progreess. +// StopTriggered notifies other goroutines that stop is in progreess. func (s *Server) Stop() { s.config.log.Debug("DAP server stopping...") defer s.config.log.Debug("DAP server stopped") - close(s.config.stopTriggered) + close(s.config.StopTriggered) if s.listener != nil { // If run goroutine is blocked on accept, this will unblock it. @@ -365,7 +366,7 @@ func (s *Session) Close() { } // Close client connection last, so other shutdown stages // can send client notifications. - // Unless Stop() was called after read loop in serveDAPCodec() + // Unless Stop() was called after read loop in ServeDAPCodec() // returned, this will result in a closed connection error // on next read, breaking out the read loop andd // allowing the run goroutinee to exit. @@ -387,8 +388,8 @@ func (c *Config) triggerServerStop() { // -- run goroutine: calls triggerServerStop() // -- main goroutine: calls Stop() // -- main goroutine: Stop() closes client connection (or client closed it) - // -- run goroutine: serveDAPCodec() gets "closed network connection" - // -- run goroutine: serveDAPCodec() returns and calls triggerServerStop() + // -- run goroutine: ServeDAPCodec() gets "closed network connection" + // -- run goroutine: ServeDAPCodec() returns and calls triggerServerStop() if c.DisconnectChan != nil { close(c.DisconnectChan) c.DisconnectChan = nil @@ -418,7 +419,7 @@ func (s *Server) Run() { conn, err := s.listener.Accept() // listener is closed in Stop() if err != nil { select { - case <-s.config.stopTriggered: + case <-s.config.StopTriggered: default: s.config.log.Errorf("Error accepting client connection: %s\n", err) s.config.triggerServerStop() @@ -438,9 +439,9 @@ func (s *Server) Run() { func (s *Server) runSession(conn io.ReadWriteCloser) { s.sessionMu.Lock() - s.session = NewSession(conn, s.config) // closed in Stop() + s.session = NewSession(conn, s.config, nil) // closed in Stop() s.sessionMu.Unlock() - s.session.serveDAPCodec() + s.session.ServeDAPCodec() } // RunWithClient is similar to Run but works only with an already established @@ -456,10 +457,10 @@ func (s *Server) RunWithClient(conn net.Conn) { go s.runSession(conn) } -// serveDAPCodec reads and decodes requests from the client +// ServeDAPCodec reads and decodes requests from the client // until it encounters an error or EOF, when it sends // a disconnect signal and returns. -func (s *Session) serveDAPCodec() { +func (s *Session) ServeDAPCodec() { // TODO(polina): defer-close conn/session like in serveJSONCodec reader := bufio.NewReader(s.conn) for { @@ -475,7 +476,7 @@ func (s *Session) serveDAPCodec() { if err != nil { s.config.log.Debug("DAP error: ", err) select { - case <-s.config.stopTriggered: + case <-s.config.StopTriggered: default: if err != io.EOF { // EOF means client closed connection if decodeErr, ok := err.(*dap.DecodeProtocolMessageFieldError); ok { @@ -1238,10 +1239,12 @@ func (s *Session) stopDebugSession(killProcess bool) error { // halt sends a halt request if the debuggee is running. // changeStateMu should be held when calling (*Server).halt. func (s *Session) halt() (*api.DebuggerState, error) { + s.config.log.Debug("halting") // Only send a halt request if the debuggee is running. if s.debugger.IsRunning() { return s.debugger.Command(&api.DebuggerCommand{Name: api.Halt}, nil) } + s.config.log.Debug("process not running") return s.debugger.State(false) } diff --git a/service/dap/server_test.go b/service/dap/server_test.go index 810d2a8d..7a36679c 100644 --- a/service/dap/server_test.go +++ b/service/dap/server_test.go @@ -4598,23 +4598,6 @@ func TestFatalThrowBreakpoint(t *testing.T) { }) } -func verifyStopLocation(t *testing.T, client *daptest.Client, thread int, name string, line int) { - t.Helper() - - client.StackTraceRequest(thread, 0, 20) - st := client.ExpectStackTraceResponse(t) - if len(st.Body.StackFrames) < 1 { - t.Errorf("\ngot %#v\nwant len(stackframes) => 1", st) - } else { - if line != -1 && st.Body.StackFrames[0].Line != line { - t.Errorf("\ngot %#v\nwant Line=%d", st, line) - } - if st.Body.StackFrames[0].Name != name { - t.Errorf("\ngot %#v\nwant Name=%q", st, name) - } - } -} - // checkStop covers the standard sequence of requests issued by // a client at a breakpoint or another non-terminal stop event. // The details have been tested by other tests, @@ -4625,7 +4608,7 @@ func checkStop(t *testing.T, client *daptest.Client, thread int, fname string, l client.ThreadsRequest() client.ExpectThreadsResponse(t) - verifyStopLocation(t, client, thread, fname, line) + client.CheckStopLocation(t, thread, fname, line) client.ScopesRequest(1000) client.ExpectScopesResponse(t) @@ -5150,7 +5133,7 @@ func TestPauseAndContinue(t *testing.T) { fixture.Source, []int{6}, []onBreakpoint{{ execute: func() { - verifyStopLocation(t, client, 1, "main.loop", 6) + client.CheckStopLocation(t, 1, "main.loop", 6) // Continue resumes all goroutines, so thread id is ignored client.ContinueRequest(12345) @@ -5967,7 +5950,6 @@ func runTestWithDebugger(t *testing.T, dbg *debugger.Debugger, test func(c *dapt server, _ := startDAPServer(t, serverStopped) client := daptest.NewClient(server.listener.Addr().String()) time.Sleep(100 * time.Millisecond) // Give time for connection to be set as dap.Session - // TODO(polina): update once the server interface is refactored to take debugger as arg server.sessionMu.Lock() if server.session == nil { t.Fatal("DAP session is not ready") @@ -6085,7 +6067,6 @@ func TestAttachRemoteMultiClient(t *testing.T) { // DAP server doesn't support accept-multiclient, but we can use this // hack to test the inner connection logic that can be used by a server that does. server.session.config.AcceptMulti = true - // TODO(polina): update once the server interface is refactored to take debugger as arg _, server.session.debugger = launchDebuggerWithTargetHalted(t, "increment") server.sessionMu.Unlock() diff --git a/service/rpccommon/server.go b/service/rpccommon/server.go index db88b9d5..328885fc 100644 --- a/service/rpccommon/server.go +++ b/service/rpccommon/server.go @@ -1,6 +1,7 @@ package rpccommon import ( + "bufio" "bytes" "encoding/json" "fmt" @@ -19,6 +20,7 @@ import ( "github.com/go-delve/delve/pkg/version" "github.com/go-delve/delve/service" "github.com/go-delve/delve/service/api" + "github.com/go-delve/delve/service/dap" "github.com/go-delve/delve/service/debugger" "github.com/go-delve/delve/service/internal/sameuser" "github.com/go-delve/delve/service/rpc1" @@ -156,7 +158,7 @@ func (s *ServerImpl) Run() error { } } - go s.serveJSONCodec(c) + go s.serveConnectionDemux(c) if !s.config.AcceptMulti { break } @@ -165,6 +167,28 @@ func (s *ServerImpl) Run() error { return nil } +type bufReadWriteCloser struct { + *bufio.Reader + io.WriteCloser +} + +func (s *ServerImpl) serveConnectionDemux(c io.ReadWriteCloser) { + conn := &bufReadWriteCloser{bufio.NewReader(c), c} + b, err := conn.Peek(1) + if err != nil { + s.log.Warnf("error determining new connection protocol: %v", err) + return + } + if b[0] == 'C' { // C is for DAP's Content-Length + s.log.Debugf("serving DAP on new connection") + ds := dap.NewSession(conn, &dap.Config{Config: s.config, StopTriggered: s.stopChan}, s.debugger) + go ds.ServeDAPCodec() + } else { + s.log.Debugf("serving JSON-RPC on new connection") + go s.serveJSONCodec(conn) + } +} + // Precompute the reflect type for error. Can't use error directly // because Typeof takes an empty interface value. This is annoying. var typeOfError = reflect.TypeOf((*error)(nil)).Elem()