From: Matt Joiner Date: Fri, 26 Dec 2014 06:15:17 +0000 (+1100) Subject: Rip out the pieces by bytes left and responsive download strategy stuff X-Git-Tag: v1.0.0~1383 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=3a221dc57d4d0e82965693db6e577ca5dac09375;p=btrtrc.git Rip out the pieces by bytes left and responsive download strategy stuff --- diff --git a/client.go b/client.go index 59b85b7c..470a5f97 100644 --- a/client.go +++ b/client.go @@ -1969,14 +1969,7 @@ func (cl *Client) allTorrentsCompleted() bool { if !t.haveInfo() { return false } - for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { - i := e.Value.(int) - if t.Pieces[i].Complete() { - continue - } - // If the piece isn't complete, make sure it's not because it's - // never been hashed. - cl.queueFirstHash(t, i) + if t.NumPiecesCompleted() != t.NumPieces() { return false } } @@ -2043,7 +2036,6 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er } me.queuePieceCheck(t, req.Index) } - t.PieceBytesLeftChanged(int(req.Index)) // Unprioritize the chunk. me.downloadStrategy.TorrentGotChunk(t, req) @@ -2124,7 +2116,6 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { me.openNewConns(t) } } - t.PieceBytesLeftChanged(int(piece)) for _, conn := range t.Conns { if correct { conn.Post(pp.Message{ diff --git a/download_strategies.go b/download_strategies.go index d3f38e98..275b57e3 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -1,10 +1,7 @@ package torrent import ( - "container/heap" - "fmt" "io" - "math/rand" pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" ) @@ -75,266 +72,3 @@ func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) { func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {} func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {} func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {} - -func NewResponsiveDownloadStrategy(readahead int64) *responsiveDownloadStrategy { - return &responsiveDownloadStrategy{ - Readahead: readahead, - lastReadOffset: make(map[*torrent]int64), - priorities: make(map[*torrent]map[request]struct{}), - requestHeat: make(map[*torrent]map[request]int), - rand: rand.New(rand.NewSource(1337)), - } -} - -type responsiveDownloadStrategy struct { - // How many bytes to preemptively download starting at the beginning of - // the last piece read for a given torrent. - Readahead int64 - lastReadOffset map[*torrent]int64 - priorities map[*torrent]map[request]struct{} - requestHeat map[*torrent]map[request]int - rand *rand.Rand // Avoid global lock - dummyConn *connection -} - -func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) { - fmt.Fprintf(w, "Priorities:\n") - for t, pp := range me.priorities { - fmt.Fprintf(w, "\t%s:", t.Name()) - for r := range pp { - fmt.Fprintf(w, " %v", r) - } - fmt.Fprintln(w) - } -} - -func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) { - me.priorities[t] = make(map[request]struct{}) - me.requestHeat[t] = make(map[request]int) - me.dummyConn = &connection{} -} - -func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) { - delete(me.lastReadOffset, t) - delete(me.priorities, t) -} -func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) { - rh := me.requestHeat[t] - if rh[r] <= 0 { - panic("request heat invariant broken") - } - rh[r]-- -} - -type requestFiller struct { - c *connection - t *torrent - s *responsiveDownloadStrategy -} - -// Wrapper around connection.request that tracks request heat. -func (me *requestFiller) request(req request) bool { - if me.c.RequestPending(req) { - return true - } - if !me.t.wantChunk(req) { - return true - } - again := me.c.Request(req) - if me.c.RequestPending(req) { - me.s.requestHeat[me.t][req]++ - } - return again -} - -// Adds additional constraints around the request heat wrapper. -func (me *requestFiller) conservativelyRequest(req request) bool { - again := me.request(req) - if len(me.c.Requests) >= 50 { - return false - } - return again -} - -// Fill priority requests. -func (me *requestFiller) priorities() bool { - for req := range me.s.priorities[me.t] { - // TODO: Perhaps this filter should be applied to every request? - if _, ok := me.t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok { - panic(req) - } - if !me.request(req) { - return false - } - } - return true -} - -// Fill requests, with all contextual information available in the receiver. -func (me *requestFiller) Run() { - if !me.priorities() { - return - } - if len(me.c.Requests) > 25 { - return - } - if !me.readahead() { - return - } - if len(me.c.Requests) > 0 { - return - } - me.completePartial() -} - -// Request partial pieces that aren't in the readahead zone. -func (me *requestFiller) completePartial() bool { - t := me.t - th := me.s.requestHeat[t] - lro, lroOk := me.s.lastReadOffset[t] - for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { - p := e.Value.(int) - // Stop when we reach pieces that aren't partial and aren't smaller - // than usual. - if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() { - break - } - // Skip pieces that are entirely inside the readahead zone. - if lroOk { - pieceOff := int64(p) * int64(t.UsualPieceSize()) - pieceEndOff := pieceOff + int64(t.PieceLength(pp.Integer(p))) - if pieceOff >= lro && pieceEndOff < lro+me.s.Readahead { - continue - } - } - for chunkSpec := range t.Pieces[p].PendingChunkSpecs { - r := request{pp.Integer(p), chunkSpec} - if th[r] >= 1 { - continue - } - if lroOk { - off := me.t.requestOffset(r) - if off >= lro && off < lro+me.s.Readahead { - continue - } - } - if !me.conservativelyRequest(r) { - return false - } - } - } - return true -} - -// Returns all wanted chunk specs in the readahead zone. -func (me *requestFiller) pendingReadaheadChunks() (ret []request) { - t := me.t - lastReadOffset, ok := me.s.lastReadOffset[t] - if !ok { - return - } - ret = make([]request, 0, (me.s.Readahead+chunkSize-1)/chunkSize) - for pi := int(lastReadOffset / int64(t.UsualPieceSize())); pi < t.NumPieces() && int64(pi)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead; pi++ { - if t.havePiece(pi) || !me.c.PeerHasPiece(pp.Integer(pi)) { - continue - } - for cs := range t.Pieces[pi].PendingChunkSpecs { - r := request{pp.Integer(pi), cs} - if _, ok := me.c.Requests[r]; ok { - continue - } - if off := t.requestOffset(r); off < lastReadOffset || off >= lastReadOffset+me.s.Readahead { - continue - } - ret = append(ret, r) - } - } - return -} - -// Min-heap of int. -type intHeap []int - -func (h intHeap) Len() int { return len(h) } -func (h intHeap) Less(i, j int) bool { return h[i] < h[j] } -func (h intHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h *intHeap) Push(x interface{}) { *h = append(*h, x.(int)) } -func (h *intHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} - -func (me *requestFiller) readahead() bool { - rr := me.pendingReadaheadChunks() - if len(rr) == 0 { - return true - } - // Produce a partially sorted random permutation into the readahead chunks - // to somewhat preserve order but reducing wasted chunks due to overlap - // with other peers. - ii := new(intHeap) - *ii = me.s.rand.Perm(len(rr)) - heap.Init(ii) - for _, i := range *ii { - if !me.conservativelyRequest(rr[i]) { - return false - } - } - return true -} - -func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { - rf := requestFiller{c: c, t: t, s: me} - rf.Run() - return -} - -func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) { - delete(me.priorities[t], req) -} - -func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) { - for _, cs := range t.pieceChunks(piece) { - delete(me.priorities[t], request{pp.Integer(piece), cs}) - } -} - -func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) { - s.lastReadOffset[t] = off - for _len > 0 { - req, ok := t.offsetRequest(off) - if !ok { - panic("bad offset") - } - reqOff := t.requestOffset(req) - // Gain the alignment adjustment. - _len += off - reqOff - // Lose the length of this block. - _len -= int64(req.Length) - off = reqOff + int64(req.Length) - if !t.haveChunk(req) { - s.priorities[t][req] = struct{}{} - } - } -} - -func (s *responsiveDownloadStrategy) AssertNotRequested(t *torrent, r request) { - if s.requestHeat[t][r] != 0 { - panic("outstanding requests invariant broken") - } -} - -func (me *responsiveDownloadStrategy) PendingData(t *torrent) bool { - if len(me.priorities[t]) != 0 { - return true - } - for index := range t.Pieces { - if t.wantPiece(index) { - return true - } - } - return false -} diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 52591aed..a3297455 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -182,11 +182,10 @@ func TestDownloadOnDemand(t *testing.T) { t.Fatal(err) } leecher, err := torrent.NewClient(&torrent.Config{ - DataDir: filepath.Join(layout.BaseDir, "download"), - DownloadStrategy: torrent.NewResponsiveDownloadStrategy(0), - DisableTrackers: true, - NoDHT: true, - ListenAddr: ":0", + DataDir: filepath.Join(layout.BaseDir, "download"), + DisableTrackers: true, + NoDHT: true, + ListenAddr: ":0", NoDefaultBlocklist: true, diff --git a/torrent.go b/torrent.go index 469d2b15..f6ced0e7 100644 --- a/torrent.go +++ b/torrent.go @@ -58,10 +58,9 @@ type torrent struct { // announcing, and communicating with peers. ceasingNetworking chan struct{} - InfoHash InfoHash - Pieces []*torrentPiece - IncompletePiecesByBytesLeft *OrderedList - length int64 + InfoHash InfoHash + Pieces []*torrentPiece + length int64 // Prevent mutations to Data memory maps while in use as they're not safe. dataLock sync.RWMutex Data *mmap_span.MMapSpan @@ -124,34 +123,6 @@ func (t *torrent) CeaseNetworking() { } } -func (t *torrent) assertIncompletePiecesByBytesLeftOrdering() { - allIndexes := make(map[int]struct{}, t.NumPieces()) - for i := 0; i < t.NumPieces(); i++ { - allIndexes[i] = struct{}{} - } - var lastBytesLeft int - for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { - i := e.Value.(int) - if _, ok := allIndexes[i]; !ok { - panic("duplicate entry") - } - delete(allIndexes, i) - if t.Pieces[i].Complete() { - panic("complete piece") - } - bytesLeft := int(t.PieceNumPendingBytes(pp.Integer(i))) - if bytesLeft < lastBytesLeft { - panic("ordering broken") - } - lastBytesLeft = bytesLeft - } - for i := range allIndexes { - if !t.Pieces[i].Complete() { - panic("leaked incomplete piece") - } - } -} - func (t *torrent) AddPeers(pp []Peer) { for _, p := range pp { t.Peers[peersKey{string(p.IP), p.Port}] = p @@ -214,25 +185,12 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte return } t.length = t.Data.Size() - t.IncompletePiecesByBytesLeft = NewList(func(a, b interface{}) bool { - apb := t.PieceNumPendingBytes(pp.Integer(a.(int))) - bpb := t.PieceNumPendingBytes(pp.Integer(b.(int))) - if apb < bpb { - return true - } - if apb > bpb { - return false - } - return a.(int) < b.(int) - }) - for index, hash := range infoPieceHashes(&md) { + for _, hash := range infoPieceHashes(&md) { piece := &torrentPiece{} piece.Event.L = eventLocker util.CopyExact(piece.Hash[:], hash) t.Pieces = append(t.Pieces, piece) - piece.bytesLeftElement = t.IncompletePiecesByBytesLeft.Insert(index) } - t.assertIncompletePiecesByBytesLeftOrdering() for _, conn := range t.Conns { t.initRequestOrdering(conn) if err := conn.setNumPieces(t.NumPieces()); err != nil { @@ -578,19 +536,9 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) { for _, cs := range t.pieceChunks(int(index)) { pcss[cs] = struct{}{} } - t.IncompletePiecesByBytesLeft.ValueChanged(piece.bytesLeftElement) return } -func (t *torrent) PieceBytesLeftChanged(index int) { - p := t.Pieces[index] - if p.Complete() { - t.IncompletePiecesByBytesLeft.Remove(p.bytesLeftElement) - } else { - t.IncompletePiecesByBytesLeft.ValueChanged(p.bytesLeftElement) - } -} - type Peer struct { Id [20]byte IP net.IP