From 456a2f7c5d47567467451feef5a386722fb4b6bf Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 28 Jan 2021 14:23:22 +1100 Subject: [PATCH] Expose more callbacks and Request and ChunkSpec --- callbacks.go | 14 +++++++- client.go | 5 ++- client_test.go | 2 +- misc.go | 24 ++++++------- misc_test.go | 4 +-- peer-impl.go | 6 ++-- peerconn.go | 68 ++++++++++++++++++++---------------- peerconn_test.go | 2 +- piece.go | 8 ++--- protocol.go | 2 +- request-strategy-defaults.go | 8 ++--- request-strategy.go | 24 ++++++------- torrent.go | 31 +++++++++------- torrent_test.go | 12 +++---- webseed-peer.go | 14 ++++---- 15 files changed, 127 insertions(+), 97 deletions(-) diff --git a/callbacks.go b/callbacks.go index f78c5854..f9ba131b 100644 --- a/callbacks.go +++ b/callbacks.go @@ -20,9 +20,21 @@ type Callbacks struct { ReceiveEncryptedHandshakeSkeys mse.SecretKeyIter ReceivedUsefulData []func(ReceivedUsefulDataEvent) + ReceivedRequested []func(PeerMessageEvent) + DeletedRequest []func(PeerRequestEvent) + SentRequest []func(PeerRequestEvent) + PeerClosed []func(*Peer) + NewPeer []func(*Peer) } -type ReceivedUsefulDataEvent struct { +type ReceivedUsefulDataEvent = PeerMessageEvent + +type PeerMessageEvent struct { Peer *Peer Message *pp.Message } + +type PeerRequestEvent struct { + Peer *Peer + Request +} diff --git a/client.go b/client.go index e66d1022..cc59dd34 100644 --- a/client.go +++ b/client.go @@ -1380,11 +1380,11 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot RemoteAddr: remoteAddr, Network: network, + callbacks: &cl.config.Callbacks, }, connString: connString, conn: nc, writeBuffer: new(bytes.Buffer), - callbacks: &cl.config.Callbacks, } c.peerImpl = c c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c) @@ -1395,6 +1395,9 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot r: c.r, } c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing) + for _, f := range cl.config.Callbacks.NewPeer { + f(&c.Peer) + } return } diff --git a/client_test.go b/client_test.go index 24820ac4..f4080139 100644 --- a/client_test.go +++ b/client_test.go @@ -108,7 +108,7 @@ func TestTorrentInitialState(t *testing.T) { tor.cl.lock() assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0)) tor.cl.unlock() - assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) + assert.EqualValues(t, ChunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) } func TestReducedDialTimeout(t *testing.T) { diff --git a/misc.go b/misc.go index 9cc33bca..5ad01d85 100644 --- a/misc.go +++ b/misc.go @@ -11,16 +11,16 @@ import ( pp "github.com/anacrolix/torrent/peer_protocol" ) -type chunkSpec struct { +type ChunkSpec struct { Begin, Length pp.Integer } -type request struct { +type Request struct { Index pp.Integer - chunkSpec + ChunkSpec } -func (r request) ToMsg(mt pp.MessageType) pp.Message { +func (r Request) ToMsg(mt pp.MessageType) pp.Message { return pp.Message{ Type: mt, Index: r.Index, @@ -29,11 +29,11 @@ func (r request) ToMsg(mt pp.MessageType) pp.Message { } } -func newRequest(index, begin, length pp.Integer) request { - return request{index, chunkSpec{begin, length}} +func newRequest(index, begin, length pp.Integer) Request { + return Request{index, ChunkSpec{begin, length}} } -func newRequestFromMessage(msg *pp.Message) request { +func newRequestFromMessage(msg *pp.Message) Request { switch msg.Type { case pp.Request, pp.Cancel, pp.Reject: return newRequest(msg.Index, msg.Begin, msg.Length) @@ -55,7 +55,7 @@ func metadataPieceSize(totalSize int, piece int) int { // Return the request that would include the given offset into the torrent data. func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) ( - r request, ok bool) { + r Request, ok bool) { if offset < 0 || offset >= torrentLength { return } @@ -74,10 +74,10 @@ func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) ( return } -func torrentRequestOffset(torrentLength, pieceSize int64, r request) (off int64) { +func torrentRequestOffset(torrentLength, pieceSize int64, r Request) (off int64) { off = int64(r.Index)*pieceSize + int64(r.Begin) if off < 0 || off >= torrentLength { - panic("invalid request") + panic("invalid Request") } return } @@ -98,8 +98,8 @@ func validateInfo(info *metainfo.Info) error { return nil } -func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) chunkSpec { - ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize} +func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) ChunkSpec { + ret := ChunkSpec{pp.Integer(index) * chunkSize, chunkSize} if ret.Begin+ret.Length > pieceLength { ret.Length = pieceLength - ret.Begin } diff --git a/misc_test.go b/misc_test.go index a5ff2fea..419ec4d0 100644 --- a/misc_test.go +++ b/misc_test.go @@ -12,7 +12,7 @@ import ( ) func TestTorrentOffsetRequest(t *testing.T) { - check := func(tl, ps, off int64, expected request, ok bool) { + check := func(tl, ps, off int64, expected Request, ok bool) { req, _ok := torrentOffsetRequest(tl, ps, defaultChunkSize, off) assert.Equal(t, _ok, ok) assert.Equal(t, req, expected) @@ -20,7 +20,7 @@ func TestTorrentOffsetRequest(t *testing.T) { check(13, 5, 0, newRequest(0, 0, 5), true) check(13, 5, 3, newRequest(0, 0, 5), true) check(13, 5, 11, newRequest(2, 0, 3), true) - check(13, 5, 13, request{}, false) + check(13, 5, 13, Request{}, false) } func TestIterBitmapsDistinct(t *testing.T) { diff --git a/peer-impl.go b/peer-impl.go index 81ec26ef..a04a160b 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -10,12 +10,12 @@ import ( type peerImpl interface { updateRequests() writeInterested(interested bool) bool - cancel(request) bool + cancel(Request) bool // Return true if there's room for more activity. - request(request) bool + request(Request) bool connectionFlags() string onClose() - _postCancel(request) + _postCancel(Request) onGotInfo(*metainfo.Info) drop() String() string diff --git a/peerconn.go b/peerconn.go index b5ff73f2..ac43e8d5 100644 --- a/peerconn.go +++ b/peerconn.go @@ -53,6 +53,7 @@ type Peer struct { t *Torrent peerImpl + callbacks *Callbacks outgoing bool Network string @@ -82,12 +83,12 @@ type Peer struct { _chunksReceivedWhileExpecting int64 choking bool - requests map[request]struct{} + requests map[Request]struct{} requestsLowWater int // Chunks that we might reasonably expect to receive from the peer. Due to // latency, buffering, and implementation differences, we may receive // chunks that are no longer in the set of requests actually want. - validReceiveChunks map[request]int + validReceiveChunks map[Request]int // Indexed by metadata piece, set to true if posted and pending a // response. metadataRequests []bool @@ -96,7 +97,7 @@ type Peer struct { // Stuff controlled by the remote peer. peerInterested bool peerChoking bool - peerRequests map[request]*peerRequestState + peerRequests map[Request]*peerRequestState PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake PeerListenPort int // The pieces the peer has claimed to have. @@ -146,8 +147,6 @@ type PeerConn struct { writerCond sync.Cond pex pexConnState - - callbacks *Callbacks } func (cn *PeerConn) connStatusString() string { @@ -354,6 +353,9 @@ func (cn *Peer) close() { cn.discardPieceInclination() cn._pieceRequestOrder.Clear() cn.peerImpl.onClose() + for _, f := range cn.callbacks.PeerClosed { + f(cn) + } } func (cn *PeerConn) onClose() { @@ -451,7 +453,7 @@ func (cn *Peer) totalExpectingTime() (ret time.Duration) { } -func (cn *PeerConn) onPeerSentCancel(r request) { +func (cn *PeerConn) onPeerSentCancel(r Request) { if _, ok := cn.peerRequests[r]; !ok { torrent.Add("unexpected cancels received", 1) return @@ -523,7 +525,7 @@ func (pc *PeerConn) writeInterested(interested bool) bool { // are okay. type messageWriter func(pp.Message) bool -func (cn *Peer) request(r request) bool { +func (cn *Peer) request(r Request) bool { if _, ok := cn.requests[r]; ok { panic("chunk already requested") } @@ -550,20 +552,23 @@ func (cn *Peer) request(r request) bool { panic("piece is queued for hash") } if cn.requests == nil { - cn.requests = make(map[request]struct{}) + cn.requests = make(map[Request]struct{}) } cn.requests[r] = struct{}{} if cn.validReceiveChunks == nil { - cn.validReceiveChunks = make(map[request]int) + cn.validReceiveChunks = make(map[Request]int) } cn.validReceiveChunks[r]++ cn.t.pendingRequests[r]++ cn.t.requestStrategy.hooks().sentRequest(r) cn.updateExpectingChunks() + for _, f := range cn.callbacks.SentRequest { + f(PeerRequestEvent{cn, r}) + } return cn.peerImpl.request(r) } -func (me *PeerConn) request(r request) bool { +func (me *PeerConn) request(r Request) bool { return me.write(pp.Message{ Type: pp.Request, Index: r.Index, @@ -572,7 +577,7 @@ func (me *PeerConn) request(r request) bool { }) } -func (me *PeerConn) cancel(r request) bool { +func (me *PeerConn) cancel(r Request) bool { return me.write(makeCancelMessage(r)) } @@ -593,7 +598,7 @@ func (cn *Peer) doRequestState() bool { } else if len(cn.requests) <= cn.requestsLowWater { filledBuffer := false cn.iterPendingPieces(func(pieceIndex pieceIndex) bool { - cn.iterPendingRequests(pieceIndex, func(r request) bool { + cn.iterPendingRequests(pieceIndex, func(r Request) bool { if !cn.setInterested(true) { filledBuffer = true return false @@ -789,11 +794,11 @@ func (cn *Peer) iterPendingPiecesUntyped(f iter.Callback) { cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) }) } -func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(request) bool) bool { +func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(Request) bool) bool { return cn.t.requestStrategy.iterUndirtiedChunks( cn.t.piece(piece).requestStrategyPiece(), - func(cs chunkSpec) bool { - return f(request{pp.Integer(piece), cs}) + func(cs ChunkSpec) bool { + return f(Request{pp.Integer(piece), cs}) }, ) } @@ -1003,7 +1008,7 @@ func (c *PeerConn) fastEnabled() bool { return c.PeerExtensionBytes.SupportsFast() && c.t.cl.config.Extensions.SupportsFast() } -func (c *PeerConn) reject(r request) { +func (c *PeerConn) reject(r Request) { if !c.fastEnabled() { panic("fast not enabled") } @@ -1011,7 +1016,7 @@ func (c *PeerConn) reject(r request) { delete(c.peerRequests, r) } -func (c *PeerConn) onReadRequest(r request) error { +func (c *PeerConn) onReadRequest(r Request) error { requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1) if _, ok := c.peerRequests[r]; ok { torrent.Add("duplicate requests received", 1) @@ -1043,10 +1048,10 @@ func (c *PeerConn) onReadRequest(r request) error { // Check this after we know we have the piece, so that the piece length will be known. if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) { torrent.Add("bad requests received", 1) - return errors.New("bad request") + return errors.New("bad Request") } if c.peerRequests == nil { - c.peerRequests = make(map[request]*peerRequestState, maxRequests) + c.peerRequests = make(map[Request]*peerRequestState, maxRequests) } value := &peerRequestState{} c.peerRequests[r] = value @@ -1055,7 +1060,7 @@ func (c *PeerConn) onReadRequest(r request) error { return nil } -func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) { +func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { b, err := readPeerRequestData(r, c) c.locker().Lock() defer c.locker().Unlock() @@ -1072,8 +1077,8 @@ func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) { // If this is maintained correctly, we might be able to support optional synchronous reading for // chunk sending, the way it used to work. -func (c *PeerConn) peerRequestDataReadFailed(err error, r request) { - c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer request %v: %v", r, err) +func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) { + c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer Request %v: %v", r, err) i := pieceIndex(r.Index) if c.t.pieceComplete(i) { // There used to be more code here that just duplicated the following break. Piece @@ -1092,7 +1097,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r request) { c.choke(c.post) } -func readPeerRequestData(r request, c *PeerConn) ([]byte, error) { +func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) { b := make([]byte, r.Length) p := c.t.info.Piece(int(r.Index)) n, err := c.t.readAt(b, p.Offset()+int64(r.Begin)) @@ -1241,13 +1246,13 @@ func (c *PeerConn) mainReadLoop() (err error) { } } -func (c *Peer) remoteRejectedRequest(r request) { +func (c *Peer) remoteRejectedRequest(r Request) { if c.deleteRequest(r) { c.decExpectedChunkReceive(r) } } -func (c *Peer) decExpectedChunkReceive(r request) { +func (c *Peer) decExpectedChunkReceive(r Request) { count := c.validReceiveChunks[r] if count == 1 { delete(c.validReceiveChunks, r) @@ -1396,7 +1401,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { piece.incrementPendingWrites() // Record that we have the chunk, so we aren't trying to download it while // waiting for it to be written to storage. - piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) + piece.unpendChunkIndex(chunkIndex(req.ChunkSpec, t.chunkSize)) // Cancel pending requests for this chunk. for c := range t.conns { @@ -1537,11 +1542,14 @@ func (c *Peer) numLocalRequests() int { return len(c.requests) } -func (c *Peer) deleteRequest(r request) bool { +func (c *Peer) deleteRequest(r Request) bool { if _, ok := c.requests[r]; !ok { return false } delete(c.requests, r) + for _, f := range c.callbacks.DeletedRequest { + f(PeerRequestEvent{c, r}) + } c.updateExpectingChunks() c.t.requestStrategy.hooks().deletedRequest(r) pr := c.t.pendingRequests @@ -1594,7 +1602,7 @@ func (c *PeerConn) tickleWriter() { c.writerCond.Broadcast() } -func (c *Peer) postCancel(r request) bool { +func (c *Peer) postCancel(r Request) bool { if !c.deleteRequest(r) { return false } @@ -1602,11 +1610,11 @@ func (c *Peer) postCancel(r request) bool { return true } -func (c *PeerConn) _postCancel(r request) { +func (c *PeerConn) _postCancel(r Request) { c.post(makeCancelMessage(r)) } -func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { +func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { c.lastChunkSent = time.Now() return msg(pp.Message{ Type: pp.Piece, diff --git a/peerconn_test.go b/peerconn_test.go index 5fbc565a..654f7689 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -134,7 +134,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { // The chunk must be written to storage everytime, to ensure the // writeSem is unlocked. t.pieces[0]._dirtyChunks.Clear() - cn.validReceiveChunks = map[request]int{newRequestFromMessage(&msg): 1} + cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1} cl.unlock() n, err := w.Write(wb) require.NoError(b, err) diff --git a/piece.go b/piece.go index e4236a7b..fb9c8056 100644 --- a/piece.go +++ b/piece.go @@ -82,7 +82,7 @@ func (p *Piece) pendingChunkIndex(chunkIndex int) bool { return !p._dirtyChunks.Contains(chunkIndex) } -func (p *Piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool { +func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool { return p.pendingChunkIndex(chunkIndex(cs, chunkSize)) } @@ -137,12 +137,12 @@ func (p *Piece) chunkIndexDirty(chunk pp.Integer) bool { return p._dirtyChunks.Contains(bitmap.BitIndex(chunk)) } -func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec { +func (p *Piece) chunkIndexSpec(chunk pp.Integer) ChunkSpec { return chunkIndexSpec(chunk, p.length(), p.chunkSize()) } -func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) request { - return request{ +func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) Request { + return Request{ pp.Integer(p.index), chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()), } diff --git a/protocol.go b/protocol.go index 3faeb6ec..82e36d35 100644 --- a/protocol.go +++ b/protocol.go @@ -4,6 +4,6 @@ import ( pp "github.com/anacrolix/torrent/peer_protocol" ) -func makeCancelMessage(r request) pp.Message { +func makeCancelMessage(r Request) pp.Message { return pp.MakeCancelMessage(r.Index, r.Begin, r.Length) } diff --git a/request-strategy-defaults.go b/request-strategy-defaults.go index 6bfba8f4..1ece5fc6 100644 --- a/request-strategy-defaults.go +++ b/request-strategy-defaults.go @@ -11,12 +11,12 @@ type requestStrategyDefaults struct{} func (requestStrategyDefaults) hooks() requestStrategyHooks { return requestStrategyHooks{ - sentRequest: func(request) {}, - deletedRequest: func(request) {}, + sentRequest: func(Request) {}, + deletedRequest: func(Request) {}, } } -func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { +func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool { chunkIndices := p.dirtyChunks().Copy() chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks())) return iter.ForPerm(chunkIndices.Len(), func(i int) bool { @@ -24,7 +24,7 @@ func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f fun if err != nil { panic(err) } - return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec) + return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec) }) } diff --git a/request-strategy.go b/request-strategy.go index 07d0cce6..75cff963 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -14,7 +14,7 @@ import ( type requestStrategyPiece interface { numChunks() pp.Integer dirtyChunks() bitmap.Bitmap - chunkIndexRequest(i pp.Integer) request + chunkIndexRequest(i pp.Integer) Request } type requestStrategyTorrent interface { @@ -39,7 +39,7 @@ type requestStrategyConnection interface { type requestStrategy interface { iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool - iterUndirtiedChunks(requestStrategyPiece, func(chunkSpec) bool) bool + iterUndirtiedChunks(requestStrategyPiece, func(ChunkSpec) bool) bool nominalMaxRequests(requestStrategyConnection) int shouldRequestWithoutBias(requestStrategyConnection) bool piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int @@ -47,12 +47,12 @@ type requestStrategy interface { } type requestStrategyHooks struct { - sentRequest func(request) - deletedRequest func(request) + sentRequest func(Request) + deletedRequest func(Request) } type requestStrategyCallbacks interface { - requestTimedOut(request) + requestTimedOut(Request) } type requestStrategyFuzzing struct { @@ -103,7 +103,7 @@ type requestStrategyDuplicateRequestTimeout struct { // The last time we requested a chunk. Deleting the request from any connection will clear this // value. - lastRequested map[request]*time.Timer + lastRequested map[Request]*time.Timer // The lock to take when running a request timeout handler. timeoutLocker sync.Locker } @@ -118,7 +118,7 @@ func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duratio return requestStrategyDuplicateRequestTimeout{ duplicateRequestTimeout: duplicateRequestTimeout, callbacks: callbacks, - lastRequested: make(map[request]*time.Timer), + lastRequested: make(map[Request]*time.Timer), timeoutLocker: clientLocker, } } @@ -126,7 +126,7 @@ func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duratio func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks { return requestStrategyHooks{ - deletedRequest: func(r request) { + deletedRequest: func(r Request) { if t, ok := rs.lastRequested[r]; ok { t.Stop() delete(rs.lastRequested, r) @@ -136,7 +136,7 @@ func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks { } } -func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { +func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool { for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ { if p.dirtyChunks().Get(bitmap.BitIndex(i)) { continue @@ -145,7 +145,7 @@ func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestSt if rs.wouldDuplicateRecent(r) { continue } - if !f(r.chunkSpec) { + if !f(r.ChunkSpec) { return false } } @@ -185,7 +185,7 @@ func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, return defaultIterPendingPieces(rs, cn, cb) } -func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) { +func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r Request) { rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() { rs.timeoutLocker.Lock() delete(rs.lastRequested, r) @@ -215,7 +215,7 @@ func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestSt ), )) } -func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool { +func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r Request) bool { // This piece has been requested on another connection, and the duplicate request timer is still // running. _, ok := rs.lastRequested[r] diff --git a/torrent.go b/torrent.go index 9e36778b..059e5883 100644 --- a/torrent.go +++ b/torrent.go @@ -144,7 +144,7 @@ type Torrent struct { connPieceInclinationPool sync.Pool // Count of each request across active connections. - pendingRequests map[request]int + pendingRequests map[Request]int pex pexState } @@ -424,7 +424,7 @@ func (t *Torrent) onSetInfo() { t.cl.event.Broadcast() t.gotMetainfo.Set() t.updateWantPeersEvent() - t.pendingRequests = make(map[request]int) + t.pendingRequests = make(map[Request]int) t.tryCreateMorePieceHashers() } @@ -744,13 +744,13 @@ func (t *Torrent) close() (err error) { return } -func (t *Torrent) requestOffset(r request) int64 { +func (t *Torrent) requestOffset(r Request) int64 { return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r) } // Return the request that would include the given offset into the torrent data. Returns !ok if // there is no such request. -func (t *Torrent) offsetRequest(off int64) (req request, ok bool) { +func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) { return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off) } @@ -846,7 +846,7 @@ func (t *Torrent) maybeDropMutuallyCompletePeer( p.drop() } -func (t *Torrent) haveChunk(r request) (ret bool) { +func (t *Torrent) haveChunk(r Request) (ret bool) { // defer func() { // log.Println("have chunk", r, ret) // }() @@ -857,10 +857,10 @@ func (t *Torrent) haveChunk(r request) (ret bool) { return true } p := &t.pieces[r.Index] - return !p.pendingChunk(r.chunkSpec, t.chunkSize) + return !p.pendingChunk(r.ChunkSpec, t.chunkSize) } -func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int { +func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int { return int(cs.Begin / chunkSize) } @@ -1073,8 +1073,8 @@ func (t *Torrent) piecePriority(piece pieceIndex) piecePriority { return ret } -func (t *Torrent) pendRequest(req request) { - ci := chunkIndex(req.chunkSpec, t.chunkSize) +func (t *Torrent) pendRequest(req Request) { + ci := chunkIndex(req.ChunkSpec, t.chunkSize) t.pieces[req.Index].pendChunkIndex(ci) } @@ -2012,8 +2012,8 @@ type torrentRequestStrategyCallbacks struct { t *Torrent } -func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) { - torrent.Add("request timeouts", 1) +func (cb torrentRequestStrategyCallbacks) requestTimedOut(r Request) { + torrent.Add("Request timeouts", 1) cb.t.cl.lock() defer cb.t.cl.unlock() cb.t.iterPeers(func(cn *Peer) { @@ -2098,6 +2098,10 @@ func (t *Torrent) iterPeers(f func(*Peer)) { } } +func (t *Torrent) callbacks() *Callbacks { + return &t.cl.config.Callbacks +} + func (t *Torrent) addWebSeed(url string) { if t.cl.config.DisableWebseeds { return @@ -2120,7 +2124,10 @@ func (t *Torrent) addWebSeed(url string) { HttpClient: http.DefaultClient, Url: url, }, - requests: make(map[request]webseed.Request, maxRequests), + requests: make(map[Request]webseed.Request, maxRequests), + } + for _, f := range t.callbacks().NewPeer { + f(&ws.peer) } ws.peer.logger = t.logger.WithContextValue(&ws) ws.peer.peerImpl = &ws diff --git a/torrent_test.go b/torrent_test.go index 2812a067..e740ab22 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -19,8 +19,8 @@ import ( "github.com/anacrolix/torrent/storage" ) -func r(i, b, l pp.Integer) request { - return request{i, chunkSpec{b, l}} +func r(i, b, l pp.Integer) Request { + return Request{i, ChunkSpec{b, l}} } // Check the given request is correct for various torrent offsets. @@ -28,15 +28,15 @@ func TestTorrentRequest(t *testing.T) { const s = 472183431 // Length of torrent. for _, _case := range []struct { off int64 // An offset into the torrent. - req request // The expected request. The zero value means !ok. + req Request // The expected request. The zero value means !ok. }{ // Invalid offset. - {-1, request{}}, + {-1, Request{}}, {0, r(0, 0, 16384)}, // One before the end of a piece. {1<<18 - 1, r(0, 1<<18-16384, 16384)}, // Offset beyond torrent length. - {472 * 1 << 20, request{}}, + {472 * 1 << 20, Request{}}, // One before the end of the torrent. Complicates the chunk length. {s - 1, r((s-1)/(1<<18), (s-1)%(1<<18)/(16384)*(16384), 12935)}, {1, r(0, 0, 16384)}, @@ -46,7 +46,7 @@ func TestTorrentRequest(t *testing.T) { {16384, r(0, 16384, 16384)}, } { req, ok := torrentOffsetRequest(472183431, 1<<18, 16384, _case.off) - if (_case.req == request{}) == ok { + if (_case.req == Request{}) == ok { t.Fatalf("expected %v, got %v", _case.req, req) } if req != _case.req { diff --git a/webseed-peer.go b/webseed-peer.go index 162c7a16..1856a4d0 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -14,7 +14,7 @@ import ( type webseedPeer struct { client webseed.Client - requests map[request]webseed.Request + requests map[Request]webseed.Request peer Peer } @@ -33,7 +33,7 @@ func (ws *webseedPeer) onGotInfo(info *metainfo.Info) { ws.client.Info = info } -func (ws *webseedPeer) _postCancel(r request) { +func (ws *webseedPeer) _postCancel(r Request) { ws.cancel(r) } @@ -41,16 +41,16 @@ func (ws *webseedPeer) writeInterested(interested bool) bool { return true } -func (ws *webseedPeer) cancel(r request) bool { +func (ws *webseedPeer) cancel(r Request) bool { ws.requests[r].Cancel() return true } -func (ws *webseedPeer) intoSpec(r request) webseed.RequestSpec { +func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)} } -func (ws *webseedPeer) request(r request) bool { +func (ws *webseedPeer) request(r Request) bool { webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) ws.requests[r] = webseedRequest go ws.requestResultHandler(r, webseedRequest) @@ -71,12 +71,12 @@ func (ws *webseedPeer) updateRequests() { func (ws *webseedPeer) onClose() {} -func (ws *webseedPeer) requestResultHandler(r request, webseedRequest webseed.Request) { +func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) { result := <-webseedRequest.Result ws.peer.t.cl.lock() defer ws.peer.t.cl.unlock() if result.Err != nil { - ws.peer.logger.Printf("request %v rejected: %v", r, result.Err) + ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err) // Always close for now. We need to filter out temporary errors, but this is a nightmare in // Go. Currently a bad webseed URL can starve out the good ones due to the chunk selection // algorithm. -- 2.44.0