mirror of
https://github.com/ipfs/kubo.git
synced 2026-03-13 09:53:17 +08:00
refactor: fix error handling and race conditions in dag import
address code review feedback for PR #11069: - fix: propagate decode errors in client/rpc dag import (was silently dropping errors) - fix: acquire pinlock before spawning goroutine to prevent race with GC - fix: update fast-provide test to always expect failure in isolated environment - test: add proper json compatibility test for provide stats (replaces compile-time check) - docs: add educational comments explaining batch config defaults - style: standardize error messages to use consistent "failed to X: %w" pattern the pinlock fix is critical - moving acquisition before goroutine spawn prevents blocks from being garbage collected before the lock is held. the error handling fix ensures RPC clients receive decode errors instead of empty results.
This commit is contained in:
@@ -180,7 +180,7 @@ func (api *HttpDagServ) Import(ctx context.Context, file files.File, opts ...opt
|
||||
if err := dec.Decode(&event); err != nil {
|
||||
if err != io.EOF {
|
||||
select {
|
||||
case out <- iface.DagImportResult{}:
|
||||
case out <- iface.DagImportResult{Err: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,31 +147,30 @@ func TestDagImport_OnlineWithFastProvideWait(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer carFile.Close()
|
||||
|
||||
// Import with fast-provide wait enabled in online mode
|
||||
// Import with fast-provide wait enabled in online mode.
|
||||
// This tests that FastProvideWait actually blocks (not fire-and-forget).
|
||||
// In isolated test environment (no DHT peers), the provide operation may:
|
||||
// 1. Succeed trivially (announced to randomly discovered peers), or
|
||||
// 2. Return an error (timeout/no peers)
|
||||
// Both outcomes prove blocking behavior works correctly.
|
||||
// In isolated test environment with no DHT peers, the blocking provide
|
||||
// operation should fail and propagate an error.
|
||||
results, err := api.Dag().Import(ctx, files.NewReaderFile(carFile),
|
||||
options.Dag.FastProvideRoot(true),
|
||||
options.Dag.FastProvideWait(true))
|
||||
|
||||
if err != nil {
|
||||
// Blocking wait detected provide failure (no DHT peers in isolated test)
|
||||
// This proves FastProvideWait actually blocked and error propagated
|
||||
// Initial call may succeed, but we should get error from results channel
|
||||
if err == nil {
|
||||
// Consume results until we hit the expected error
|
||||
var gotError bool
|
||||
for result := range results {
|
||||
if result.Err != nil {
|
||||
gotError = true
|
||||
require.Contains(t, result.Err.Error(), "fast-provide",
|
||||
"error should be from fast-provide operation")
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, gotError, "should receive fast-provide error in isolated test environment")
|
||||
} else {
|
||||
// Error returned directly (also acceptable)
|
||||
require.Contains(t, err.Error(), "fast-provide",
|
||||
"error should be from fast-provide operation")
|
||||
return // Test passed - blocking wait worked and returned error
|
||||
}
|
||||
|
||||
// No error - provide succeeded, verify we got results
|
||||
var roots []cid.Cid
|
||||
for result := range results {
|
||||
if result.Root != nil {
|
||||
roots = append(roots, result.Root.Cid)
|
||||
}
|
||||
}
|
||||
|
||||
require.Len(t, roots, 1, "should receive one root when provide succeeds")
|
||||
}
|
||||
|
||||
@@ -13,20 +13,34 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Compile-time check: ensure our response type is compatible with kubo's provideStats
|
||||
// This verifies that JSON marshaling/unmarshaling will work correctly
|
||||
var _ = func() {
|
||||
func TestProvideStats_JSONCompatibility(t *testing.T) {
|
||||
// Verify that command's provideStats structure is compatible with
|
||||
// iface.ProvideStatsResponse for JSON marshaling/unmarshaling.
|
||||
// This ensures RPC client can correctly decode responses from the command.
|
||||
|
||||
// Create instance of command's provideStats structure
|
||||
cmdStats := struct {
|
||||
Sweep *stats.Stats `json:"Sweep,omitempty"`
|
||||
Legacy *boxoprovider.ReproviderStats `json:"Legacy,omitempty"`
|
||||
FullRT bool `json:"FullRT,omitempty"`
|
||||
}{}
|
||||
}{
|
||||
Sweep: &stats.Stats{},
|
||||
FullRT: true,
|
||||
}
|
||||
|
||||
// Marshal and unmarshal to verify compatibility
|
||||
data, _ := json.Marshal(cmdStats)
|
||||
// Marshal command structure to JSON
|
||||
data, err := json.Marshal(cmdStats)
|
||||
require.NoError(t, err, "should marshal command stats")
|
||||
|
||||
// Unmarshal into interface type
|
||||
var ifaceStats iface.ProvideStatsResponse
|
||||
_ = json.Unmarshal(data, &ifaceStats)
|
||||
err = json.Unmarshal(data, &ifaceStats)
|
||||
require.NoError(t, err, "should unmarshal into interface stats")
|
||||
|
||||
// Verify fields transferred correctly
|
||||
require.NotNil(t, ifaceStats.Sweep, "Sweep field should be present")
|
||||
require.Nil(t, ifaceStats.Legacy, "Legacy field should be nil")
|
||||
require.True(t, ifaceStats.FullRT, "FullRT field should be true")
|
||||
}
|
||||
|
||||
// testProvideStats mirrors the subset of fields we verify in tests.
|
||||
|
||||
@@ -93,8 +93,10 @@ func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options.
|
||||
// Create block decoder for IPLD nodes
|
||||
blockDecoder := ipldlegacy.NewDecoder()
|
||||
|
||||
// Create batch for efficient block addition
|
||||
// Uses config values for batch size tuning
|
||||
// Create batch for efficient block addition.
|
||||
// Uses config values for batch size tuning:
|
||||
// - MaxNodes: Default 128 nodes per batch (128 file descriptors in flatfs)
|
||||
// - MaxSize: Default 100MiB per batch (with 256KiB blocks, hits node limit at ~32MiB)
|
||||
batch := ipld.NewBatch(ctx, api.DAGService,
|
||||
ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))),
|
||||
ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))),
|
||||
@@ -103,15 +105,19 @@ func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options.
|
||||
// Create output channel
|
||||
out := make(chan iface.DagImportResult)
|
||||
|
||||
// Acquire pinlock BEFORE spawning goroutine if pinning roots
|
||||
// This prevents race condition with GC (lock serves as both pin and GC lock)
|
||||
var pinUnlocker func(context.Context)
|
||||
if settings.PinRoots {
|
||||
pinUnlocker = api.core.blockstore.PinLock(ctx).Unlock
|
||||
}
|
||||
|
||||
// Process import in background
|
||||
go func() {
|
||||
defer close(out)
|
||||
defer file.Close()
|
||||
|
||||
// Acquire pinlock if pinning roots (also serves as GC lock)
|
||||
if settings.PinRoots {
|
||||
unlocker := api.core.blockstore.PinLock(ctx)
|
||||
defer unlocker.Unlock(ctx)
|
||||
if pinUnlocker != nil {
|
||||
defer pinUnlocker(ctx)
|
||||
}
|
||||
|
||||
// Track roots from CAR headers and stats
|
||||
@@ -137,9 +143,9 @@ func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options.
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
if previous != nil {
|
||||
out <- iface.DagImportResult{Err: fmt.Errorf("error reading block after %s: %w", previous.Cid(), err)}
|
||||
out <- iface.DagImportResult{Err: fmt.Errorf("failed to read block after %s: %w", previous.Cid(), err)}
|
||||
} else {
|
||||
out <- iface.DagImportResult{Err: fmt.Errorf("error reading CAR blocks: %w", err)}
|
||||
out <- iface.DagImportResult{Err: fmt.Errorf("failed to read CAR blocks: %w", err)}
|
||||
}
|
||||
}
|
||||
break
|
||||
@@ -223,7 +229,7 @@ func (api *dagAPI) Import(ctx context.Context, file files.File, opts ...options.
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
out <- iface.DagImportResult{Err: fmt.Errorf("error emitting roots: %w", err)}
|
||||
out <- iface.DagImportResult{Err: fmt.Errorf("failed to emit roots: %w", err)}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user