]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Merge branch 'master' of github.com:anacrolix/torrent
authorMatt Joiner <anacrolix@gmail.com>
Tue, 2 Jun 2015 14:18:24 +0000 (00:18 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 2 Jun 2015 14:18:24 +0000 (00:18 +1000)
17 files changed:
client.go
client_test.go
connection.go
connection_test.go
data/blob/atime_darwin.go [deleted file]
data/blob/atime_linux.go [deleted file]
data/blob/store.go
dht/dht.go
file.go
fs/torrentfs.go
fs/torrentfs_test.go
internal/pieceordering/pieceordering.go
misc.go
piece.go
piecestate.go [new file with mode: 0644]
t.go
torrent.go

index 27af8fc705d8e1b60b8e95c08e990985bfe97411..9f7051a5c29963e0980a5c707ac64b91094d31e1 100644 (file)
--- a/client.go
+++ b/client.go
@@ -279,18 +279,18 @@ func readaheadPieces(readahead, pieceLength int64) int {
 
 func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
        index := int(off / int64(t.usualPieceSize()))
-       cl.raisePiecePriority(t, index, piecePriorityNow)
+       cl.raisePiecePriority(t, index, PiecePriorityNow)
        index++
        if index >= t.numPieces() {
                return
        }
-       cl.raisePiecePriority(t, index, piecePriorityNext)
+       cl.raisePiecePriority(t, index, PiecePriorityNext)
        for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
                index++
                if index >= t.numPieces() {
                        break
                }
-               cl.raisePiecePriority(t, index, piecePriorityReadahead)
+               cl.raisePiecePriority(t, index, PiecePriorityReadahead)
        }
 }
 
@@ -344,9 +344,14 @@ func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority)
        if t.havePiece(piece) {
                return
        }
