mirror of
https://github.com/ipfs/kubo.git
synced 2025-08-06 11:31:54 +08:00
enhance(cmd/verify): add goroutine count to improve verify speed
License: MIT Signed-off-by: chenminjian <727180553@qq.com>
This commit is contained in:

committed by
Steven Allen

parent
9bec503ac1
commit
17e7892036
@ -2,12 +2,15 @@ package commands
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"text/tabwriter"
|
||||
|
||||
oldcmds "github.com/ipfs/go-ipfs/commands"
|
||||
@ -276,6 +279,48 @@ type VerifyProgress struct {
|
||||
Progress int
|
||||
}
|
||||
|
||||
func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Cid, results chan<- string, bs bstore.Blockstore) {
|
||||
defer wg.Done()
|
||||
|
||||
for k := range keys {
|
||||
_, err := bs.Get(k)
|
||||
if err != nil {
|
||||
select {
|
||||
case results <- fmt.Sprintf("block %s was corrupt (%s)", k, err):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case results <- "":
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func verifyResultChan(ctx context.Context, keys <-chan cid.Cid, bs bstore.Blockstore) <-chan string {
|
||||
results := make(chan string)
|
||||
|
||||
go func() {
|
||||
defer close(results)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < runtime.NumCPU()*2; i++ {
|
||||
wg.Add(1)
|
||||
go verifyWorkerRun(ctx, &wg, keys, results, bs)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
var repoVerifyCmd = &oldcmds.Command{
|
||||
Helptext: cmdkit.HelpText{
|
||||
Tagline: "Verify all blocks in repo are not corrupted.",
|
||||
@ -300,15 +345,14 @@ var repoVerifyCmd = &oldcmds.Command{
|
||||
return
|
||||
}
|
||||
|
||||
results := verifyResultChan(req.Context(), keys, bs)
|
||||
|
||||
var fails int
|
||||
var i int
|
||||
for k := range keys {
|
||||
_, err := bs.Get(k)
|
||||
if err != nil {
|
||||
for msg := range results {
|
||||
if msg != "" {
|
||||
select {
|
||||
case out <- &VerifyProgress{
|
||||
Msg: fmt.Sprintf("block %s was corrupt (%s)", k, err),
|
||||
}:
|
||||
case out <- &VerifyProgress{Msg: msg}:
|
||||
case <-req.Context().Done():
|
||||
return
|
||||
}
|
||||
|
Reference in New Issue
Block a user