I still need to handle "prefetch"-style downloading, and some functions haven't been committed to force this issue.
"expvar"
"fmt"
"io"
+ "io/ioutil"
"log"
"math/big"
mathRand "math/rand"
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/sync"
"github.com/anacrolix/utp"
- "github.com/bradfitz/iter"
"github.com/edsrzf/mmap-go"
"github.com/anacrolix/torrent/bencode"
filePkg "github.com/anacrolix/torrent/data/file"
"github.com/anacrolix/torrent/dht"
- "github.com/anacrolix/torrent/internal/pieceordering"
"github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
return
}
-func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
- index := int(off / int64(t.usualPieceSize()))
- cl.raisePiecePriority(t, index, PiecePriorityNow)
- index++
- if index >= t.numPieces() {
- return
- }
- cl.raisePiecePriority(t, index, PiecePriorityNext)
- for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
- index++
- if index >= t.numPieces() {
- break
- }
- cl.raisePiecePriority(t, index, PiecePriorityReadahead)
- }
-}
-
-func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
- for n > 0 {
- req, ok := t.offsetRequest(off)
- if !ok {
- break
- }
- if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
- if t.urgent == nil {
- t.urgent = make(map[request]struct{}, (n+int(t.chunkSize)-1)/int(t.chunkSize))
- }
- t.urgent[req] = struct{}{}
- cl.event.Broadcast() // Why?
- index := int(req.Index)
- cl.queueFirstHash(t, index)
- cl.pieceChanged(t, index)
- }
- reqOff := t.requestOffset(req)
- n1 := req.Length - pp.Integer(off-reqOff)
- off += int64(n1)
- n -= int(n1)
- }
- // log.Print(t.urgent)
-}
-
func (cl *Client) configDir() string {
if cl.config.ConfigDir == "" {
return filepath.Join(os.Getenv("HOME"), ".config/torrent")
return cl.configDir()
}
-func (t *torrent) connPendPiece(c *connection, piece int) {
- c.pendPiece(piece, t.Pieces[piece].Priority, t)
-}
-
-func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
- if t.Pieces[piece].Priority < priority {
- cl.prioritizePiece(t, piece, priority)
- }
-}
-
-func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) {
- if t.havePiece(piece) {
- priority = PiecePriorityNone
- }
- if priority != PiecePriorityNone {
- cl.queueFirstHash(t, piece)
- }
- p := &t.Pieces[piece]
- if p.Priority != priority {
- p.Priority = priority
- cl.pieceChanged(t, piece)
- }
-}
-
func loadPackedBlocklist(filename string) (ret iplist.Ranger, err error) {
f, err := os.Open(filename)
if os.IsNotExist(err) {
go c.writer()
go c.writeOptimizer(time.Minute)
cl.sendInitialMessages(c, t)
- if t.haveInfo() {
- t.initRequestOrdering(c)
- }
err = cl.connectionLoop(t, c)
if err != nil {
err = fmt.Errorf("error during connection loop: %s", err)
}
}
-// Randomizes the piece order for this connection. Every connection will be
-// given a different ordering. Having it stored per connection saves having to
-// randomize during request filling, and constantly recalculate the ordering
-// based on piece priorities.
-func (t *torrent) initRequestOrdering(c *connection) {
- if c.pieceRequestOrder != nil || c.piecePriorities != nil {
- panic("double init of request ordering")
- }
- c.pieceRequestOrder = pieceordering.New()
- for i := range iter.N(t.Info.NumPieces()) {
- if !c.PeerHasPiece(i) {
- continue
- }
- if !t.wantPiece(i) {
- continue
- }
- t.connPendPiece(c, i)
- }
-}
-
func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error {
if !c.peerHasAll {
if t.haveInfo() {
c.PeerPieces[piece] = true
}
if t.wantPiece(piece) {
- t.connPendPiece(c, piece)
me.replenishConnRequests(t, c)
}
return nil
func (me *Client) dropConnection(t *torrent, c *connection) {
me.event.Broadcast()
c.Close()
- if c.piecePriorities != nil {
- t.connPiecePriorites.Put(c.piecePriorities)
- // I wonder if it's safe to set it to nil. Probably not. Since it's
- // only read, it doesn't particularly matter if a closing connection
- // shares the slice with another connection.
- }
if me.deleteConnection(t, c) {
me.openNewConns(t)
}
if !t.haveInfo() {
return true
}
- if len(t.urgent) != 0 {
- return true
- }
- for i := range t.Pieces {
- p := &t.Pieces[i]
- if p.Priority != PiecePriorityNone {
- return true
+ return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ for i := begin; i < end; i++ {
+ if !t.pieceComplete(i) {
+ return false
+ }
}
- }
- return false
+ return true
+ })
}
func (cl *Client) usefulConn(t *torrent, c *connection) bool {
return
}
-// Marks the pieces in the given region for download.
-func (t Torrent) SetRegionPriority(off, len int64) {
- t.cl.mu.Lock()
- defer t.cl.mu.Unlock()
- pieceSize := int64(t.torrent.usualPieceSize())
- for i := off / pieceSize; i*pieceSize < off+len; i++ {
- t.cl.raisePiecePriority(t.torrent, int(i), PiecePriorityNormal)
- }
-}
-
func (t Torrent) AddPeers(pp []Peer) error {
cl := t.cl
cl.mu.Lock()
return true
}
+func (me *Client) connAddRequest(c *connection, req request) (more bool) {
+ if len(c.Requests) >= 64 {
+ return false
+ }
+ more = c.Request(req)
+ return
+}
+
+func (me *Client) connRequestPiecePendingChunks(c *connection, t *torrent, piece int) (more bool) {
+ for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) {
+ req := request{pp.Integer(piece), cs}
+ if !me.connAddRequest(c, req) {
+ return false
+ }
+ }
+ return true
+}
+
func (me *Client) fillRequests(t *torrent, c *connection) {
if c.Interested {
if c.PeerChoked {
return
}
}
- addRequest := func(req request) (again bool) {
- // TODO: Couldn't this check also be done *after* the request?
- if len(c.Requests) >= 64 {
- return false
- }
- return c.Request(req)
- }
- for req := range t.urgent {
- if !addRequest(req) {
- return
+ if !t.forUrgentPieces(func(piece int) (again bool) {
+ if !c.PeerHasPiece(piece) {
+ return true
}
+ return me.connRequestPiecePendingChunks(c, t, piece)
+ }) {
+ return
}
- for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
- pieceIndex := e.Piece()
- if !c.PeerHasPiece(pieceIndex) {
- panic("piece in request order but peer doesn't have it")
- }
- if !t.wantPiece(pieceIndex) {
- log.Printf("unwanted piece %d in connection request order\n%s", pieceIndex, c)
- c.pieceRequestOrder.DeletePiece(pieceIndex)
- continue
- }
- piece := &t.Pieces[pieceIndex]
- for _, cs := range piece.shuffledPendingChunkSpecs(t, pieceIndex) {
- r := request{pp.Integer(pieceIndex), cs}
- if !addRequest(r) {
- return
+ t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ for i := begin + 1; i < end; i++ {
+ if !c.PeerHasPiece(i) {
+ continue
+ }
+ if !me.connRequestPiecePendingChunks(c, t, i) {
+ return false
}
}
- }
- return
+ return true
+ })
}
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
piece.pendingWrites++
piece.pendingWritesMutex.Unlock()
go func() {
+ defer me.event.Broadcast()
defer func() {
piece.pendingWritesMutex.Lock()
piece.pendingWrites--
defer t.publishPieceChange(int(req.Index))
// Record that we have the chunk.
piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
- delete(t.urgent, req)
// It's important that the piece is potentially queued before we check if
// the piece is still wanted, because if it is queued, it won't be wanted.
if t.pieceAllDirty(index) {
me.queuePieceCheck(t, int(req.Index))
}
- if !t.wantPiece(int(req.Index)) {
- for _, c := range t.Conns {
- c.pieceRequestOrder.DeletePiece(int(req.Index))
- }
- }
// Cancel pending requests for this chunk.
for _, c := range t.Conns {
func (me *Client) pieceChanged(t *torrent, piece int) {
correct := t.pieceComplete(piece)
- p := &t.Pieces[piece]
defer t.publishPieceChange(piece)
defer me.event.Broadcast()
- if correct {
- p.Priority = PiecePriorityNone
- for req := range t.urgent {
- if int(req.Index) == piece {
- delete(t.urgent, req)
- }
- }
- } else {
+ if !correct {
if t.pieceAllDirty(piece) {
t.pendAllChunkSpecs(piece)
}
conn.Cancel(r)
}
}
- conn.pieceRequestOrder.DeletePiece(int(piece))
me.upload(t, conn)
} else if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
- t.connPendPiece(conn, int(piece))
me.replenishConnRequests(t, conn)
}
}
reader.SetReadahead(0)
reader.SetResponsive()
b := make([]byte, 2)
- _, err = reader.ReadAt(b, 3)
+ _, err = reader.Seek(3, os.SEEK_SET)
+ require.NoError(t, err)
+ _, err = io.ReadFull(reader, b)
assert.Nil(t, err)
assert.EqualValues(t, "lo", string(b))
- n, err := reader.ReadAt(b, 11)
+ _, err = reader.Seek(11, os.SEEK_SET)
+ require.NoError(t, err)
+ n, err := io.ReadFull(reader, b)
assert.Nil(t, err)
assert.EqualValues(t, 2, n)
assert.EqualValues(t, "d\n", string(b))
reader.SetReadahead(0)
reader.SetResponsive()
b := make([]byte, 2)
- _, err = reader.ReadAt(b, 3)
+ _, err = reader.Seek(3, os.SEEK_SET)
+ require.NoError(t, err)
+ _, err = io.ReadFull(reader, b)
assert.Nil(t, err)
assert.EqualValues(t, "lo", string(b))
go leecherTorrent.Drop()
- n, err := reader.ReadAt(b, 11)
+ _, err = reader.Seek(11, os.SEEK_SET)
+ require.NoError(t, err)
+ n, err := reader.Read(b)
assert.EqualError(t, err, "torrent closed")
assert.EqualValues(t, 0, n)
}
"time"
"github.com/anacrolix/torrent/bencode"
- "github.com/anacrolix/torrent/internal/pieceordering"
pp "github.com/anacrolix/torrent/peer_protocol"
)
post chan pp.Message
writeCh chan []byte
- // The connection's preferred order to download pieces. The index is the
- // piece, the value is its priority.
- piecePriorities []int
- // The piece request order based on piece priorities.
- pieceRequestOrder *pieceordering.Instance
-
UnwantedChunksReceived int
UsefulChunksReceived int
chunksSent int
return cn.conn.LocalAddr()
}
-// Adjust piece position in the request order for this connection based on the
-// given piece priority.
-func (cn *connection) pendPiece(piece int, priority piecePriority, t *torrent) {
- if priority == PiecePriorityNone {
- cn.pieceRequestOrder.DeletePiece(piece)
- return
- }
- if cn.piecePriorities == nil {
- cn.piecePriorities = t.newConnPiecePriorities()
- }
- pp := cn.piecePriorities[piece]
- // Priority regions not to scale. Within each region, piece is randomized
- // according to connection.
-
- // <-request first -- last->
- // [ Now ]
- // [ Next ]
- // [ Readahead ]
- // [ Normal ]
- key := func() int {
- switch priority {
- case PiecePriorityNow:
- return -3*len(cn.piecePriorities) + 3*pp
- case PiecePriorityNext:
- return -2*len(cn.piecePriorities) + 2*pp
- case PiecePriorityReadahead:
- return -len(cn.piecePriorities) + pp
- case PiecePriorityNormal:
- return pp
- default:
- panic(priority)
- }
- }()
- cn.pieceRequestOrder.SetPiece(piece, key)
-}
-
func (cn *connection) supportsExtension(ext string) bool {
_, ok := cn.PeerExtensionIDs[ext]
return ok
"testing"
"time"
- "github.com/bradfitz/iter"
"github.com/stretchr/testify/assert"
"github.com/anacrolix/torrent/internal/pieceordering"
func testRequestOrder(expected []int, ro *pieceordering.Instance, t *testing.T) {
assert.EqualValues(t, pieceOrderingAsSlice(ro), expected)
}
-
-// Tests the request ordering based on a connections priorities.
-func TestPieceRequestOrder(t *testing.T) {
- c := connection{
- pieceRequestOrder: pieceordering.New(),
- piecePriorities: []int{1, 4, 0, 3, 2},
- }
- testRequestOrder(nil, c.pieceRequestOrder, t)
- c.pendPiece(2, PiecePriorityNone, nil)
- testRequestOrder(nil, c.pieceRequestOrder, t)
- c.pendPiece(1, PiecePriorityNormal, nil)
- c.pendPiece(2, PiecePriorityNormal, nil)
- testRequestOrder([]int{2, 1}, c.pieceRequestOrder, t)
- c.pendPiece(0, PiecePriorityNormal, nil)
- testRequestOrder([]int{2, 0, 1}, c.pieceRequestOrder, t)
- c.pendPiece(1, PiecePriorityReadahead, nil)
- testRequestOrder([]int{1, 2, 0}, c.pieceRequestOrder, t)
- c.pendPiece(4, PiecePriorityNow, nil)
- // now(4), r(1), normal(0, 2)
- testRequestOrder([]int{4, 1, 2, 0}, c.pieceRequestOrder, t)
- c.pendPiece(2, PiecePriorityReadahead, nil)
- // N(4), R(1, 2), N(0)
- testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
- c.pendPiece(1, PiecePriorityNow, nil)
- // now(4, 1), readahead(2), normal(0)
- // in the same order, the keys will be: -15+6, -15+12, -5, 1
- // so we test that a very low priority (for this connection), "now"
- // piece has been placed after a readahead piece.
- testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
- // Note this intentially sets to None a piece that's not in the order.
- for i := range iter.N(5) {
- c.pendPiece(i, PiecePriorityNone, nil)
- }
- testRequestOrder(nil, c.pieceRequestOrder, t)
-}
Hashing bool
QueuedForHash bool
EverHashed bool
- Priority piecePriority
PublicPieceState PieceState
pendingWritesMutex sync.Mutex
"errors"
"io"
"os"
+ "sync"
)
// Accesses torrent data via a client.
type Reader struct {
- t *Torrent
+ t *Torrent
+
+ mu sync.Mutex
pos int64
responsive bool
readahead int64
// Configure the number of bytes ahead of a read that should also be
// prioritized in preparation for further reads.
func (r *Reader) SetReadahead(readahead int64) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
r.readahead = readahead
}
-func (r *Reader) raisePriorities(off int64, n int) {
- if r.responsive {
- r.t.cl.addUrgentRequests(r.t.torrent, off, n)
- }
- if !r.responsive || r.readahead != 0 {
- r.t.cl.readRaisePiecePriorities(r.t.torrent, off, int64(n)+r.readahead)
- }
-}
-
func (r *Reader) readable(off int64) (ret bool) {
// log.Println("readable", off)
// defer func() {
r.t.cl.event.Wait()
}
-func (r *Reader) ReadAt(b []byte, off int64) (n int, err error) {
- for {
- var n1 int
- n1, err = r.readAt(b, off)
- n += n1
- b = b[n1:]
- off += int64(n1)
- if err != nil || len(b) == 0 || n1 == 0 {
- return
- }
- }
-}
-
func (r *Reader) Read(b []byte) (n int, err error) {
- n, err = r.readAt(b, r.pos)
+ r.mu.Lock()
+ pos := r.pos
+ r.mu.Unlock()
+ n, err = r.readAt(b, pos)
+ r.mu.Lock()
r.pos += int64(n)
+ r.mu.Unlock()
+ r.posChanged()
return
}
}
again:
r.t.cl.mu.Lock()
- r.raisePriorities(pos, len(b))
for !r.readable(pos) {
- r.raisePriorities(pos, len(b))
r.waitReadable(pos)
}
avail := r.available(pos, int64(len(b)))
}
func (r *Reader) Close() error {
+ r.t.deleteReader(r)
r.t = nil
return nil
}
+func (r *Reader) posChanged() {
+ r.t.cl.mu.Lock()
+ defer r.t.cl.mu.Unlock()
+ r.t.torrent.readersChanged(r.t.cl)
+}
+
func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
+ r.mu.Lock()
switch whence {
case os.SEEK_SET:
r.pos = off
err = errors.New("bad whence")
}
ret = r.pos
+ r.mu.Unlock()
+ r.posChanged()
return
}
t: &t,
readahead: 5 * 1024 * 1024,
}
+ t.addReader(ret)
return
}
defer t.cl.mu.Unlock()
return t.torrent.MetaInfo()
}
+
+func (t Torrent) addReader(r *Reader) {
+ t.cl.mu.Lock()
+ defer t.cl.mu.Unlock()
+ if t.torrent.readers == nil {
+ t.torrent.readers = make(map[*Reader]struct{})
+ }
+ t.torrent.readers[r] = struct{}{}
+ t.torrent.readersChanged(t.cl)
+}
+
+func (t Torrent) deleteReader(r *Reader) {
+ t.cl.mu.Lock()
+ defer t.cl.mu.Unlock()
+ delete(t.torrent.readers, r)
+ t.torrent.readersChanged(t.cl)
+}
"fmt"
"io"
"log"
- "math/rand"
"net"
"sort"
"sync"
// Values are the piece indices that changed.
pieceStateChanges *pubsub.PubSub
chunkSize pp.Integer
- // Chunks that are wanted before all others. This is for
- // responsive/streaming readers that want to unblock ASAP.
- urgent map[request]struct{}
// Total length of the torrent in bytes. Stored because it's not O(1) to
// get this from the info dict.
length int64
// Closed when .Info is set.
gotMetainfo chan struct{}
- connPiecePriorites sync.Pool
+ readers map[*Reader]struct{}
}
var (
t.displayName = dn
}
-func (t *torrent) newConnPiecePriorities() []int {
- _ret := t.connPiecePriorites.Get()
- if _ret != nil {
- piecePrioritiesReused.Add(1)
- return _ret.([]int)
- }
- piecePrioritiesNew.Add(1)
- return rand.Perm(t.numPieces())
-}
-
func (t *torrent) pieceComplete(piece int) bool {
// TODO: This is called when setting metadata, and before storage is
// assigned, which doesn't seem right.
missinggo.CopyExact(piece.Hash[:], hash)
}
for _, conn := range t.Conns {
- t.initRequestOrdering(conn)
if err := conn.setNumPieces(t.numPieces()); err != nil {
log.Printf("closing connection: %s", err)
conn.Close()
func (t *torrent) pieceState(index int) (ret PieceState) {
p := &t.Pieces[index]
- ret.Priority = p.Priority
+ ret.Priority = t.piecePriority(index)
if t.pieceComplete(index) {
ret.Complete = true
}
fmt.Fprintln(w)
}
fmt.Fprintf(w, "Urgent:")
- for req := range t.urgent {
- fmt.Fprintf(w, " %v", req)
- }
+ t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ fmt.Fprintf(w, " %d:%d", begin, end)
+ return true
+ })
fmt.Fprintln(w)
fmt.Fprintf(w, "Trackers: ")
for _, tier := range t.Trackers {
if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
return true
}
- _, ok := t.urgent[r]
- return ok
-}
-
-func (t *torrent) urgentChunkInPiece(piece int) bool {
- p := pp.Integer(piece)
- for req := range t.urgent {
- if req.Index == p {
- return true
- }
- }
+ // TODO: What about pieces that were wanted, but aren't now, and aren't
+ // completed either? That used to be done here.
return false
}
if p.Hashing {
return false
}
- if p.Priority == PiecePriorityNone {
- if !t.urgentChunkInPiece(index) {
- return false
- }
- }
+
// Put piece complete check last, since it's the slowest as it can involve
// calling out into external data stores.
return !t.pieceComplete(index)
}
func (t *torrent) connHasWantedPieces(c *connection) bool {
- return c.pieceRequestOrder != nil && !c.pieceRequestOrder.Empty()
+ return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ for i := begin; i < end; i++ {
+ if c.PeerHasPiece(i) {
+ return false
+ }
+ }
+ return true
+ })
}
func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
}
return true
}
+
+func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
+ return t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ if begin < end {
+ if !f(begin) {
+ return false
+ }
+ }
+ return true
+ })
+}
+
+func (t *torrent) readersChanged(cl *Client) {
+ for _, c := range t.Conns {
+ cl.replenishConnRequests(t, c)
+ }
+ cl.openNewConns(t)
+}
+
+func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
+ if off >= t.length {
+ return
+ }
+ if off < 0 {
+ size += off
+ off = 0
+ }
+ if size <= 0 {
+ return
+ }
+ begin = int(off / t.Info.PieceLength)
+ end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength)
+ if end > t.Info.NumPieces() {
+ end = t.Info.NumPieces()
+ }
+ return
+}
+
+func (t *torrent) forReaderWantedRegionPieces(f func(begin, end int) (more bool)) (all bool) {
+ for r := range t.readers {
+ r.mu.Lock()
+ pos, readahead := r.pos, r.readahead
+ r.mu.Unlock()
+ if readahead < 1 {
+ readahead = 1
+ }
+ begin, end := t.byteRegionPieces(pos, readahead)
+ if begin >= end {
+ continue
+ }
+ if !f(begin, end) {
+ return false
+ }
+ }
+ return true
+}
+
+func (t *torrent) piecePriority(piece int) (ret piecePriority) {
+ ret = PiecePriorityNone
+ if t.pieceComplete(piece) {
+ return
+ }
+ raiseRet := func(prio piecePriority) {
+ if prio > ret {
+ ret = prio
+ }
+ }
+ t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ if piece == begin {
+ raiseRet(PiecePriorityNow)
+ }
+ if begin <= piece && piece < end {
+ raiseRet(PiecePriorityReadahead)
+ }
+ return true
+ })
+ return
+}