.gitignore | 2 ++ client.go | 50 +++++++++++++++++++++++++++++++++++++++++++++----- client_test.go | 2 +- cmd/torrent/main.go | 41 +++++++++++++++++++++++++---------------- config.go | 2 ++ file.go | 2 +- go.mod | 6 +++--- go.sum | 10 ++++++---- issue97_test.go | 1 + peer-impl.go | 9 +++++---- peerconn.go | 235 ++++++++++++++++++++++++++++++----------------------- peerconn_test.go | 2 +- pending-requests.go | 33 +++++++++++++++++++++++++++++++++ pending-requests_test.go | 19 +++++++++++++++++++ piece.go | 31 +++++-------------------------- request-strategy/order.go | 2 +- request-strategy/piece.go | 2 +- request-strategy/torrent.go | 2 +- requesting.go | 307 +++++++++++++++++++++++++++++++---------------------- spec.go | 2 +- t.go | 10 +++++----- test/issue377_test.go | 3 +++ testing.go | 3 +++ torrent.go | 176 +++++++++++++++++++++++++---------------------------- torrent_test.go | 4 +++- webseed-peer.go | 27 +++++++++++++++++---------- webtorrent/tracker_client.go | 13 +++++++++++-- wstracker.go | 5 ++--- diff --git a/.gitignore b/.gitignore index 485dee64bcfb48793379b200a1afd14e85a8aaf4..d92c6d5bfd4a881ef48dd9bfd83aaee1415bcb21 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ .idea +*-run.gob +.envrc* diff --git a/client.go b/client.go index c55eb1a7a0e3342fecf0e6f5c4278fa1a4353d11..6e90fa7583bf3a13db0ee71f9d92ea4770689e14 100644 --- a/client.go +++ b/client.go @@ -6,8 +6,10 @@ "context" "crypto/rand" "encoding/binary" "errors" + "expvar" "fmt" "io" + "math" "net" "net/http" "sort" @@ -304,10 +306,6 @@ return } go t.onWebRtcConn(dc, dcc) }, - } - - if !peerRequesting { - go cl.requester() } return @@ -874,11 +872,20 @@ cl.unlock() return } +var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map + +func init() { + torrent.Set( + "successful_peer_wire_protocol_handshake_peer_reserved_bytes", + &successfulPeerWireProtocolHandshakePeerReservedBytes) +} + func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) { res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions) if err != nil { return } + successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1) ret = res.Hash c.PeerExtensionBytes = res.PeerExtensionBits c.PeerID = res.PeerID @@ -925,6 +932,11 @@ // Client lock must be held before entering this. func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { c.setTorrent(t) + for i, b := range cl.config.MinPeerExtensions { + if c.PeerExtensionBytes[i]&b != b { + return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes) + } + } if c.PeerID == cl.peerID { if c.outgoing { connsToSelf.Add(1) @@ -950,6 +962,7 @@ } defer t.dropConnection(c) c.startWriter() cl.sendInitialMessages(c, t) + c.initUpdateRequestsTimer() err := c.mainReadLoop() if err != nil { return fmt.Errorf("main read loop: %w", err) @@ -957,6 +970,34 @@ } return nil } +const check = false + +func (p *Peer) initUpdateRequestsTimer() { + if check { + if p.updateRequestsTimer != nil { + panic(p.updateRequestsTimer) + } + } + p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc) + p.updateRequestsTimer.Stop() +} + +func (c *Peer) updateRequestsTimerFunc() { + c.locker().Lock() + defer c.locker().Unlock() + if c.closed.IsSet() { + return + } + if c.needRequestUpdate != "" { + return + } + if c.isLowOnRequests() { + // If there are no outstanding requests, then a request update should have already run. + return + } + c.updateRequests("updateRequestsTimer") +} + // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB // (1<<19) cached for sending, for 16KiB (1<<14) chunks. @@ -1135,7 +1176,6 @@ webSeeds: make(map[string]*Peer), gotMetainfoC: make(chan struct{}), } t.networkingEnabled.Set() - t._pendingPieces.NewSet = priorityBitmapStableNewSet t.logger = cl.logger.WithContextValue(t) if opts.ChunkSize == 0 { opts.ChunkSize = defaultChunkSize diff --git a/client_test.go b/client_test.go index 414aac0675a52a8d4390a2ed0bd5771f398ba686..7a04c59120f6bb8bde59ccf3ff78897d1b8b6241 100644 --- a/client_test.go +++ b/client_test.go @@ -483,7 +483,7 @@ leecherGreeting.cl.lock() leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces()) if ps.Cancel { - leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces()) + leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces(), "") } leecherGreeting.cl.unlock() done := make(chan struct{}) diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index 260f51d28931130598f9c84e7703d3d6baf2b5a3..50604cf689a66922cec489cf2830325ab09797d5 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -22,6 +22,7 @@ "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/tagflag" "github.com/anacrolix/torrent/bencode" + pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/version" "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" @@ -186,6 +187,9 @@ TcpPeers bool `default:"true"` UtpPeers bool `default:"true"` Webtorrent bool `default:"true"` DisableWebseeds bool + // Don't progress past handshake for peer connections where the peer doesn't offer the fast + // extension. + RequireFastExtension bool Ipv4 bool `default:"true"` Ipv6 bool `default:"true"` @@ -219,7 +223,7 @@ func mainErr() error { defer envpprof.Stop() stdLog.SetFlags(stdLog.Flags() | stdLog.Lshortfile) debug := args.Flag(args.FlagOpt{Long: "debug"}) - p := args.ParseMain( + args.ParseMain( debug, args.Subcommand("metainfo", metainfoCmd), args.Subcommand("announce", func(p args.SubCmdCtx) error { @@ -233,17 +237,21 @@ } return announceErr(cmd) }), args.Subcommand("download", func(p args.SubCmdCtx) error { - var dlf DownloadCmd + var dlc DownloadCmd err := p.NewParser().AddParams( - append(args.FromStruct(&dlf), debug)..., + append(args.FromStruct(&dlc), debug)..., ).Parse() if err != nil { return err } - return downloadErr(downloadFlags{ + dlf := downloadFlags{ Debug: debug.Bool(), - DownloadCmd: dlf, + DownloadCmd: dlc, + } + p.Defer(func() error { + return downloadErr(dlf) }) + return nil }), args.Subcommand( "spew-bencoding", @@ -271,16 +279,6 @@ fmt.Printf("Torrent version prefix: %q\n", version.DefaultBep20Prefix) return nil }), ) - if p.Err != nil { - if errors.Is(p.Err, args.ErrHelped) { - return nil - } - return p.Err - } - if !p.RanSubCmd { - p.PrintChoices(os.Stderr) - args.FatalUsage() - } return nil } @@ -321,6 +319,9 @@ clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(*flags.DownloadRate), 1<<20) } if flags.Quiet { clientConfig.Logger = log.Discard + } + if flags.RequireFastExtension { + clientConfig.MinPeerExtensions.SetBit(pp.ExtensionBitFast, true) } clientConfig.MaxUnverifiedBytes = flags.MaxUnverifiedBytes.Int64() @@ -347,6 +348,7 @@ http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { client.WriteStatus(w) }) err = addTorrents(client, flags) + started := time.Now() if err != nil { return fmt.Errorf("adding torrents: %w", err) } @@ -356,6 +358,13 @@ log.Print("downloaded ALL the torrents") } else { err = errors.New("y u no complete torrents?!") } + clientConnStats := client.ConnStats() + log.Printf("average download rate: %v", + humanize.Bytes(uint64( + time.Duration( + clientConnStats.BytesReadUsefulData.Int64(), + )*time.Second/time.Since(started), + ))) if flags.Seed { if len(client.Torrents()) == 0 { log.Print("no torrents to seed") @@ -369,7 +378,7 @@ spew.Dump(client.ConnStats()) clStats := client.ConnStats() sentOverhead := clStats.BytesWritten.Int64() - clStats.BytesWrittenData.Int64() log.Printf( - "client read %v, %v was useful data. sent %v non-data bytes", + "client read %v, %.1f%% was useful data. sent %v non-data bytes", humanize.Bytes(uint64(clStats.BytesRead.Int64())), 100*float64(clStats.BytesReadUsefulData.Int64())/float64(clStats.BytesRead.Int64()), humanize.Bytes(uint64(sentOverhead))) diff --git a/config.go b/config.go index 14c129cf977b9226f0b49f69ba41f38b7d98c198..3d370d55fca2f38c182f34f94b0b15e2607c124c 100644 --- a/config.go +++ b/config.go @@ -146,6 +146,8 @@ // OnQuery hook func DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool) Extensions PeerExtensionBits + // Bits that peers must have set to proceed past handshakes. + MinPeerExtensions PeerExtensionBits DisableWebtorrent bool DisableWebseeds bool diff --git a/file.go b/file.go index 35fe9239aaaba3a0191d220f46d0e84fe79e1aa5..01515187d0d1aa11f03e3efa8ae3e59bebeb35be 100644 --- a/file.go +++ b/file.go @@ -149,7 +149,7 @@ func (f *File) SetPriority(prio piecePriority) { f.t.cl.lock() if prio != f.prio { f.prio = prio - f.t.updatePiecePriorities(f.firstPieceIndex(), f.endPieceIndex()) + f.t.updatePiecePriorities(f.firstPieceIndex(), f.endPieceIndex(), "File.SetPriority") } f.t.cl.unlock() } diff --git a/go.mod b/go.mod index f4c21341346e711bc6452571e6f52e8b8ed09d44..a00591869b5b2c995889875a5e090286704c2728 100644 --- a/go.mod +++ b/go.mod @@ -7,13 +7,13 @@ bazil.org/fuse v0.0.0-20200407214033-5883e5a4b512 crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508 github.com/RoaringBitmap/roaring v0.9.4 github.com/alexflint/go-arg v1.4.2 - github.com/anacrolix/args v0.1.1-0.20210917054839-725094dd33fe + github.com/anacrolix/args v0.1.1-0.20211020052733-53ed238acbd4 github.com/anacrolix/chansync v0.3.0-0.0.20211007004133-3f72684c4a93 github.com/anacrolix/confluence v1.8.0 // indirect github.com/anacrolix/dht/v2 v2.10.6-0.20211007004332-99263ec9c1c8 github.com/anacrolix/envpprof v1.1.1 github.com/anacrolix/go-libutp v1.0.4 - github.com/anacrolix/log v0.9.0 + github.com/anacrolix/log v0.10.0 github.com/anacrolix/missinggo v1.3.0 github.com/anacrolix/missinggo/perf v1.0.0 github.com/anacrolix/missinggo/v2 v2.5.2 @@ -28,7 +28,7 @@ github.com/davecgh/go-spew v1.1.1 github.com/dustin/go-humanize v1.0.0 github.com/edsrzf/mmap-go v1.0.0 github.com/elliotchance/orderedmap v1.4.0 - github.com/frankban/quicktest v1.13.1 + github.com/frankban/quicktest v1.14.0 github.com/fsnotify/fsnotify v1.5.1 github.com/google/btree v1.0.1 github.com/google/go-cmp v0.5.6 diff --git a/go.sum b/go.sum index 79b80c611a059d0bd01147e5a7c950e4bca973e9..4ed7c1cbed651aa24ac644d87af2777db0c0b165 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,8 @@ github.com/alexflint/go-arg v1.4.2/go.mod h1:9iRbDxne7LcR/GSvEr7ma++GLpdIU1zrghf2y2768kM= github.com/alexflint/go-scalar v1.0.0/go.mod h1:GpHzbCOZXEKMEcygYQ5n/aa4Aq84zbxjy3MxYW0gjYw= github.com/alexflint/go-scalar v1.1.0 h1:aaAouLLzI9TChcPXotr6gUhq+Scr8rl0P9P4PnltbhM= github.com/alexflint/go-scalar v1.1.0/go.mod h1:LoFvNMqS1CPrMVltza4LvnGKhaSpc3oyLEBUZVhhS2o= -github.com/anacrolix/args v0.1.1-0.20210917054839-725094dd33fe h1:YvHyQkMwxP4OdUnudTu6HaU6WqW25ShNgXtUEWjN/tc= -github.com/anacrolix/args v0.1.1-0.20210917054839-725094dd33fe/go.mod h1:RCPBt2vU1GJn4gG9rL+fuYu7ivnE9tmK2pHm63t3yO0= +github.com/anacrolix/args v0.1.1-0.20211020052733-53ed238acbd4 h1:s6KNsoIo4VlU5fqoTVczWqDyM163HzyqoPY0hQJS+9U= +github.com/anacrolix/args v0.1.1-0.20211020052733-53ed238acbd4/go.mod h1:41JBnF8sKExNVLHPkCdL74jkZc3dSxAkGsk1TuKOUFI= github.com/anacrolix/chansync v0.0.0-20210524073341-a336ebc2de92/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= github.com/anacrolix/chansync v0.3.0-0.0.20211007004133-3f72684c4a93 h1:sQ8igc3anitrtKPEHRK+RBvuNZP0+DRAa6jskKlq4+k= github.com/anacrolix/chansync v0.3.0-0.0.20211007004133-3f72684c4a93/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k= @@ -87,8 +87,9 @@ github.com/anacrolix/log v0.6.0/go.mod h1:lWvLTqzAnCWPJA08T2HCstZi0L1y2Wyvm3FJgwU9jwU= github.com/anacrolix/log v0.6.1-0.20200416071330-f58a030e6149/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJtnX0oQI= github.com/anacrolix/log v0.7.1-0.20200604014615-c244de44fd2d/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJtnX0oQI= github.com/anacrolix/log v0.8.0/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJtnX0oQI= -github.com/anacrolix/log v0.9.0 h1:HD1Ml3WV/6lRbITKqi5EIS3e9rVOGHej5V9UaQA4cvY= github.com/anacrolix/log v0.9.0/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJtnX0oQI= +github.com/anacrolix/log v0.10.0 h1:uz9XDnmsw8ZEO/TTRU03lL7I74PlgVRFszYqPZ39WNY= +github.com/anacrolix/log v0.10.0/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJtnX0oQI= github.com/anacrolix/missinggo v0.0.0-20180522035225-b4a5853e62ff/go.mod h1:b0p+7cn+rWMIphK1gDH2hrDuwGOcbB6V4VXeSsEfHVk= github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s= github.com/anacrolix/missinggo v0.2.1-0.20190310234110-9fbdc9f242a8/go.mod h1:MBJu3Sk/k3ZfGYcS7z18gwfu72Ey/xopPFJJbTi5yIo= @@ -236,8 +237,9 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/frankban/quicktest v1.9.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= -github.com/frankban/quicktest v1.13.1 h1:xVm/f9seEhZFL9+n5kv5XLrGwy6elc4V9v/XFY2vmd8= github.com/frankban/quicktest v1.13.1/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= +github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= +github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= diff --git a/issue97_test.go b/issue97_test.go index 2091b758df633dd373665c8088456c715090a810..04a2c4be4d5cc0e4703240fd87846016dd389309 100644 --- a/issue97_test.go +++ b/issue97_test.go @@ -19,6 +19,7 @@ defer os.RemoveAll(td) tt := &Torrent{ storageOpener: storage.NewClient(storage.NewFile(td)), logger: log.Default, + chunkSize: defaultChunkSize, } mi := testutil.GreetingMetaInfo() info, err := mi.UnmarshalInfo() diff --git a/peer-impl.go b/peer-impl.go index 23c0fbb922413298e5ac548a245d01df26bcbb58..b5cf028ecd98b3a75d7d927b31c165b735ade44f 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -8,13 +8,15 @@ // Contains implementation details that differ between peer types, like Webseeds and regular // BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with // legacy PeerConn methods. type peerImpl interface { - onNextRequestStateChanged() - updateRequests() + // Trigger the actual request state to get updated + handleUpdateRequests() + // Whether the outstanding local request cardinality is low enough to warrant an update. + isLowOnRequests() bool writeInterested(interested bool) bool // Neither of these return buffer room anymore, because they're currently both posted. There's // also PeerConn.writeBufferFull for when/where it matters. - _cancel(Request) bool + _cancel(RequestIndex) bool _request(Request) bool connectionFlags() string @@ -23,5 +25,4 @@ onGotInfo(*metainfo.Info) drop() String() string connStatusString() string - writeBufferFull() bool } diff --git a/peerconn.go b/peerconn.go index 749832ca14de9994b3f976afc0f6547d1bdcc6f5..91ef37e601f27bf37a51b886f1658b88eb4f5d6f 100644 --- a/peerconn.go +++ b/peerconn.go @@ -17,7 +17,6 @@ "github.com/RoaringBitmap/roaring" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/bitmap" - "github.com/anacrolix/missinggo/v2/prioritybitmap" "github.com/anacrolix/multiless" "github.com/anacrolix/chansync" @@ -83,8 +82,10 @@ lastUsefulChunkReceived time.Time lastChunkSent time.Time // Stuff controlled by the local peer. - nextRequestState requestState + needRequestUpdate string actualRequestState requestState + updateRequestsTimer *time.Timer + cancelledRequests roaring.Bitmap lastBecameInterested time.Time priorInterest time.Duration @@ -126,9 +127,6 @@ PeerMaxRequests maxRequests // Maximum pending requests the peer allows. PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber PeerClientName string - - pieceInclination []int - _pieceRequestOrder prioritybitmap.PriorityBitmap logger log.Logger } @@ -245,7 +243,7 @@ } return roaring.Flip(&cn._peerPieces, 0, bitmap.BitRange(cn.t.numPieces())).IsEmpty(), true } -func (cn *PeerConn) locker() *lockWithDeferreds { +func (cn *Peer) locker() *lockWithDeferreds { return cn.t.cl.locker() } @@ -368,13 +366,14 @@ cn.cumInterest(), cn.totalExpectingTime(), ) fmt.Fprintf(w, - " %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: %d/(%d/%d)-%d/%d, flags: %s, dr: %.1f KiB/s\n", + " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d-%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", cn.completedString(), len(cn.peerTouchedPieces), &cn._stats.ChunksReadUseful, &cn._stats.ChunksRead, &cn._stats.ChunksWritten, cn.actualRequestState.Requests.GetCardinality(), + cn.cancelledRequests.GetCardinality(), cn.nominalMaxRequests(), cn.PeerMaxRequests, len(cn.peerRequests), @@ -404,8 +403,9 @@ func (p *Peer) close() { if !p.closed.Set() { return } - p.discardPieceInclination() - p._pieceRequestOrder.Clear() + if p.updateRequestsTimer != nil { + p.updateRequestsTimer.Stop() + } p.peerImpl.onClose() if p.t != nil { p.t.decPeerPieceAvailability(p) @@ -472,7 +472,7 @@ } // The actual value to use as the maximum outbound requests. func (cn *Peer) nominalMaxRequests() (ret maxRequests) { - return maxRequests(clamp(1, int64(cn.PeerMaxRequests), 128)) + return maxRequests(clamp(1, int64(cn.PeerMaxRequests), 2048)) } func (cn *Peer) totalExpectingTime() (ret time.Duration) { @@ -556,6 +556,8 @@ // The function takes a message to be sent, and returns true if more messages // are okay. type messageWriter func(pp.Message) bool +// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it +// when we want to go fast. func (cn *Peer) shouldRequest(r RequestIndex) error { pi := pieceIndex(r / cn.t.chunksPerRegularPiece()) if !cn.peerHasPiece(pi) { @@ -574,11 +576,23 @@ if cn.t.pieceQueuedForHash(pi) { panic("piece is queued for hash") } if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) { - panic("peer choking and piece not allowed fast") + // This could occur if we made a request with the fast extension, and then got choked and + // haven't had the request rejected yet. + if !cn.actualRequestState.Requests.Contains(r) { + panic("peer choking and piece not allowed fast") + } } return nil } +func (cn *Peer) mustRequest(r RequestIndex) bool { + more, err := cn.request(r) + if err != nil { + panic(err) + } + return more +} + func (cn *Peer) request(r RequestIndex) (more bool, err error) { if err := cn.shouldRequest(r); err != nil { panic(err) @@ -594,7 +608,7 @@ if cn.validReceiveChunks == nil { cn.validReceiveChunks = make(map[RequestIndex]int) } cn.validReceiveChunks[r]++ - cn.t.pendingRequests[r]++ + cn.t.pendingRequests.Inc(r) cn.updateExpectingChunks() ppReq := cn.t.requestIndexToRequest(r) for _, f := range cn.callbacks.SentRequest { @@ -613,18 +627,32 @@ }) } func (me *Peer) cancel(r RequestIndex) bool { - if me.deleteRequest(r) { - return me.peerImpl._cancel(me.t.requestIndexToRequest(r)) + if !me.actualRequestState.Requests.Contains(r) { + return true } - return true + return me._cancel(r) } -func (me *PeerConn) _cancel(r Request) bool { - return me.write(makeCancelMessage(r)) +func (me *PeerConn) _cancel(r RequestIndex) bool { + if me.cancelledRequests.Contains(r) { + // Already cancelled and waiting for a response. + return true + } + if me.fastEnabled() { + me.cancelledRequests.Add(r) + } else { + if !me.deleteRequest(r) { + panic("request not existing should have been guarded") + } + if me.isLowOnRequests() { + me.updateRequests("Peer.cancel") + } + } + return me.write(makeCancelMessage(me.t.requestIndexToRequest(r))) } func (cn *PeerConn) fillWriteBuffer() { - if !cn.applyNextRequestState() { + if !cn.maybeUpdateActualRequestState() { return } if cn.pex.IsEnabled() { @@ -660,15 +688,18 @@ }) cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()} } -func (cn *PeerConn) updateRequests() { - if peerRequesting { - if cn.actualRequestState.Requests.GetCardinality() != 0 { - return - } - cn.tickleWriter() +// Sets a reason to update requests, and if there wasn't already one, handle it. +func (cn *Peer) updateRequests(reason string) { + if cn.needRequestUpdate != "" { return } - cn.t.cl.tickleRequester() + cn.needRequestUpdate = reason + cn.handleUpdateRequests() +} + +func (cn *PeerConn) handleUpdateRequests() { + // The writer determines the request state as needed when it can write. + cn.tickleWriter() } // Emits the indices in the Bitmaps bms in order, never repeating any index. @@ -694,54 +725,7 @@ } } } -// check callers updaterequests -func (cn *Peer) stopRequestingPiece(piece pieceIndex) bool { - return cn._pieceRequestOrder.Remove(piece) -} - -// This is distinct from Torrent piece priority, which is the user's -// preference. Connection piece priority is specific to a connection and is -// used to pseudorandomly avoid connections always requesting the same pieces -// and thus wasting effort. -func (cn *Peer) updatePiecePriority(piece pieceIndex) bool { - tpp := cn.t.piecePriority(piece) - if !cn.peerHasPiece(piece) { - tpp = PiecePriorityNone - } - if tpp == PiecePriorityNone { - return cn.stopRequestingPiece(piece) - } - prio := cn.getPieceInclination()[piece] - return cn._pieceRequestOrder.Set(piece, prio) -} - -func (cn *Peer) getPieceInclination() []int { - if cn.pieceInclination == nil { - cn.pieceInclination = cn.t.getConnPieceInclination() - } - return cn.pieceInclination -} - -func (cn *Peer) discardPieceInclination() { - if cn.pieceInclination == nil { - return - } - cn.t.putPieceInclination(cn.pieceInclination) - cn.pieceInclination = nil -} - func (cn *Peer) peerPiecesChanged() { - if cn.t.haveInfo() { - prioritiesChanged := false - for i := pieceIndex(0); i < cn.t.numPieces(); i++ { - if cn.updatePiecePriority(i) { - prioritiesChanged = true - } - } - if prioritiesChanged { - cn.updateRequests() - } - } cn.t.maybeDropMutuallyCompletePeer(cn) } @@ -763,10 +747,10 @@ if !cn.peerHasPiece(piece) { cn.t.incPieceAvailability(piece) } cn._peerPieces.Add(uint32(piece)) - cn.t.maybeDropMutuallyCompletePeer(&cn.Peer) - if cn.updatePiecePriority(piece) { - cn.updateRequests() + if cn.t.wantPieceIndex(piece) { + cn.updateRequests("have") } + cn.peerPiecesChanged() return nil } @@ -795,6 +779,9 @@ } } if have { cn._peerPieces.Add(uint32(i)) + if cn.t.wantPieceIndex(i) { + cn.updateRequests("bitfield") + } } else { cn._peerPieces.Remove(uint32(i)) } @@ -815,6 +802,9 @@ } } cn.peerSentHaveAll = true cn._peerPieces.Clear() + if !cn.t._pendingPieces.IsEmpty() { + cn.updateRequests("Peer.onPeerHasAllPieces") + } cn.peerPiecesChanged() } @@ -887,7 +877,7 @@ func (cn *PeerConn) wroteBytes(n int64) { cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten })) } -func (cn *PeerConn) readBytes(n int64) { +func (cn *Peer) readBytes(n int64) { cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead })) } @@ -1034,6 +1024,12 @@ go f() } else { f() } +} + +func (c *PeerConn) logProtocolBehaviour(level log.Level, format string, arg ...interface{}) { + c.logger.WithLevel(level).WithContextText(fmt.Sprintf( + "peer id %q, ext v %q", c.PeerID, c.PeerClientName, + )).SkipCallers(1).Printf(format, arg...) } // Processes incoming BitTorrent wire-protocol messages. The client lock is held upon entry and @@ -1082,16 +1078,49 @@ return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type) } switch msg.Type { case pp.Choke: - c.peerChoking = true + if c.peerChoking { + break + } if !c.fastEnabled() { c.deleteAllRequests() + } else { + // We don't decrement pending requests here, let's wait for the peer to either + // reject or satisfy the outstanding requests. Additionally some peers may unchoke + // us and resume where they left off, we don't want to have piled on to those chunks + // in the meanwhile. I think a peers ability to abuse this should be limited: they + // could let us request a lot of stuff, then choke us and never reject, but they're + // only a single peer, our chunk balancing should smooth over this abuse. } - // We can then reset our interest. - c.updateRequests() + c.peerChoking = true + // We can now reset our interest. I think we do this after setting the flag in case the + // peerImpl updates synchronously (webseeds?). + c.updateRequests("choked") c.updateExpectingChunks() case pp.Unchoke: + if !c.peerChoking { + // Some clients do this for some reason. Transmission doesn't error on this, so we + // won't for consistency. + c.logProtocolBehaviour(log.Debug, "received unchoke when already unchoked") + break + } c.peerChoking = false - c.updateRequests() + preservedCount := 0 + c.actualRequestState.Requests.Iterate(func(x uint32) bool { + if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) { + preservedCount++ + } + return true + }) + if preservedCount != 0 { + // TODO: Yes this is a debug log but I'm not happy with the state of the logging lib + // right now. + c.logger.WithLevel(log.Debug).Printf( + "%v requests were preserved while being choked (fast=%v)", + preservedCount, + c.fastEnabled()) + torrent.Add("requestsPreservedThroughChoking", int64(preservedCount)) + } + c.updateRequests("unchoked") c.updateExpectingChunks() case pp.Interested: c.peerInterested = true @@ -1138,7 +1167,7 @@ }) case pp.Suggest: torrent.Add("suggests received", 1) log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).SetLevel(log.Debug).Log(c.t.logger) - c.updateRequests() + c.updateRequests("suggested") case pp.HaveAll: err = c.onPeerSentHaveAll() case pp.HaveNone: @@ -1148,8 +1177,7 @@ c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg))) case pp.AllowedFast: torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger) - c.peerAllowedFast.Add(bitmap.BitIndex(msg.Index)) - c.updateRequests() + c.updateRequests("PeerConn.mainReadLoop allowed fast") case pp.Extended: err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) default: @@ -1163,6 +1191,9 @@ } func (c *Peer) remoteRejectedRequest(r RequestIndex) { if c.deleteRequest(r) { + if c.isLowOnRequests() { + c.updateRequests("Peer.remoteRejectedRequest") + } c.decExpectedChunkReceive(r) } } @@ -1299,6 +1330,9 @@ deletedRequest = true if !c.peerChoking { c._chunksReceivedWhileExpecting++ } + if c.isLowOnRequests() { + c.updateRequests("Peer.receiveChunk deleted request") + } } else { chunksReceived.Add("unwanted", 1) } @@ -1309,6 +1343,7 @@ cl := t.cl // Do we actually want this chunk? if t.haveChunk(ppReq) { + //panic(fmt.Sprintf("%+v", ppReq)) chunksReceived.Add("wasted", 1) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) return nil @@ -1320,7 +1355,6 @@ c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) if deletedRequest { c.piecesReceivedSinceLastRequestUpdate++ - c.updateRequests() c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) } for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData { @@ -1361,7 +1395,10 @@ if err != nil { c.logger.WithDefaultLevel(log.Error).Printf("writing received chunk %v: %v", req, err) t.pendRequest(req) - //t.updatePieceCompletion(pieceIndex(msg.Index)) + // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a + // request update runs while we're writing the chunk that just failed. Then we never do a + // fresh update after pending the failed request. + c.updateRequests("Peer.receiveChunk error writing chunk") t.onWriteChunkErr(err) return nil } @@ -1470,27 +1507,25 @@ return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64() } func (c *Peer) peerHasWantedPieces() bool { - return !c._pieceRequestOrder.IsEmpty() + if c.peerSentHaveAll { + return !c.t.haveAllPieces() + } + if !c.t.haveInfo() { + return !c._peerPieces.IsEmpty() + } + return c._peerPieces.Intersects(&c.t._pendingPieces) } func (c *Peer) deleteRequest(r RequestIndex) bool { - c.nextRequestState.Requests.Remove(r) if !c.actualRequestState.Requests.CheckedRemove(r) { return false } + c.cancelledRequests.Remove(r) for _, f := range c.callbacks.DeletedRequest { f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) } c.updateExpectingChunks() - pr := c.t.pendingRequests - pr[r]-- - n := pr[r] - if n == 0 { - delete(pr, r) - } - if n < 0 { - panic(n) - } + c.t.pendingRequests.Dec(r) return true } @@ -1502,10 +1537,6 @@ }) if !c.actualRequestState.Requests.IsEmpty() { panic(c.actualRequestState.Requests.GetCardinality()) } - c.nextRequestState.Requests.Clear() - // for c := range c.t.conns { - // c.tickleWriter() - // } } // This is called when something has changed that should wake the writer, such as putting stuff into @@ -1633,6 +1664,6 @@ pc, ok := p.peerImpl.(*PeerConn) return pc, ok } -func (p *PeerConn) onNextRequestStateChanged() { - p.tickleWriter() +func (pc *PeerConn) isLowOnRequests() bool { + return pc.actualRequestState.Requests.IsEmpty() } diff --git a/peerconn_test.go b/peerconn_test.go index be88bf47b27d2d83e566748d4b444fa30a9d2dad..b6ad410c103a855eb329c11381be0757f062c22e 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -104,7 +104,7 @@ Length: 1 << 20, PieceLength: 1 << 20, })) t.setChunkSize(defaultChunkSize) - t._pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority()) + t._pendingPieces.Add(0) r, w := net.Pipe() cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r)) cn.setTorrent(t) diff --git a/pending-requests.go b/pending-requests.go new file mode 100644 index 0000000000000000000000000000000000000000..3d0e4ca61670197b989ce64629ccbb10ac2723fb --- /dev/null +++ b/pending-requests.go @@ -0,0 +1,33 @@ +package torrent + +type pendingRequests struct { + m []int +} + +func (p *pendingRequests) Dec(r RequestIndex) { + prev := p.m[r] + if prev <= 0 { + panic(prev) + } + p.m[r]-- +} + +func (p *pendingRequests) Inc(r RequestIndex) { + p.m[r]++ +} + +func (p *pendingRequests) Init(maxIndex RequestIndex) { + p.m = make([]int, maxIndex) +} + +func (p *pendingRequests) AssertEmpty() { + for _, count := range p.m { + if count != 0 { + panic(count) + } + } +} + +func (p *pendingRequests) Get(r RequestIndex) int { + return p.m[r] +} diff --git a/pending-requests_test.go b/pending-requests_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5f0debdca0408fe42926f1328627f286ca75e3bd --- /dev/null +++ b/pending-requests_test.go @@ -0,0 +1,19 @@ +package torrent + +import ( + "testing" + + qt "github.com/frankban/quicktest" + "github.com/google/go-cmp/cmp" +) + +// Ensure that cmp.Diff will detect errors as required. +func TestPendingRequestsDiff(t *testing.T) { + var a, b pendingRequests + c := qt.New(t) + diff := func() string { return cmp.Diff(a.m, b.m) } + c.Check(diff(), qt.ContentEquals, "") + a.m = []int{1, 3} + b.m = []int{1, 2, 3} + c.Check(diff(), qt.Not(qt.Equals), "") +} diff --git a/piece.go b/piece.go index 20c5c19a28399b8e97d6eaf7febbae76071f0f24..6b70a5b56407c4b6dc287f5d819d98599a47c4cc 100644 --- a/piece.go +++ b/piece.go @@ -8,7 +8,6 @@ "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" "github.com/anacrolix/missinggo/v2/bitmap" - request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" @@ -41,6 +40,8 @@ // Connections that have written data to this piece since its last check. // This can include connections that have closed. dirtiers map[*Peer]struct{} + + undirtiedChunksIter undirtiedChunksIter } func (p *Piece) String() string { @@ -191,7 +192,7 @@ func (p *Piece) SetPriority(prio piecePriority) { p.t.cl.lock() defer p.t.cl.unlock() p.priority = prio - p.t.updatePiecePriority(p.index) + p.t.updatePiecePriority(p.index, "Piece.SetPriority") } func (p *Piece) purePriority() (ret piecePriority) { @@ -244,13 +245,14 @@ func init() { gob.Register(undirtiedChunksIter{}) } +// Use an iterator to jump between dirty bits. type undirtiedChunksIter struct { TorrentDirtyChunks *roaring.Bitmap StartRequestIndex RequestIndex EndRequestIndex RequestIndex } -func (me undirtiedChunksIter) Iter(f func(chunkIndexType)) { +func (me *undirtiedChunksIter) Iter(f func(chunkIndexType)) { it := me.TorrentDirtyChunks.Iterator() startIndex := me.StartRequestIndex endIndex := me.EndRequestIndex @@ -270,29 +272,6 @@ for index := lastDirty + 1; index < endIndex; index++ { f(index - startIndex) } return -} - -func (p *Piece) undirtiedChunksIter() request_strategy.ChunksIter { - // Use an iterator to jump between dirty bits. - return undirtiedChunksIter{ - TorrentDirtyChunks: &p.t.dirtyChunks, - StartRequestIndex: p.requestIndexOffset(), - EndRequestIndex: p.requestIndexOffset() + p.numChunks(), - } -} - -func (p *Piece) iterUndirtiedChunks(f func(chunkIndexType)) { - if true { - p.undirtiedChunksIter().Iter(f) - return - } - // The original implementation. - for i := chunkIndexType(0); i < p.numChunks(); i++ { - if p.chunkIndexDirty(i) { - continue - } - f(i) - } } func (p *Piece) requestIndexOffset() RequestIndex { diff --git a/request-strategy/order.go b/request-strategy/order.go index 82cb504872d69596e65529c7edc7541f3c815fc6..7456649bb6c42da042f6250ff2f23a3a411fcd26 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -88,7 +88,7 @@ IterPendingChunks ChunksIterFunc } func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex { - return RequestIndex(p.t.ChunksPerPiece*p.index) + RequestIndex(c) + return p.t.ChunksPerPiece*uint32(p.index) + c } type filterPiece struct { diff --git a/request-strategy/piece.go b/request-strategy/piece.go index dfc2d928a323710add9c010bbc66858a19246e24..8a038e67dafe8a9ea73531085e6057f0cf500077 100644 --- a/request-strategy/piece.go +++ b/request-strategy/piece.go @@ -3,7 +3,7 @@ type ChunksIterFunc func(func(ChunkIndex)) type ChunksIter interface { - Iter(func(ChunkIndex)) + Iter(func(ci ChunkIndex)) } type Piece struct { diff --git a/request-strategy/torrent.go b/request-strategy/torrent.go index 262ae9656682cc9f04bc14aa3c1dd0666aeddf37..ff0261a3eb6944a3c99f6fee98f74cb23a54fee0 100644 --- a/request-strategy/torrent.go +++ b/request-strategy/torrent.go @@ -12,7 +12,7 @@ // Unclosed Peers. Not necessary for getting requestable piece ordering. Peers []Peer // Some value that's unique and stable between runs. Could even use the infohash? InfoHash metainfo.Hash - ChunksPerPiece int + ChunksPerPiece uint32 MaxUnverifiedBytes int64 } diff --git a/requesting.go b/requesting.go index 82ad60de55f0386b478180138475b021e7d21c41..01c513aebb9839adbc563070f45fdfd95ec5ad04 100644 --- a/requesting.go +++ b/requesting.go @@ -1,47 +1,20 @@ package torrent import ( + "container/heap" + "context" "encoding/gob" "reflect" + "runtime/pprof" "time" "unsafe" - "github.com/RoaringBitmap/roaring" - "github.com/anacrolix/chansync/events" "github.com/anacrolix/log" - "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/multiless" request_strategy "github.com/anacrolix/torrent/request-strategy" ) -// Calculate requests individually for each peer. -const peerRequesting = false - -func (cl *Client) requester() { - for { - update := func() events.Signaled { - cl.lock() - defer cl.unlock() - cl.doRequests() - return cl.updateRequests.Signaled() - }() - minWait := time.After(100 * time.Millisecond) - maxWait := time.After(1000 * time.Millisecond) - select { - case <-cl.closed.Done(): - return - case <-minWait: - case <-maxWait: - } - select { - case <-cl.closed.Done(): - return - case <-update: - case <-maxWait: - } - } -} - func (cl *Client) tickleRequester() { cl.updateRequests.Broadcast() } @@ -57,7 +30,7 @@ continue } rst := request_strategy.Torrent{ InfoHash: t.infoHash, - ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize), + ChunksPerPiece: t.chunksPerRegularPiece(), } if t.storage != nil { rst.Capacity = t.storage.Capacity @@ -72,7 +45,7 @@ Partial: t.piecePartiallyDownloaded(i), Availability: p.availability, Length: int64(p.length()), NumPendingChunks: int(t.pieceNumPendingChunks(i)), - IterPendingChunks: p.undirtiedChunksIter(), + IterPendingChunks: &p.undirtiedChunksIter, }) } t.iterPeers(func(p *Peer) { @@ -105,14 +78,6 @@ MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes, } } -func (cl *Client) doRequests() { - input := cl.getRequestStrategyInput() - nextPeerStates := request_strategy.Run(input) - for p, state := range nextPeerStates { - setPeerNextRequestState(p, state) - } -} - func init() { gob.Register(peerId{}) } @@ -151,94 +116,163 @@ copy(*(*[]byte)(unsafe.Pointer(&dst)), b) return nil } -func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) { - p := _p.(peerId).Peer - p.nextRequestState = rp - p.onNextRequestStateChanged() -} - type RequestIndex = request_strategy.RequestIndex type chunkIndexType = request_strategy.ChunkIndex -func (p *Peer) applyNextRequestState() bool { - if peerRequesting { - if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) { - return true +type peerRequests struct { + requestIndexes []RequestIndex + peer *Peer + torrentStrategyInput request_strategy.Torrent +} + +func (p *peerRequests) Len() int { + return len(p.requestIndexes) +} + +func (p *peerRequests) Less(i, j int) bool { + leftRequest := p.requestIndexes[i] + rightRequest := p.requestIndexes[j] + t := p.peer.t + leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece + rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece + leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest) + rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest) + pending := func(index RequestIndex, current bool) int { + ret := t.pendingRequests.Get(index) + if current { + ret-- } - type piece struct { - index int - endGame bool + // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be + // resolved. + if ret < 0 { + panic(ret) } - var pieceOrder []piece - request_strategy.GetRequestablePieces( - p.t.cl.getRequestStrategyInput(), - func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) { - if t.InfoHash != p.t.infoHash { - return - } - if !p.peerHasPiece(pieceIndex) { - return - } - pieceOrder = append(pieceOrder, piece{ - index: pieceIndex, - endGame: rsp.Priority == PiecePriorityNow, - }) - }, + return ret + } + ml := multiless.New() + // Push requests that can't be served right now to the end. But we don't throw them away unless + // there's a better alternative. This is for when we're using the fast extension and get choked + // but our requests could still be good when we get unchoked. + if p.peer.peerChoking { + ml = ml.Bool( + !p.peer.peerAllowedFast.Contains(leftPieceIndex), + !p.peer.peerAllowedFast.Contains(rightPieceIndex), ) - more := true - interested := false - for _, endGameIter := range []bool{false, true} { - for _, piece := range pieceOrder { - tp := p.t.piece(piece.index) - tp.iterUndirtiedChunks(func(cs chunkIndexType) { - req := cs + tp.requestIndexOffset() - if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 { + } + ml = ml.Int( + pending(leftRequest, leftCurrent), + pending(rightRequest, rightCurrent)) + ml = ml.Bool(!leftCurrent, !rightCurrent) + ml = ml.Int( + -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority), + -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority), + ) + ml = ml.Int( + int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability), + int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability)) + ml = ml.Uint32(leftPieceIndex, rightPieceIndex) + ml = ml.Uint32(leftRequest, rightRequest) + return ml.MustLess() +} + +func (p *peerRequests) Swap(i, j int) { + p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i] +} + +func (p *peerRequests) Push(x interface{}) { + p.requestIndexes = append(p.requestIndexes, x.(RequestIndex)) +} + +func (p *peerRequests) Pop() interface{} { + last := len(p.requestIndexes) - 1 + x := p.requestIndexes[last] + p.requestIndexes = p.requestIndexes[:last] + return x +} + +type desiredRequestState struct { + Requests []RequestIndex + Interested bool +} + +func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { + input := p.t.cl.getRequestStrategyInput() + requestHeap := peerRequests{ + peer: p, + } + for _, t := range input.Torrents { + if t.InfoHash == p.t.infoHash { + requestHeap.torrentStrategyInput = t + break + } + } + request_strategy.GetRequestablePieces( + input, + func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) { + if t.InfoHash != p.t.infoHash { + return + } + if !p.peerHasPiece(pieceIndex) { + return + } + allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) + rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) { + r := p.t.pieceRequestIndexOffset(pieceIndex) + ci + //if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) { + // return + //} + if !allowedFast { + // We must signal interest to request this + desired.Interested = true + // We can make or will allow sustaining a request here if we're not choked, or + // have made the request previously (presumably while unchoked), and haven't had + // the peer respond yet (and the request was retained because we are using the + // fast extension). + if p.peerChoking && !p.actualRequestState.Requests.Contains(r) { + // We can't request this right now. return } - interested = true - more = p.setInterested(true) - if !more { - return - } - if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() { - return - } - if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) { - return - } - var err error - more, err = p.request(req) - if err != nil { - panic(err) - } - }) - if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() { - break } - if !more { - break - } - } - if !more { - break - } - } - if !more { - return false - } - if !interested { - p.setInterested(false) - } - return more + requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) + }) + }, + ) + p.t.assertPendingRequests() + heap.Init(&requestHeap) + for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() { + requestIndex := heap.Pop(&requestHeap).(RequestIndex) + desired.Requests = append(desired.Requests, requestIndex) + } + return +} + +func (p *Peer) maybeUpdateActualRequestState() bool { + if p.needRequestUpdate == "" { + return true } + var more bool + pprof.Do( + context.Background(), + pprof.Labels("update request", p.needRequestUpdate), + func(_ context.Context) { + next := p.getDesiredRequestState() + more = p.applyRequestState(next) + }, + ) + return more +} - next := p.nextRequestState - current := p.actualRequestState +// Transmit/action the request state to the peer. +func (p *Peer) applyRequestState(next desiredRequestState) bool { + current := &p.actualRequestState if !p.setInterested(next.Interested) { return false } more := true - cancel := roaring.AndNot(¤t.Requests, &next.Requests) + cancel := current.Requests.Clone() + for _, ri := range next.Requests { + cancel.Remove(ri) + } cancel.Iterate(func(req uint32) bool { more = p.cancel(req) return more @@ -246,20 +280,37 @@ }) if !more { return false } - next.Requests.Iterate(func(req uint32) bool { - // This could happen if the peer chokes us between the next state being generated, and us - // trying to transmit the state. - if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) { - return true + for _, req := range next.Requests { + if p.cancelledRequests.Contains(req) { + // Waiting for a reject or piece message, which will suitably trigger us to update our + // requests, so we can skip this one with no additional consideration. + continue } - var err error - more, err = p.request(req) - if err != nil { - panic(err) - } /* else { - log.Print(req) - } */ - return more - }) + // The cardinality of our desired requests shouldn't exceed the max requests since it's used + // in the calculation of the requests. However, if we cancelled requests and they haven't + // been rejected or serviced yet with the fast extension enabled, we can end up with more + // extra outstanding requests. We could subtract the number of outstanding cancels from the + // next request cardinality, but peers might not like that. + if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() { + //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]", + // next.Requests.GetCardinality(), + // p.cancelledRequests.GetCardinality(), + // current.Requests.GetCardinality(), + // p.nominalMaxRequests(), + //) + break + } + more = p.mustRequest(req) + if !more { + break + } + } + p.updateRequestsTimer.Stop() + if more { + p.needRequestUpdate = "" + if !current.Requests.IsEmpty() { + p.updateRequestsTimer.Reset(3 * time.Second) + } + } return more } diff --git a/spec.go b/spec.go index a69e2093d9e386d406f9c95831f17ee00876e874..60c774cf7db9ad62d005b0ac384f6178aac24869 100644 --- a/spec.go +++ b/spec.go @@ -30,7 +30,7 @@ // for new Torrents. TODO: Move into a "new" Torrent opt type. ChunkSize pp.Integer // TODO: Move into a "new" Torrent opt type. Storage storage.ClientImpl - + DisableInitialPieceCheck bool // Whether to allow data download or upload diff --git a/t.go b/t.go index 042a5262560a33375fd052c73bdc015f8a331067..78c500dc438faf33b9d6f3a12dd669ff5fa5566c 100644 --- a/t.go +++ b/t.go @@ -185,25 +185,25 @@ func (t *Torrent) downloadPiecesLocked(begin, end pieceIndex) { for i := begin; i < end; i++ { if t.pieces[i].priority.Raise(PiecePriorityNormal) { - t.updatePiecePriority(i) + t.updatePiecePriority(i, "Torrent.DownloadPieces") } } } -func (t *Torrent) CancelPieces(begin, end pieceIndex) { +func (t *Torrent) CancelPieces(begin, end pieceIndex, reason string) { t.cl.lock() - t.cancelPiecesLocked(begin, end) + t.cancelPiecesLocked(begin, end, "Torrent.CancelPieces") t.cl.unlock() } -func (t *Torrent) cancelPiecesLocked(begin, end pieceIndex) { +func (t *Torrent) cancelPiecesLocked(begin, end pieceIndex, reason string) { for i := begin; i < end; i++ { p := &t.pieces[i] if p.priority == PiecePriorityNone { continue } p.priority = PiecePriorityNone - t.updatePiecePriority(i) + t.updatePiecePriority(i, reason) } } diff --git a/test/issue377_test.go b/test/issue377_test.go index 7456e9c452901bdfed35d0e9a15ca6ffa5690784..01caaed680ca783a1dc05c2e40eddce31d903d4f 100644 --- a/test/issue377_test.go +++ b/test/issue377_test.go @@ -51,6 +51,9 @@ defer seederClient.Close() defer testutil.ExportStatusWriter(seederClient, "s", t)() leecherClientConfig := torrent.TestingConfig(t) leecherClientConfig.Debug = true + // Don't require fast extension, whether the seeder will provide it or not (so we can test mixed + // cases). + leecherClientConfig.MinPeerExtensions.SetBit(pp.ExtensionBitFast, false) justOneNetwork(leecherClientConfig) leecherClient, err := torrent.NewClient(leecherClientConfig) require.NoError(t, err) diff --git a/testing.go b/testing.go index 6a9d892d4dbeaad495c9ac08a8b2d535a028f4eb..d681796d911b8248355945953b2558a1128cd4ff 100644 --- a/testing.go +++ b/testing.go @@ -3,6 +3,8 @@ import ( "testing" "time" + + pp "github.com/anacrolix/torrent/peer_protocol" ) func TestingConfig(t testing.TB) *ClientConfig { @@ -15,6 +17,7 @@ cfg.NoDefaultPortForwarding = true cfg.DisableAcceptRateLimiting = true cfg.ListenPort = 0 cfg.KeepAliveTimeout = time.Millisecond + cfg.MinPeerExtensions.SetBit(pp.ExtensionBitFast, true) //cfg.Debug = true //cfg.Logger = cfg.Logger.WithText(func(m log.Msg) string { // t := m.Text() diff --git a/torrent.go b/torrent.go index 32b1d720d3a1ab266619c1d5bb83ac7894f83911..5f18df1f3e9bacfd9dc9707fdbb6a4fc66f16bb5 100644 --- a/torrent.go +++ b/torrent.go @@ -27,10 +27,10 @@ "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" - "github.com/anacrolix/missinggo/v2/prioritybitmap" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" "github.com/davecgh/go-spew/spew" + "github.com/google/go-cmp/cmp" "github.com/pion/datachannel" "github.com/anacrolix/torrent/bencode" @@ -131,7 +131,7 @@ _readerReadaheadPieces bitmap.Bitmap // A cache of pieces we need to get. Calculated from various piece and // file priorities and completion states elsewhere. - _pendingPieces prioritybitmap.PriorityBitmap + _pendingPieces roaring.Bitmap // A cache of completed piece indices. _completedPieces roaring.Bitmap // Pieces that need to be hashed. @@ -145,7 +145,7 @@ // different pieces. connPieceInclinationPool sync.Pool // Count of each request across active connections. - pendingRequests map[RequestIndex]int + pendingRequests pendingRequests // Chunks we've written to since the corresponding piece was last checked. dirtyChunks roaring.Bitmap @@ -193,10 +193,6 @@ } func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool { return !t.wantPieceIndex(i) -} - -func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap { - return &t._pendingPieces } // Returns a channel that is closed when the Torrent is closed. @@ -368,6 +364,11 @@ files := *t.files beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files) endFile := pieceEndFileIndex(piece.torrentEndOffset(), files) piece.files = files[beginFile:endFile] + piece.undirtiedChunksIter = undirtiedChunksIter{ + TorrentDirtyChunks: &t.dirtyChunks, + StartRequestIndex: piece.requestIndexOffset(), + EndRequestIndex: piece.requestIndexOffset() + piece.numChunks(), + } } } @@ -428,9 +429,6 @@ } // This seems to be all the follow-up tasks after info is set, that can't fail. func (t *Torrent) onSetInfo() { - t.iterPeers(func(p *Peer) { - p.onGotInfo(t.info) - }) for i := range t.pieces { p := &t.pieces[i] // Need to add availability before updating piece completion, as that may result in conns @@ -448,8 +446,12 @@ } t.cl.event.Broadcast() close(t.gotMetainfoC) t.updateWantPeersEvent() - t.pendingRequests = make(map[RequestIndex]int) + t.pendingRequests.Init(t.numRequests()) t.tryCreateMorePieceHashers() + t.iterPeers(func(p *Peer) { + p.onGotInfo(t.info) + p.updateRequests("onSetInfo") + }) } // Called when metadata for a torrent becomes available. @@ -860,6 +862,13 @@ func (t *Torrent) chunksPerRegularPiece() uint32 { return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize) } +func (t *Torrent) numRequests() RequestIndex { + if t.numPieces() == 0 { + return 0 + } + return uint32(t.numPieces()-1)*t.chunksPerRegularPiece() + t.pieceNumChunks(t.numPieces()-1) +} + func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) { t.dirtyChunks.RemoveRange( uint64(t.pieceRequestIndexOffset(pieceIndex)), @@ -923,7 +932,8 @@ return t.haveInfo() && t.pieceComplete(index) } func (t *Torrent) maybeDropMutuallyCompletePeer( - // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's okay? + // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's + // okay? p *Peer, ) { if !t.cl.config.DropMutuallyCompletePeers { @@ -961,32 +971,7 @@ return chunkIndexType(cs.Begin / chunkSize) } func (t *Torrent) wantPieceIndex(index pieceIndex) bool { - // TODO: Are these overly conservative, should we be guarding this here? - { - if !t.haveInfo() { - return false - } - if index < 0 || index >= t.numPieces() { - return false - } - } - p := &t.pieces[index] - if p.queuedForHash() { - return false - } - if p.hashing { - return false - } - if t.pieceComplete(index) { - return false - } - if t._pendingPieces.Contains(int(index)) { - return true - } - // t.logger.Printf("piece %d not pending", index) - return !t.forReaderOffsetPieces(func(begin, end pieceIndex) bool { - return index < begin || index >= end - }) + return t._pendingPieces.Contains(uint32(index)) } // A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent @@ -1058,7 +1043,7 @@ } func (t *Torrent) readersChanged() { t.updateReaderPieces() - t.updateAllPiecePriorities() + t.updateAllPiecePriorities("Torrent.readersChanged") } func (t *Torrent) updateReaderPieces() { @@ -1077,15 +1062,15 @@ l, h = h, l } if l.end < h.begin { // Two distinct ranges. - t.updatePiecePriorities(l.begin, l.end) - t.updatePiecePriorities(h.begin, h.end) + t.updatePiecePriorities(l.begin, l.end, "Torrent.readerPosChanged") + t.updatePiecePriorities(h.begin, h.end, "Torrent.readerPosChanged") } else { // Ranges overlap. end := l.end if h.end > end { end = h.end } - t.updatePiecePriorities(l.begin, end) + t.updatePiecePriorities(l.begin, end, "Torrent.readerPosChanged") } } @@ -1095,43 +1080,50 @@ t.cl.event.Broadcast() t.openNewConns() } -func (t *Torrent) piecePriorityChanged(piece pieceIndex) { - // t.logger.Printf("piece %d priority changed", piece) - t.iterPeers(func(c *Peer) { - if c.updatePiecePriority(piece) { - // log.Print("conn piece priority changed") - c.updateRequests() - } - }) +func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) { + if t._pendingPieces.Contains(uint32(piece)) { + t.iterPeers(func(c *Peer) { + if c.actualRequestState.Interested { + return + } + if !c.isLowOnRequests() { + return + } + if !c.peerHasPiece(piece) { + return + } + c.updateRequests(reason) + }) + } t.maybeNewConns() t.publishPieceChange(piece) } -func (t *Torrent) updatePiecePriority(piece pieceIndex) { +func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) { p := &t.pieces[piece] newPrio := p.uncachedPriority() // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) if newPrio == PiecePriorityNone { - if !t._pendingPieces.Remove(int(piece)) { + if !t._pendingPieces.CheckedRemove(uint32(piece)) { return } } else { - if !t._pendingPieces.Set(int(piece), newPrio.BitmapPriority()) { + if !t._pendingPieces.CheckedAdd(uint32(piece)) { return } } - t.piecePriorityChanged(piece) + t.piecePriorityChanged(piece, reason) } -func (t *Torrent) updateAllPiecePriorities() { - t.updatePiecePriorities(0, t.numPieces()) +func (t *Torrent) updateAllPiecePriorities(reason string) { + t.updatePiecePriorities(0, t.numPieces(), reason) } // Update all piece priorities in one hit. This function should have the same // output as updatePiecePriority, but across all pieces. -func (t *Torrent) updatePiecePriorities(begin, end pieceIndex) { +func (t *Torrent) updatePiecePriorities(begin, end pieceIndex, reason string) { for i := begin; i < end; i++ { - t.updatePiecePriority(i) + t.updatePiecePriority(i, reason) } } @@ -1172,32 +1164,21 @@ return true } func (t *Torrent) piecePriority(piece pieceIndex) piecePriority { - prio, ok := t._pendingPieces.GetPriority(piece) - if !ok { - return PiecePriorityNone - } - if prio > 0 { - panic(prio) - } - ret := piecePriority(-prio) - if ret == PiecePriorityNone { - panic(piece) - } - return ret + return t.piece(piece).uncachedPriority() } func (t *Torrent) pendRequest(req RequestIndex) { t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece()) } -func (t *Torrent) pieceCompletionChanged(piece pieceIndex) { +func (t *Torrent) pieceCompletionChanged(piece pieceIndex, reason string) { t.cl.event.Broadcast() if t.pieceComplete(piece) { t.onPieceCompleted(piece) } else { t.onIncompletePiece(piece) } - t.updatePiecePriority(piece) + t.updatePiecePriority(piece, reason) } func (t *Torrent) numReceivedConns() (ret int) { @@ -1275,7 +1256,7 @@ t.logger.Printf("marked piece %v complete but still has dirtiers", piece) } if changed { log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).SetLevel(log.Debug).Log(t.logger) - t.pieceCompletionChanged(piece) + t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion") } return changed } @@ -1338,7 +1319,7 @@ } if !t.haveInfo() { return true } - return t._pendingPieces.Len() != 0 + return !t._pendingPieces.IsEmpty() } func appendMissingStrings(old, new []string) (ret []string) { @@ -1405,9 +1386,7 @@ } } torrent.Add("deleted connections", 1) c.deleteAllRequests() - if t.numActivePeers() == 0 { - t.assertNoPendingRequests() - } + t.assertPendingRequests() return } @@ -1428,13 +1407,24 @@ }) return } -func (t *Torrent) assertNoPendingRequests() { - if len(t.pendingRequests) != 0 { - panic(t.pendingRequests) +func (t *Torrent) assertPendingRequests() { + if !check { + return + } + var actual pendingRequests + if t.haveInfo() { + actual.m = make([]int, t.numRequests()) + } + t.iterPeers(func(p *Peer) { + p.actualRequestState.Requests.Iterate(func(x uint32) bool { + actual.Inc(x) + return true + }) + }) + diff := cmp.Diff(actual.m, t.pendingRequests.m) + if diff != "" { + panic(diff) } - //if len(t.lastRequested) != 0 { - // panic(t.lastRequested) - //} } func (t *Torrent) dropConnection(c *PeerConn) { @@ -1524,7 +1514,7 @@ } } func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) { - t.logRunHandshookConn(pc, false, log.Warning) + t.logRunHandshookConn(pc, false, log.Debug) } func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer { @@ -2008,7 +1998,7 @@ // } // } t.iterPeers(func(conn *Peer) { if conn.peerHasPiece(piece) { - conn.updateRequests() + conn.updateRequests("piece incomplete") } }) } @@ -2030,7 +2020,7 @@ p := t.piece(pi) t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi)) p.hashing = true t.publishPieceChange(pi) - t.updatePiecePriority(pi) + t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher") t.storageLock.RLock() t.activePieceHashes++ go t.pieceHasher(pi) @@ -2063,7 +2053,7 @@ t.cl.lock() defer t.cl.unlock() p.hashing = false t.pieceHashed(index, correct, copyErr) - t.updatePiecePriority(index) + t.updatePiecePriority(index, "Torrent.pieceHasher") t.activePieceHashes-- t.tryCreateMorePieceHashers() } @@ -2091,7 +2081,7 @@ return } t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex)) t.publishPieceChange(pieceIndex) - t.updatePiecePriority(pieceIndex) + t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck") t.tryCreateMorePieceHashers() } @@ -2184,7 +2174,7 @@ t.cl.lock() defer t.cl.unlock() t.dataUploadDisallowed = false for c := range t.conns { - c.updateRequests() + c.updateRequests("allow data upload") } } @@ -2194,7 +2184,7 @@ t.cl.lock() defer t.cl.unlock() t.dataUploadDisallowed = true for c := range t.conns { - c.updateRequests() + c.updateRequests("disallow data upload") } } @@ -2250,7 +2240,9 @@ HttpClient: WebseedHttpClient, Url: url, }, activeRequests: make(map[Request]webseed.Request, maxRequests), + maxRequests: maxRequests, } + ws.peer.initUpdateRequestsTimer() ws.requesterCond.L = t.cl.locker() for i := 0; i < maxRequests; i += 1 { go ws.requester() @@ -2286,10 +2278,6 @@ } func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex { return t.pieceRequestIndexOffset(pieceIndex(r.Index)) + uint32(r.Begin/t.chunkSize) -} - -func (t *Torrent) numChunks() RequestIndex { - return RequestIndex((t.Length() + int64(t.chunkSize) - 1) / int64(t.chunkSize)) } func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex { diff --git a/torrent_test.go b/torrent_test.go index 64d90b691c5d4cc53fbc0529cdbd902af4945b21..2b96f19157421ebab0c5eb8979bd8b44bb4e2774 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -101,7 +101,7 @@ t._completedPieces.Add(bitmap.BitIndex(i)) } t.DownloadPieces(0, t.numPieces()) for i := 0; i < b.N; i += 1 { - t.updateAllPiecePriorities() + t.updateAllPiecePriorities("") } } @@ -163,6 +163,8 @@ // Check the behaviour of Torrent.Metainfo when metadata is not completed. func TestTorrentMetainfoIncompleteMetadata(t *testing.T) { cfg := TestingConfig(t) cfg.Debug = true + // Disable this just because we manually initiate a connection without it. + cfg.MinPeerExtensions.SetBit(pp.ExtensionBitFast, false) cl, err := NewClient(cfg) require.NoError(t, err) defer cl.Close() diff --git a/webseed-peer.go b/webseed-peer.go index 9263aa46227f7f356717da7e1f5e36d889fff73c..71cdfcb4c7ce43f62b34ad9dc36b0ad9d375b0ed 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -8,6 +8,7 @@ "net/http" "strings" "sync" + "github.com/anacrolix/log" "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" @@ -20,13 +21,11 @@ client webseed.Client activeRequests map[Request]webseed.Request requesterCond sync.Cond peer Peer + // Number of requester routines. + maxRequests int } var _ peerImpl = (*webseedPeer)(nil) - -func (me *webseedPeer) writeBufferFull() bool { - return false -} func (me *webseedPeer) connStatusString() string { return me.client.Url @@ -45,10 +44,16 @@ func (ws *webseedPeer) writeInterested(interested bool) bool { return true } -func (ws *webseedPeer) _cancel(r Request) bool { - active, ok := ws.activeRequests[r] +func (ws *webseedPeer) _cancel(r RequestIndex) bool { + active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)] if ok { active.Cancel() + if !ws.peer.deleteRequest(r) { + panic("cancelled webseed request should exist") + } + if ws.peer.isLowOnRequests() { + ws.peer.updateRequests("webseedPeer._cancel") + } } return true } @@ -103,11 +108,12 @@ // TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could // return bool if this is even possible, and if it isn't, skip to the next drop candidate. func (ws *webseedPeer) drop() {} -func (ws *webseedPeer) updateRequests() { +func (ws *webseedPeer) handleUpdateRequests() { + ws.peer.maybeUpdateActualRequestState() } func (ws *webseedPeer) onClose() { - ws.peer.logger.Print("closing") + ws.peer.logger.WithLevel(log.Debug).Print("closing") for _, r := range ws.activeRequests { r.Cancel() } @@ -120,6 +126,7 @@ // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not // sure if we can divine which errors indicate cancellation on our end without hitting the // network though. ws.peer.doChunkReadStats(int64(len(result.Bytes))) + ws.peer.readBytes(int64(len(result.Bytes))) ws.peer.t.cl.lock() defer ws.peer.t.cl.unlock() if result.Err != nil { @@ -155,6 +162,6 @@ } } } -func (me *webseedPeer) onNextRequestStateChanged() { - me.peer.applyNextRequestState() +func (me *webseedPeer) isLowOnRequests() bool { + return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests) } diff --git a/webtorrent/tracker_client.go b/webtorrent/tracker_client.go index 7972fb08beb7ca751762bd932c9b0e88364447cb..d75b9f9bbd5973335acc7ea1cbfec84530028669 100644 --- a/webtorrent/tracker_client.go +++ b/webtorrent/tracker_client.go @@ -106,9 +106,18 @@ tc.mu.Unlock() return err } -func (tc *TrackerClient) Run() error { +// Finishes initialization and spawns the run routine, calling onStop when it completes with the +// result. We don't let the caller just spawn the runner directly, since then we can race against +// .Close to finish initialization. +func (tc *TrackerClient) Start(onStop func(error)) { tc.pingTicker = time.NewTicker(60 * time.Second) tc.cond.L = &tc.mu + go func() { + onStop(tc.run()) + }() +} + +func (tc *TrackerClient) run() error { tc.mu.Lock() for !tc.closed { tc.mu.Unlock() @@ -325,7 +334,7 @@ tc.mu.Lock() defer tc.mu.Unlock() offer, ok := tc.outboundOffers[offerId] if !ok { - tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId) + tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %+q", offerId) return } //tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer) diff --git a/wstracker.go b/wstracker.go index 4e83ca5fe790c793d8497e271997f83282ca6ade..f93f784a7f3c87c0bdc458d2d5f3fef38b4e29df 100644 --- a/wstracker.go +++ b/wstracker.go @@ -55,12 +55,11 @@ return fmt.Sprintf("tracker client for %q: %v", url, m) }), }, } - go func() { - err := value.TrackerClient.Run() + value.TrackerClient.Start(func(err error) { if err != nil { me.Logger.Printf("error running tracker client for %q: %v", url, err) } - }() + }) if me.clients == nil { me.clients = make(map[string]*refCountedWebtorrentTrackerClient) }