mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-19 23:36:07 +08:00
Merge pull request #1507 from ipfs/fix/http-client-close
attempt at properly closing http response bodies
This commit is contained in:
@ -102,12 +102,14 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) {
|
|||||||
ec <- err
|
ec <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// using the overridden JSON encoding in request
|
// using the overridden JSON encoding in request
|
||||||
res, err := getResponse(httpRes, req)
|
res, err := getResponse(httpRes, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ec <- err
|
ec <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rc <- res
|
rc <- res
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -179,6 +181,8 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
|
|||||||
res.SetLength(length)
|
res.SetLength(length)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
res.SetCloser(httpRes.Body)
|
||||||
|
|
||||||
if len(httpRes.Header.Get(streamHeader)) > 0 {
|
if len(httpRes.Header.Get(streamHeader)) > 0 {
|
||||||
// if output is a stream, we can just use the body reader
|
// if output is a stream, we can just use the body reader
|
||||||
res.SetOutput(httpRes.Body)
|
res.SetOutput(httpRes.Body)
|
||||||
@ -202,8 +206,15 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
|
|||||||
} else {
|
} else {
|
||||||
err = dec.Decode(&v)
|
err = dec.Decode(&v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// since we are just looping reading on the response, the only way to
|
||||||
|
// know we are 'done' is for the consumer to close the response body.
|
||||||
|
// doing so doesnt throw an io.EOF, but we want to treat it like one.
|
||||||
|
if err != nil && strings.Contains(err.Error(), "read on closed response body") {
|
||||||
|
err = io.EOF
|
||||||
|
}
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
fmt.Println(err.Error())
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,12 +171,15 @@ func (i internalHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
// w.WriteHeader(200)
|
// w.WriteHeader(200)
|
||||||
err = copyChunks(applicationJson, w, out)
|
err = copyChunks(applicationJson, w, out)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug(err)
|
log.Debug("copy chunks error: ", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
flushCopy(w, out)
|
err = flushCopy(w, out)
|
||||||
|
if err != nil {
|
||||||
|
log.Debug("Flush copy returned an error: ", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
@ -191,8 +194,8 @@ func flushCopy(w http.ResponseWriter, out io.Reader) error {
|
|||||||
return copyChunks("", w, out)
|
return copyChunks("", w, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
io.Copy(&flushResponse{w}, out)
|
_, err := io.Copy(&flushResponse{w}, out)
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copies from an io.Reader to a http.ResponseWriter.
|
// Copies from an io.Reader to a http.ResponseWriter.
|
||||||
|
@ -101,6 +101,10 @@ type Response interface {
|
|||||||
SetLength(uint64)
|
SetLength(uint64)
|
||||||
Length() uint64
|
Length() uint64
|
||||||
|
|
||||||
|
// underlying http connections need to be cleaned up, this is for that
|
||||||
|
Close() error
|
||||||
|
SetCloser(io.Closer)
|
||||||
|
|
||||||
// Marshal marshals out the response into a buffer. It uses the EncodingType
|
// Marshal marshals out the response into a buffer. It uses the EncodingType
|
||||||
// on the Request to chose a Marshaler (Codec).
|
// on the Request to chose a Marshaler (Codec).
|
||||||
Marshal() (io.Reader, error)
|
Marshal() (io.Reader, error)
|
||||||
@ -121,6 +125,7 @@ type response struct {
|
|||||||
length uint64
|
length uint64
|
||||||
stdout io.Writer
|
stdout io.Writer
|
||||||
stderr io.Writer
|
stderr io.Writer
|
||||||
|
closer io.Closer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *response) Request() Request {
|
func (r *response) Request() Request {
|
||||||
@ -214,6 +219,17 @@ func (r *response) Reader() (io.Reader, error) {
|
|||||||
return r.out, nil
|
return r.out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *response) Close() error {
|
||||||
|
if r.closer != nil {
|
||||||
|
return r.closer.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *response) SetCloser(c io.Closer) {
|
||||||
|
r.closer = c
|
||||||
|
}
|
||||||
|
|
||||||
func (r *response) Stdout() io.Writer {
|
func (r *response) Stdout() io.Writer {
|
||||||
return r.stdout
|
return r.stdout
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user