mirror of
https://github.com/ipfs/kubo.git
synced 2025-10-25 10:27:01 +08:00
refactor http client code
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -184,55 +184,16 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
|
||||
|
||||
res.SetCloser(httpRes.Body)
|
||||
|
||||
if len(httpRes.Header.Get(streamHeader)) > 0 && contentType != "application/json" {
|
||||
// if output is a stream, we can just use the body reader
|
||||
if contentType != "application/json" {
|
||||
// for all non json output types, just stream back the output
|
||||
res.SetOutput(&httpResponseReader{httpRes})
|
||||
return res, nil
|
||||
|
||||
} else if len(httpRes.Header.Get(channelHeader)) > 0 {
|
||||
// if output is coming from a channel, decode each chunk
|
||||
outChan := make(chan interface{})
|
||||
go func() {
|
||||
dec := json.NewDecoder(&httpResponseReader{httpRes})
|
||||
outputType := reflect.TypeOf(req.Command().Type)
|
||||
|
||||
ctx := req.Context()
|
||||
|
||||
for {
|
||||
var v interface{}
|
||||
var err error
|
||||
if outputType != nil {
|
||||
v = reflect.New(outputType).Interface()
|
||||
err = dec.Decode(v)
|
||||
} else {
|
||||
err = dec.Decode(&v)
|
||||
}
|
||||
|
||||
// since we are just looping reading on the response, the only way to
|
||||
// know we are 'done' is for the consumer to close the response body.
|
||||
// doing so doesnt throw an io.EOF, but we want to treat it like one.
|
||||
if err != nil && strings.Contains(err.Error(), "read on closed response body") {
|
||||
err = io.EOF
|
||||
}
|
||||
if err != nil && err != io.EOF {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(outChan)
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if err == io.EOF {
|
||||
close(outChan)
|
||||
return
|
||||
}
|
||||
outChan <- v
|
||||
}
|
||||
}()
|
||||
go readStreamedJson(req, httpRes, outChan)
|
||||
|
||||
res.SetOutput((<-chan interface{})(outChan))
|
||||
return res, nil
|
||||
@ -240,22 +201,24 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
|
||||
|
||||
dec := json.NewDecoder(&httpResponseReader{httpRes})
|
||||
|
||||
// If we ran into an error
|
||||
if httpRes.StatusCode >= http.StatusBadRequest {
|
||||
e := cmds.Error{}
|
||||
|
||||
if httpRes.StatusCode == http.StatusNotFound {
|
||||
switch {
|
||||
case httpRes.StatusCode == http.StatusNotFound:
|
||||
// handle 404s
|
||||
e.Message = "Command not found."
|
||||
e.Code = cmds.ErrClient
|
||||
|
||||
} else if contentType == "text/plain" {
|
||||
case contentType == "text/plain":
|
||||
// handle non-marshalled errors
|
||||
buf := bytes.NewBuffer(nil)
|
||||
io.Copy(buf, httpRes.Body)
|
||||
e.Message = string(buf.Bytes())
|
||||
e.Code = cmds.ErrNormal
|
||||
|
||||
} else {
|
||||
default:
|
||||
// handle marshalled errors
|
||||
err = dec.Decode(&e)
|
||||
if err != nil {
|
||||
@ -265,27 +228,61 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
|
||||
|
||||
res.SetError(e, e.Code)
|
||||
|
||||
} else {
|
||||
outputType := reflect.TypeOf(req.Command().Type)
|
||||
var v interface{}
|
||||
|
||||
if outputType != nil {
|
||||
v = reflect.New(outputType).Interface()
|
||||
err = dec.Decode(v)
|
||||
} else {
|
||||
err = dec.Decode(&v)
|
||||
}
|
||||
if err != nil && err != io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
if v != nil {
|
||||
res.SetOutput(v)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
outputType := reflect.TypeOf(req.Command().Type)
|
||||
v, err := decodeTypedVal(outputType, dec)
|
||||
if err != nil && err != io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res.SetOutput(v)
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func readStreamedJson(req cmds.Request, httpRes *http.Response, out chan<- interface{}) {
|
||||
defer close(out)
|
||||
dec := json.NewDecoder(&httpResponseReader{httpRes})
|
||||
outputType := reflect.TypeOf(req.Command().Type)
|
||||
|
||||
ctx := req.Context()
|
||||
|
||||
for {
|
||||
v, err := decodeTypedVal(outputType, dec)
|
||||
if err != nil {
|
||||
// since we are just looping reading on the response, the only way to
|
||||
// know we are 'done' is for the consumer to close the response body.
|
||||
// doing so doesnt throw an io.EOF, but we want to treat it like one.
|
||||
if !(strings.Contains(err.Error(), "read on closed response body") || err == io.EOF) {
|
||||
log.Error(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case out <- v:
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func decodeTypedVal(t reflect.Type, dec *json.Decoder) (interface{}, error) {
|
||||
var v interface{}
|
||||
var err error
|
||||
if t != nil {
|
||||
v = reflect.New(t).Interface()
|
||||
err = dec.Decode(v)
|
||||
} else {
|
||||
err = dec.Decode(&v)
|
||||
}
|
||||
|
||||
return v, err
|
||||
}
|
||||
|
||||
type httpResponseReader struct {
|
||||
resp *http.Response
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user