]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Store request heat in the download strategy
authorMatt Joiner <anacrolix@gmail.com>
Fri, 23 May 2014 11:01:05 +0000 (21:01 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 23 May 2014 11:01:05 +0000 (21:01 +1000)
This avoids recalculating it for every FillRequest

client.go
connection.go
torrent.go

index 16c434a27986547369279795eba06e8f81986370..7b5d6715a218ca116dc7b29224c4871a7fc3918d 100644 (file)
--- a/client.go
+++ b/client.go
@@ -170,7 +170,7 @@ func (c *Client) Start() {
        }
        c.quit = make(chan struct{})
        if c.DownloadStrategy == nil {
-               c.DownloadStrategy = DefaultDownloadStrategy{}
+               c.DownloadStrategy = &DefaultDownloadStrategy{}
        }
        if c.Listener != nil {
                go c.acceptConnections()
@@ -363,6 +363,22 @@ func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
        me.replenishConnRequests(torrent, conn)
 }
 
+func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
+       ok = cn.Cancel(r)
+       if ok {
+               cl.DownloadStrategy.DeleteRequest(t, r)
+       }
+       return
+}
+
+func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
+       if !cn.RequestPending(r) {
+               return
+       }
+       cl.DownloadStrategy.DeleteRequest(t, r)
+       delete(cn.Requests, r)
+}
+
 func (me *Client) connectionLoop(t *torrent, c *connection) error {
        decoder := pp.Decoder{
                R:         bufio.NewReader(c.Socket),
@@ -385,7 +401,9 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                switch msg.Type {
                case pp.Choke:
                        c.PeerChoked = true
-                       c.Requests = nil
+                       for r := range c.Requests {
+                               me.connDeleteRequest(t, c, r)
+                       }
                case pp.Unchoke:
                        c.PeerChoked = false
                        me.peerUnchoked(t, c)
@@ -453,6 +471,9 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
 
 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
        conn.Socket.Close()
+       for r := range conn.Requests {
+               me.connDeleteRequest(torrent, conn, r)
+       }
        for i0, c := range torrent.Conns {
                if c != conn {
                        continue
@@ -562,6 +583,7 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
                return torrent.Close()
        }
        me.torrents[torrent.InfoHash] = torrent
+       me.DownloadStrategy.TorrentStarted(torrent)
        if !me.DisableTrackers {
                go me.announceTorrent(torrent)
        }
@@ -668,14 +690,39 @@ func (me *Client) WaitAll() bool {
 
 type DownloadStrategy interface {
        FillRequests(t *torrent, c *connection)
+       TorrentStarted(t *torrent)
+       TorrentStopped(t *torrent)
+       DeleteRequest(t *torrent, r request)
 }
 
-type DefaultDownloadStrategy struct{}
+type DefaultDownloadStrategy struct {
+       heat map[*torrent]map[request]int
+}
 
-func (DefaultDownloadStrategy) FillRequests(torrent *torrent, conn *connection) {
-       requestHeatMap := torrent.requestHeat()
+func (cl *Client) assertRequestHeat() {
+       dds, ok := cl.DownloadStrategy.(*DefaultDownloadStrategy)
+       if !ok {
+               return
+       }
+       for _, t := range cl.torrents {
+               m := make(map[request]int, 3000)
+               for _, cn := range t.Conns {
+                       for r := range cn.Requests {
+                               m[r]++
+                       }
+               }
+               for r, h := range dds.heat[t] {
+                       if m[r] != h {
+                               panic(fmt.Sprintln(m[r], h))
+                       }
+               }
+       }
+}
+
+func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
+       th := s.heat[t]
        addRequest := func(req request) (again bool) {
-               piece := torrent.Pieces[req.Index]
+               piece := t.Pieces[req.Index]
                if piece.Hashing {
                        // We can't be sure we want this.
                        return true
@@ -684,30 +731,69 @@ func (DefaultDownloadStrategy) FillRequests(torrent *torrent, conn *connection)
                        // We already have this.
                        return true
                }
-               if requestHeatMap[req] > 0 {
-                       // We've already requested this.
+               if c.RequestPending(req) {
                        return true
                }
-               return conn.Request(req)
+               again = c.Request(req)
+               if c.RequestPending(req) {
+                       th[req]++
+               }
+               return
        }
        // First request prioritized chunks.
-       for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
+       for e := t.Priorities.Front(); e != nil; e = e.Next() {
                if !addRequest(e.Value.(request)) {
                        return
                }
        }
+       ppbs := t.piecesByPendingBytes()
        // Then finish off incomplete pieces in order of bytes remaining.
-       for _, index := range torrent.piecesByPendingBytes() {
-               for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
-                       if !addRequest(request{index, chunkSpec}) {
-                               return
+       for _, heatThreshold := range []int{0, 4, 100} {
+               for _, pieceIndex := range ppbs {
+                       for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
+                               r := request{pieceIndex, chunkSpec}
+                               if th[r] > heatThreshold {
+                                       continue
+                               }
+                               if !addRequest(request{pieceIndex, chunkSpec}) {
+                                       return
+                               }
                        }
                }
        }
 }
 
+func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {
+       if s.heat[t] != nil {
+               panic("torrent already started")
+       }
+       if s.heat == nil {
+               s.heat = make(map[*torrent]map[request]int, 10)
+       }
+       s.heat[t] = make(map[request]int, t.ChunkCount())
+}
+
+func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
+       if _, ok := s.heat[t]; !ok {
+               panic("torrent not yet started")
+       }
+       delete(s.heat, t)
+}
+
+func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
+       m := s.heat[t]
+       if m[r] <= 0 {
+               panic("no pending requests")
+       }
+       m[r]--
+}
+
 type ResponsiveDownloadStrategy struct{}
 
+func (ResponsiveDownloadStrategy) TorrentStarted(*torrent)         {}
+func (ResponsiveDownloadStrategy) TorrentStopped(*torrent)         {}
+func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
+
 func (ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
        for e := t.Priorities.Front(); e != nil; e = e.Next() {
                if !c.Request(e.Value.(request)) {
@@ -745,7 +831,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
 
        // Request has been satisfied.
-       delete(c.Requests, req)
+       me.connDeleteRequest(t, c, req)
 
        defer me.replenishConnRequests(t, c)
 
@@ -763,7 +849,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
 
        // Cancel pending requests for this chunk.
        for _, c := range t.Conns {
-               if c.Cancel(req) {
+               if me.connCancel(t, c, req) {
                        me.replenishConnRequests(t, c)
                }
        }
index 96a1d2154d9676370fe45af843a9f33d82ccaf9b..6bb5f5b2bbb18688d6794de693415b5a0aa4f435 100644 (file)
@@ -62,6 +62,11 @@ func (c *connection) Post(msg encoding.BinaryMarshaler) {
        c.post <- msg
 }
 
+func (c *connection) RequestPending(r request) bool {
+       _, ok := c.Requests[r]
+       return ok
+}
+
 // Returns true if more requests can be sent.
 func (c *connection) Request(chunk request) bool {
        if len(c.Requests) >= c.PeerMaxRequests {
@@ -74,18 +79,19 @@ func (c *connection) Request(chunk request) bool {
        if c.PeerChoked {
                return false
        }
-       if _, ok := c.Requests[chunk]; !ok {
-               c.Post(peer_protocol.Message{
-                       Type:   peer_protocol.Request,
-                       Index:  chunk.Index,
-                       Begin:  chunk.Begin,
-                       Length: chunk.Length,
-               })
+       if c.RequestPending(chunk) {
+               return true
        }
        if c.Requests == nil {
                c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
        }
        c.Requests[chunk] = struct{}{}
+       c.Post(peer_protocol.Message{
+               Type:   peer_protocol.Request,
+               Index:  chunk.Index,
+               Begin:  chunk.Begin,
+               Length: chunk.Length,
+       })
        return true
 }
 
index 8a1da7c6517d4c92cabf976cdf6f76a3587109ce..dfe2431433e109cbda4dcc2114148f37704a5e63 100644 (file)
@@ -49,6 +49,20 @@ func (t *torrent) BytesLeft() (left int64) {
        return
 }
 
+func NumChunksForPiece(chunkSize int, pieceSize int) int {
+       return (pieceSize + chunkSize - 1) / chunkSize
+}
+
+func (t *torrent) ChunkCount() (num int) {
+       num += (t.NumPieces() - 1) * NumChunksForPiece(chunkSize, int(t.PieceLength(0)))
+       num += NumChunksForPiece(chunkSize, int(t.PieceLength(pp.Integer(t.NumPieces()-1))))
+       return
+}
+
+func (t *torrent) LastPieceSize() int {
+       return int(t.PieceLength(pp.Integer(t.NumPieces() - 1)))
+}
+
 func (t *torrent) NumPieces() int {
        return len(t.MetaInfo.Pieces) / pieceHash.Size()
 }
@@ -156,16 +170,6 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) {
        return
 }
 
-func (t *torrent) requestHeat() (ret map[request]int) {
-       ret = make(map[request]int)
-       for _, conn := range t.Conns {
-               for req, _ := range conn.Requests {
-                       ret[req]++
-               }
-       }
-       return
-}
-
 type Peer struct {
        Id   [20]byte
        IP   net.IP