]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Expose more callbacks and Request and ChunkSpec
authorMatt Joiner <anacrolix@gmail.com>
Thu, 28 Jan 2021 03:23:22 +0000 (14:23 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 28 Jan 2021 03:23:22 +0000 (14:23 +1100)
15 files changed:
callbacks.go
client.go
client_test.go
misc.go
misc_test.go
peer-impl.go
peerconn.go
peerconn_test.go
piece.go
protocol.go
request-strategy-defaults.go
request-strategy.go
torrent.go
torrent_test.go
webseed-peer.go

index f78c58545e4d41c75daf3aa8d7cc57e957d858a0..f9ba131b136e8859a801837c275f4c247ae60904 100644 (file)
@@ -20,9 +20,21 @@ type Callbacks struct {
        ReceiveEncryptedHandshakeSkeys mse.SecretKeyIter
 
        ReceivedUsefulData []func(ReceivedUsefulDataEvent)
+       ReceivedRequested  []func(PeerMessageEvent)
+       DeletedRequest     []func(PeerRequestEvent)
+       SentRequest        []func(PeerRequestEvent)
+       PeerClosed         []func(*Peer)
+       NewPeer            []func(*Peer)
 }
 
-type ReceivedUsefulDataEvent struct {
+type ReceivedUsefulDataEvent = PeerMessageEvent
+
+type PeerMessageEvent struct {
        Peer    *Peer
        Message *pp.Message
 }
+
+type PeerRequestEvent struct {
+       Peer *Peer
+       Request
+}
index e66d1022ec931b1466c1ccb9adc370faa9840b13..cc59dd34924defcc6c255c012b44c91652bedef0 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1380,11 +1380,11 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
 
                        RemoteAddr: remoteAddr,
                        Network:    network,
+                       callbacks:  &cl.config.Callbacks,
                },
                connString:  connString,
                conn:        nc,
                writeBuffer: new(bytes.Buffer),
-               callbacks:   &cl.config.Callbacks,
        }
        c.peerImpl = c
        c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
@@ -1395,6 +1395,9 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
                r: c.r,
        }
        c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
+       for _, f := range cl.config.Callbacks.NewPeer {
+               f(&c.Peer)
+       }
        return
 }
 
