func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
index := int(off / int64(t.usualPieceSize()))
- cl.raisePiecePriority(t, index, piecePriorityNow)
+ cl.raisePiecePriority(t, index, PiecePriorityNow)
index++
if index >= t.numPieces() {
return
}
- cl.raisePiecePriority(t, index, piecePriorityNext)
+ cl.raisePiecePriority(t, index, PiecePriorityNext)
for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
index++
if index >= t.numPieces() {
break
}
- cl.raisePiecePriority(t, index, piecePriorityReadahead)
+ cl.raisePiecePriority(t, index, PiecePriorityReadahead)
}
}
if t.havePiece(piece) {
return
}
- cl.queueFirstHash(t, piece)
- t.Pieces[piece].Priority = priority
- cl.pieceChanged(t, piece)
+ if priority != PiecePriorityNone {
+ cl.queueFirstHash(t, piece)
+ }
+ p := t.Pieces[piece]
+ if p.Priority != priority {
+ p.Priority = priority
+ cl.pieceChanged(t, piece)
+ }
}
func (cl *Client) setEnvBlocklist() (err error) {
if !cfg.NoDHT {
dhtCfg := cfg.DHTConfig
if dhtCfg == nil {
- dhtCfg = &dht.ServerConfig{}
+ dhtCfg = &dht.ServerConfig{
+ IPBlocklist: cl.ipBlockList,
+ }
}
if dhtCfg.Addr == "" {
dhtCfg.Addr = listenAddr()
if err != nil {
return
}
- if cl.ipBlockList != nil {
- cl.dHT.SetIPBlockList(cl.ipBlockList)
- }
}
return
case pp.Have:
me.peerGotPiece(t, c, int(msg.Index))
case pp.Request:
- if me.config.NoUpload {
+ if c.Choked {
break
}
request := newRequest(msg.Index, msg.Begin, msg.Length)
// }
p := make([]byte, msg.Length)
n, err := dataReadAt(t.data, p, int64(t.pieceLength(0))*int64(msg.Index)+int64(msg.Begin))
+ // TODO: Failing to read for a request should not be fatal to the connection.
if err != nil {
return fmt.Errorf("reading t data to serve request %q: %s", request, err)
}
if me.ipBlockRange(p.IP) != nil {
continue
}
+ if p.Port == 0 {
+ log.Printf("got bad peer: %v", p)
+ continue
+ }
t.addPeer(p)
}
me.openNewConns(t)
return
}
+// Marks the pieces in the given region for download.
func (t Torrent) SetRegionPriority(off, len int64) {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
pieceSize := int64(t.usualPieceSize())
for i := off / pieceSize; i*pieceSize < off+len; i++ {
- t.cl.prioritizePiece(t.torrent, int(i), piecePriorityNormal)
+ t.cl.raisePiecePriority(t.torrent, int(i), PiecePriorityNormal)
}
}
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
for i := range iter.N(t.numPieces()) {
- t.cl.raisePiecePriority(t.torrent, i, piecePriorityNormal)
+ t.cl.raisePiecePriority(t.torrent, i, PiecePriorityNormal)
}
// Nice to have the first and last pieces sooner for various interactive
// purposes.
- t.cl.raisePiecePriority(t.torrent, 0, piecePriorityReadahead)
- t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead)
+ t.cl.raisePiecePriority(t.torrent, 0, PiecePriorityReadahead)
+ t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, PiecePriorityReadahead)
}
// Returns nil metainfo if it isn't in the cache. Checks that the retrieved
// Write the chunk out.
err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
if err != nil {
- return fmt.Errorf("error writing chunk: %s", err)
+ log.Printf("error writing chunk: %s", err)
+ return nil
}
// log.Println("got chunk", req)
}
p.EverHashed = true
if correct {
- if sd, ok := t.data.(StatefulData); ok {
- err := sd.PieceCompleted(int(piece))
- if err != nil {
- log.Printf("error completing piece: %s", err)
- correct = false
- }
+ err := t.data.PieceCompleted(int(piece))
+ if err != nil {
+ log.Printf("error completing piece: %s", err)
+ correct = false
}
}
me.pieceChanged(t, int(piece))
correct := t.pieceComplete(piece)
p := t.Pieces[piece]
if correct {
- p.Priority = piecePriorityNone
+ p.Priority = PiecePriorityNone
p.PendingChunkSpecs = nil
for req := range t.urgent {
if int(req.Index) == piece {
import (
"encoding/binary"
"fmt"
+ "io"
"io/ioutil"
"log"
"net"
"os"
+ "path/filepath"
"testing"
"time"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/data/blob"
+ "github.com/anacrolix/torrent/data"
"github.com/anacrolix/torrent/internal/testutil"
+ "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/util"
)
DisableTrackers: true,
NoDefaultBlocklist: true,
DisableMetainfoCache: true,
+ DataDir: filepath.Join(os.TempDir(), "anacrolix"),
}
func TestClientDefault(t *testing.T) {
c.Assert(T.Trackers[0][0].URL(), check.Equals, "http://a")
c.Assert(T.Trackers[1][0].URL(), check.Equals, "udp://b")
}
+
+type badData struct {
+}
+
+func (me badData) WriteAt(b []byte, off int64) (int, error) {
+ return 0, nil
+}
+
+func (me badData) WriteSectionTo(w io.Writer, off, n int64) (int64, error) {
+ return 0, nil
+}
+
+func (me badData) PieceComplete(piece int) bool {
+ return true
+}
+
+func (me badData) PieceCompleted(piece int) error {
+ return nil
+}
+
+func (me badData) ReadAt(b []byte, off int64) (n int, err error) {
+ if off >= 5 {
+ err = io.EOF
+ return
+ }
+ n = copy(b, []byte("hello")[off:])
+ return
+}
+
+var _ StatefulData = badData{}
+
+// We read from a piece which is marked completed, but is missing data.
+func TestCompletedPieceWrongSize(t *testing.T) {
+ cfg := TestingConfig
+ cfg.TorrentDataOpener = func(*metainfo.Info) data.Data {
+ return badData{}
+ }
+ cl, _ := NewClient(&cfg)
+ defer cl.Close()
+ tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
+ Info: &metainfo.InfoEx{
+ Info: metainfo.Info{
+ PieceLength: 15,
+ Pieces: make([]byte, 20),
+ Files: []metainfo.FileInfo{
+ metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
+ },
+ },
+ },
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !new {
+ t.Fatal("expected new")
+ }
+ r := tt.NewReader()
+ defer r.Close()
+ b := make([]byte, 20)
+ n, err := io.ReadFull(r, b)
+ if n != 5 || err != io.ErrUnexpectedEOF {
+ t.Fatal(n, err)
+ }
+ defer tt.Drop()
+}
post chan pp.Message
writeCh chan []byte
- // The connections preferred order to download pieces.
+ // The connection's preferred order to download pieces. The index is the
+ // piece, the value is its priority.
piecePriorities []int
// The piece request order based on piece priorities.
pieceRequestOrder *pieceordering.Instance
// Adjust piece position in the request order for this connection based on the
// given piece priority.
func (cn *connection) pendPiece(piece int, priority piecePriority) {
- if priority == piecePriorityNone {
+ if priority == PiecePriorityNone {
cn.pieceRequestOrder.DeletePiece(piece)
return
}
// Priority regions not to scale. Within each region, piece is randomized
// according to connection.
+ // <-request first -- last->
// [ Now ]
// [ Next ]
// [ Readahead ]
// [ Normal ]
key := func() int {
switch priority {
- case piecePriorityNow:
+ case PiecePriorityNow:
return -3*len(cn.piecePriorities) + 3*pp
- case piecePriorityNext:
+ case PiecePriorityNext:
return -2*len(cn.piecePriorities) + 2*pp
- case piecePriorityReadahead:
+ case PiecePriorityReadahead:
return -len(cn.piecePriorities) + pp
- case piecePriorityNormal:
+ case PiecePriorityNormal:
return pp
default:
panic(priority)
c.Post(pp.Message{
Type: pp.Choke,
})
+ c.PeerRequests = nil
c.Choked = true
}
piecePriorities: []int{1, 4, 0, 3, 2},
}
testRequestOrder(nil, c.pieceRequestOrder, t)
- c.pendPiece(2, piecePriorityNone)
+ c.pendPiece(2, PiecePriorityNone)
testRequestOrder(nil, c.pieceRequestOrder, t)
- c.pendPiece(1, piecePriorityNormal)
- c.pendPiece(2, piecePriorityNormal)
+ c.pendPiece(1, PiecePriorityNormal)
+ c.pendPiece(2, PiecePriorityNormal)
testRequestOrder([]int{2, 1}, c.pieceRequestOrder, t)
- c.pendPiece(0, piecePriorityNormal)
+ c.pendPiece(0, PiecePriorityNormal)
testRequestOrder([]int{2, 0, 1}, c.pieceRequestOrder, t)
- c.pendPiece(1, piecePriorityReadahead)
+ c.pendPiece(1, PiecePriorityReadahead)
testRequestOrder([]int{1, 2, 0}, c.pieceRequestOrder, t)
- c.pendPiece(4, piecePriorityNow)
+ c.pendPiece(4, PiecePriorityNow)
// now(4), r(1), normal(0, 2)
testRequestOrder([]int{4, 1, 2, 0}, c.pieceRequestOrder, t)
- c.pendPiece(2, piecePriorityReadahead)
+ c.pendPiece(2, PiecePriorityReadahead)
// N(4), R(1, 2), N(0)
testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
- c.pendPiece(1, piecePriorityNow)
+ c.pendPiece(1, PiecePriorityNow)
// now(4, 1), readahead(2), normal(0)
// in the same order, the keys will be: -15+6, -15+12, -5, 1
// so we test that a very low priority (for this connection), "now"
testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
// Note this intentially sets to None a piece that's not in the order.
for i := range iter.N(5) {
- c.pendPiece(i, piecePriorityNone)
+ c.pendPiece(i, PiecePriorityNone)
}
testRequestOrder(nil, c.pieceRequestOrder, t)
}
+++ /dev/null
-package blob
-
-import (
- "os"
- "syscall"
- "time"
-)
-
-func accessTime(fi os.FileInfo) time.Time {
- ts := fi.Sys().(*syscall.Stat_t).Atimespec
- return time.Unix(ts.Sec, ts.Nano())
-}
+++ /dev/null
-package blob
-
-import (
- "os"
- "syscall"
- "time"
-)
-
-func accessTime(fi os.FileInfo) time.Time {
- ts := fi.Sys().(*syscall.Stat_t).Atim
- return time.Unix(ts.Sec, ts.Nano())
-}
"sync"
"time"
+ "github.com/anacrolix/missinggo"
+
dataPkg "github.com/anacrolix/torrent/data"
"github.com/anacrolix/torrent/metainfo"
)
func lastTime(fi os.FileInfo) (ret time.Time) {
ret = fi.ModTime()
- atime := accessTime(fi)
+ atime := missinggo.FileInfoAccessTime(fi)
if atime.After(ret) {
ret = atime
}
// Disable the DHT security extension:
// http://www.libtorrent.org/dht_sec.html.
NoSecurity bool
+ // Initial IP blocklist to use. Applied before serving and bootstrapping
+ // begins.
+ IPBlocklist *iplist.IPList
}
type ServerStats struct {
c = &ServerConfig{}
}
s = &Server{
- config: *c,
+ config: *c,
+ ipBlockList: c.IPBlocklist,
}
if c.Conn != nil {
s.socket = c.Conn
logonce.Stderr.Printf("received dht packet exceeds buffer size")
continue
}
- if s.ipBlocked(util.AddrIP(addr)) {
+ s.mu.Lock()
+ blocked := s.ipBlocked(util.AddrIP(addr))
+ s.mu.Unlock()
+ if blocked {
continue
}
s.processPacket(b[:n], newDHTAddr(addr))
}
type FilePieceState struct {
- Length int64
- State byte
+ Bytes int64 // Bytes within the piece that are part of this File.
+ PieceState
}
-func (f *File) Progress() (ret []FilePieceState) {
+// Returns the state of pieces in this file.
+func (f *File) State() (ret []FilePieceState) {
+ f.t.cl.mu.Lock()
+ defer f.t.cl.mu.Unlock()
pieceSize := int64(f.t.usualPieceSize())
off := f.offset % pieceSize
remaining := f.length
if len1 > remaining {
len1 = remaining
}
- ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
+ ret = append(ret, FilePieceState{len1, f.t.pieceState(i)})
off = 0
remaining -= len1
}
TorrentOffset int64
}
-func (fn fileNode) Attr(attr *fuse.Attr) {
+func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Size = fn.size
attr.Mode = defaultMode
- return
+ return nil
}
func (n *node) fsPath() string {
return
}
-func (dn dirNode) Attr(attr *fuse.Attr) {
+func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Mode = os.ModeDir | defaultMode
- return
+ return nil
}
func (me rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
return
}
-func (rootNode) Attr(attr *fuse.Attr) {
+func (rootNode) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Mode = os.ModeDir
+ return nil
}
// TODO(anacrolix): Why should rootNode implement this?
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
- "golang.org/x/net/context"
+ netContext "golang.org/x/net/context"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/data"
fs := New(leecher)
defer fs.Destroy()
root, _ := fs.Root()
- node, _ := root.(fusefs.NodeStringLookuper).Lookup(context.Background(), "greeting")
+ node, _ := root.(fusefs.NodeStringLookuper).Lookup(netContext.Background(), "greeting")
var attr fuse.Attr
- node.Attr(&attr)
+ node.Attr(netContext.Background(), &attr)
size := attr.Size
resp := &fuse.ReadResponse{
Data: make([]byte, size),
}
- node.(fusefs.HandleReader).Read(context.Background(), &fuse.ReadRequest{
+ node.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{
Size: int(size),
}, resp)
content := resp.Data
// Maintains piece integers by their ascending assigned keys.
type Instance struct {
- sl *skiplist.SkipList
+ // Contains the ascending priority keys. The keys contain a slice of piece
+ // indices.
+ sl *skiplist.SkipList
+ // Maps from piece index back to its key, so that it can be remove
+ // efficiently from the skip list.
pieceKeys map[int]int
}
me.shuffleItem(key)
}
+// Shuffle the piece indices that share a given key.
func (me *Instance) shuffleItem(key int) {
_item, ok := me.sl.Get(key)
if !ok {
"fmt"
"time"
+ "github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
)
}
return
}
+
+func validateInfo(info *metainfo.Info) error {
+ if len(info.Pieces)%20 != 0 {
+ return errors.New("pieces has invalid length")
+ }
+ if int((info.TotalLength()+info.PieceLength-1)/info.PieceLength) != info.NumPieces() {
+ return errors.New("piece count and file lengths are at odds")
+ }
+ return nil
+}
pp "github.com/anacrolix/torrent/peer_protocol"
)
+// Piece priority describes the importance of obtaining a particular piece.
+
type piecePriority byte
const (
- piecePriorityNone piecePriority = iota
- piecePriorityNormal
- piecePriorityReadahead
- piecePriorityNext
- piecePriorityNow
+ PiecePriorityNone piecePriority = iota // Not wanted.
+ PiecePriorityNormal // Wanted.
+ PiecePriorityReadahead // May be required soon.
+ PiecePriorityNext // Succeeds a piece where a read occurred.
+ PiecePriorityNow // A read occurred in this piece.
)
type piece struct {
- Hash pieceSum
- // Chunks we don't have. The offset and length can be determined by our
+ Hash pieceSum // The completed piece SHA1 hash, from the metainfo "pieces" field.
+ // Chunks we don't have. The offset and length can be determined by the
// request chunkSize in use.
PendingChunkSpecs []bool
Hashing bool
--- /dev/null
+package torrent
+
+// The current state of a piece.
+type PieceState struct {
+ Priority piecePriority
+ // The piece is available in its entirety.
+ Complete bool
+ // The piece is being hashed, or is queued for hash.
+ Checking bool
+ // Some of the piece has been obtained.
+ Partial bool
+}
+
+// Represents a series of consecutive pieces with the same state.
+type PieceStateRun struct {
+ PieceState
+ Length int // How many consecutive pieces have this state.
+}
}
return
}
+
+// Returns the state of pieces of the torrent. They are grouped into runs of
+// same state. The sum of the state run lengths is the number of pieces
+// in the torrent.
+func (t *Torrent) PieceStateRuns() []PieceStateRun {
+ t.stateMu.Lock()
+ defer t.stateMu.Unlock()
+ return t.torrent.pieceStateRuns()
+}
"sync"
"time"
+ "github.com/anacrolix/missinggo"
"github.com/bradfitz/iter"
"github.com/anacrolix/torrent/bencode"
// Called when metadata for a torrent becomes available.
func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
+ err = validateInfo(md)
+ if err != nil {
+ err = fmt.Errorf("bad info: %s", err)
+ return
+ }
t.Info = md
t.length = 0
for _, f := range t.Info.UpvertedFiles() {
return ""
}
-func (t *torrent) pieceStatusChar(index int) byte {
+func (t *torrent) pieceState(index int) (ret PieceState) {
p := t.Pieces[index]
- switch {
- case t.pieceComplete(index):
- return 'C'
- case p.QueuedForHash:
- return 'Q'
- case p.Hashing:
- return 'H'
- case !p.EverHashed:
- return '?'
- case t.piecePartiallyDownloaded(index):
- switch p.Priority {
- case piecePriorityNone:
- return 'F' // Forgotten
- default:
- return 'P'
- }
- default:
- switch p.Priority {
- case piecePriorityNone:
- return 'z'
- case piecePriorityNow:
- return '!'
- case piecePriorityReadahead:
- return 'R'
- case piecePriorityNext:
- return 'N'
- default:
- return '.'
- }
+ ret.Priority = p.Priority
+ if t.pieceComplete(index) {
+ ret.Complete = true
+ }
+ if p.QueuedForHash || p.Hashing {
+ ret.Checking = true
+ }
+ if t.piecePartiallyDownloaded(index) {
+ ret.Partial = true
}
+ return
}
func (t *torrent) metadataPieceSize(piece int) int {
}
}
-type PieceStatusCharSequence struct {
- Char byte // The state of this sequence of pieces.
- Count int // How many consecutive pieces have this state.
-}
-
-// Returns the state of pieces of the torrent. They are grouped into runs of
-// same state. The sum of the Counts of the sequences is the number of pieces
-// in the torrent. See the function torrent.pieceStatusChar for the possible
-// states.
-func (t *torrent) PieceStatusCharSequences() []PieceStatusCharSequence {
- t.stateMu.Lock()
- defer t.stateMu.Unlock()
- return t.pieceStatusCharSequences()
+func (t *torrent) pieceStateRuns() (ret []PieceStateRun) {
+ rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
+ ret = append(ret, PieceStateRun{
+ PieceState: el.(PieceState),
+ Length: int(count),
+ })
+ })
+ for index := range t.Pieces {
+ rle.Append(t.pieceState(index), 1)
+ }
+ rle.Flush()
+ return
}
-// Returns the length of sequences of identical piece status chars.
-func (t *torrent) pieceStatusCharSequences() (ret []PieceStatusCharSequence) {
- var (
- char byte
- count int
- )
- writeSequence := func() {
- ret = append(ret, PieceStatusCharSequence{char, count})
- }
- if len(t.Pieces) != 0 {
- char = t.pieceStatusChar(0)
- }
- for index := range t.Pieces {
- char1 := t.pieceStatusChar(index)
- if char1 == char {
- count++
- } else {
- writeSequence()
- char = char1
- count = 1
+// Produces a small string representing a PieceStateRun.
+func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
+ ret = fmt.Sprintf("%d", psr.Length)
+ ret += func() string {
+ switch psr.Priority {
+ case PiecePriorityNext:
+ return "N"
+ case PiecePriorityNormal:
+ return "."
+ case PiecePriorityReadahead:
+ return "R"
+ case PiecePriorityNow:
+ return "!"
+ default:
+ return ""
}
+ }()
+ if psr.Checking {
+ ret += "H"
+ }
+ if psr.Partial {
+ ret += "P"
}
- if count != 0 {
- writeSequence()
+ if psr.Complete {
+ ret += "C"
}
return
}
}
}())
if t.haveInfo() {
- fmt.Fprint(w, "Pieces: ")
- for _, seq := range t.pieceStatusCharSequences() {
- fmt.Fprintf(w, "%d%c ", seq.Count, seq.Char)
+ fmt.Fprint(w, "Pieces:")
+ for _, psr := range t.pieceStateRuns() {
+ w.Write([]byte(" "))
+ w.Write([]byte(pieceStateRunStatusChars(psr)))
}
fmt.Fprintln(w)
}
}
func (t *torrent) piecePartiallyDownloaded(index int) bool {
- return t.pieceNumPendingBytes(index) != t.pieceLength(index)
+ pendingBytes := t.pieceNumPendingBytes(index)
+ return pendingBytes != 0 && pendingBytes != t.pieceLength(index)
}
func numChunksForPiece(chunkSize int, pieceSize int) int {
}
func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
- _, err = t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
+ n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
+ if err == nil && n != len(data) {
+ err = io.ErrShortWrite
+ }
return
}
if p.Hashing {
return false
}
- if p.Priority == piecePriorityNone {
+ if p.Priority == PiecePriorityNone {
if !t.urgentChunkInPiece(index) {
return false
}