mirror of
https://github.com/ipfs/kubo.git
synced 2025-05-17 15:06:47 +08:00
client/rpc: use streaming pin listing
This also fix a leaking goroutine bug on client/rpc.PinAPI.Ls, we would deadlock if context was canceled while writing the keys.
This commit is contained in:
@ -29,15 +29,15 @@ type pin struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (p *pin) Err() error {
|
||||
func (p pin) Err() error {
|
||||
return p.err
|
||||
}
|
||||
|
||||
func (p *pin) Path() path.Resolved {
|
||||
func (p pin) Path() path.Resolved {
|
||||
return p.path
|
||||
}
|
||||
|
||||
func (p *pin) Type() string {
|
||||
func (p pin) Type() string {
|
||||
return p.typ
|
||||
}
|
||||
|
||||
@ -51,29 +51,61 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp
|
||||
Option("recursive", options.Recursive).Exec(ctx, nil)
|
||||
}
|
||||
|
||||
type pinLsObject struct {
|
||||
Cid string
|
||||
Type string
|
||||
}
|
||||
|
||||
func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, error) {
|
||||
options, err := caopts.PinLsOptions(opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var out pinRefKeyList
|
||||
err = api.core().Request("pin/ls").
|
||||
Option("type", options.Type).Exec(ctx, &out)
|
||||
res, err := api.core().Request("pin/ls").
|
||||
Option("type", options.Type).
|
||||
Option("stream", true).
|
||||
Send(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pins := make(chan iface.Pin)
|
||||
go func(ch chan<- iface.Pin) {
|
||||
defer res.Output.Close()
|
||||
defer close(ch)
|
||||
for hash, p := range out.Keys {
|
||||
c, e := cid.Parse(hash)
|
||||
if e != nil {
|
||||
ch <- &pin{typ: p.Type, err: e}
|
||||
|
||||
dec := json.NewDecoder(res.Output)
|
||||
var out pinLsObject
|
||||
for {
|
||||
switch err := dec.Decode(&out); err {
|
||||
case nil:
|
||||
case io.EOF:
|
||||
return
|
||||
default:
|
||||
select {
|
||||
case ch <- pin{err: err}:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
c, err := cid.Parse(out.Cid)
|
||||
if err != nil {
|
||||
select {
|
||||
case ch <- pin{err: err}:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- pin{typ: out.Type, path: path.IpldPath(c)}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
ch <- &pin{typ: p.Type, path: path.IpldPath(c), err: e}
|
||||
}
|
||||
}(pins)
|
||||
return pins, nil
|
||||
|
Reference in New Issue
Block a user