"errors"
"fmt"
"io"
+ "math"
"math/rand"
"net"
"strconv"
lastBecameInterested time.Time
priorInterest time.Duration
+ lastStartedExpectingToReceiveChunks time.Time
+ cumulativeExpectedToReceiveChunks time.Duration
+ chunksReceivedWhileExpecting int64
+
Choked bool
requests map[request]struct{}
requestsLowWater int
writerCond sync.Cond
}
+func (cn *connection) updateExpectingChunks() {
+ if cn.expectingChunks() {
+ if cn.lastStartedExpectingToReceiveChunks.IsZero() {
+ cn.lastStartedExpectingToReceiveChunks = time.Now()
+ }
+ } else {
+ if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
+ cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
+ cn.lastStartedExpectingToReceiveChunks = time.Time{}
+ }
+ }
+}
+
+func (cn *connection) expectingChunks() bool {
+ return cn.Interested && !cn.PeerChoked
+}
+
// Returns true if the connection is over IPv6.
func (cn *connection) ipv6() bool {
ip := missinggo.AddrIP(cn.remoteAddr())
func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
// \t isn't preserved in <pre> blocks?
fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
- fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s\n",
+ fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
eventAgeString(cn.lastMessageReceived),
eventAgeString(cn.completedHandshake),
eventAgeString(cn.lastHelpful()),
cn.cumInterest(),
+ cn.totalExpectingTime(),
)
fmt.Fprintf(w,
" %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
// The actual value to use as the maximum outbound requests.
func (cn *connection) nominalMaxRequests() (ret int) {
+ if cn.t.requestStrategy == 3 {
+ expectingTime := int64(cn.totalExpectingTime())
+ if expectingTime == 0 {
+ expectingTime = math.MaxInt64
+ }
+ return int(clamp(
+ 1,
+ int64(cn.PeerMaxRequests),
+ max(
+ // It makes sense to always pipeline at least one connection,
+ // since latency must be non-zero.
+ 2,
+ // Request only as many as we expect to receive in the
+ // dupliateRequestTimeout window. We are trying to avoid having to
+ // duplicate requests.
+ cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
+ ),
+ ))
+ }
return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
}
+func (cn *connection) totalExpectingTime() (ret time.Duration) {
+ ret = cn.cumulativeExpectedToReceiveChunks
+ if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
+ ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
+ }
+ return
+
+}
+
func (cn *connection) onPeerSentCancel(r request) {
if _, ok := cn.PeerRequests[r]; !ok {
torrent.Add("unexpected cancels received", 1)
} else if !cn.lastBecameInterested.IsZero() {
cn.priorInterest += time.Since(cn.lastBecameInterested)
}
+ cn.updateExpectingChunks()
// log.Printf("%p: setting interest: %v", cn, interested)
return msg(pp.Message{
Type: func() pp.MessageType {
}
cn.requests[r] = struct{}{}
cn.t.pendingRequests[r]++
+ cn.t.lastRequested[r] = time.Now()
+ cn.updateExpectingChunks()
return mw(pp.Message{
Type: pp.Request,
Index: r.Index,
}
func (cn *connection) pieceRequestOrderIter() iter.Func {
+ if cn.t.requestStrategy == 3 {
+ return cn.unbiasedPieceRequestOrder()
+ }
if cn.shouldRequestWithoutBias() {
return cn.unbiasedPieceRequestOrder()
} else {
piece := _piece.(int)
return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
r := request{pp.Integer(piece), cs}
- // log.Println(r, cn.t.pendingRequests[r], cn.requests)
- // if _, ok := cn.requests[r]; !ok && cn.t.pendingRequests[r] != 0 {
- // return true
- // }
+ if cn.t.requestStrategy == 3 {
+ lr := cn.t.lastRequested[r]
+ if !lr.IsZero() {
+ if time.Since(lr) < cn.t.duplicateRequestTimeout {
+ return true
+ } else {
+ torrent.Add("requests duplicated due to timeout", 1)
+ }
+ }
+ }
return f(r)
})
})
c.deleteAllRequests()
// We can then reset our interest.
c.updateRequests()
+ c.updateExpectingChunks()
case pp.Reject:
c.deleteRequest(newRequestFromMessage(&msg))
case pp.Unchoke:
c.PeerChoked = false
c.tickleWriter()
+ c.updateExpectingChunks()
case pp.Interested:
c.PeerInterested = true
c.tickleWriter()
// Request has been satisfied.
if c.deleteRequest(req) {
+ if c.expectingChunks() {
+ c.chunksReceivedWhileExpecting++
+ }
c.updateRequests()
} else {
torrent.Add("chunks received unexpected", 1)
return false
}
delete(c.requests, r)
+ c.updateExpectingChunks()
+ delete(c.t.lastRequested, r)
pr := c.t.pendingRequests
pr[r]--
n := pr[r]
logger *log.Logger
networkingEnabled bool
+
// Determines what chunks to request from peers. 1: Favour higher priority
// pieces with some fuzzing to reduce overlaps and wastage across
// connections. 2: The fastest connection downloads strictly in order of
- // priority, while all others adher to their piece inclications.
+ // priority, while all others adher to their piece inclications. 3:
+ // Requests are strictly by piece priority, and not duplicated until
+ // duplicateRequestTimeout is reached.
requestStrategy int
+ // How long to avoid duplicating a pending request.
+ duplicateRequestTimeout time.Duration
closed missinggo.Event
infoHash metainfo.Hash
// Count of each request across active connections.
pendingRequests map[request]int
+ // The last time we requested a chunk. Deleting the request from any
+ // connection will clear this value.
+ lastRequested map[request]time.Time
}
func (t *Torrent) tickleReaders() {
t.gotMetainfo.Set()
t.updateWantPeersEvent()
t.pendingRequests = make(map[request]int)
+ t.lastRequested = make(map[request]time.Time)
}
// Called when metadata for a torrent becomes available.