Port: uint16(addr.Port),
})
}
+ if torrent.haveInfo() {
+ conn.initPieceOrder(torrent.NumPieces())
+ }
err = me.connectionLoop(torrent, conn)
if err != nil {
err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
return true
}
-func (cl *Client) assertRequestHeat() {
- dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy)
- if !ok {
- return
- }
- for _, t := range cl.torrents {
- m := make(map[request]int, 3000)
- for _, cn := range t.Conns {
- for r := range cn.Requests {
- m[r]++
- }
- }
- for r, h := range dds.heat[t] {
- if m[r] != h {
- panic(fmt.Sprintln(m[r], h))
- }
- }
- }
-}
-
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
if !t.haveInfo() {
return
}
- for _, p := range me.downloadStrategy.FillRequests(t, c) {
- // Make sure the state of pieces that would have been requested is
- // known.
- me.queueFirstHash(t, p)
- }
- //me.assertRequestHeat()
+ me.downloadStrategy.FillRequests(t, c)
if len(c.Requests) == 0 && !c.PeerChoked {
c.SetInterested(false)
}
}
}
}
- // Do this even if the piece is correct because new first-hashings may
- // need to be scheduled.
- if conn.PeerHasPiece(piece) {
- me.replenishConnRequests(t, conn)
- }
}
if t.haveAllPieces() && me.noUpload {
t.CeaseNetworking()
)
type DownloadStrategy interface {
- // Tops up the outgoing pending requests. Returns the indices of pieces
- // that would be requested. This is used to determine if pieces require
- // hashing so the completed state is known.
- FillRequests(*torrent, *connection) (pieces []int)
+ // Tops up the outgoing pending requests.
+ FillRequests(*torrent, *connection)
TorrentStarted(*torrent)
TorrentStopped(*torrent)
DeleteRequest(*torrent, request)
PendingData(*torrent) bool
}
-type DefaultDownloadStrategy struct {
- heat map[*torrent]map[request]int
-}
+type DefaultDownloadStrategy struct{}
func (me *DefaultDownloadStrategy) PendingData(t *torrent) bool {
return !t.haveAllPieces()
}
-func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {
- if me.heat[t][r] != 0 {
- panic("outstanding requests break invariant")
- }
-}
+func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {}
func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {}
-func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) {
+func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
if c.Interested {
if c.PeerChoked {
return
}
- if len(c.Requests) >= (c.PeerMaxRequests+1)/2 {
+ if len(c.Requests) != 0 {
return
}
}
- th := s.heat[t]
addRequest := func(req request) (again bool) {
- piece := t.Pieces[req.Index]
- if piece.Hashing || piece.QueuedForHash {
- // We can't be sure we want this.
- return true
- }
- if piece.Complete() {
- // We already have this.
- return true
- }
- if c.RequestPending(req) {
- return true
+ return c.Request(req)
+ }
+ for i := range t.Pieces {
+ pieceIndex := c.pieceOrder[i]
+ if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
+ continue
}
- again = c.Request(req)
- if c.RequestPending(req) {
- th[req]++
+ if !t.wantPiece(pieceIndex) {
+ continue
}
- return
- }
- // Then finish off incomplete pieces in order of bytes remaining.
- for _, heatThreshold := range []int{1, 4, 15, 60} {
- for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
- pieceIndex := pp.Integer(e.Value.(int))
- piece := t.Pieces[pieceIndex]
- if !piece.EverHashed {
- pieces = append(pieces, int(pieceIndex))
- continue
- }
- for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
- r := request{pieceIndex, chunkSpec}
- if th[r] >= heatThreshold {
- continue
- }
- if !addRequest(r) {
- return
- }
+ piece := t.Pieces[pieceIndex]
+ for _, cs := range piece.shuffledPendingChunkSpecs() {
+ r := request{pp.Integer(pieceIndex), cs}
+ if !addRequest(r) {
+ return
}
}
}
return
}
-func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {
- if s.heat[t] != nil {
- panic("torrent already started")
- }
- if s.heat == nil {
- s.heat = make(map[*torrent]map[request]int, 10)
- }
- s.heat[t] = make(map[request]int, t.ChunkCount())
-}
+func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {}
func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
- if _, ok := s.heat[t]; !ok {
- panic("torrent not yet started")
- }
- delete(s.heat, t)
}
func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
- m := s.heat[t]
- if m[r] <= 0 {
- panic("no pending requests")
- }
- m[r]--
}
func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
c *connection
t *torrent
s *responsiveDownloadStrategy
-
- // The set of pieces that were considered for requesting.
- pieces map[int]struct{}
}
// Wrapper around connection.request that tracks request heat.
func (me *requestFiller) request(req request) bool {
- if me.pieces == nil {
- me.pieces = make(map[int]struct{})
- }
- // log.Print(req)
- me.pieces[int(req.Index)] = struct{}{}
if me.c.RequestPending(req) {
return true
}
return true
}
-func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) {
+func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
rf := requestFiller{c: c, t: t, s: me}
rf.Run()
- for p := range rf.pieces {
- pieces = append(pieces, p)
- }
return
}
}
func (me *responsiveDownloadStrategy) PendingData(t *torrent) bool {
- return len(me.FillRequests(t, me.dummyConn)) != 0
+ if len(me.priorities[t]) != 0 {
+ return true
+ }
+ for index := range t.Pieces {
+ if t.wantPiece(index) {
+ return true
+ }
+ }
+ return false
}