return nil
}
+// If peer requests are buffered on read, this instructs the amount of memory that might be used to
+// cache pending writes. Assuming 512KiB cached for sending, for 16KiB chunks.
+const localClientReqq = 1 << 5
+
// See the order given in Transmission's tr_peerMsgsNew.
func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
M: map[pp.ExtensionName]pp.ExtensionNumber{
pp.ExtensionNameMetadata: metadataExtendedId,
},
- V: cl.config.ExtendedHandshakeClientVersion,
- // If peer requests are buffered on read, this instructs the amount of memory
- // that might be used to cache pending writes. Assuming 512KiB cached for
- // sending, for 16KiB chunks.
- Reqq: 1 << 5,
+ V: cl.config.ExtendedHandshakeClientVersion,
+ Reqq: localClientReqq,
YourIp: pp.CompactIp(conn.remoteIp()),
Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
Port: cl.incomingPeerPort(),
BytesWritten Count
BytesWrittenData Count
- BytesRead Count
- BytesReadData Count
- BytesReadUsefulData Count
+ BytesRead Count
+ BytesReadData Count
+ BytesReadUsefulData Count
+ BytesReadUsefulIntendedData Count
ChunksWritten Count
}
func (cn *Peer) expectingChunks() bool {
- return len(cn.requests) != 0 && !cn.peerChoking
+ if len(cn.requests) == 0 {
+ return false
+ }
+ if !cn.interested {
+ return false
+ }
+ if !cn.peerChoking {
+ return true
+ }
+ for r := range cn.requests {
+ if cn.peerAllowedFast.Contains(bitmap.BitIndex(r.Index)) {
+ return true
+ }
+ }
+ return false
}
// Returns true if the connection is over IPv6.
return
}
-// func (cn *connection) String() string {
-// var buf bytes.Buffer
-// cn.writeStatus(&buf, nil)
-// return buf.String()
-// }
-
func (cn *Peer) downloadRate() float64 {
- return float64(cn._stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
+ num := cn._stats.BytesReadUsefulData.Int64()
+ if num == 0 {
+ return 0
+ }
+ return float64(num) / cn.totalExpectingTime().Seconds()
+}
+
+func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
+ ret = make(map[pieceIndex]int)
+ for r := range cn.requests {
+ ret[pieceIndex(r.Index)]++
+ }
+ return
}
func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
fmt.Fprint(w, "CLOSED: ")
}
fmt.Fprintln(w, cn.connStatusString())
+ prio, err := cn.peerPriority()
+ prioStr := fmt.Sprintf("%08x", prio)
+ if err != nil {
+ prioStr += ": " + err.Error()
+ }
+ fmt.Fprintf(w, " bep40-prio: %v\n", prioStr)
fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
eventAgeString(cn.lastMessageReceived),
eventAgeString(cn.completedHandshake),
cn.totalExpectingTime(),
)
fmt.Fprintf(w,
- " %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
+ " %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: %d/(%d/%d)-%d/%d, flags: %s, dr: %.1f KiB/s\n",
cn.completedString(),
len(cn.peerTouchedPieces),
&cn._stats.ChunksReadUseful,
&cn._stats.ChunksRead,
&cn._stats.ChunksWritten,
- cn.requestsLowWater,
cn.numLocalRequests(),
cn.nominalMaxRequests(),
+ cn.PeerMaxRequests,
len(cn.peerRequests),
+ localClientReqq,
cn.statusFlags(),
cn.downloadRate()/(1<<10),
)
- //fmt.Fprintf(w, " next pieces: %v%s\n",
- // iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
- // func() string {
- // if cn == t.fastestPeer {
- // return " (fastest)"
- // } else {
- // return ""
- // }
- // }(),
- //)
+ fmt.Fprintf(w, " requested pieces:")
+ for piece, count := range cn.numRequestsByPiece() {
+ fmt.Fprintf(w, " %v (%v)", piece, count)
+ }
+ fmt.Fprintf(w, "\n")
}
func (cn *Peer) close() {
// The actual value to use as the maximum outbound requests.
func (cn *Peer) nominalMaxRequests() (ret int) {
- return cn.PeerMaxRequests
+ return int(clamp(1, int64(cn.PeerMaxRequests), 64))
}
func (cn *Peer) totalExpectingTime() (ret time.Duration) {
chunksReceived.Add("due to allowed fast", 1)
}
- // TODO: This needs to happen immediately, to prevent cancels occurring asynchronously when have
- // actually already received the piece, while we have the Client unlocked to write the data out.
+ // The request needs to be deleted immediately to prevent cancels occurring asynchronously when
+ // have actually already received the piece, while we have the Client unlocked to write the data
+ // out.
+ deletedRequest := false
{
if _, ok := c.requests[req]; ok {
for _, f := range c.callbacks.ReceivedRequested {
}
// Request has been satisfied.
if c.deleteRequest(req) {
- if c.expectingChunks() {
+ deletedRequest = true
+ if !c.peerChoking {
c._chunksReceivedWhileExpecting++
}
} else {
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
+ if deletedRequest {
+ c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
+ }
for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
f(ReceivedUsefulDataEvent{c, msg})
}
c.lastUsefulChunkReceived = time.Now()
- // if t.fastestPeer != c {
- // log.Printf("setting fastest connection %p", c)
- // }
- t.fastestPeer = c
// Need to record that it hasn't been written yet, before we attempt to do
// anything with it.
return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
}
-func (cn *Peer) chunksReceivedWhileExpecting() int64 {
- return cn._chunksReceivedWhileExpecting
-}
-
-func (cn *Peer) fastest() bool {
- return cn == cn.t.fastestPeer
-}
-
func (cn *Peer) peerMaxRequests() int {
return cn.PeerMaxRequests
}
import (
"sort"
"time"
+ "unsafe"
"github.com/anacrolix/multiless"
pp "github.com/anacrolix/torrent/peer_protocol"
func (me clientPieceRequestOrder) less(_i, _j int) bool {
i := me.pieces[_i]
j := me.pieces[_j]
- ml := multiless.New()
- ml.Int(int(j.prio), int(i.prio))
- ml.Bool(j.partial, i.partial)
- ml.Int(i.availability, j.availability)
- return ml.Less()
+ return multiless.New().Int(
+ int(j.prio), int(i.prio),
+ ).Bool(
+ j.partial, i.partial,
+ ).Int(
+ i.availability, j.availability,
+ ).Less()
}
func (cl *Client) requester() {
func (cl *Client) doRequests() {
requestOrder := clientPieceRequestOrder{}
allPeers := make(map[*Torrent][]*Peer)
- storageCapacity := make(map[*Torrent]*int64)
+ // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
+ // TorrentImpl.
+ storageLeft := make(map[*func() *int64]*int64)
for _, t := range cl.torrents {
// TODO: We could do metainfo requests here.
if t.haveInfo() {
- value := int64(t.usualPieceSize())
- storageCapacity[t] = &value
+ if t.storage.Capacity != nil {
+ if _, ok := storageLeft[t.storage.Capacity]; !ok {
+ storageLeft[t.storage.Capacity] = (*t.storage.Capacity)()
+ }
+ }
requestOrder.addPieces(t, t.numPieces())
}
var peers []*Peer
t.iterPeers(func(p *Peer) {
- peers = append(peers, p)
+ if !p.closed.IsSet() {
+ peers = append(peers, p)
+ }
+ })
+ // Sort in *desc* order, approximately the reverse of worseConn where appropriate.
+ sort.Slice(peers, func(i, j int) bool {
+ return multiless.New().Float64(
+ peers[j].downloadRate(), peers[i].downloadRate(),
+ ).Uintptr(
+ uintptr(unsafe.Pointer(peers[j])), uintptr(unsafe.Pointer(peers[i]))).Less()
})
allPeers[t] = peers
}
requestOrder.update()
requestOrder.sort()
+ // For a given piece, the set of allPeers indices that absorbed requests for the piece.
+ contributed := make(map[int]struct{})
for _, p := range requestOrder.pieces {
if p.t.ignorePieceForRequests(p.index) {
continue
}
peers := allPeers[p.t]
torrentPiece := p.t.piece(p.index)
- if left := storageCapacity[p.t]; left != nil {
+ if left := storageLeft[p.t.storage.Capacity]; left != nil {
if *left < int64(torrentPiece.length()) {
continue
}
*left -= int64(torrentPiece.length())
}
p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
- for _, peer := range peers {
- req := Request{pp.Integer(p.index), chunk}
- _, err := peer.request(req)
- if err == nil {
- //log.Printf("requested %v", req)
- break
+ req := Request{pp.Integer(p.index), chunk}
+ const skipAlreadyRequested = false
+ if skipAlreadyRequested {
+ alreadyRequested := false
+ p.t.iterPeers(func(p *Peer) {
+ if _, ok := p.requests[req]; ok {
+ alreadyRequested = true
+ }
+ })
+ if alreadyRequested {
+ return true
+ }
+ }
+ alreadyRequested := false
+ for peerIndex, peer := range peers {
+ if alreadyRequested {
+ // Cancel all requests from "slower" peers after the one that requested it.
+ peer.cancel(req)
+ } else {
+ err := peer.request(req)
+ if err == nil {
+ contributed[peerIndex] = struct{}{}
+ alreadyRequested = true
+ //log.Printf("requested %v", req)
+ }
}
}
return true
})
+ // Move requestees for this piece to the back.
+ lastIndex := len(peers) - 1
+ for peerIndex := range contributed {
+ peers[peerIndex], peers[lastIndex] = peers[lastIndex], peers[peerIndex]
+ delete(contributed, peerIndex)
+ lastIndex--
+ }
}
for _, t := range cl.torrents {
t.iterPeers(func(p *Peer) {
"time"
"unsafe"
+ "github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/webseed"
maxEstablishedConns int
// Set of addrs to which we're attempting to connect. Connections are
// half-open until all handshakes are completed.
- halfOpen map[string]PeerInfo
- fastestPeer *Peer
+ halfOpen map[string]PeerInfo
// Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with
return
}
-func (t *Torrent) sortPieceRequestOrder(sl []pieceIndex) {
- if len(sl) != t.numPieces() {
- panic(len(sl))
- }
- availability := make([]int, len(sl))
- t.iterPeers(func(peer *Peer) {
- for i := range availability {
- if peer.peerHasPiece(i) {
- availability[i]++
- }
- }
- })
-}
-
func (t *Torrent) numConns() int {
return len(t.conns)
}
spew.Fdump(w, t.statsLocked())
peers := t.peersAsSlice()
- sort.Slice(peers, func(i, j int) bool {
- return worseConn(peers[i], peers[j])
+ sort.Slice(peers, func(_i, _j int) bool {
+ i := peers[_i]
+ j := peers[_j]
+ if less, ok := multiless.New().EagerSameLess(
+ i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(),
+ ).LessOk(); ok {
+ return less
+ }
+ return worseConn(i, j)
})
for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1)