]> Sergey Matveev's repositories - btrtrc.git/commitdiff
It's working and the tests are usually passing
authorMatt Joiner <anacrolix@gmail.com>
Mon, 18 Jan 2016 07:35:14 +0000 (18:35 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 18 Jan 2016 07:35:14 +0000 (18:35 +1100)
I still need to handle "prefetch"-style downloading, and some functions haven't been committed to force this issue.

client.go
client_test.go
connection.go
connection_test.go
piece.go
reader.go
t.go
torrent.go

index bd6b8b61df07cb02ab2e7a63176658f300fdfec7..de9e4f6dcd221d7f4e92f08ce7f0be22cbb05c28 100644 (file)
--- a/client.go
+++ b/client.go
@@ -10,6 +10,7 @@ import (
        "expvar"
        "fmt"
        "io"
+       "io/ioutil"
        "log"
        "math/big"
        mathRand "math/rand"
@@ -28,13 +29,11 @@ import (
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/sync"
        "github.com/anacrolix/utp"
-       "github.com/bradfitz/iter"
        "github.com/edsrzf/mmap-go"
 
        "github.com/anacrolix/torrent/bencode"
        filePkg "github.com/anacrolix/torrent/data/file"
        "github.com/anacrolix/torrent/dht"
-       "github.com/anacrolix/torrent/internal/pieceordering"
        "github.com/anacrolix/torrent/iplist"
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
@@ -276,47 +275,6 @@ func readaheadPieces(readahead, pieceLength int64) (ret int) {
        return
 }
 
-func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
-       index := int(off / int64(t.usualPieceSize()))
-       cl.raisePiecePriority(t, index, PiecePriorityNow)
-       index++
-       if index >= t.numPieces() {
-               return
-       }
-       cl.raisePiecePriority(t, index, PiecePriorityNext)
-       for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
-               index++
-               if index >= t.numPieces() {
-                       break
-               }
-               cl.raisePiecePriority(t, index, PiecePriorityReadahead)
-       }
-}
-
-func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
-       for n > 0 {
-               req, ok := t.offsetRequest(off)
-               if !ok {
-                       break
-               }
-               if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
-                       if t.urgent == nil {
-                               t.urgent = make(map[request]struct{}, (n+int(t.chunkSize)-1)/int(t.chunkSize))
-                       }
-                       t.urgent[req] = struct{}{}
-                       cl.event.Broadcast() // Why?
-                       index := int(req.Index)
-                       cl.queueFirstHash(t, index)
-                       cl.pieceChanged(t, index)
-               }
-               reqOff := t.requestOffset(req)
-               n1 := req.Length - pp.Integer(off-reqOff)
-               off += int64(n1)
-               n -= int(n1)
-       }
-       // log.Print(t.urgent)
-}
-
 func (cl *Client) configDir() string {
        if cl.config.ConfigDir == "" {
                return filepath.Join(os.Getenv("HOME"), ".config/torrent")
@@ -330,30 +288,6 @@ func (cl *Client) ConfigDir() string {
        return cl.configDir()
 }
 
-func (t *torrent) connPendPiece(c *connection, piece int) {
-       c.pendPiece(piece, t.Pieces[piece].Priority, t)
-}
-
-func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
-       if t.Pieces[piece].Priority < priority {
-               cl.prioritizePiece(t, piece, priority)
-       }
-}
-
-func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) {
-       if t.havePiece(piece) {
-               priority = PiecePriorityNone
-       }
-       if priority != PiecePriorityNone {
-               cl.queueFirstHash(t, piece)
-       }
-       p := &t.Pieces[piece]
-       if p.Priority != priority {
-               p.Priority = priority
-               cl.pieceChanged(t, piece)
-       }
-}
-
 func loadPackedBlocklist(filename string) (ret iplist.Ranger, err error) {
        f, err := os.Open(filename)
        if os.IsNotExist(err) {
@@ -1169,9 +1103,6 @@ func (cl *Client) runHandshookConn(c *connection, t *torrent) (err error) {
        go c.writer()
        go c.writeOptimizer(time.Minute)
        cl.sendInitialMessages(c, t)
-       if t.haveInfo() {
-               t.initRequestOrdering(c)
-       }
        err = cl.connectionLoop(t, c)
        if err != nil {
                err = fmt.Errorf("error during connection loop: %s", err)
@@ -1237,26 +1168,6 @@ func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) {
        }
 }
 
-// Randomizes the piece order for this connection. Every connection will be
-// given a different ordering. Having it stored per connection saves having to
-// randomize during request filling, and constantly recalculate the ordering
-// based on piece priorities.
-func (t *torrent) initRequestOrdering(c *connection) {
-       if c.pieceRequestOrder != nil || c.piecePriorities != nil {
-               panic("double init of request ordering")
-       }
-       c.pieceRequestOrder = pieceordering.New()
-       for i := range iter.N(t.Info.NumPieces()) {
-               if !c.PeerHasPiece(i) {
-                       continue
-               }
-               if !t.wantPiece(i) {
-                       continue
-               }
-               t.connPendPiece(c, i)
-       }
-}
-
 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error {
        if !c.peerHasAll {
                if t.haveInfo() {
@@ -1274,7 +1185,6 @@ func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error {
                c.PeerPieces[piece] = true
        }
        if t.wantPiece(piece) {
-               t.connPendPiece(c, piece)
                me.replenishConnRequests(t, c)
        }
        return nil
@@ -1721,12 +1631,6 @@ func (me *Client) deleteConnection(t *torrent, c *connection) bool {
 func (me *Client) dropConnection(t *torrent, c *connection) {
        me.event.Broadcast()
        c.Close()
-       if c.piecePriorities != nil {
-               t.connPiecePriorites.Put(c.piecePriorities)
-               // I wonder if it's safe to set it to nil. Probably not. Since it's
-               // only read, it doesn't particularly matter if a closing connection
-               // shares the slice with another connection.
-       }
        if me.deleteConnection(t, c) {
                me.openNewConns(t)
        }
@@ -1774,16 +1678,14 @@ func (t *torrent) needData() bool {
        if !t.haveInfo() {
                return true
        }
-       if len(t.urgent) != 0 {
-               return true
-       }
-       for i := range t.Pieces {
-               p := &t.Pieces[i]
-               if p.Priority != PiecePriorityNone {
-                       return true
+       return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               for i := begin; i < end; i++ {
+                       if !t.pieceComplete(i) {
+                               return false
+                       }
                }
-       }
-       return false
+               return true
+       })
 }
 
 func (cl *Client) usefulConn(t *torrent, c *connection) bool {
@@ -2048,16 +1950,6 @@ func (t Torrent) Files() (ret []File) {
        return
 }
 
-// Marks the pieces in the given region for download.
-func (t Torrent) SetRegionPriority(off, len int64) {
-       t.cl.mu.Lock()
-       defer t.cl.mu.Unlock()
-       pieceSize := int64(t.torrent.usualPieceSize())
-       for i := off / pieceSize; i*pieceSize < off+len; i++ {
-               t.cl.raisePiecePriority(t.torrent, int(i), PiecePriorityNormal)
-       }
-}
-
 func (t Torrent) AddPeers(pp []Peer) error {
        cl := t.cl
        cl.mu.Lock()
@@ -2474,6 +2366,24 @@ func (me *Client) WaitAll() bool {
        return true
 }
 
+func (me *Client) connAddRequest(c *connection, req request) (more bool) {
+       if len(c.Requests) >= 64 {
+               return false
+       }
+       more = c.Request(req)
+       return
+}
+
+func (me *Client) connRequestPiecePendingChunks(c *connection, t *torrent, piece int) (more bool) {
+       for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) {
+               req := request{pp.Integer(piece), cs}
+               if !me.connAddRequest(c, req) {
+                       return false
+               }
+       }
+       return true
+}
+
 func (me *Client) fillRequests(t *torrent, c *connection) {
        if c.Interested {
                if c.PeerChoked {
@@ -2483,37 +2393,25 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
                        return
                }
        }
-       addRequest := func(req request) (again bool) {
-               // TODO: Couldn't this check also be done *after* the request?
-               if len(c.Requests) >= 64 {
-                       return false
-               }
-               return c.Request(req)
-       }
-       for req := range t.urgent {
-               if !addRequest(req) {
-                       return
+       if !t.forUrgentPieces(func(piece int) (again bool) {
+               if !c.PeerHasPiece(piece) {
+                       return true
                }
+               return me.connRequestPiecePendingChunks(c, t, piece)
+       }) {
+               return
        }
-       for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
-               pieceIndex := e.Piece()
-               if !c.PeerHasPiece(pieceIndex) {
-                       panic("piece in request order but peer doesn't have it")
-               }
-               if !t.wantPiece(pieceIndex) {
-                       log.Printf("unwanted piece %d in connection request order\n%s", pieceIndex, c)
-                       c.pieceRequestOrder.DeletePiece(pieceIndex)
-                       continue
-               }
-               piece := &t.Pieces[pieceIndex]
-               for _, cs := range piece.shuffledPendingChunkSpecs(t, pieceIndex) {
-                       r := request{pp.Integer(pieceIndex), cs}
-                       if !addRequest(r) {
-                               return
+       t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               for i := begin + 1; i < end; i++ {
+                       if !c.PeerHasPiece(i) {
+                               continue
+                       }
+                       if !me.connRequestPiecePendingChunks(c, t, i) {
+                               return false
                        }
                }
-       }
-       return
+               return true
+       })
 }
 
 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
@@ -2562,6 +2460,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        piece.pendingWrites++
        piece.pendingWritesMutex.Unlock()
        go func() {
+               defer me.event.Broadcast()
                defer func() {
                        piece.pendingWritesMutex.Lock()
                        piece.pendingWrites--
@@ -2591,17 +2490,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        defer t.publishPieceChange(int(req.Index))
        // Record that we have the chunk.
        piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
-       delete(t.urgent, req)
        // It's important that the piece is potentially queued before we check if
        // the piece is still wanted, because if it is queued, it won't be wanted.
        if t.pieceAllDirty(index) {
                me.queuePieceCheck(t, int(req.Index))
        }
-       if !t.wantPiece(int(req.Index)) {
-               for _, c := range t.Conns {
-                       c.pieceRequestOrder.DeletePiece(int(req.Index))
-               }
-       }
 
        // Cancel pending requests for this chunk.
        for _, c := range t.Conns {
@@ -2656,17 +2549,9 @@ func (me *Client) pieceHashed(t *torrent, piece int, correct bool) {
 
 func (me *Client) pieceChanged(t *torrent, piece int) {
        correct := t.pieceComplete(piece)
-       p := &t.Pieces[piece]
        defer t.publishPieceChange(piece)
        defer me.event.Broadcast()
-       if correct {
-               p.Priority = PiecePriorityNone
-               for req := range t.urgent {
-                       if int(req.Index) == piece {
-                               delete(t.urgent, req)
-                       }
-               }
-       } else {
+       if !correct {
                if t.pieceAllDirty(piece) {
                        t.pendAllChunkSpecs(piece)
                }
@@ -2682,10 +2567,8 @@ func (me *Client) pieceChanged(t *torrent, piece int) {
                                        conn.Cancel(r)
                                }
                        }
-                       conn.pieceRequestOrder.DeletePiece(int(piece))
                        me.upload(t, conn)
                } else if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
-                       t.connPendPiece(conn, int(piece))
                        me.replenishConnRequests(t, conn)
                }
        }
index 25242a1c1c3a2b789c4e151290697d95f9b730af..754bbcc21d54db48d0f5e2e0cee0f18753959371 100644 (file)
@@ -529,10 +529,14 @@ func TestResponsive(t *testing.T) {
        reader.SetReadahead(0)
        reader.SetResponsive()
        b := make([]byte, 2)
-       _, err = reader.ReadAt(b, 3)
+       _, err = reader.Seek(3, os.SEEK_SET)
+       require.NoError(t, err)
+       _, err = io.ReadFull(reader, b)
        assert.Nil(t, err)
        assert.EqualValues(t, "lo", string(b))
-       n, err := reader.ReadAt(b, 11)
+       _, err = reader.Seek(11, os.SEEK_SET)
+       require.NoError(t, err)
+       n, err := io.ReadFull(reader, b)
        assert.Nil(t, err)
        assert.EqualValues(t, 2, n)
        assert.EqualValues(t, "d\n", string(b))
@@ -571,11 +575,15 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
        reader.SetReadahead(0)
        reader.SetResponsive()
        b := make([]byte, 2)
-       _, err = reader.ReadAt(b, 3)
+       _, err = reader.Seek(3, os.SEEK_SET)
+       require.NoError(t, err)
+       _, err = io.ReadFull(reader, b)
        assert.Nil(t, err)
        assert.EqualValues(t, "lo", string(b))
        go leecherTorrent.Drop()
-       n, err := reader.ReadAt(b, 11)
+       _, err = reader.Seek(11, os.SEEK_SET)
+       require.NoError(t, err)
+       n, err := reader.Read(b)
        assert.EqualError(t, err, "torrent closed")
        assert.EqualValues(t, 0, n)
 }
index 14c24c85bce03838b36bc3f1acac8b6483997c8e..9a88191845abfadd328e30fba8e3730f6d02e431 100644 (file)
@@ -14,7 +14,6 @@ import (
        "time"
 
        "github.com/anacrolix/torrent/bencode"
-       "github.com/anacrolix/torrent/internal/pieceordering"
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
@@ -41,12 +40,6 @@ type connection struct {
        post      chan pp.Message
        writeCh   chan []byte
 
-       // The connection's preferred order to download pieces. The index is the
-       // piece, the value is its priority.
-       piecePriorities []int
-       // The piece request order based on piece priorities.
-       pieceRequestOrder *pieceordering.Instance
-
        UnwantedChunksReceived int
        UsefulChunksReceived   int
        chunksSent             int
@@ -105,42 +98,6 @@ func (cn *connection) localAddr() net.Addr {
        return cn.conn.LocalAddr()
 }
 
-// Adjust piece position in the request order for this connection based on the
-// given piece priority.
-func (cn *connection) pendPiece(piece int, priority piecePriority, t *torrent) {
-       if priority == PiecePriorityNone {
-               cn.pieceRequestOrder.DeletePiece(piece)
-               return
-       }
-       if cn.piecePriorities == nil {
-               cn.piecePriorities = t.newConnPiecePriorities()
-       }
-       pp := cn.piecePriorities[piece]
-       // Priority regions not to scale. Within each region, piece is randomized
-       // according to connection.
-
-       // <-request first -- last->
-       // [ Now         ]
-       //  [ Next       ]
-       //   [ Readahead ]
-       //                [ Normal ]
-       key := func() int {
-               switch priority {
-               case PiecePriorityNow:
-                       return -3*len(cn.piecePriorities) + 3*pp
-               case PiecePriorityNext:
-                       return -2*len(cn.piecePriorities) + 2*pp
-               case PiecePriorityReadahead:
-                       return -len(cn.piecePriorities) + pp
-               case PiecePriorityNormal:
-                       return pp
-               default:
-                       panic(priority)
-               }
-       }()
-       cn.pieceRequestOrder.SetPiece(piece, key)
-}
-
 func (cn *connection) supportsExtension(ext string) bool {
        _, ok := cn.PeerExtensionIDs[ext]
        return ok
index 36832b9f1c93bb4b746ef540d419580c034a58fe..77dbb53b50551b43b7e0dcb175cfa72763ca7e86 100644 (file)
@@ -4,7 +4,6 @@ import (
        "testing"
        "time"
 
-       "github.com/bradfitz/iter"
        "github.com/stretchr/testify/assert"
 
        "github.com/anacrolix/torrent/internal/pieceordering"
@@ -63,38 +62,3 @@ func pieceOrderingAsSlice(po *pieceordering.Instance) (ret []int) {
 func testRequestOrder(expected []int, ro *pieceordering.Instance, t *testing.T) {
        assert.EqualValues(t, pieceOrderingAsSlice(ro), expected)
 }
-
-// Tests the request ordering based on a connections priorities.
-func TestPieceRequestOrder(t *testing.T) {
-       c := connection{
-               pieceRequestOrder: pieceordering.New(),
-               piecePriorities:   []int{1, 4, 0, 3, 2},
-       }
-       testRequestOrder(nil, c.pieceRequestOrder, t)
-       c.pendPiece(2, PiecePriorityNone, nil)
-       testRequestOrder(nil, c.pieceRequestOrder, t)
-       c.pendPiece(1, PiecePriorityNormal, nil)
-       c.pendPiece(2, PiecePriorityNormal, nil)
-       testRequestOrder([]int{2, 1}, c.pieceRequestOrder, t)
-       c.pendPiece(0, PiecePriorityNormal, nil)
-       testRequestOrder([]int{2, 0, 1}, c.pieceRequestOrder, t)
-       c.pendPiece(1, PiecePriorityReadahead, nil)
-       testRequestOrder([]int{1, 2, 0}, c.pieceRequestOrder, t)
-       c.pendPiece(4, PiecePriorityNow, nil)
-       // now(4), r(1), normal(0, 2)
-       testRequestOrder([]int{4, 1, 2, 0}, c.pieceRequestOrder, t)
-       c.pendPiece(2, PiecePriorityReadahead, nil)
-       // N(4), R(1, 2), N(0)
-       testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
-       c.pendPiece(1, PiecePriorityNow, nil)
-       // now(4, 1), readahead(2), normal(0)
-       // in the same order, the keys will be: -15+6, -15+12, -5, 1
-       // so we test that a very low priority (for this connection), "now"
-       // piece has been placed after a readahead piece.
-       testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
-       // Note this intentially sets to None a piece that's not in the order.
-       for i := range iter.N(5) {
-               c.pendPiece(i, PiecePriorityNone, nil)
-       }
-       testRequestOrder(nil, c.pieceRequestOrder, t)
-}
index c25bd2e41362cf6de9bd2c8a6e371a66d85e0b0d..518e76bbf486d4772e81a167303c2518cad450a1 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -30,7 +30,6 @@ type piece struct {
        Hashing          bool
        QueuedForHash    bool
        EverHashed       bool
-       Priority         piecePriority
        PublicPieceState PieceState
 
        pendingWritesMutex sync.Mutex
index ed2701b813df7aeff596e7cffa0fc8c84facb15b..3d04b35ca53512ade112a0cbe8e938467965c241 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -4,11 +4,14 @@ import (
        "errors"
        "io"
        "os"
+       "sync"
 )
 
 // Accesses torrent data via a client.
 type Reader struct {
-       t          *Torrent
+       t *Torrent
+
+       mu         sync.Mutex
        pos        int64
        responsive bool
        readahead  int64
@@ -25,18 +28,11 @@ func (r *Reader) SetResponsive() {
 // Configure the number of bytes ahead of a read that should also be
 // prioritized in preparation for further reads.
 func (r *Reader) SetReadahead(readahead int64) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
        r.readahead = readahead
 }
 
-func (r *Reader) raisePriorities(off int64, n int) {
-       if r.responsive {
-               r.t.cl.addUrgentRequests(r.t.torrent, off, n)
-       }
-       if !r.responsive || r.readahead != 0 {
-               r.t.cl.readRaisePiecePriorities(r.t.torrent, off, int64(n)+r.readahead)
-       }
-}
-
 func (r *Reader) readable(off int64) (ret bool) {
        // log.Println("readable", off)
        // defer func() {
@@ -81,22 +77,15 @@ func (r *Reader) waitReadable(off int64) {
        r.t.cl.event.Wait()
 }
 
-func (r *Reader) ReadAt(b []byte, off int64) (n int, err error) {
-       for {
-               var n1 int
-               n1, err = r.readAt(b, off)
-               n += n1
-               b = b[n1:]
-               off += int64(n1)
-               if err != nil || len(b) == 0 || n1 == 0 {
-                       return
-               }
-       }
-}
-
 func (r *Reader) Read(b []byte) (n int, err error) {
-       n, err = r.readAt(b, r.pos)
+       r.mu.Lock()
+       pos := r.pos
+       r.mu.Unlock()
+       n, err = r.readAt(b, pos)
+       r.mu.Lock()
        r.pos += int64(n)
+       r.mu.Unlock()
+       r.posChanged()
        return
 }
 
@@ -115,9 +104,7 @@ func (r *Reader) readAt(b []byte, pos int64) (n int, err error) {
        }
 again:
        r.t.cl.mu.Lock()
-       r.raisePriorities(pos, len(b))
        for !r.readable(pos) {
-               r.raisePriorities(pos, len(b))
                r.waitReadable(pos)
        }
        avail := r.available(pos, int64(len(b)))
@@ -154,11 +141,19 @@ again:
 }
 
 func (r *Reader) Close() error {
+       r.t.deleteReader(r)
        r.t = nil
        return nil
 }
 
+func (r *Reader) posChanged() {
+       r.t.cl.mu.Lock()
+       defer r.t.cl.mu.Unlock()
+       r.t.torrent.readersChanged(r.t.cl)
+}
+
 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
+       r.mu.Lock()
        switch whence {
        case os.SEEK_SET:
                r.pos = off
@@ -170,5 +165,7 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
                err = errors.New("bad whence")
        }
        ret = r.pos
+       r.mu.Unlock()
+       r.posChanged()
        return
 }
diff --git a/t.go b/t.go
index 23085d1b0bcdc9759bdc4ce64caca7466910c635..bf2b9e2245b797529951eabe340cf332a48b422d 100644 (file)
--- a/t.go
+++ b/t.go
@@ -41,6 +41,7 @@ func (t Torrent) NewReader() (ret *Reader) {
                t:         &t,
                readahead: 5 * 1024 * 1024,
        }
+       t.addReader(ret)
        return
 }
 
@@ -119,3 +120,20 @@ func (t Torrent) MetaInfo() *metainfo.MetaInfo {
        defer t.cl.mu.Unlock()
        return t.torrent.MetaInfo()
 }
+
+func (t Torrent) addReader(r *Reader) {
+       t.cl.mu.Lock()
+       defer t.cl.mu.Unlock()
+       if t.torrent.readers == nil {
+               t.torrent.readers = make(map[*Reader]struct{})
+       }
+       t.torrent.readers[r] = struct{}{}
+       t.torrent.readersChanged(t.cl)
+}
+
+func (t Torrent) deleteReader(r *Reader) {
+       t.cl.mu.Lock()
+       defer t.cl.mu.Unlock()
+       delete(t.torrent.readers, r)
+       t.torrent.readersChanged(t.cl)
+}
index c74476eb8d7612350f422f47db283586a3801fc4..2933c809aec794ac13639b25d401ae7a62e91e74 100644 (file)
@@ -6,7 +6,6 @@ import (
        "fmt"
        "io"
        "log"
-       "math/rand"
        "net"
        "sort"
        "sync"
@@ -62,9 +61,6 @@ type torrent struct {
        // Values are the piece indices that changed.
        pieceStateChanges *pubsub.PubSub
        chunkSize         pp.Integer
-       // Chunks that are wanted before all others. This is for
-       // responsive/streaming readers that want to unblock ASAP.
-       urgent map[request]struct{}
        // Total length of the torrent in bytes. Stored because it's not O(1) to
        // get this from the info dict.
        length int64
@@ -99,7 +95,7 @@ type torrent struct {
        // Closed when .Info is set.
        gotMetainfo chan struct{}
 
-       connPiecePriorites sync.Pool
+       readers map[*Reader]struct{}
 }
 
 var (
@@ -111,16 +107,6 @@ func (t *torrent) setDisplayName(dn string) {
        t.displayName = dn
 }
 
-func (t *torrent) newConnPiecePriorities() []int {
-       _ret := t.connPiecePriorites.Get()
-       if _ret != nil {
-               piecePrioritiesReused.Add(1)
-               return _ret.([]int)
-       }
-       piecePrioritiesNew.Add(1)
-       return rand.Perm(t.numPieces())
-}
-
 func (t *torrent) pieceComplete(piece int) bool {
        // TODO: This is called when setting metadata, and before storage is
        // assigned, which doesn't seem right.
@@ -261,7 +247,6 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
                missinggo.CopyExact(piece.Hash[:], hash)
        }
        for _, conn := range t.Conns {
-               t.initRequestOrdering(conn)
                if err := conn.setNumPieces(t.numPieces()); err != nil {
                        log.Printf("closing connection: %s", err)
                        conn.Close()
@@ -324,7 +309,7 @@ func (t *torrent) Name() string {
 
 func (t *torrent) pieceState(index int) (ret PieceState) {
        p := &t.Pieces[index]
-       ret.Priority = p.Priority
+       ret.Priority = t.piecePriority(index)
        if t.pieceComplete(index) {
                ret.Complete = true
        }
@@ -436,9 +421,10 @@ func (t *torrent) writeStatus(w io.Writer, cl *Client) {
                fmt.Fprintln(w)
        }
        fmt.Fprintf(w, "Urgent:")
-       for req := range t.urgent {
-               fmt.Fprintf(w, " %v", req)
-       }
+       t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               fmt.Fprintf(w, " %d:%d", begin, end)
+               return true
+       })
        fmt.Fprintln(w)
        fmt.Fprintf(w, "Trackers: ")
        for _, tier := range t.Trackers {
@@ -728,17 +714,8 @@ func (t *torrent) wantChunk(r request) bool {
        if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
                return true
        }
-       _, ok := t.urgent[r]
-       return ok
-}
-
-func (t *torrent) urgentChunkInPiece(piece int) bool {
-       p := pp.Integer(piece)
-       for req := range t.urgent {
-               if req.Index == p {
-                       return true
-               }
-       }
+       // TODO: What about pieces that were wanted, but aren't now, and aren't
+       // completed either? That used to be done here.
        return false
 }
 
@@ -754,18 +731,21 @@ func (t *torrent) wantPiece(index int) bool {
        if p.Hashing {
                return false
        }
-       if p.Priority == PiecePriorityNone {
-               if !t.urgentChunkInPiece(index) {
-                       return false
-               }
-       }
+
        // Put piece complete check last, since it's the slowest as it can involve
        // calling out into external data stores.
        return !t.pieceComplete(index)
 }
 
 func (t *torrent) connHasWantedPieces(c *connection) bool {
-       return c.pieceRequestOrder != nil && !c.pieceRequestOrder.Empty()
+       return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               for i := begin; i < end; i++ {
+                       if c.PeerHasPiece(i) {
+                               return false
+                       }
+               }
+               return true
+       })
 }
 
 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
@@ -818,3 +798,81 @@ func (t *torrent) pieceAllDirty(piece int) bool {
        }
        return true
 }
+
+func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
+       return t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               if begin < end {
+                       if !f(begin) {
+                               return false
+                       }
+               }
+               return true
+       })
+}
+
+func (t *torrent) readersChanged(cl *Client) {
+       for _, c := range t.Conns {
+               cl.replenishConnRequests(t, c)
+       }
+       cl.openNewConns(t)
+}
+
+func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
+       if off >= t.length {
+               return
+       }
+       if off < 0 {
+               size += off
+               off = 0
+       }
+       if size <= 0 {
+               return
+       }
+       begin = int(off / t.Info.PieceLength)
+       end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength)
+       if end > t.Info.NumPieces() {
+               end = t.Info.NumPieces()
+       }
+       return
+}
+
+func (t *torrent) forReaderWantedRegionPieces(f func(begin, end int) (more bool)) (all bool) {
+       for r := range t.readers {
+               r.mu.Lock()
+               pos, readahead := r.pos, r.readahead
+               r.mu.Unlock()
+               if readahead < 1 {
+                       readahead = 1
+               }
+               begin, end := t.byteRegionPieces(pos, readahead)
+               if begin >= end {
+                       continue
+               }
+               if !f(begin, end) {
+                       return false
+               }
+       }
+       return true
+}
+
+func (t *torrent) piecePriority(piece int) (ret piecePriority) {
+       ret = PiecePriorityNone
+       if t.pieceComplete(piece) {
+               return
+       }
+       raiseRet := func(prio piecePriority) {
+               if prio > ret {
+                       ret = prio
+               }
+       }
+       t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               if piece == begin {
+                       raiseRet(PiecePriorityNow)
+               }
+               if begin <= piece && piece < end {
+                       raiseRet(PiecePriorityReadahead)
+               }
+               return true
+       })
+       return
+}