client.go | 35 ++++------------------------------- connection.go | 26 +++++++++++++++++++------- download_strategies.go | 109 +++++++++++++++-------------------------------------- diff --git a/client.go b/client.go index c886a0ef6e97b835b87424bd216e62b3bedf0a83..741416272435286f44e8b4ec3a09a1423da1dbf9 100644 --- a/client.go +++ b/client.go @@ -836,6 +836,9 @@ Type: pp.Port, Port: uint16(addr.Port), }) } + if torrent.haveInfo() { + conn.initPieceOrder(torrent.NumPieces()) + } err = me.connectionLoop(torrent, conn) if err != nil { err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err) @@ -1758,36 +1761,11 @@ } return true } -func (cl *Client) assertRequestHeat() { - dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy) - if !ok { - return - } - for _, t := range cl.torrents { - m := make(map[request]int, 3000) - for _, cn := range t.Conns { - for r := range cn.Requests { - m[r]++ - } - } - for r, h := range dds.heat[t] { - if m[r] != h { - panic(fmt.Sprintln(m[r], h)) - } - } - } -} - func (me *Client) replenishConnRequests(t *torrent, c *connection) { if !t.haveInfo() { return } - for _, p := range me.downloadStrategy.FillRequests(t, c) { - // Make sure the state of pieces that would have been requested is - // known. - me.queueFirstHash(t, p) - } - //me.assertRequestHeat() + me.downloadStrategy.FillRequests(t, c) if len(c.Requests) == 0 && !c.PeerChoked { c.SetInterested(false) } @@ -1915,11 +1893,6 @@ if r.Index == piece { panic("wat") } } - } - // Do this even if the piece is correct because new first-hashings may - // need to be scheduled. - if conn.PeerHasPiece(piece) { - me.replenishConnRequests(t, conn) } } if t.haveAllPieces() && me.noUpload { diff --git a/connection.go b/connection.go index 1a75e17a6f0d15129aef0580d70420fc82ec103c..464f14be3dea2c7fd8c25a8a17fde3f06dac608f 100644 --- a/connection.go +++ b/connection.go @@ -8,6 +8,7 @@ "errors" "expvar" "fmt" "io" + "math/rand" "net" "sync" "time" @@ -27,13 +28,14 @@ ) // Maintains the state of a connection with a peer. type connection struct { - Socket net.Conn - Discovery peerSource - uTP bool - closing chan struct{} - mu sync.Mutex // Only for closing. - post chan pp.Message - writeCh chan []byte + Socket net.Conn + Discovery peerSource + uTP bool + closing chan struct{} + mu sync.Mutex // Only for closing. + post chan pp.Message + writeCh chan []byte + pieceOrder []int UnwantedChunksReceived int UsefulChunksReceived int @@ -109,6 +111,7 @@ // Correct the PeerPieces slice length. Return false if the existing slice is // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE // messages. func (cn *connection) setNumPieces(num int) error { + cn.initPieceOrder(num) if cn.PeerPieces == nil { return nil } @@ -129,6 +132,15 @@ if len(cn.PeerPieces) != num { panic("wat") } return nil +} + +func (cn *connection) initPieceOrder(numPieces int) { + if cn.pieceOrder == nil { + cn.pieceOrder = rand.Perm(numPieces) + } + if len(cn.pieceOrder) != numPieces { + panic("piece order initialized with wrong length") + } } func eventAgeString(t time.Time) string { diff --git a/download_strategies.go b/download_strategies.go index 5317cbf9cbac6dd2bf9b3fbde859405ad73d6f6f..0b85d190a5d4a78c7523c56c0de329d5761c324a 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -10,10 +10,8 @@ pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" ) type DownloadStrategy interface { - // Tops up the outgoing pending requests. Returns the indices of pieces - // that would be requested. This is used to determine if pieces require - // hashing so the completed state is known. - FillRequests(*torrent, *connection) (pieces []int) + // Tops up the outgoing pending requests. + FillRequests(*torrent, *connection) TorrentStarted(*torrent) TorrentStopped(*torrent) DeleteRequest(*torrent, request) @@ -25,97 +23,53 @@ AssertNotRequested(*torrent, request) PendingData(*torrent) bool } -type DefaultDownloadStrategy struct { - heat map[*torrent]map[request]int -} +type DefaultDownloadStrategy struct{} func (me *DefaultDownloadStrategy) PendingData(t *torrent) bool { return !t.haveAllPieces() } -func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) { - if me.heat[t][r] != 0 { - panic("outstanding requests break invariant") - } -} +func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {} func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {} -func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) { +func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { if c.Interested { if c.PeerChoked { return } - if len(c.Requests) >= (c.PeerMaxRequests+1)/2 { + if len(c.Requests) != 0 { return } } - th := s.heat[t] addRequest := func(req request) (again bool) { - piece := t.Pieces[req.Index] - if piece.Hashing || piece.QueuedForHash { - // We can't be sure we want this. - return true + return c.Request(req) + } + for i := range t.Pieces { + pieceIndex := c.pieceOrder[i] + if !c.PeerHasPiece(pp.Integer(pieceIndex)) { + continue } - if piece.Complete() { - // We already have this. - return true + if !t.wantPiece(pieceIndex) { + continue } - if c.RequestPending(req) { - return true - } - again = c.Request(req) - if c.RequestPending(req) { - th[req]++ - } - return - } - // Then finish off incomplete pieces in order of bytes remaining. - for _, heatThreshold := range []int{1, 4, 15, 60} { - for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { - pieceIndex := pp.Integer(e.Value.(int)) - piece := t.Pieces[pieceIndex] - if !piece.EverHashed { - pieces = append(pieces, int(pieceIndex)) - continue - } - for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs { - r := request{pieceIndex, chunkSpec} - if th[r] >= heatThreshold { - continue - } - if !addRequest(r) { - return - } + piece := t.Pieces[pieceIndex] + for _, cs := range piece.shuffledPendingChunkSpecs() { + r := request{pp.Integer(pieceIndex), cs} + if !addRequest(r) { + return } } } return } -func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) { - if s.heat[t] != nil { - panic("torrent already started") - } - if s.heat == nil { - s.heat = make(map[*torrent]map[request]int, 10) - } - s.heat[t] = make(map[request]int, t.ChunkCount()) -} +func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {} func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) { - if _, ok := s.heat[t]; !ok { - panic("torrent not yet started") - } - delete(s.heat, t) } func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) { - m := s.heat[t] - if m[r] <= 0 { - panic("no pending requests") - } - m[r]-- } func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {} @@ -176,18 +130,10 @@ type requestFiller struct { c *connection t *torrent s *responsiveDownloadStrategy - - // The set of pieces that were considered for requesting. - pieces map[int]struct{} } // Wrapper around connection.request that tracks request heat. func (me *requestFiller) request(req request) bool { - if me.pieces == nil { - me.pieces = make(map[int]struct{}) - } - // log.Print(req) - me.pieces[int(req.Index)] = struct{}{} if me.c.RequestPending(req) { return true } @@ -340,12 +286,9 @@ } return true } -func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) { +func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { rf := requestFiller{c: c, t: t, s: me} rf.Run() - for p := range rf.pieces { - pieces = append(pieces, p) - } return } @@ -385,5 +328,13 @@ } } func (me *responsiveDownloadStrategy) PendingData(t *torrent) bool { - return len(me.FillRequests(t, me.dummyConn)) != 0 + if len(me.priorities[t]) != 0 { + return true + } + for index := range t.Pieces { + if t.wantPiece(index) { + return true + } + } + return false }