index 24820ac49a07f41f520d507709332e33de577dac..f408013952aa2637650cf51ed1fb5759f4a2f948 100644 (file)
@@ -108,7 +108,7 @@ func TestTorrentInitialState(t *testing.T) {
        tor.cl.lock()
        assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
        tor.cl.unlock()
-       assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
+       assert.EqualValues(t, ChunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
 }
 
 func TestReducedDialTimeout(t *testing.T) {
diff --git a/misc.go b/misc.go
index 9cc33bcaf38a560b1662db759c59d8df36a5ff67..5ad01d8594b7ab037fbc080516d61fdf7ab4e3c2 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -11,16 +11,16 @@ import (
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
-type chunkSpec struct {
+type ChunkSpec struct {
        Begin, Length pp.Integer
 }
 
-type request struct {
+type Request struct {
        Index pp.Integer
-       chunkSpec
+       ChunkSpec
 }
 
-func (r request) ToMsg(mt pp.MessageType) pp.Message {
+func (r Request) ToMsg(mt pp.MessageType) pp.Message {
        return pp.Message{
                Type:   mt,
                Index:  r.Index,
@@ -29,11 +29,11 @@ func (r request) ToMsg(mt pp.MessageType) pp.Message {
        }
 }
 
-func newRequest(index, begin, length pp.Integer) request {
-       return request{index, chunkSpec{begin, length}}
+func newRequest(index, begin, length pp.Integer) Request {
+       return Request{index, ChunkSpec{begin, length}}
 }
 
-func newRequestFromMessage(msg *pp.Message) request {
+func newRequestFromMessage(msg *pp.Message) Request {
        switch msg.Type {
        case pp.Request, pp.Cancel, pp.Reject:
                return newRequest(msg.Index, msg.Begin, msg.Length)
@@ -55,7 +55,7 @@ func metadataPieceSize(totalSize int, piece int) int {
 
 // Return the request that would include the given offset into the torrent data.
 func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
-       r request, ok bool) {
+       r Request, ok bool) {
        if offset < 0 || offset >= torrentLength {
                return
        }
@@ -74,10 +74,10 @@ func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
        return
 }
 
-func torrentRequestOffset(torrentLength, pieceSize int64, r request) (off int64) {
+func torrentRequestOffset(torrentLength, pieceSize int64, r Request) (off int64) {
        off = int64(r.Index)*pieceSize + int64(r.Begin)
        if off < 0 || off >= torrentLength {
-               panic("invalid request")
+               panic("invalid Request")
        }
        return
 }
@@ -98,8 +98,8 @@ func validateInfo(info *metainfo.Info) error {
        return nil
 }
 
-func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) chunkSpec {
-       ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize}
+func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) ChunkSpec {
+       ret := ChunkSpec{pp.Integer(index) * chunkSize, chunkSize}
        if ret.Begin+ret.Length > pieceLength {
                ret.Length = pieceLength - ret.Begin
        }
index a5ff2fea353a92bcb90e666b2baed639219b41ad..419ec4d07411bdbdd14ae56f6d07604ab3aeddff 100644 (file)
@@ -12,7 +12,7 @@ import (
 )
 
 func TestTorrentOffsetRequest(t *testing.T) {
-       check := func(tl, ps, off int64, expected request, ok bool) {
+       check := func(tl, ps, off int64, expected Request, ok bool) {
                req, _ok := torrentOffsetRequest(tl, ps, defaultChunkSize, off)
                assert.Equal(t, _ok, ok)
                assert.Equal(t, req, expected)
@@ -20,7 +20,7 @@ func TestTorrentOffsetRequest(t *testing.T) {
        check(13, 5, 0, newRequest(0, 0, 5), true)
        check(13, 5, 3, newRequest(0, 0, 5), true)
        check(13, 5, 11, newRequest(2, 0, 3), true)
-       check(13, 5, 13, request{}, false)
+       check(13, 5, 13, Request{}, false)
 }
 
 func TestIterBitmapsDistinct(t *testing.T) {
index 81ec26ef25389ebd814f39fdef0a2cd813247633..a04a160b3b466f90bb4882464ab018de6773de63 100644 (file)
@@ -10,12 +10,12 @@ import (
 type peerImpl interface {
        updateRequests()
        writeInterested(interested bool) bool
-       cancel(request) bool
+       cancel(Request) bool
        // Return true if there's room for more activity.
-       request(request) bool
+       request(Request) bool
        connectionFlags() string
        onClose()
-       _postCancel(request)
+       _postCancel(Request)
        onGotInfo(*metainfo.Info)
        drop()
        String() string
index b5ff73f247f6a6880f8ff7cb62b22d3c8b53ca86..ac43e8d5b4517a53510be9fc509b8bd6bcc88bfc 100644 (file)
@@ -53,6 +53,7 @@ type Peer struct {
        t *Torrent
 
        peerImpl
+       callbacks *Callbacks
 
        outgoing   bool
        Network    string
@@ -82,12 +83,12 @@ type Peer struct {
        _chunksReceivedWhileExpecting       int64
 
        choking          bool
-       requests         map[request]struct{}
+       requests         map[Request]struct{}
        requestsLowWater int
        // Chunks that we might reasonably expect to receive from the peer. Due to
        // latency, buffering, and implementation differences, we may receive
        // chunks that are no longer in the set of requests actually want.
-       validReceiveChunks map[request]int
+       validReceiveChunks map[Request]int
        // Indexed by metadata piece, set to true if posted and pending a
        // response.
        metadataRequests []bool
@@ -96,7 +97,7 @@ type Peer struct {
        // Stuff controlled by the remote peer.
        peerInterested        bool
        peerChoking           bool
-       peerRequests          map[request]*peerRequestState
+       peerRequests          map[Request]*peerRequestState
        PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
        PeerListenPort        int
        // The pieces the peer has claimed to have.
@@ -146,8 +147,6 @@ type PeerConn struct {
        writerCond  sync.Cond
 
        pex pexConnState
-
-       callbacks *Callbacks
 }
 
 func (cn *PeerConn) connStatusString() string {
@@ -354,6 +353,9 @@ func (cn *Peer) close() {
        cn.discardPieceInclination()
        cn._pieceRequestOrder.Clear()
        cn.peerImpl.onClose()
+       for _, f := range cn.callbacks.PeerClosed {
+               f(cn)
+       }
 }
 
 func (cn *PeerConn) onClose() {
@@ -451,7 +453,7 @@ func (cn *Peer) totalExpectingTime() (ret time.Duration) {
 
 }
 
-func (cn *PeerConn) onPeerSentCancel(r request) {
+func (cn *PeerConn) onPeerSentCancel(r Request) {
        if _, ok := cn.peerRequests[r]; !ok {
                torrent.Add("unexpected cancels received", 1)
                return
@@ -523,7 +525,7 @@ func (pc *PeerConn) writeInterested(interested bool) bool {
 // are okay.
 type messageWriter func(pp.Message) bool
 
-func (cn *Peer) request(r request) bool {
+func (cn *Peer) request(r Request) bool {
        if _, ok := cn.requests[r]; ok {
                panic("chunk already requested")
        }
@@ -550,20 +552,23 @@ func (cn *Peer) request(r request) bool {
                panic("piece is queued for hash")
        }
        if cn.requests == nil {
-               cn.requests = make(map[request]struct{})
+               cn.requests = make(map[Request]struct{})
        }
        cn.requests[r] = struct{}{}
        if cn.validReceiveChunks == nil {
-               cn.validReceiveChunks = make(map[request]int)
+               cn.validReceiveChunks = make(map[Request]int)
        }
        cn.validReceiveChunks[r]++
        cn.t.pendingRequests[r]++
        cn.t.requestStrategy.hooks().sentRequest(r)
        cn.updateExpectingChunks()
+       for _, f := range cn.callbacks.SentRequest {
+               f(PeerRequestEvent{cn, r})
+       }
        return cn.peerImpl.request(r)
 }
 
-func (me *PeerConn) request(r request) bool {
+func (me *PeerConn) request(r Request) bool {
        return me.write(pp.Message{
                Type:   pp.Request,
                Index:  r.Index,
@@ -572,7 +577,7 @@ func (me *PeerConn) request(r request) bool {
        })
 }
 
-func (me *PeerConn) cancel(r request) bool {
+func (me *PeerConn) cancel(r Request) bool {
        return me.write(makeCancelMessage(r))
 }
 
@@ -593,7 +598,7 @@ func (cn *Peer) doRequestState() bool {
        } else if len(cn.requests) <= cn.requestsLowWater {
                filledBuffer := false
                cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
-                       cn.iterPendingRequests(pieceIndex, func(r request) bool {
+                       cn.iterPendingRequests(pieceIndex, func(r Request) bool {
                                if !cn.setInterested(true) {
                                        filledBuffer = true
                                        return false
@@ -789,11 +794,11 @@ func (cn *Peer) iterPendingPiecesUntyped(f iter.Callback) {
        cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
 }
 
-func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
+func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(Request) bool) bool {
        return cn.t.requestStrategy.iterUndirtiedChunks(
                cn.t.piece(piece).requestStrategyPiece(),
-               func(cs chunkSpec) bool {
-                       return f(request{pp.Integer(piece), cs})
+               func(cs ChunkSpec) bool {
+                       return f(Request{pp.Integer(piece), cs})
                },
        )
 }
@@ -1003,7 +1008,7 @@ func (c *PeerConn) fastEnabled() bool {
        return c.PeerExtensionBytes.SupportsFast() && c.t.cl.config.Extensions.SupportsFast()
 }
 
-func (c *PeerConn) reject(r request) {
+func (c *PeerConn) reject(r Request) {
        if !c.fastEnabled() {
                panic("fast not enabled")
        }
@@ -1011,7 +1016,7 @@ func (c *PeerConn) reject(r request) {
        delete(c.peerRequests, r)
 }
 
-func (c *PeerConn) onReadRequest(r request) error {
+func (c *PeerConn) onReadRequest(r Request) error {
        requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
        if _, ok := c.peerRequests[r]; ok {
                torrent.Add("duplicate requests received", 1)
@@ -1043,10 +1048,10 @@ func (c *PeerConn) onReadRequest(r request) error {
        // Check this after we know we have the piece, so that the piece length will be known.
        if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) {
                torrent.Add("bad requests received", 1)
-               return errors.New("bad request")
+               return errors.New("bad Request")
        }
        if c.peerRequests == nil {
-               c.peerRequests = make(map[request]*peerRequestState, maxRequests)
+               c.peerRequests = make(map[Request]*peerRequestState, maxRequests)
        }
        value := &peerRequestState{}
        c.peerRequests[r] = value
@@ -1055,7 +1060,7 @@ func (c *PeerConn) onReadRequest(r request) error {
        return nil
 }
 
-func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) {
+func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
        b, err := readPeerRequestData(r, c)
        c.locker().Lock()
        defer c.locker().Unlock()
@@ -1072,8 +1077,8 @@ func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) {
 
 // If this is maintained correctly, we might be able to support optional synchronous reading for
 // chunk sending, the way it used to work.
-func (c *PeerConn) peerRequestDataReadFailed(err error, r request) {
-       c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer request %v: %v", r, err)
+func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
+       c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer Request %v: %v", r, err)
        i := pieceIndex(r.Index)
        if c.t.pieceComplete(i) {
                // There used to be more code here that just duplicated the following break. Piece
@@ -1092,7 +1097,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r request) {
        c.choke(c.post)
 }
 
-func readPeerRequestData(r request, c *PeerConn) ([]byte, error) {
+func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) {
        b := make([]byte, r.Length)
        p := c.t.info.Piece(int(r.Index))
        n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
@@ -1241,13 +1246,13 @@ func (c *PeerConn) mainReadLoop() (err error) {
        }
 }
 
-func (c *Peer) remoteRejectedRequest(r request) {
+func (c *Peer) remoteRejectedRequest(r Request) {
        if c.deleteRequest(r) {
                c.decExpectedChunkReceive(r)
        }
 }
 
-func (c *Peer) decExpectedChunkReceive(r request) {
+func (c *Peer) decExpectedChunkReceive(r Request) {
        count := c.validReceiveChunks[r]
        if count == 1 {
                delete(c.validReceiveChunks, r)
@@ -1396,7 +1401,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        piece.incrementPendingWrites()
        // Record that we have the chunk, so we aren't trying to download it while
        // waiting for it to be written to storage.
-       piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
+       piece.unpendChunkIndex(chunkIndex(req.ChunkSpec, t.chunkSize))
 
        // Cancel pending requests for this chunk.
        for c := range t.conns {
@@ -1537,11 +1542,14 @@ func (c *Peer) numLocalRequests() int {
        return len(c.requests)
 }
 
-func (c *Peer) deleteRequest(r request) bool {
+func (c *Peer) deleteRequest(r Request) bool {
        if _, ok := c.requests[r]; !ok {
                return false
        }
        delete(c.requests, r)
+       for _, f := range c.callbacks.DeletedRequest {
+               f(PeerRequestEvent{c, r})
+       }
        c.updateExpectingChunks()
        c.t.requestStrategy.hooks().deletedRequest(r)
        pr := c.t.pendingRequests
@@ -1594,7 +1602,7 @@ func (c *PeerConn) tickleWriter() {
        c.writerCond.Broadcast()
 }
 
-func (c *Peer) postCancel(r request) bool {
+func (c *Peer) postCancel(r Request) bool {
        if !c.deleteRequest(r) {
                return false
        }
@@ -1602,11 +1610,11 @@ func (c *Peer) postCancel(r request) bool {
        return true
 }
 
-func (c *PeerConn) _postCancel(r request) {
+func (c *PeerConn) _postCancel(r Request) {
        c.post(makeCancelMessage(r))
 }
 
-func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
+func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
        c.lastChunkSent = time.Now()
        return msg(pp.Message{
                Type:  pp.Piece,
index 5fbc565a5284fe3a40df16b6065f08f72b3a8889..654f7689fcd1d9fc105e14ff34d6e02e53be5066 100644 (file)
@@ -134,7 +134,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
                        // The chunk must be written to storage everytime, to ensure the
                        // writeSem is unlocked.
                        t.pieces[0]._dirtyChunks.Clear()
-                       cn.validReceiveChunks = map[request]int{newRequestFromMessage(&msg): 1}
+                       cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1}
                        cl.unlock()
                        n, err := w.Write(wb)
                        require.NoError(b, err)
index e4236a7bb80d16d4efe433979bd6b8d14dfb7180..fb9c8056f7f6fda3b8df53bd9d2cca6223717047 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -82,7 +82,7 @@ func (p *Piece) pendingChunkIndex(chunkIndex int) bool {
        return !p._dirtyChunks.Contains(chunkIndex)
 }
 
-func (p *Piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool {
+func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
        return p.pendingChunkIndex(chunkIndex(cs, chunkSize))
 }
 
@@ -137,12 +137,12 @@ func (p *Piece) chunkIndexDirty(chunk pp.Integer) bool {
        return p._dirtyChunks.Contains(bitmap.BitIndex(chunk))
 }
 
-func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec {
+func (p *Piece) chunkIndexSpec(chunk pp.Integer) ChunkSpec {
        return chunkIndexSpec(chunk, p.length(), p.chunkSize())
 }
 
-func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) request {
-       return request{
+func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) Request {
+       return Request{
                pp.Integer(p.index),
                chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()),
        }
index 3faeb6ecc37aa74bb0bb7774b3e438ef7bc4664f..82e36d35700e6a4a5ba6e6d258ad9a1cd6cc8cf3 100644 (file)
@@ -4,6 +4,6 @@ import (
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
-func makeCancelMessage(r request) pp.Message {
+func makeCancelMessage(r Request) pp.Message {
        return pp.MakeCancelMessage(r.Index, r.Begin, r.Length)
 }
index 6bfba8f4573de8f8c9b0d1b6da2dea37b909fd1f..1ece5fc69b52861afaad4831606ade05fe10127d 100644 (file)
@@ -11,12 +11,12 @@ type requestStrategyDefaults struct{}
 
 func (requestStrategyDefaults) hooks() requestStrategyHooks {
        return requestStrategyHooks{
-               sentRequest:    func(request) {},
-               deletedRequest: func(request) {},
+               sentRequest:    func(Request) {},
+               deletedRequest: func(Request) {},
        }
 }
 
-func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
        chunkIndices := p.dirtyChunks().Copy()
        chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
        return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
@@ -24,7 +24,7 @@ func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f fun
                if err != nil {
                        panic(err)
                }
-               return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
+               return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
        })
 }
 
index 07d0cce6005eaf3e399bda60afd6cd3701a27e85..75cff9637fcbdb957329ea09a2397a358d90442e 100644 (file)
@@ -14,7 +14,7 @@ import (
 type requestStrategyPiece interface {
        numChunks() pp.Integer
        dirtyChunks() bitmap.Bitmap
-       chunkIndexRequest(i pp.Integer) request
+       chunkIndexRequest(i pp.Integer) Request
 }
 
 type requestStrategyTorrent interface {
@@ -39,7 +39,7 @@ type requestStrategyConnection interface {
 
 type requestStrategy interface {
        iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool
-       iterUndirtiedChunks(requestStrategyPiece, func(chunkSpec) bool) bool
+       iterUndirtiedChunks(requestStrategyPiece, func(ChunkSpec) bool) bool
        nominalMaxRequests(requestStrategyConnection) int
        shouldRequestWithoutBias(requestStrategyConnection) bool
        piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
@@ -47,12 +47,12 @@ type requestStrategy interface {
 }
 
 type requestStrategyHooks struct {
-       sentRequest    func(request)
-       deletedRequest func(request)
+       sentRequest    func(Request)
+       deletedRequest func(Request)
 }
 
 type requestStrategyCallbacks interface {
-       requestTimedOut(request)
+       requestTimedOut(Request)
 }
 
 type requestStrategyFuzzing struct {
@@ -103,7 +103,7 @@ type requestStrategyDuplicateRequestTimeout struct {
 
        // The last time we requested a chunk. Deleting the request from any connection will clear this
        // value.
-       lastRequested map[request]*time.Timer
+       lastRequested map[Request]*time.Timer
        // The lock to take when running a request timeout handler.
        timeoutLocker sync.Locker
 }
@@ -118,7 +118,7 @@ func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duratio
                return requestStrategyDuplicateRequestTimeout{
                        duplicateRequestTimeout: duplicateRequestTimeout,
                        callbacks:               callbacks,
-                       lastRequested:           make(map[request]*time.Timer),
+                       lastRequested:           make(map[Request]*time.Timer),
                        timeoutLocker:           clientLocker,
                }
        }
@@ -126,7 +126,7 @@ func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duratio
 
 func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
        return requestStrategyHooks{
-               deletedRequest: func(r request) {
+               deletedRequest: func(r Request) {
                        if t, ok := rs.lastRequested[r]; ok {
                                t.Stop()
                                delete(rs.lastRequested, r)
@@ -136,7 +136,7 @@ func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
        }
 }
 
-func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
        for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
                if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
                        continue
@@ -145,7 +145,7 @@ func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestSt
                if rs.wouldDuplicateRecent(r) {
                        continue
                }
-               if !f(r.chunkSpec) {
+               if !f(r.ChunkSpec) {
                        return false
                }
        }
@@ -185,7 +185,7 @@ func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection,
        return defaultIterPendingPieces(rs, cn, cb)
 }
 
-func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) {
+func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r Request) {
        rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
                rs.timeoutLocker.Lock()
                delete(rs.lastRequested, r)
@@ -215,7 +215,7 @@ func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestSt
                ),
        ))
 }
-func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool {
+func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r Request) bool {
        // This piece has been requested on another connection, and the duplicate request timer is still
        // running.
        _, ok := rs.lastRequested[r]
index 9e36778baf107b5d69f3662ca2814df53c646e8f..059e5883bd6e806fbe55c0fbb5b744498c1f56dd 100644 (file)
@@ -144,7 +144,7 @@ type Torrent struct {
        connPieceInclinationPool sync.Pool
 
        // Count of each request across active connections.
-       pendingRequests map[request]int
+       pendingRequests map[Request]int
 
        pex pexState
 }
@@ -424,7 +424,7 @@ func (t *Torrent) onSetInfo() {
        t.cl.event.Broadcast()
        t.gotMetainfo.Set()
        t.updateWantPeersEvent()
-       t.pendingRequests = make(map[request]int)
+       t.pendingRequests = make(map[Request]int)
        t.tryCreateMorePieceHashers()
 }
 
@@ -744,13 +744,13 @@ func (t *Torrent) close() (err error) {
        return
 }
 
-func (t *Torrent) requestOffset(r request) int64 {
+func (t *Torrent) requestOffset(r Request) int64 {
        return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
 }
 
 // Return the request that would include the given offset into the torrent data. Returns !ok if
 // there is no such request.
-func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
+func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
        return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
 }
 
@@ -846,7 +846,7 @@ func (t *Torrent) maybeDropMutuallyCompletePeer(
        p.drop()
 }
 
-func (t *Torrent) haveChunk(r request) (ret bool) {
+func (t *Torrent) haveChunk(r Request) (ret bool) {
        // defer func() {
        //      log.Println("have chunk", r, ret)
        // }()
@@ -857,10 +857,10 @@ func (t *Torrent) haveChunk(r request) (ret bool) {
                return true
        }
        p := &t.pieces[r.Index]
-       return !p.pendingChunk(r.chunkSpec, t.chunkSize)
+       return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
 }
 
-func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
+func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int {
        return int(cs.Begin / chunkSize)
 }
 
@@ -1073,8 +1073,8 @@ func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
        return ret
 }
 
-func (t *Torrent) pendRequest(req request) {
-       ci := chunkIndex(req.chunkSpec, t.chunkSize)
+func (t *Torrent) pendRequest(req Request) {
+       ci := chunkIndex(req.ChunkSpec, t.chunkSize)
        t.pieces[req.Index].pendChunkIndex(ci)
 }
 
@@ -2012,8 +2012,8 @@ type torrentRequestStrategyCallbacks struct {
        t *Torrent
 }
 
-func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) {
-       torrent.Add("request timeouts", 1)
+func (cb torrentRequestStrategyCallbacks) requestTimedOut(r Request) {
+       torrent.Add("Request timeouts", 1)
        cb.t.cl.lock()
        defer cb.t.cl.unlock()
        cb.t.iterPeers(func(cn *Peer) {
@@ -2098,6 +2098,10 @@ func (t *Torrent) iterPeers(f func(*Peer)) {
        }
 }
 
+func (t *Torrent) callbacks() *Callbacks {
+       return &t.cl.config.Callbacks
+}
+
 func (t *Torrent) addWebSeed(url string) {
        if t.cl.config.DisableWebseeds {
                return
@@ -2120,7 +2124,10 @@ func (t *Torrent) addWebSeed(url string) {
                        HttpClient: http.DefaultClient,
                        Url:        url,
                },
-               requests: make(map[request]webseed.Request, maxRequests),
+               requests: make(map[Request]webseed.Request, maxRequests),
+       }
+       for _, f := range t.callbacks().NewPeer {
+               f(&ws.peer)
        }
        ws.peer.logger = t.logger.WithContextValue(&ws)
        ws.peer.peerImpl = &ws
index 2812a0671fc8b1124f50101d54122b1c0e248fd6..e740ab22184f226016cfbf95ed8fc5ebf6e79aef 100644 (file)
@@ -19,8 +19,8 @@ import (
        "github.com/anacrolix/torrent/storage"
 )
 
-func r(i, b, l pp.Integer) request {
-       return request{i, chunkSpec{b, l}}
+func r(i, b, l pp.Integer) Request {
+       return Request{i, ChunkSpec{b, l}}
 }
 
 // Check the given request is correct for various torrent offsets.
@@ -28,15 +28,15 @@ func TestTorrentRequest(t *testing.T) {
        const s = 472183431 // Length of torrent.
        for _, _case := range []struct {
                off int64   // An offset into the torrent.
-               req request // The expected request. The zero value means !ok.
+               req Request // The expected request. The zero value means !ok.
        }{
                // Invalid offset.
-               {-1, request{}},
+               {-1, Request{}},
                {0, r(0, 0, 16384)},
                // One before the end of a piece.
                {1<<18 - 1, r(0, 1<<18-16384, 16384)},
                // Offset beyond torrent length.
-               {472 * 1 << 20, request{}},
+               {472 * 1 << 20, Request{}},
                // One before the end of the torrent. Complicates the chunk length.
                {s - 1, r((s-1)/(1<<18), (s-1)%(1<<18)/(16384)*(16384), 12935)},
                {1, r(0, 0, 16384)},
@@ -46,7 +46,7 @@ func TestTorrentRequest(t *testing.T) {
                {16384, r(0, 16384, 16384)},
        } {
                req, ok := torrentOffsetRequest(472183431, 1<<18, 16384, _case.off)
-               if (_case.req == request{}) == ok {
+               if (_case.req == Request{}) == ok {
                        t.Fatalf("expected %v, got %v", _case.req, req)
                }
                if req != _case.req {
index 162c7a16e6b0e99fb560cefeeabfabb9a639d528..1856a4d0fcbe12e2e1219eea85f819414a8d53e7 100644 (file)
@@ -14,7 +14,7 @@ import (
 
 type webseedPeer struct {
        client   webseed.Client
-       requests map[request]webseed.Request
+       requests map[Request]webseed.Request
        peer     Peer
 }
 
@@ -33,7 +33,7 @@ func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
        ws.client.Info = info
 }
 
-func (ws *webseedPeer) _postCancel(r request) {
+func (ws *webseedPeer) _postCancel(r Request) {
        ws.cancel(r)
 }
 
@@ -41,16 +41,16 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
        return true
 }
 
-func (ws *webseedPeer) cancel(r request) bool {
+func (ws *webseedPeer) cancel(r Request) bool {
        ws.requests[r].Cancel()
        return true
 }
 
-func (ws *webseedPeer) intoSpec(r request) webseed.RequestSpec {
+func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
        return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
 }
 
-func (ws *webseedPeer) request(r request) bool {
+func (ws *webseedPeer) request(r Request) bool {
        webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
        ws.requests[r] = webseedRequest
        go ws.requestResultHandler(r, webseedRequest)
@@ -71,12 +71,12 @@ func (ws *webseedPeer) updateRequests() {
 
 func (ws *webseedPeer) onClose() {}
 
-func (ws *webseedPeer) requestResultHandler(r request, webseedRequest webseed.Request) {
+func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
        result := <-webseedRequest.Result
        ws.peer.t.cl.lock()
        defer ws.peer.t.cl.unlock()
        if result.Err != nil {
-               ws.peer.logger.Printf("request %v rejected: %v", r, result.Err)
+               ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
                // Always close for now. We need to filter out temporary errors, but this is a nightmare in
                // Go. Currently a bad webseed URL can starve out the good ones due to the chunk selection
                // algorithm.