-       cl.queueFirstHash(t, piece)
-       t.Pieces[piece].Priority = priority
-       cl.pieceChanged(t, piece)
+       if priority != PiecePriorityNone {
+               cl.queueFirstHash(t, piece)
+       }
+       p := t.Pieces[piece]
+       if p.Priority != priority {
+               p.Priority = priority
+               cl.pieceChanged(t, piece)
+       }
 }
 
 func (cl *Client) setEnvBlocklist() (err error) {
@@ -509,7 +514,9 @@ func NewClient(cfg *Config) (cl *Client, err error) {
        if !cfg.NoDHT {
                dhtCfg := cfg.DHTConfig
                if dhtCfg == nil {
-                       dhtCfg = &dht.ServerConfig{}
+                       dhtCfg = &dht.ServerConfig{
+                               IPBlocklist: cl.ipBlockList,
+                       }
                }
                if dhtCfg.Addr == "" {
                        dhtCfg.Addr = listenAddr()
@@ -521,9 +528,6 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                if err != nil {
                        return
                }
-               if cl.ipBlockList != nil {
-                       cl.dHT.SetIPBlockList(cl.ipBlockList)
-               }
        }
 
        return
@@ -1440,7 +1444,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                case pp.Have:
                        me.peerGotPiece(t, c, int(msg.Index))
                case pp.Request:
-                       if me.config.NoUpload {
+                       if c.Choked {
                                break
                        }
                        request := newRequest(msg.Index, msg.Begin, msg.Length)
@@ -1452,6 +1456,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                        // }
                        p := make([]byte, msg.Length)
                        n, err := dataReadAt(t.data, p, int64(t.pieceLength(0))*int64(msg.Index)+int64(msg.Begin))
+                       // TODO: Failing to read for a request should not be fatal to the connection.
                        if err != nil {
                                return fmt.Errorf("reading t data to serve request %q: %s", request, err)
                        }
@@ -1767,6 +1772,10 @@ func (me *Client) addPeers(t *torrent, peers []Peer) {
                if me.ipBlockRange(p.IP) != nil {
                        continue
                }
+               if p.Port == 0 {
+                       log.Printf("got bad peer: %v", p)
+                       continue
+               }
                t.addPeer(p)
        }
        me.openNewConns(t)
@@ -1975,12 +1984,13 @@ func (t Torrent) Files() (ret []File) {
        return
 }
 
+// Marks the pieces in the given region for download.
 func (t Torrent) SetRegionPriority(off, len int64) {
        t.cl.mu.Lock()
        defer t.cl.mu.Unlock()
        pieceSize := int64(t.usualPieceSize())
        for i := off / pieceSize; i*pieceSize < off+len; i++ {
-               t.cl.prioritizePiece(t.torrent, int(i), piecePriorityNormal)
+               t.cl.raisePiecePriority(t.torrent, int(i), PiecePriorityNormal)
        }
 }
 
@@ -1998,12 +2008,12 @@ func (t Torrent) DownloadAll() {
        t.cl.mu.Lock()
        defer t.cl.mu.Unlock()
        for i := range iter.N(t.numPieces()) {
-               t.cl.raisePiecePriority(t.torrent, i, piecePriorityNormal)
+               t.cl.raisePiecePriority(t.torrent, i, PiecePriorityNormal)
        }
        // Nice to have the first and last pieces sooner for various interactive
        // purposes.
-       t.cl.raisePiecePriority(t.torrent, 0, piecePriorityReadahead)
-       t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead)
+       t.cl.raisePiecePriority(t.torrent, 0, PiecePriorityReadahead)
+       t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, PiecePriorityReadahead)
 }
 
 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
@@ -2493,7 +2503,8 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        // Write the chunk out.
        err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
        if err != nil {
-               return fmt.Errorf("error writing chunk: %s", err)
+               log.Printf("error writing chunk: %s", err)
+               return nil
        }
 
        // log.Println("got chunk", req)
@@ -2526,12 +2537,10 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
        }
        p.EverHashed = true
        if correct {
-               if sd, ok := t.data.(StatefulData); ok {
-                       err := sd.PieceCompleted(int(piece))
-                       if err != nil {
-                               log.Printf("error completing piece: %s", err)
-                               correct = false
-                       }
+               err := t.data.PieceCompleted(int(piece))
+               if err != nil {
+                       log.Printf("error completing piece: %s", err)
+                       correct = false
                }
        }
        me.pieceChanged(t, int(piece))
@@ -2542,7 +2551,7 @@ func (me *Client) pieceChanged(t *torrent, piece int) {
        correct := t.pieceComplete(piece)
        p := t.Pieces[piece]
        if correct {
-               p.Priority = piecePriorityNone
+               p.Priority = PiecePriorityNone
                p.PendingChunkSpecs = nil
                for req := range t.urgent {
                        if int(req.Index) == piece {
index 45cc5e01dfb9a7697f46cdcb3718be727298418c..0e924bb26869756c1ff2313482bfc6c56ee47a0c 100644 (file)
@@ -3,10 +3,12 @@ package torrent
 import (
        "encoding/binary"
        "fmt"
+       "io"
        "io/ioutil"
        "log"
        "net"
        "os"
+       "path/filepath"
        "testing"
        "time"
 
@@ -17,7 +19,9 @@ import (
 
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/data/blob"
+       "github.com/anacrolix/torrent/data"
        "github.com/anacrolix/torrent/internal/testutil"
+       "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/util"
 )
 
@@ -31,6 +35,7 @@ var TestingConfig = Config{
        DisableTrackers:      true,
        NoDefaultBlocklist:   true,
        DisableMetainfoCache: true,
+       DataDir:              filepath.Join(os.TempDir(), "anacrolix"),
 }
 
 func TestClientDefault(t *testing.T) {
@@ -317,3 +322,68 @@ func (suite) TestMergingTrackersByAddingSpecs(c *check.C) {
        c.Assert(T.Trackers[0][0].URL(), check.Equals, "http://a")
        c.Assert(T.Trackers[1][0].URL(), check.Equals, "udp://b")
 }
+
+type badData struct {
+}
+
+func (me badData) WriteAt(b []byte, off int64) (int, error) {
+       return 0, nil
+}
+
+func (me badData) WriteSectionTo(w io.Writer, off, n int64) (int64, error) {
+       return 0, nil
+}
+
+func (me badData) PieceComplete(piece int) bool {
+       return true
+}
+
+func (me badData) PieceCompleted(piece int) error {
+       return nil
+}
+
+func (me badData) ReadAt(b []byte, off int64) (n int, err error) {
+       if off >= 5 {
+               err = io.EOF
+               return
+       }
+       n = copy(b, []byte("hello")[off:])
+       return
+}
+
+var _ StatefulData = badData{}
+
+// We read from a piece which is marked completed, but is missing data.
+func TestCompletedPieceWrongSize(t *testing.T) {
+       cfg := TestingConfig
+       cfg.TorrentDataOpener = func(*metainfo.Info) data.Data {
+               return badData{}
+       }
+       cl, _ := NewClient(&cfg)
+       defer cl.Close()
+       tt, new, err := cl.AddTorrentSpec(&TorrentSpec{
+               Info: &metainfo.InfoEx{
+                       Info: metainfo.Info{
+                               PieceLength: 15,
+                               Pieces:      make([]byte, 20),
+                               Files: []metainfo.FileInfo{
+                                       metainfo.FileInfo{Path: []string{"greeting"}, Length: 13},
+                               },
+                       },
+               },
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       if !new {
+               t.Fatal("expected new")
+       }
+       r := tt.NewReader()
+       defer r.Close()
+       b := make([]byte, 20)
+       n, err := io.ReadFull(r, b)
+       if n != 5 || err != io.ErrUnexpectedEOF {
+               t.Fatal(n, err)
+       }
+       defer tt.Drop()
+}
index ed94626ff3a890b1b0d1242045374fa0708f548c..943e1a7cfcc29271dd99c499739678bedeedee54 100644 (file)
@@ -39,7 +39,8 @@ type connection struct {
        post      chan pp.Message
        writeCh   chan []byte
 
-       // The connections preferred order to download pieces.
+       // The connection's preferred order to download pieces. The index is the
+       // piece, the value is its priority.
        piecePriorities []int
        // The piece request order based on piece priorities.
        pieceRequestOrder *pieceordering.Instance
@@ -101,7 +102,7 @@ func (cn *connection) localAddr() net.Addr {
 // Adjust piece position in the request order for this connection based on the
 // given piece priority.
 func (cn *connection) pendPiece(piece int, priority piecePriority) {
-       if priority == piecePriorityNone {
+       if priority == PiecePriorityNone {
                cn.pieceRequestOrder.DeletePiece(piece)
                return
        }
@@ -109,19 +110,20 @@ func (cn *connection) pendPiece(piece int, priority piecePriority) {
        // Priority regions not to scale. Within each region, piece is randomized
        // according to connection.
 
+       // <-request first -- last->
        // [ Now         ]
        //  [ Next       ]
        //   [ Readahead ]
        //                [ Normal ]
        key := func() int {
                switch priority {
-               case piecePriorityNow:
+               case PiecePriorityNow:
                        return -3*len(cn.piecePriorities) + 3*pp
-               case piecePriorityNext:
+               case PiecePriorityNext:
                        return -2*len(cn.piecePriorities) + 2*pp
-               case piecePriorityReadahead:
+               case PiecePriorityReadahead:
                        return -len(cn.piecePriorities) + pp
-               case piecePriorityNormal:
+               case PiecePriorityNormal:
                        return pp
                default:
                        panic(priority)
@@ -383,6 +385,7 @@ func (c *connection) Choke() {
        c.Post(pp.Message{
                Type: pp.Choke,
        })
+       c.PeerRequests = nil
        c.Choked = true
 }
 
index d93b7d05e2a627ed188ebdf18efdaf357e2df062..1ffb9aef3fb65005d1482f136e40f43867a161e4 100644 (file)
@@ -77,22 +77,22 @@ func (suite) TestPieceRequestOrder(t *C) {
                piecePriorities:   []int{1, 4, 0, 3, 2},
        }
        testRequestOrder(nil, c.pieceRequestOrder, t)
-       c.pendPiece(2, piecePriorityNone)
+       c.pendPiece(2, PiecePriorityNone)
        testRequestOrder(nil, c.pieceRequestOrder, t)
-       c.pendPiece(1, piecePriorityNormal)
-       c.pendPiece(2, piecePriorityNormal)
+       c.pendPiece(1, PiecePriorityNormal)
+       c.pendPiece(2, PiecePriorityNormal)
        testRequestOrder([]int{2, 1}, c.pieceRequestOrder, t)
-       c.pendPiece(0, piecePriorityNormal)
+       c.pendPiece(0, PiecePriorityNormal)
        testRequestOrder([]int{2, 0, 1}, c.pieceRequestOrder, t)
-       c.pendPiece(1, piecePriorityReadahead)
+       c.pendPiece(1, PiecePriorityReadahead)
        testRequestOrder([]int{1, 2, 0}, c.pieceRequestOrder, t)
-       c.pendPiece(4, piecePriorityNow)
+       c.pendPiece(4, PiecePriorityNow)
        // now(4), r(1), normal(0, 2)
        testRequestOrder([]int{4, 1, 2, 0}, c.pieceRequestOrder, t)
-       c.pendPiece(2, piecePriorityReadahead)
+       c.pendPiece(2, PiecePriorityReadahead)
        // N(4), R(1, 2), N(0)
        testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
-       c.pendPiece(1, piecePriorityNow)
+       c.pendPiece(1, PiecePriorityNow)
        // now(4, 1), readahead(2), normal(0)
        // in the same order, the keys will be: -15+6, -15+12, -5, 1
        // so we test that a very low priority (for this connection), "now"
@@ -100,7 +100,7 @@ func (suite) TestPieceRequestOrder(t *C) {
        testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
        // Note this intentially sets to None a piece that's not in the order.
        for i := range iter.N(5) {
-               c.pendPiece(i, piecePriorityNone)
+               c.pendPiece(i, PiecePriorityNone)
        }
        testRequestOrder(nil, c.pieceRequestOrder, t)
 }
diff --git a/data/blob/atime_darwin.go b/data/blob/atime_darwin.go
deleted file mode 100644 (file)
index 33091d5..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-package blob
-
-import (
-       "os"
-       "syscall"
-       "time"
-)
-
-func accessTime(fi os.FileInfo) time.Time {
-       ts := fi.Sys().(*syscall.Stat_t).Atimespec
-       return time.Unix(ts.Sec, ts.Nano())
-}
diff --git a/data/blob/atime_linux.go b/data/blob/atime_linux.go
deleted file mode 100644 (file)
index 3bcc054..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-package blob
-
-import (
-       "os"
-       "syscall"
-       "time"
-)
-
-func accessTime(fi os.FileInfo) time.Time {
-       ts := fi.Sys().(*syscall.Stat_t).Atim
-       return time.Unix(ts.Sec, ts.Nano())
-}
index 06ff2f7a15a9e97ac86231d62298f6595f127a42..7331ce6e9d9ebf8c0f09a11261227ba7faf6ab2e 100644 (file)
@@ -13,6 +13,8 @@ import (
        "sync"
        "time"
 
+       "github.com/anacrolix/missinggo"
+
        dataPkg "github.com/anacrolix/torrent/data"
        "github.com/anacrolix/torrent/metainfo"
 )
@@ -186,7 +188,7 @@ func (me fileInfoSorter) Len() int {
 
 func lastTime(fi os.FileInfo) (ret time.Time) {
        ret = fi.ModTime()
-       atime := accessTime(fi)
+       atime := missinggo.FileInfoAccessTime(fi)
        if atime.After(ret) {
                ret = atime
        }
index de96fe9ecb4e1e7e3c218752511e608b9e42423a..3f4377307f99dd17fa7d35c45039b1c64229b21d 100644 (file)
@@ -65,6 +65,9 @@ type ServerConfig struct {
        // Disable the DHT security extension:
        // http://www.libtorrent.org/dht_sec.html.
        NoSecurity bool
+       // Initial IP blocklist to use. Applied before serving and bootstrapping
+       // begins.
+       IPBlocklist *iplist.IPList
 }
 
 type ServerStats struct {
@@ -115,7 +118,8 @@ func NewServer(c *ServerConfig) (s *Server, err error) {
                c = &ServerConfig{}
        }
        s = &Server{
-               config: *c,
+               config:      *c,
+               ipBlockList: c.IPBlocklist,
        }
        if c.Conn != nil {
                s.socket = c.Conn
@@ -625,7 +629,10 @@ func (s *Server) serve() error {
                        logonce.Stderr.Printf("received dht packet exceeds buffer size")
                        continue
                }
-               if s.ipBlocked(util.AddrIP(addr)) {
+               s.mu.Lock()
+               blocked := s.ipBlocked(util.AddrIP(addr))
+               s.mu.Unlock()
+               if blocked {
                        continue
                }
                s.processPacket(b[:n], newDHTAddr(addr))
diff --git a/file.go b/file.go
index d663a2f64d659fe7a186e02d4e09d8c011e84905..688fc2faa92ae6af221d706a94fe4d27461f5eb2 100644 (file)
--- a/file.go
+++ b/file.go
@@ -29,11 +29,14 @@ func (f *File) Length() int64 {
 }
 
 type FilePieceState struct {
-       Length int64
-       State  byte
+       Bytes int64 // Bytes within the piece that are part of this File.
+       PieceState
 }
 
-func (f *File) Progress() (ret []FilePieceState) {
+// Returns the state of pieces in this file.
+func (f *File) State() (ret []FilePieceState) {
+       f.t.cl.mu.Lock()
+       defer f.t.cl.mu.Unlock()
        pieceSize := int64(f.t.usualPieceSize())
        off := f.offset % pieceSize
        remaining := f.length
@@ -45,7 +48,7 @@ func (f *File) Progress() (ret []FilePieceState) {
                if len1 > remaining {
                        len1 = remaining
                }
-               ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
+               ret = append(ret, FilePieceState{len1, f.t.pieceState(i)})
                off = 0
                remaining -= len1
        }
index 5fcd8eea1ab79024c37a76d6f094c0c2896d257f..261dbc3870998f5a8bb44ee0b71dbc816b3c74c2 100644 (file)
@@ -70,10 +70,10 @@ type fileNode struct {
        TorrentOffset int64
 }
 
-func (fn fileNode) Attr(attr *fuse.Attr) {
+func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
        attr.Size = fn.size
        attr.Mode = defaultMode
-       return
+       return nil
 }
 
 func (n *node) fsPath() string {
@@ -230,9 +230,9 @@ func (dn dirNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, e
        return
 }
 
-func (dn dirNode) Attr(attr *fuse.Attr) {
+func (dn dirNode) Attr(ctx context.Context, attr *fuse.Attr) error {
        attr.Mode = os.ModeDir | defaultMode
-       return
+       return nil
 }
 
 func (me rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
@@ -279,8 +279,9 @@ func (me rootNode) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err e
        return
 }
 
-func (rootNode) Attr(attr *fuse.Attr) {
+func (rootNode) Attr(ctx context.Context, attr *fuse.Attr) error {
        attr.Mode = os.ModeDir
+       return nil
 }
 
 // TODO(anacrolix): Why should rootNode implement this?
index 5f77d206aeb4fa1bd95eb626124698ad11576b53..b77dab2fefd56b0819cbeee11d7a811cc80adf7a 100644 (file)
@@ -17,7 +17,7 @@ import (
 
        "bazil.org/fuse"
        fusefs "bazil.org/fuse/fs"
-       "golang.org/x/net/context"
+       netContext "golang.org/x/net/context"
 
        "github.com/anacrolix/torrent"
        "github.com/anacrolix/torrent/data"
@@ -232,14 +232,14 @@ func TestDownloadOnDemand(t *testing.T) {
        fs := New(leecher)
        defer fs.Destroy()
        root, _ := fs.Root()
-       node, _ := root.(fusefs.NodeStringLookuper).Lookup(context.Background(), "greeting")
+       node, _ := root.(fusefs.NodeStringLookuper).Lookup(netContext.Background(), "greeting")
        var attr fuse.Attr
-       node.Attr(&attr)
+       node.Attr(netContext.Background(), &attr)
        size := attr.Size
        resp := &fuse.ReadResponse{
                Data: make([]byte, size),
        }
-       node.(fusefs.HandleReader).Read(context.Background(), &fuse.ReadRequest{
+       node.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{
                Size: int(size),
        }, resp)
        content := resp.Data
index d26cbdcd2d1b55ae09254be760e74fc1d08e48a8..169ebbc7ee2ef50356c7d679ff14ffdb5fa0690f 100644 (file)
@@ -10,7 +10,11 @@ import (
 
 // Maintains piece integers by their ascending assigned keys.
 type Instance struct {
-       sl        *skiplist.SkipList
+       // Contains the ascending priority keys. The keys contain a slice of piece
+       // indices.
+       sl *skiplist.SkipList
+       // Maps from piece index back to its key, so that it can be remove
+       // efficiently from the skip list.
        pieceKeys map[int]int
 }
 
@@ -41,6 +45,7 @@ func (me *Instance) SetPiece(piece, key int) {
        me.shuffleItem(key)
 }
 
+// Shuffle the piece indices that share a given key.
 func (me *Instance) shuffleItem(key int) {
        _item, ok := me.sl.Get(key)
        if !ok {
diff --git a/misc.go b/misc.go
index c567f68d5631d320732873890a7c8372d4d9c486..3cf1b8ee75b1dba2d78dca0de4d9a43c0d9d07d3 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -6,6 +6,7 @@ import (
        "fmt"
        "time"
 
+       "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
@@ -107,3 +108,13 @@ func torrentRequestOffset(torrentLength, pieceSize int64, r request) (off int64)
        }
        return
 }
+
+func validateInfo(info *metainfo.Info) error {
+       if len(info.Pieces)%20 != 0 {
+               return errors.New("pieces has invalid length")
+       }
+       if int((info.TotalLength()+info.PieceLength-1)/info.PieceLength) != info.NumPieces() {
+               return errors.New("piece count and file lengths are at odds")
+       }
+       return nil
+}
index 6bb96a3e69b428cf9b2535ec881b0e4ceef43d76..bae74094da2d1f618ab095507750450dd9683f23 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -7,19 +7,21 @@ import (
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
 
+// Piece priority describes the importance of obtaining a particular piece.
+
 type piecePriority byte
 
 const (
-       piecePriorityNone piecePriority = iota
-       piecePriorityNormal
-       piecePriorityReadahead
-       piecePriorityNext
-       piecePriorityNow
+       PiecePriorityNone      piecePriority = iota // Not wanted.
+       PiecePriorityNormal                         // Wanted.
+       PiecePriorityReadahead                      // May be required soon.
+       PiecePriorityNext                           // Succeeds a piece where a read occurred.
+       PiecePriorityNow                            // A read occurred in this piece.
 )
 
 type piece struct {
-       Hash pieceSum
-       // Chunks we don't have. The offset and length can be determined by our
+       Hash pieceSum // The completed piece SHA1 hash, from the metainfo "pieces" field.
+       // Chunks we don't have. The offset and length can be determined by the
        // request chunkSize in use.
        PendingChunkSpecs []bool
        Hashing           bool
diff --git a/piecestate.go b/piecestate.go
new file mode 100644 (file)
index 0000000..358ffcf
--- /dev/null
@@ -0,0 +1,18 @@
+package torrent
+
+// The current state of a piece.
+type PieceState struct {
+       Priority piecePriority
+       // The piece is available in its entirety.
+       Complete bool
+       // The piece is being hashed, or is queued for hash.
+       Checking bool
+       // Some of the piece has been obtained.
+       Partial bool
+}
+
+// Represents a series of consecutive pieces with the same state.
+type PieceStateRun struct {
+       PieceState
+       Length int // How many consecutive pieces have this state.
+}
diff --git a/t.go b/t.go
index 70e6f8dd42affed6990582404a9b24693eaf8e5a..2f0ddbee230c11da7bc2fd3efe42d475272bdbdf 100644 (file)
--- a/t.go
+++ b/t.go
@@ -30,3 +30,12 @@ func (t *Torrent) NewReader() (ret *Reader) {
        }
        return
 }
+
+// Returns the state of pieces of the torrent. They are grouped into runs of
+// same state. The sum of the state run lengths is the number of pieces
+// in the torrent.
+func (t *Torrent) PieceStateRuns() []PieceStateRun {
+       t.stateMu.Lock()
+       defer t.stateMu.Unlock()
+       return t.torrent.pieceStateRuns()
+}
index a764a662f83e5b1532b4f2dc509ae7842c6f3f86..e32175e9429a4ab74159334aa95e326cc92da89c 100644 (file)
@@ -10,6 +10,7 @@ import (
        "sync"
        "time"
 
+       "github.com/anacrolix/missinggo"
        "github.com/bradfitz/iter"
 
        "github.com/anacrolix/torrent/bencode"
@@ -207,6 +208,11 @@ func infoPieceHashes(info *metainfo.Info) (ret []string) {
 
 // Called when metadata for a torrent becomes available.
 func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
+       err = validateInfo(md)
+       if err != nil {
+               err = fmt.Errorf("bad info: %s", err)
+               return
+       }
        t.Info = md
        t.length = 0
        for _, f := range t.Info.UpvertedFiles() {
@@ -289,38 +295,19 @@ func (t *torrent) Name() string {
        return ""
 }
 
-func (t *torrent) pieceStatusChar(index int) byte {
+func (t *torrent) pieceState(index int) (ret PieceState) {
        p := t.Pieces[index]
-       switch {
-       case t.pieceComplete(index):
-               return 'C'
-       case p.QueuedForHash:
-               return 'Q'
-       case p.Hashing:
-               return 'H'
-       case !p.EverHashed:
-               return '?'
-       case t.piecePartiallyDownloaded(index):
-               switch p.Priority {
-               case piecePriorityNone:
-                       return 'F' // Forgotten
-               default:
-                       return 'P'
-               }
-       default:
-               switch p.Priority {
-               case piecePriorityNone:
-                       return 'z'
-               case piecePriorityNow:
-                       return '!'
-               case piecePriorityReadahead:
-                       return 'R'
-               case piecePriorityNext:
-                       return 'N'
-               default:
-                       return '.'
-               }
+       ret.Priority = p.Priority
+       if t.pieceComplete(index) {
+               ret.Complete = true
+       }
+       if p.QueuedForHash || p.Hashing {
+               ret.Checking = true
+       }
+       if t.piecePartiallyDownloaded(index) {
+               ret.Partial = true
        }
+       return
 }
 
 func (t *torrent) metadataPieceSize(piece int) int {
@@ -346,45 +333,45 @@ func (t *torrent) newMetadataExtensionMessage(c *connection, msgType int, piece
        }
 }
 
-type PieceStatusCharSequence struct {
-       Char  byte // The state of this sequence of pieces.
-       Count int  // How many consecutive pieces have this state.
-}
-
-// Returns the state of pieces of the torrent. They are grouped into runs of
-// same state. The sum of the Counts of the sequences is the number of pieces
-// in the torrent. See the function torrent.pieceStatusChar for the possible
-// states.
-func (t *torrent) PieceStatusCharSequences() []PieceStatusCharSequence {
-       t.stateMu.Lock()
-       defer t.stateMu.Unlock()
-       return t.pieceStatusCharSequences()
+func (t *torrent) pieceStateRuns() (ret []PieceStateRun) {
+       rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
+               ret = append(ret, PieceStateRun{
+                       PieceState: el.(PieceState),
+                       Length:     int(count),
+               })
+       })
+       for index := range t.Pieces {
+               rle.Append(t.pieceState(index), 1)
+       }
+       rle.Flush()
+       return
 }
 
-// Returns the length of sequences of identical piece status chars.
-func (t *torrent) pieceStatusCharSequences() (ret []PieceStatusCharSequence) {
-       var (
-               char  byte
-               count int
-       )
-       writeSequence := func() {
-               ret = append(ret, PieceStatusCharSequence{char, count})
-       }
-       if len(t.Pieces) != 0 {
-               char = t.pieceStatusChar(0)
-       }
-       for index := range t.Pieces {
-               char1 := t.pieceStatusChar(index)
-               if char1 == char {
-                       count++
-               } else {
-                       writeSequence()
-                       char = char1
-                       count = 1
+// Produces a small string representing a PieceStateRun.
+func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
+       ret = fmt.Sprintf("%d", psr.Length)
+       ret += func() string {
+               switch psr.Priority {
+               case PiecePriorityNext:
+                       return "N"
+               case PiecePriorityNormal:
+                       return "."
+               case PiecePriorityReadahead:
+                       return "R"
+               case PiecePriorityNow:
+                       return "!"
+               default:
+                       return ""
                }
+       }()
+       if psr.Checking {
+               ret += "H"
+       }
+       if psr.Partial {
+               ret += "P"
        }
-       if count != 0 {
-               writeSequence()
+       if psr.Complete {
+               ret += "C"
        }
        return
 }
@@ -411,9 +398,10 @@ func (t *torrent) writeStatus(w io.Writer) {
                }
        }())
        if t.haveInfo() {
-               fmt.Fprint(w, "Pieces: ")
-               for _, seq := range t.pieceStatusCharSequences() {
-                       fmt.Fprintf(w, "%d%c ", seq.Count, seq.Char)
+               fmt.Fprint(w, "Pieces:")
+               for _, psr := range t.pieceStateRuns() {
+                       w.Write([]byte(" "))
+                       w.Write([]byte(pieceStateRunStatusChars(psr)))
                }
                fmt.Fprintln(w)
        }
@@ -494,7 +482,8 @@ func (t *torrent) bytesLeft() (left int64) {
 }
 
 func (t *torrent) piecePartiallyDownloaded(index int) bool {
-       return t.pieceNumPendingBytes(index) != t.pieceLength(index)
+       pendingBytes := t.pieceNumPendingBytes(index)
+       return pendingBytes != 0 && pendingBytes != t.pieceLength(index)
 }
 
 func numChunksForPiece(chunkSize int, pieceSize int) int {
@@ -561,7 +550,10 @@ func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
 }
 
 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
-       _, err = t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
+       n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
+       if err == nil && n != len(data) {
+               err = io.ErrShortWrite
+       }
        return
 }
 
@@ -713,7 +705,7 @@ func (t *torrent) wantPiece(index int) bool {
        if p.Hashing {
                return false
        }
-       if p.Priority == piecePriorityNone {
+       if p.Priority == PiecePriorityNone {
                if !t.urgentChunkInPiece(index) {
                        return false
                }