From 06445f2a1eef768fbc238b218ffd79539286aa84 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 18 Jan 2016 18:35:14 +1100 Subject: [PATCH] It's working and the tests are usually passing I still need to handle "prefetch"-style downloading, and some functions haven't been committed to force this issue. --- client.go | 203 ++++++++++----------------------------------- client_test.go | 16 +++- connection.go | 43 ---------- connection_test.go | 36 -------- piece.go | 1 - reader.go | 49 +++++------ t.go | 18 ++++ torrent.go | 132 ++++++++++++++++++++--------- 8 files changed, 191 insertions(+), 307 deletions(-) diff --git a/client.go b/client.go index bd6b8b61..de9e4f6d 100644 --- a/client.go +++ b/client.go @@ -10,6 +10,7 @@ import ( "expvar" "fmt" "io" + "io/ioutil" "log" "math/big" mathRand "math/rand" @@ -28,13 +29,11 @@ import ( "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/sync" "github.com/anacrolix/utp" - "github.com/bradfitz/iter" "github.com/edsrzf/mmap-go" "github.com/anacrolix/torrent/bencode" filePkg "github.com/anacrolix/torrent/data/file" "github.com/anacrolix/torrent/dht" - "github.com/anacrolix/torrent/internal/pieceordering" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" @@ -276,47 +275,6 @@ func readaheadPieces(readahead, pieceLength int64) (ret int) { return } -func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) { - index := int(off / int64(t.usualPieceSize())) - cl.raisePiecePriority(t, index, PiecePriorityNow) - index++ - if index >= t.numPieces() { - return - } - cl.raisePiecePriority(t, index, PiecePriorityNext) - for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) { - index++ - if index >= t.numPieces() { - break - } - cl.raisePiecePriority(t, index, PiecePriorityReadahead) - } -} - -func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) { - for n > 0 { - req, ok := t.offsetRequest(off) - if !ok { - break - } - if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) { - if t.urgent == nil { - t.urgent = make(map[request]struct{}, (n+int(t.chunkSize)-1)/int(t.chunkSize)) - } - t.urgent[req] = struct{}{} - cl.event.Broadcast() // Why? - index := int(req.Index) - cl.queueFirstHash(t, index) - cl.pieceChanged(t, index) - } - reqOff := t.requestOffset(req) - n1 := req.Length - pp.Integer(off-reqOff) - off += int64(n1) - n -= int(n1) - } - // log.Print(t.urgent) -} - func (cl *Client) configDir() string { if cl.config.ConfigDir == "" { return filepath.Join(os.Getenv("HOME"), ".config/torrent") @@ -330,30 +288,6 @@ func (cl *Client) ConfigDir() string { return cl.configDir() } -func (t *torrent) connPendPiece(c *connection, piece int) { - c.pendPiece(piece, t.Pieces[piece].Priority, t) -} - -func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) { - if t.Pieces[piece].Priority < priority { - cl.prioritizePiece(t, piece, priority) - } -} - -func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) { - if t.havePiece(piece) { - priority = PiecePriorityNone - } - if priority != PiecePriorityNone { - cl.queueFirstHash(t, piece) - } - p := &t.Pieces[piece] - if p.Priority != priority { - p.Priority = priority - cl.pieceChanged(t, piece) - } -} - func loadPackedBlocklist(filename string) (ret iplist.Ranger, err error) { f, err := os.Open(filename) if os.IsNotExist(err) { @@ -1169,9 +1103,6 @@ func (cl *Client) runHandshookConn(c *connection, t *torrent) (err error) { go c.writer() go c.writeOptimizer(time.Minute) cl.sendInitialMessages(c, t) - if t.haveInfo() { - t.initRequestOrdering(c) - } err = cl.connectionLoop(t, c) if err != nil { err = fmt.Errorf("error during connection loop: %s", err) @@ -1237,26 +1168,6 @@ func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) { } } -// Randomizes the piece order for this connection. Every connection will be -// given a different ordering. Having it stored per connection saves having to -// randomize during request filling, and constantly recalculate the ordering -// based on piece priorities. -func (t *torrent) initRequestOrdering(c *connection) { - if c.pieceRequestOrder != nil || c.piecePriorities != nil { - panic("double init of request ordering") - } - c.pieceRequestOrder = pieceordering.New() - for i := range iter.N(t.Info.NumPieces()) { - if !c.PeerHasPiece(i) { - continue - } - if !t.wantPiece(i) { - continue - } - t.connPendPiece(c, i) - } -} - func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error { if !c.peerHasAll { if t.haveInfo() { @@ -1274,7 +1185,6 @@ func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error { c.PeerPieces[piece] = true } if t.wantPiece(piece) { - t.connPendPiece(c, piece) me.replenishConnRequests(t, c) } return nil @@ -1721,12 +1631,6 @@ func (me *Client) deleteConnection(t *torrent, c *connection) bool { func (me *Client) dropConnection(t *torrent, c *connection) { me.event.Broadcast() c.Close() - if c.piecePriorities != nil { - t.connPiecePriorites.Put(c.piecePriorities) - // I wonder if it's safe to set it to nil. Probably not. Since it's - // only read, it doesn't particularly matter if a closing connection - // shares the slice with another connection. - } if me.deleteConnection(t, c) { me.openNewConns(t) } @@ -1774,16 +1678,14 @@ func (t *torrent) needData() bool { if !t.haveInfo() { return true } - if len(t.urgent) != 0 { - return true - } - for i := range t.Pieces { - p := &t.Pieces[i] - if p.Priority != PiecePriorityNone { - return true + return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + for i := begin; i < end; i++ { + if !t.pieceComplete(i) { + return false + } } - } - return false + return true + }) } func (cl *Client) usefulConn(t *torrent, c *connection) bool { @@ -2048,16 +1950,6 @@ func (t Torrent) Files() (ret []File) { return } -// Marks the pieces in the given region for download. -func (t Torrent) SetRegionPriority(off, len int64) { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() - pieceSize := int64(t.torrent.usualPieceSize()) - for i := off / pieceSize; i*pieceSize < off+len; i++ { - t.cl.raisePiecePriority(t.torrent, int(i), PiecePriorityNormal) - } -} - func (t Torrent) AddPeers(pp []Peer) error { cl := t.cl cl.mu.Lock() @@ -2474,6 +2366,24 @@ func (me *Client) WaitAll() bool { return true } +func (me *Client) connAddRequest(c *connection, req request) (more bool) { + if len(c.Requests) >= 64 { + return false + } + more = c.Request(req) + return +} + +func (me *Client) connRequestPiecePendingChunks(c *connection, t *torrent, piece int) (more bool) { + for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) { + req := request{pp.Integer(piece), cs} + if !me.connAddRequest(c, req) { + return false + } + } + return true +} + func (me *Client) fillRequests(t *torrent, c *connection) { if c.Interested { if c.PeerChoked { @@ -2483,37 +2393,25 @@ func (me *Client) fillRequests(t *torrent, c *connection) { return } } - addRequest := func(req request) (again bool) { - // TODO: Couldn't this check also be done *after* the request? - if len(c.Requests) >= 64 { - return false - } - return c.Request(req) - } - for req := range t.urgent { - if !addRequest(req) { - return + if !t.forUrgentPieces(func(piece int) (again bool) { + if !c.PeerHasPiece(piece) { + return true } + return me.connRequestPiecePendingChunks(c, t, piece) + }) { + return } - for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() { - pieceIndex := e.Piece() - if !c.PeerHasPiece(pieceIndex) { - panic("piece in request order but peer doesn't have it") - } - if !t.wantPiece(pieceIndex) { - log.Printf("unwanted piece %d in connection request order\n%s", pieceIndex, c) - c.pieceRequestOrder.DeletePiece(pieceIndex) - continue - } - piece := &t.Pieces[pieceIndex] - for _, cs := range piece.shuffledPendingChunkSpecs(t, pieceIndex) { - r := request{pp.Integer(pieceIndex), cs} - if !addRequest(r) { - return + t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + for i := begin + 1; i < end; i++ { + if !c.PeerHasPiece(i) { + continue + } + if !me.connRequestPiecePendingChunks(c, t, i) { + return false } } - } - return + return true + }) } func (me *Client) replenishConnRequests(t *torrent, c *connection) { @@ -2562,6 +2460,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er piece.pendingWrites++ piece.pendingWritesMutex.Unlock() go func() { + defer me.event.Broadcast() defer func() { piece.pendingWritesMutex.Lock() piece.pendingWrites-- @@ -2591,17 +2490,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er defer t.publishPieceChange(int(req.Index)) // Record that we have the chunk. piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) - delete(t.urgent, req) // It's important that the piece is potentially queued before we check if // the piece is still wanted, because if it is queued, it won't be wanted. if t.pieceAllDirty(index) { me.queuePieceCheck(t, int(req.Index)) } - if !t.wantPiece(int(req.Index)) { - for _, c := range t.Conns { - c.pieceRequestOrder.DeletePiece(int(req.Index)) - } - } // Cancel pending requests for this chunk. for _, c := range t.Conns { @@ -2656,17 +2549,9 @@ func (me *Client) pieceHashed(t *torrent, piece int, correct bool) { func (me *Client) pieceChanged(t *torrent, piece int) { correct := t.pieceComplete(piece) - p := &t.Pieces[piece] defer t.publishPieceChange(piece) defer me.event.Broadcast() - if correct { - p.Priority = PiecePriorityNone - for req := range t.urgent { - if int(req.Index) == piece { - delete(t.urgent, req) - } - } - } else { + if !correct { if t.pieceAllDirty(piece) { t.pendAllChunkSpecs(piece) } @@ -2682,10 +2567,8 @@ func (me *Client) pieceChanged(t *torrent, piece int) { conn.Cancel(r) } } - conn.pieceRequestOrder.DeletePiece(int(piece)) me.upload(t, conn) } else if t.wantPiece(piece) && conn.PeerHasPiece(piece) { - t.connPendPiece(conn, int(piece)) me.replenishConnRequests(t, conn) } } diff --git a/client_test.go b/client_test.go index 25242a1c..754bbcc2 100644 --- a/client_test.go +++ b/client_test.go @@ -529,10 +529,14 @@ func TestResponsive(t *testing.T) { reader.SetReadahead(0) reader.SetResponsive() b := make([]byte, 2) - _, err = reader.ReadAt(b, 3) + _, err = reader.Seek(3, os.SEEK_SET) + require.NoError(t, err) + _, err = io.ReadFull(reader, b) assert.Nil(t, err) assert.EqualValues(t, "lo", string(b)) - n, err := reader.ReadAt(b, 11) + _, err = reader.Seek(11, os.SEEK_SET) + require.NoError(t, err) + n, err := io.ReadFull(reader, b) assert.Nil(t, err) assert.EqualValues(t, 2, n) assert.EqualValues(t, "d\n", string(b)) @@ -571,11 +575,15 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) { reader.SetReadahead(0) reader.SetResponsive() b := make([]byte, 2) - _, err = reader.ReadAt(b, 3) + _, err = reader.Seek(3, os.SEEK_SET) + require.NoError(t, err) + _, err = io.ReadFull(reader, b) assert.Nil(t, err) assert.EqualValues(t, "lo", string(b)) go leecherTorrent.Drop() - n, err := reader.ReadAt(b, 11) + _, err = reader.Seek(11, os.SEEK_SET) + require.NoError(t, err) + n, err := reader.Read(b) assert.EqualError(t, err, "torrent closed") assert.EqualValues(t, 0, n) } diff --git a/connection.go b/connection.go index 14c24c85..9a881918 100644 --- a/connection.go +++ b/connection.go @@ -14,7 +14,6 @@ import ( "time" "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/internal/pieceordering" pp "github.com/anacrolix/torrent/peer_protocol" ) @@ -41,12 +40,6 @@ type connection struct { post chan pp.Message writeCh chan []byte - // The connection's preferred order to download pieces. The index is the - // piece, the value is its priority. - piecePriorities []int - // The piece request order based on piece priorities. - pieceRequestOrder *pieceordering.Instance - UnwantedChunksReceived int UsefulChunksReceived int chunksSent int @@ -105,42 +98,6 @@ func (cn *connection) localAddr() net.Addr { return cn.conn.LocalAddr() } -// Adjust piece position in the request order for this connection based on the -// given piece priority. -func (cn *connection) pendPiece(piece int, priority piecePriority, t *torrent) { - if priority == PiecePriorityNone { - cn.pieceRequestOrder.DeletePiece(piece) - return - } - if cn.piecePriorities == nil { - cn.piecePriorities = t.newConnPiecePriorities() - } - pp := cn.piecePriorities[piece] - // Priority regions not to scale. Within each region, piece is randomized - // according to connection. - - // <-request first -- last-> - // [ Now ] - // [ Next ] - // [ Readahead ] - // [ Normal ] - key := func() int { - switch priority { - case PiecePriorityNow: - return -3*len(cn.piecePriorities) + 3*pp - case PiecePriorityNext: - return -2*len(cn.piecePriorities) + 2*pp - case PiecePriorityReadahead: - return -len(cn.piecePriorities) + pp - case PiecePriorityNormal: - return pp - default: - panic(priority) - } - }() - cn.pieceRequestOrder.SetPiece(piece, key) -} - func (cn *connection) supportsExtension(ext string) bool { _, ok := cn.PeerExtensionIDs[ext] return ok diff --git a/connection_test.go b/connection_test.go index 36832b9f..77dbb53b 100644 --- a/connection_test.go +++ b/connection_test.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/bradfitz/iter" "github.com/stretchr/testify/assert" "github.com/anacrolix/torrent/internal/pieceordering" @@ -63,38 +62,3 @@ func pieceOrderingAsSlice(po *pieceordering.Instance) (ret []int) { func testRequestOrder(expected []int, ro *pieceordering.Instance, t *testing.T) { assert.EqualValues(t, pieceOrderingAsSlice(ro), expected) } - -// Tests the request ordering based on a connections priorities. -func TestPieceRequestOrder(t *testing.T) { - c := connection{ - pieceRequestOrder: pieceordering.New(), - piecePriorities: []int{1, 4, 0, 3, 2}, - } - testRequestOrder(nil, c.pieceRequestOrder, t) - c.pendPiece(2, PiecePriorityNone, nil) - testRequestOrder(nil, c.pieceRequestOrder, t) - c.pendPiece(1, PiecePriorityNormal, nil) - c.pendPiece(2, PiecePriorityNormal, nil) - testRequestOrder([]int{2, 1}, c.pieceRequestOrder, t) - c.pendPiece(0, PiecePriorityNormal, nil) - testRequestOrder([]int{2, 0, 1}, c.pieceRequestOrder, t) - c.pendPiece(1, PiecePriorityReadahead, nil) - testRequestOrder([]int{1, 2, 0}, c.pieceRequestOrder, t) - c.pendPiece(4, PiecePriorityNow, nil) - // now(4), r(1), normal(0, 2) - testRequestOrder([]int{4, 1, 2, 0}, c.pieceRequestOrder, t) - c.pendPiece(2, PiecePriorityReadahead, nil) - // N(4), R(1, 2), N(0) - testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t) - c.pendPiece(1, PiecePriorityNow, nil) - // now(4, 1), readahead(2), normal(0) - // in the same order, the keys will be: -15+6, -15+12, -5, 1 - // so we test that a very low priority (for this connection), "now" - // piece has been placed after a readahead piece. - testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t) - // Note this intentially sets to None a piece that's not in the order. - for i := range iter.N(5) { - c.pendPiece(i, PiecePriorityNone, nil) - } - testRequestOrder(nil, c.pieceRequestOrder, t) -} diff --git a/piece.go b/piece.go index c25bd2e4..518e76bb 100644 --- a/piece.go +++ b/piece.go @@ -30,7 +30,6 @@ type piece struct { Hashing bool QueuedForHash bool EverHashed bool - Priority piecePriority PublicPieceState PieceState pendingWritesMutex sync.Mutex diff --git a/reader.go b/reader.go index ed2701b8..3d04b35c 100644 --- a/reader.go +++ b/reader.go @@ -4,11 +4,14 @@ import ( "errors" "io" "os" + "sync" ) // Accesses torrent data via a client. type Reader struct { - t *Torrent + t *Torrent + + mu sync.Mutex pos int64 responsive bool readahead int64 @@ -25,18 +28,11 @@ func (r *Reader) SetResponsive() { // Configure the number of bytes ahead of a read that should also be // prioritized in preparation for further reads. func (r *Reader) SetReadahead(readahead int64) { + r.mu.Lock() + defer r.mu.Unlock() r.readahead = readahead } -func (r *Reader) raisePriorities(off int64, n int) { - if r.responsive { - r.t.cl.addUrgentRequests(r.t.torrent, off, n) - } - if !r.responsive || r.readahead != 0 { - r.t.cl.readRaisePiecePriorities(r.t.torrent, off, int64(n)+r.readahead) - } -} - func (r *Reader) readable(off int64) (ret bool) { // log.Println("readable", off) // defer func() { @@ -81,22 +77,15 @@ func (r *Reader) waitReadable(off int64) { r.t.cl.event.Wait() } -func (r *Reader) ReadAt(b []byte, off int64) (n int, err error) { - for { - var n1 int - n1, err = r.readAt(b, off) - n += n1 - b = b[n1:] - off += int64(n1) - if err != nil || len(b) == 0 || n1 == 0 { - return - } - } -} - func (r *Reader) Read(b []byte) (n int, err error) { - n, err = r.readAt(b, r.pos) + r.mu.Lock() + pos := r.pos + r.mu.Unlock() + n, err = r.readAt(b, pos) + r.mu.Lock() r.pos += int64(n) + r.mu.Unlock() + r.posChanged() return } @@ -115,9 +104,7 @@ func (r *Reader) readAt(b []byte, pos int64) (n int, err error) { } again: r.t.cl.mu.Lock() - r.raisePriorities(pos, len(b)) for !r.readable(pos) { - r.raisePriorities(pos, len(b)) r.waitReadable(pos) } avail := r.available(pos, int64(len(b))) @@ -154,11 +141,19 @@ again: } func (r *Reader) Close() error { + r.t.deleteReader(r) r.t = nil return nil } +func (r *Reader) posChanged() { + r.t.cl.mu.Lock() + defer r.t.cl.mu.Unlock() + r.t.torrent.readersChanged(r.t.cl) +} + func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { + r.mu.Lock() switch whence { case os.SEEK_SET: r.pos = off @@ -170,5 +165,7 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { err = errors.New("bad whence") } ret = r.pos + r.mu.Unlock() + r.posChanged() return } diff --git a/t.go b/t.go index 23085d1b..bf2b9e22 100644 --- a/t.go +++ b/t.go @@ -41,6 +41,7 @@ func (t Torrent) NewReader() (ret *Reader) { t: &t, readahead: 5 * 1024 * 1024, } + t.addReader(ret) return } @@ -119,3 +120,20 @@ func (t Torrent) MetaInfo() *metainfo.MetaInfo { defer t.cl.mu.Unlock() return t.torrent.MetaInfo() } + +func (t Torrent) addReader(r *Reader) { + t.cl.mu.Lock() + defer t.cl.mu.Unlock() + if t.torrent.readers == nil { + t.torrent.readers = make(map[*Reader]struct{}) + } + t.torrent.readers[r] = struct{}{} + t.torrent.readersChanged(t.cl) +} + +func (t Torrent) deleteReader(r *Reader) { + t.cl.mu.Lock() + defer t.cl.mu.Unlock() + delete(t.torrent.readers, r) + t.torrent.readersChanged(t.cl) +} diff --git a/torrent.go b/torrent.go index c74476eb..2933c809 100644 --- a/torrent.go +++ b/torrent.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "log" - "math/rand" "net" "sort" "sync" @@ -62,9 +61,6 @@ type torrent struct { // Values are the piece indices that changed. pieceStateChanges *pubsub.PubSub chunkSize pp.Integer - // Chunks that are wanted before all others. This is for - // responsive/streaming readers that want to unblock ASAP. - urgent map[request]struct{} // Total length of the torrent in bytes. Stored because it's not O(1) to // get this from the info dict. length int64 @@ -99,7 +95,7 @@ type torrent struct { // Closed when .Info is set. gotMetainfo chan struct{} - connPiecePriorites sync.Pool + readers map[*Reader]struct{} } var ( @@ -111,16 +107,6 @@ func (t *torrent) setDisplayName(dn string) { t.displayName = dn } -func (t *torrent) newConnPiecePriorities() []int { - _ret := t.connPiecePriorites.Get() - if _ret != nil { - piecePrioritiesReused.Add(1) - return _ret.([]int) - } - piecePrioritiesNew.Add(1) - return rand.Perm(t.numPieces()) -} - func (t *torrent) pieceComplete(piece int) bool { // TODO: This is called when setting metadata, and before storage is // assigned, which doesn't seem right. @@ -261,7 +247,6 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) { missinggo.CopyExact(piece.Hash[:], hash) } for _, conn := range t.Conns { - t.initRequestOrdering(conn) if err := conn.setNumPieces(t.numPieces()); err != nil { log.Printf("closing connection: %s", err) conn.Close() @@ -324,7 +309,7 @@ func (t *torrent) Name() string { func (t *torrent) pieceState(index int) (ret PieceState) { p := &t.Pieces[index] - ret.Priority = p.Priority + ret.Priority = t.piecePriority(index) if t.pieceComplete(index) { ret.Complete = true } @@ -436,9 +421,10 @@ func (t *torrent) writeStatus(w io.Writer, cl *Client) { fmt.Fprintln(w) } fmt.Fprintf(w, "Urgent:") - for req := range t.urgent { - fmt.Fprintf(w, " %v", req) - } + t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + fmt.Fprintf(w, " %d:%d", begin, end) + return true + }) fmt.Fprintln(w) fmt.Fprintf(w, "Trackers: ") for _, tier := range t.Trackers { @@ -728,17 +714,8 @@ func (t *torrent) wantChunk(r request) bool { if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) { return true } - _, ok := t.urgent[r] - return ok -} - -func (t *torrent) urgentChunkInPiece(piece int) bool { - p := pp.Integer(piece) - for req := range t.urgent { - if req.Index == p { - return true - } - } + // TODO: What about pieces that were wanted, but aren't now, and aren't + // completed either? That used to be done here. return false } @@ -754,18 +731,21 @@ func (t *torrent) wantPiece(index int) bool { if p.Hashing { return false } - if p.Priority == PiecePriorityNone { - if !t.urgentChunkInPiece(index) { - return false - } - } + // Put piece complete check last, since it's the slowest as it can involve // calling out into external data stores. return !t.pieceComplete(index) } func (t *torrent) connHasWantedPieces(c *connection) bool { - return c.pieceRequestOrder != nil && !c.pieceRequestOrder.Empty() + return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + for i := begin; i < end; i++ { + if c.PeerHasPiece(i) { + return false + } + } + return true + }) } func (t *torrent) extentPieces(off, _len int64) (pieces []int) { @@ -818,3 +798,81 @@ func (t *torrent) pieceAllDirty(piece int) bool { } return true } + +func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) { + return t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + if begin < end { + if !f(begin) { + return false + } + } + return true + }) +} + +func (t *torrent) readersChanged(cl *Client) { + for _, c := range t.Conns { + cl.replenishConnRequests(t, c) + } + cl.openNewConns(t) +} + +func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) { + if off >= t.length { + return + } + if off < 0 { + size += off + off = 0 + } + if size <= 0 { + return + } + begin = int(off / t.Info.PieceLength) + end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength) + if end > t.Info.NumPieces() { + end = t.Info.NumPieces() + } + return +} + +func (t *torrent) forReaderWantedRegionPieces(f func(begin, end int) (more bool)) (all bool) { + for r := range t.readers { + r.mu.Lock() + pos, readahead := r.pos, r.readahead + r.mu.Unlock() + if readahead < 1 { + readahead = 1 + } + begin, end := t.byteRegionPieces(pos, readahead) + if begin >= end { + continue + } + if !f(begin, end) { + return false + } + } + return true +} + +func (t *torrent) piecePriority(piece int) (ret piecePriority) { + ret = PiecePriorityNone + if t.pieceComplete(piece) { + return + } + raiseRet := func(prio piecePriority) { + if prio > ret { + ret = prio + } + } + t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + if piece == begin { + raiseRet(PiecePriorityNow) + } + if begin <= piece && piece < end { + raiseRet(PiecePriorityReadahead) + } + return true + }) + return +} -- 2.44.0