client.go | 7 +++++++ torrent.go | 5 +++++ webseed-peer.go | 61 +++++++++++++++++++++++++++++++++++++++++++---------- webseed_test.go | 55 ++++++++++++++--------------------------------------- diff --git a/client.go b/client.go index 66a826d3477218f53adcea4b8c19b5b285552cab..1770146644e1f1712f5ee3b7369bf5205e0fbd82 100644 --- a/client.go +++ b/client.go @@ -1965,6 +1965,13 @@ panicif.Zero(key) return cl.numWebSeedRequests[key] < webseedHostRequestConcurrency } +// webseedHostLowOnRequests reports whether a webseed host's active request count is low enough +// to warrant scheduling more requests (at zero or at/below the low-water mark). +func (cl *Client) webseedHostLowOnRequests(key webseedHostKeyHandle) bool { + n := cl.numWebSeedRequests[key] + return n == 0 || n <= webseedHostRequestConcurrency/2 +} + // Check for bad arrangements. This is a candidate for an error state check method. func (cl *Client) checkConfig() error { if EffectiveDownloadRateLimit(cl.config.DownloadRateLimiter) == 0 { diff --git a/torrent.go b/torrent.go index 2e39ec1275ad7b7a5c52e2a3b4ad5739b2421b55..52b08a26aeab8746ffc1595467f65147d411b6f4 100644 --- a/torrent.go +++ b/torrent.go @@ -1548,6 +1548,11 @@ } c.onNeedUpdateRequests(reason) }() } + for _, ws := range t.webSeeds { + if ws.peer.peerHasPiece(piece) { + ws.peer.onNeedUpdateRequests(reason) + } + } } // Stuff to do when pending pieces changes. We avoid running this in some benchmarks. diff --git a/webseed-peer.go b/webseed-peer.go index 222679fde18ccf71ce01859d40fe38637b91829f..8657a1a1404e9b49ec1e0df9ae8cade1ef7b58c5 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -51,6 +51,32 @@ me.lastCrime = err me.penanceComplete = time.Now().Add(term) } +// webseedFileUnavailable reports whether an HTTP status code means this webseed +// does not (and will not) serve the requested file. Pieces for such files should +// be removed from the webseed's bitmap rather than convicting the whole peer. +func webseedFileUnavailable(statusCode int) bool { + switch statusCode { + case 403, // Forbidden + 404, // Not Found + 410, // Gone + 451: // Unavailable For Legal Reasons + return true + } + return false +} + +// removeFilePieces removes pieces for the given file index from this webseed's bitmap. +// Must be called with the client lock held. +// TODO: Re-add pieces after an interval in case the file becomes available later. +func (ws *webseedPeer) removeFilePieces(fi int) { + for pi := range ws.peer.t.Files()[fi].PieceIndices() { + if ws.client.Pieces.CheckedRemove(uint32(pi)) { + ws.peer.t.decPieceAvailability(pieceIndex(pi)) + } + } + ws.peer.cl.updateWebseedRequestsWithReason("file unavailable on webseed") +} + func (*webseedPeer) allConnStatsImplField(stats *AllConnStats) *ConnStats { return &stats.WebSeeds } @@ -70,11 +96,11 @@ // Updates globally instead. return false } -// Webseed requests are issued globally so per-connection reasons or handling make no sense. +// Trigger the webseed request scheduler if this webseed's URL is under-utilized. func (me *webseedPeer) onNeedUpdateRequests(reason updateRequestReason) { - // Too many reasons here: Can't predictably determine when we need to rerun updates. - // TODO: Can trigger this when we have Client-level active-requests map. - //me.peer.cl.scheduleImmediateWebseedRequestUpdate(reason) + if me.peer.cl.webseedHostLowOnRequests(me.hostKey) { + me.peer.cl.updateWebseedRequestsWithReason(reason) + } } func (me *webseedPeer) expectingChunks() bool { @@ -257,10 +283,8 @@ locker.Lock() // Delete this entry after waiting above on an error, to prevent more requests. ws.deleteActiveRequest(webseedRequest) cl := ws.peer.cl - if err == nil && cl.numWebSeedRequests[ws.hostKey] == webseedHostRequestConcurrency/2 { - cl.updateWebseedRequestsWithReason("webseedPeer.runRequest low water") - } else if cl.numWebSeedRequests[ws.hostKey] == 0 { - cl.updateWebseedRequestsWithReason("webseedPeer.runRequest zero requests") + if cl.webseedHostLowOnRequests(ws.hostKey) { + cl.updateWebseedRequestsWithReason("webseedPeer.runRequest low on requests") } locker.Unlock() } @@ -341,10 +365,25 @@ } // We need this early for the convict call. ws.peer.locker().Lock() if err != nil { - // TODO: Pick out missing files or associate error with file. See also - // webseed.ReadRequestPartError. if !wr.cancelled.Load() { - ws.convict(err, time.Minute) + var rpe webseed.ReadRequestPartError + if errors.As(err, &rpe) { + var badResp webseed.ErrBadResponse + if errors.As(rpe.Err, &badResp) && webseedFileUnavailable(badResp.Response.StatusCode) { + ws.slogger().Info( + "file not available on webseed, removing pieces", + "fileIndex", rpe.FileIndex, + "status", badResp.Response.StatusCode, + ) + ws.removeFilePieces(rpe.FileIndex) + ws.peer.locker().Unlock() + err = nil + return + } + } + if err != nil { + ws.convict(err, time.Minute) + } } ws.peer.locker().Unlock() err = fmt.Errorf("reading chunk: %w", err) diff --git a/webseed_test.go b/webseed_test.go index e709f36e7c74030d40449d8ed060a61cd5c6704c..7dff304bbd2d3fdb7974b9430bac38096526bb75 100644 --- a/webseed_test.go +++ b/webseed_test.go @@ -7,26 +7,19 @@ "net/http" "net/http/httptest" "os" "path/filepath" - "strings" "testing" - "github.com/stretchr/testify/require" + "github.com/go-quicktest/qt" "github.com/anacrolix/torrent/internal/testutil" ) // Tests that the client can download a multi-file torrent from two webseeds simultaneously when -// each webseed only has one of the two files (non-overlapping data). The piece bitmaps are -// restricted to match the files each server has, and webseedRequestChunkSize is set to pieceLen so -// each piece is its own webseed slice (preventing requests from spanning into the other server's -// data due to the alignment of getWebseedRequestEnd). +// each webseed only has one of the two files (non-overlapping data). When a webseed receives a 404 +// for a file it doesn't have, the pieces for that file are removed from its bitmap and the +// scheduler reassigns them to the other webseed. func TestDownloadFromTwoNonOverlappingWebseeds(t *testing.T) { - // Override webseedRequestChunkSize so each piece maps to its own slice, ensuring requests stay - // within the pieces each webseed has. const pieceLen = 2 * defaultChunkSize // 32 KiB; two 16 KiB chunks per piece - old := webseedRequestChunkSize - webseedRequestChunkSize = uint64(pieceLen) - defer func() { webseedRequestChunkSize = old }() // Two files, each spanning exactly 2 pieces. fileLen := 2 * pieceLen @@ -42,27 +35,25 @@ {Name: "a.bin", Data: string(dataA)}, {Name: "b.bin", Data: string(dataB)}, }, } - mi, info := tu.Generate(int64(pieceLen)) - numPieces := info.NumPieces() // 4 - mid := numPieces / 2 // 2; pieces 0-1 = a.bin, pieces 2-3 = b.bin + mi, _ := tu.Generate(int64(pieceLen)) // Server 1: serves only a.bin; b.bin returns 404 naturally. dir1 := t.TempDir() - require.NoError(t, os.MkdirAll(filepath.Join(dir1, "testdata"), 0o755)) - require.NoError(t, os.WriteFile(filepath.Join(dir1, "testdata", "a.bin"), dataA, 0o644)) + qt.Assert(t, qt.IsNil(os.MkdirAll(filepath.Join(dir1, "testdata"), 0o755))) + qt.Assert(t, qt.IsNil(os.WriteFile(filepath.Join(dir1, "testdata", "a.bin"), dataA, 0o644))) srv1 := httptest.NewServer(http.FileServer(http.Dir(dir1))) defer srv1.Close() // Server 2: serves only b.bin; a.bin returns 404 naturally. dir2 := t.TempDir() - require.NoError(t, os.MkdirAll(filepath.Join(dir2, "testdata"), 0o755)) - require.NoError(t, os.WriteFile(filepath.Join(dir2, "testdata", "b.bin"), dataB, 0o644)) + qt.Assert(t, qt.IsNil(os.MkdirAll(filepath.Join(dir2, "testdata"), 0o755))) + qt.Assert(t, qt.IsNil(os.WriteFile(filepath.Join(dir2, "testdata", "b.bin"), dataB, 0o644))) srv2 := httptest.NewServer(http.FileServer(http.Dir(dir2))) defer srv2.Close() cfg := TestingConfig(t) cl, err := NewClient(cfg) - require.NoError(t, err) + qt.Assert(t, qt.IsNil(err)) defer cl.Close() // BEP 19 multi-file webseeds use a trailing slash; the file path is appended automatically. @@ -73,32 +64,14 @@ InfoBytes: mi.InfoBytes, }, Webseeds: []string{srv1.URL + "/", srv2.URL + "/"}, }) - require.NoError(t, err) - - // Restrict each webseed to only the pieces of its file. Remove pieces that the webseed doesn't - // have and decrement their availability counts, so the scheduler doesn't assign wrong pieces - // (which would cause a conviction when the missing file returns 404). - cl.lock() - for _, ws := range tt.webSeeds { - var removeStart, removeEnd int - if strings.HasPrefix(ws.client.Url, srv1.URL) { - removeStart, removeEnd = mid, numPieces // keep [0, mid) - } else { - removeStart, removeEnd = 0, mid // keep [mid, numPieces) - } - for i := removeStart; i < removeEnd; i++ { - ws.client.Pieces.Remove(uint32(i)) - tt.decPieceAvailability(pieceIndex(i)) - } - } - cl.unlock() + qt.Assert(t, qt.IsNil(err)) tt.DownloadAll() - require.True(t, cl.WaitAll()) + qt.Assert(t, qt.IsTrue(cl.WaitAll())) r := tt.NewReader() defer r.Close() got, err := io.ReadAll(r) - require.NoError(t, err) - require.Equal(t, append(dataA, dataB...), got) + qt.Assert(t, qt.IsNil(err)) + qt.Assert(t, qt.DeepEquals(got, append(dataA, dataB...))) }