From: Matt Joiner Date: Tue, 10 Mar 2015 15:41:21 +0000 (+1100) Subject: Rework piece completion X-Git-Tag: v1.0.0~1289 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=e6fbde0dc6bf69af5ed3d685a66eff21e30cb401;p=btrtrc.git Rework piece completion --- diff --git a/client.go b/client.go index ce3289f0..5701834e 100644 --- a/client.go +++ b/client.go @@ -100,29 +100,28 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) { // been checked before. func (cl *Client) queueFirstHash(t *torrent, piece int) { p := t.Pieces[piece] - if p.EverHashed || p.Hashing || p.QueuedForHash || p.Complete() { + if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) { return } cl.queuePieceCheck(t, pp.Integer(piece)) } type Client struct { - noUpload bool - dataDir string - halfOpenLimit int - peerID [20]byte - listeners []net.Listener - utpSock *utp.Socket - disableTrackers bool - downloadStrategy downloadStrategy - dHT *dht.Server - disableUTP bool - disableTCP bool - ipBlockList *iplist.IPList - bannedTorrents map[InfoHash]struct{} - _configDir string - config Config - pruneTimer *time.Timer + noUpload bool + dataDir string + halfOpenLimit int + peerID [20]byte + listeners []net.Listener + utpSock *utp.Socket + disableTrackers bool + dHT *dht.Server + disableUTP bool + disableTCP bool + ipBlockList *iplist.IPList + bannedTorrents map[InfoHash]struct{} + _configDir string + config Config + pruneTimer *time.Timer torrentDataOpener TorrentDataOpener @@ -274,7 +273,10 @@ func (cl *Client) prepareRead(t *torrent, off int64) (n int64) { } piece := t.Pieces[index] cl.readRaisePiecePriorities(t, off) - for !piece.Complete() && !t.isClosed() { + for !t.pieceComplete(index) && !t.isClosed() { + // This is to prevent being starved if a piece is dropped before we + // can read it. + cl.readRaisePiecePriorities(t, off) piece.Event.Wait() } return t.Info.Piece(index).Length() - off%t.Info.PieceLength @@ -292,6 +294,7 @@ type SectionOpener interface { } func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) { +again: if ra, ok := d.(io.ReaderAt); ok { return ra.ReadAt(b, off) } @@ -304,6 +307,10 @@ func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) { defer rc.Close() return io.ReadFull(rc, b) } + if dp, ok := super(d); ok { + d = dp.(data.Data) + goto again + } panic(fmt.Sprintf("can't read from %T", d)) } @@ -355,14 +362,7 @@ func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) } cl.queueFirstHash(t, piece) t.Pieces[piece].Priority = priority - if t.wantPiece(piece) { - for _, c := range t.Conns { - if c.PeerHasPiece(pp.Integer(piece)) { - t.connPendPiece(c, piece) - cl.replenishConnRequests(t, c) - } - } - } + cl.pieceChanged(t, piece) } func (cl *Client) setEnvBlocklist() (err error) { @@ -2224,11 +2224,45 @@ func (me *Client) WaitAll() bool { return true } +func (me *Client) fillRequests(t *torrent, c *connection) { + if c.Interested { + if c.PeerChoked { + return + } + if len(c.Requests) > c.requestsLowWater { + return + } + } + addRequest := func(req request) (again bool) { + if len(c.Requests) >= 32 { + return false + } + return c.Request(req) + } + for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() { + pieceIndex := e.Piece() + if !c.PeerHasPiece(pp.Integer(pieceIndex)) { + panic("piece in request order but peer doesn't have it") + } + if !t.wantPiece(pieceIndex) { + panic("unwanted piece in connection request order") + } + piece := t.Pieces[pieceIndex] + for _, cs := range piece.shuffledPendingChunkSpecs() { + r := request{pp.Integer(pieceIndex), cs} + if !addRequest(r) { + return + } + } + } + return +} + func (me *Client) replenishConnRequests(t *torrent, c *connection) { if !t.haveInfo() { return } - me.downloadStrategy.FillRequests(t, c) + me.fillRequests(t, c) if len(c.Requests) == 0 && !c.PeerChoked { c.SetInterested(false) } @@ -2248,7 +2282,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er piece := t.Pieces[req.Index] // Do we actually want this chunk? - if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok { + if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone { unusedDownloadedChunksCount.Add(1) c.UnwantedChunksReceived++ return nil @@ -2298,10 +2332,15 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { } } } + me.pieceChanged(t, int(piece)) +} + +func (me *Client) pieceChanged(t *torrent, piece int) { + correct := t.pieceComplete(piece) + p := t.Pieces[piece] if correct { p.Priority = piecePriorityNone p.PendingChunkSpecs = nil - p.complete = true p.Event.Broadcast() } else { if len(p.PendingChunkSpecs) == 0 { @@ -2344,7 +2383,7 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) { cl.event.Wait() } p.QueuedForHash = false - if t.isClosed() || p.complete { + if t.isClosed() || t.pieceComplete(int(index)) { return } p.Hashing = true diff --git a/data/blob/blob.go b/data/blob/blob.go index 5802c18a..a3ade8e6 100644 --- a/data/blob/blob.go +++ b/data/blob/blob.go @@ -1,39 +1,15 @@ package blob import ( - "bytes" - "crypto/sha1" "encoding/hex" - "errors" "io" - "os" - "path/filepath" - - dataPkg "bitbucket.org/anacrolix/go.torrent/data" "github.com/anacrolix/libtorgo/metainfo" ) -const ( - filePerm = 0640 - dirPerm = 0750 -) - type data struct { - info *metainfo.Info - baseDir string -} - -type store struct { - baseDir string -} - -func (me store) OpenTorrent(info *metainfo.Info) dataPkg.Data { - return &data{info, me.baseDir} -} - -func NewStore(baseDir string) dataPkg.Store { - return store{baseDir} + info *metainfo.Info + store *store } func (me *data) pieceHashHex(i int) string { @@ -42,45 +18,26 @@ func (me *data) pieceHashHex(i int) string { func (me *data) Close() {} -func (me *data) ReadAt(p []byte, off int64) (n int, err error) { - hash := me.pieceHashHex(int(off / me.info.PieceLength)) - f, err := os.Open(me.baseDir + "/complete/" + hash) - if os.IsNotExist(err) { - f, err = os.Open(me.baseDir + "/incomplete/" + hash) - if os.IsNotExist(err) { - err = io.EOF - return - } - if err != nil { - return - } - } else if err != nil { +func (me *data) ReadAt(b []byte, off int64) (n int, err error) { + p := me.info.Piece(int(off / me.info.PieceLength)) + f := me.store.pieceRead(p) + if f == nil { + err = io.ErrUnexpectedEOF return } defer f.Close() - off %= me.info.PieceLength - return f.ReadAt(p, off) -} - -func (me *data) openComplete(piece int) (f *os.File, err error) { - return os.OpenFile(me.baseDir+"/complete/"+me.pieceHashHex(piece), os.O_RDWR, 0660) + n, err = f.ReadAt(b, off%me.info.PieceLength) + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return } func (me *data) WriteAt(p []byte, off int64) (n int, err error) { i := int(off / me.info.PieceLength) off %= me.info.PieceLength for len(p) != 0 { - _, err = os.Stat(me.baseDir + "/complete/" + me.pieceHashHex(i)) - if err == nil { - err = errors.New("can't write to completed piece") - return - } - os.MkdirAll(me.baseDir+"/incomplete", 0750) - var f *os.File - f, err = os.OpenFile(me.baseDir+"/incomplete/"+me.pieceHashHex(i), os.O_WRONLY|os.O_CREATE, 0640) - if err != nil { - return - } + f := me.store.pieceWrite(me.info.Piece(i)) p1 := p maxN := me.info.Piece(i).Length() - off if int64(len(p1)) > maxN { @@ -95,28 +52,24 @@ func (me *data) WriteAt(p []byte, off int64) (n int, err error) { } p = p[n1:] off = 0 + i++ } return } func (me *data) pieceReader(piece int, off int64) (ret io.ReadCloser, err error) { - f, err := os.Open(me.completedPiecePath(piece)) - if os.IsNotExist(err) { - f, err = os.Open(me.incompletePiecePath(piece)) - if os.IsNotExist(err) { - err = io.EOF - return - } - if err != nil { - return - } - } else if err != nil { + f := me.store.pieceRead(me.info.Piece(piece)) + if f == nil { + err = io.ErrUnexpectedEOF return } return struct { io.Reader io.Closer - }{io.NewSectionReader(f, off, me.info.Piece(piece).Length()-off), f}, nil + }{ + Reader: io.NewSectionReader(f, off, me.info.Piece(piece).Length()-off), + Closer: f, + }, nil } func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) { @@ -130,59 +83,26 @@ func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err er err = nil } return + pr.Close() } var n1 int64 n1, err = io.CopyN(w, pr, n) + pr.Close() written += n1 n -= n1 if err != nil { return } off = 0 + i++ } return } -func (me *data) incompletePiecePath(piece int) string { - return filepath.Join(me.baseDir, "incomplete", me.pieceHashHex(piece)) -} - -func (me *data) completedPiecePath(piece int) string { - return filepath.Join(me.baseDir, "complete", me.pieceHashHex(piece)) -} - func (me *data) PieceCompleted(index int) (err error) { - var ( - incompletePiecePath = me.incompletePiecePath(index) - completedPiecePath = me.completedPiecePath(index) - ) - fSrc, err := os.Open(incompletePiecePath) - if err != nil { - return - } - defer fSrc.Close() - os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm) - fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm) - if err != nil { - return - } - defer fDst.Close() - hasher := sha1.New() - r := io.TeeReader(io.LimitReader(fSrc, me.info.Piece(index).Length()), hasher) - _, err = io.Copy(fDst, r) - if err != nil { - return - } - if !bytes.Equal(hasher.Sum(nil), me.info.Piece(index).Hash()) { - err = errors.New("piece incomplete") - os.Remove(completedPiecePath) - return - } - os.Remove(incompletePiecePath) - return + return me.store.PieceCompleted(me.info.Piece(index)) } func (me *data) PieceComplete(piece int) bool { - _, err := os.Stat(me.completedPiecePath(piece)) - return err == nil + return me.store.pieceComplete(me.info.Piece(piece)) } diff --git a/data/blob/store.go b/data/blob/store.go new file mode 100644 index 00000000..766129f3 --- /dev/null +++ b/data/blob/store.go @@ -0,0 +1,195 @@ +package blob + +import ( + "bytes" + "crypto/sha1" + "encoding/hex" + "errors" + "fmt" + "io" + "math/rand" + "os" + "path/filepath" + + dataPkg "bitbucket.org/anacrolix/go.torrent/data" + "github.com/anacrolix/libtorgo/metainfo" +) + +const ( + filePerm = 0640 + dirPerm = 0750 +) + +type store struct { + baseDir string + capacity int64 + completed map[string]struct{} +} + +func (me *store) OpenTorrent(info *metainfo.Info) dataPkg.Data { + return &data{info, me} +} + +type StoreOption func(*store) + +func Capacity(bytes int64) StoreOption { + return func(s *store) { + s.capacity = bytes + } +} + +func NewStore(baseDir string, opt ...StoreOption) dataPkg.Store { + s := &store{baseDir, -1, nil} + for _, o := range opt { + o(s) + } + s.initCompleted() + return s +} + +func (me *store) initCompleted() { + fis, err := me.readCompletedDir() + if err != nil { + panic(err) + } + me.completed = make(map[string]struct{}, len(fis)) + for _, fi := range fis { + me.completed[fi.Name()] = struct{}{} + } +} + +func (me *store) completePieceDirPath() string { + return filepath.Join(me.baseDir, "complete") +} + +func (me *store) path(p metainfo.Piece, completed bool) string { + return filepath.Join(me.baseDir, func() string { + if completed { + return "complete" + } else { + return "incomplete" + } + }(), fmt.Sprintf("%x", p.Hash())) +} + +func (me *store) pieceComplete(p metainfo.Piece) bool { + _, ok := me.completed[hex.EncodeToString(p.Hash())] + return ok +} + +func (me *store) pieceWrite(p metainfo.Piece) (f *os.File) { + if me.pieceComplete(p) { + return + } + name := me.path(p, false) + os.MkdirAll(filepath.Dir(name), dirPerm) + f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY, filePerm) + if err != nil { + panic(err) + } + return +} + +func (me *store) pieceRead(p metainfo.Piece) (f *os.File) { + f, err := os.Open(me.path(p, true)) + if err == nil { + return + } + if !os.IsNotExist(err) { + panic(err) + } + f, err = os.Open(me.path(p, false)) + if err == nil { + return + } + if !os.IsNotExist(err) { + panic(err) + } + return +} + +func (me *store) readCompletedDir() (fis []os.FileInfo, err error) { + f, err := os.Open(me.completePieceDirPath()) + if err != nil { + if os.IsNotExist(err) { + err = nil + } + return + } + fis, err = f.Readdir(-1) + f.Close() + return +} + +func (me *store) removeCompleted(name string) (err error) { + err = os.Remove(filepath.Join(me.completePieceDirPath(), name)) + if os.IsNotExist(err) { + err = nil + } + if err != nil { + return err + } + delete(me.completed, name) + return +} + +func (me *store) makeSpace(space int64) error { + if me.capacity < 0 { + return nil + } + if space > me.capacity { + return errors.New("space requested exceeds capacity") + } + fis, err := me.readCompletedDir() + if err != nil { + return err + } + var size int64 + for _, fi := range fis { + size += fi.Size() + } + for size > me.capacity-space { + i := rand.Intn(len(fis)) + me.removeCompleted(fis[i].Name()) + size -= fis[i].Size() + fis[i] = fis[len(fis)-1] + fis = fis[:len(fis)-1] + } + return nil +} + +func (me *store) PieceCompleted(p metainfo.Piece) (err error) { + err = me.makeSpace(p.Length()) + if err != nil { + return + } + var ( + incompletePiecePath = me.path(p, false) + completedPiecePath = me.path(p, true) + ) + fSrc, err := os.Open(incompletePiecePath) + if err != nil { + return + } + defer fSrc.Close() + os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm) + fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm) + if err != nil { + return + } + defer fDst.Close() + hasher := sha1.New() + r := io.TeeReader(io.LimitReader(fSrc, p.Length()), hasher) + _, err = io.Copy(fDst, r) + if err != nil { + return + } + if !bytes.Equal(hasher.Sum(nil), p.Hash()) { + err = errors.New("piece incomplete") + os.Remove(completedPiecePath) + return + } + os.Remove(incompletePiecePath) + me.completed[hex.EncodeToString(p.Hash())] = struct{}{} + return +} diff --git a/download_strategies.go b/download_strategies.go deleted file mode 100644 index fdf9e245..00000000 --- a/download_strategies.go +++ /dev/null @@ -1,77 +0,0 @@ -package torrent - -import ( - "io" - - pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" -) - -type downloadStrategy interface { - // Tops up the outgoing pending requests. - FillRequests(*torrent, *connection) - TorrentStarted(*torrent) - TorrentStopped(*torrent) - DeleteRequest(*torrent, request) - TorrentPrioritize(t *torrent, off, _len int64) - TorrentGotChunk(*torrent, request) - TorrentGotPiece(t *torrent, piece int) - WriteStatus(w io.Writer) - AssertNotRequested(*torrent, request) - PendingData(*torrent) bool -} - -type defaultDownloadStrategy struct{} - -func (me *defaultDownloadStrategy) PendingData(t *torrent) bool { - return !t.haveAllPieces() -} - -func (me *defaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {} - -func (me *defaultDownloadStrategy) WriteStatus(w io.Writer) {} - -func (s *defaultDownloadStrategy) FillRequests(t *torrent, c *connection) { - if c.Interested { - if c.PeerChoked { - return - } - if len(c.Requests) > c.requestsLowWater { - return - } - } - addRequest := func(req request) (again bool) { - if len(c.Requests) >= 32 { - return false - } - return c.Request(req) - } - for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() { - pieceIndex := e.Piece() - if !c.PeerHasPiece(pp.Integer(pieceIndex)) { - panic("piece in request order but peer doesn't have it") - } - if !t.wantPiece(pieceIndex) { - panic("unwanted piece in connection request order") - } - piece := t.Pieces[pieceIndex] - for _, cs := range piece.shuffledPendingChunkSpecs() { - r := request{pp.Integer(pieceIndex), cs} - if !addRequest(r) { - return - } - } - } - return -} - -func (s *defaultDownloadStrategy) TorrentStarted(t *torrent) {} - -func (s *defaultDownloadStrategy) TorrentStopped(t *torrent) { -} - -func (s *defaultDownloadStrategy) DeleteRequest(t *torrent, r request) { -} - -func (me *defaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {} -func (me *defaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {} -func (*defaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {} diff --git a/misc.go b/misc.go index b47198b2..c63fa6eb 100644 --- a/misc.go +++ b/misc.go @@ -45,7 +45,6 @@ const ( type piece struct { Hash pieceSum - complete bool PendingChunkSpecs map[chunkSpec]struct{} Hashing bool QueuedForHash bool @@ -72,10 +71,6 @@ func (p *piece) shuffledPendingChunkSpecs() (css []chunkSpec) { return } -func (p *piece) Complete() bool { - return p.complete -} - func lastChunkSpec(pieceLength peer_protocol.Integer) (cs chunkSpec) { cs.Begin = (pieceLength - 1) / chunkSize * chunkSize cs.Length = pieceLength - cs.Begin @@ -108,3 +103,18 @@ func metadataPieceSize(totalSize int, piece int) int { } return ret } + +type Super interface { + Super() interface{} +} + +// Returns ok if there's a parent, and it's not nil. +func super(child interface{}) (parent interface{}, ok bool) { + s, ok := child.(Super) + if !ok { + return + } + parent = s.Super() + ok = parent != nil + return +} diff --git a/stateless.go b/stateless.go new file mode 100644 index 00000000..3caa4957 --- /dev/null +++ b/stateless.go @@ -0,0 +1,21 @@ +package torrent + +import "bitbucket.org/anacrolix/go.torrent/data" + +type statelessDataWrapper struct { + data.Data + complete []bool +} + +func (me *statelessDataWrapper) PieceComplete(piece int) bool { + return me.complete[piece] +} + +func (me *statelessDataWrapper) PieceCompleted(piece int) error { + me.complete[piece] = true + return nil +} + +func (me *statelessDataWrapper) Super() interface{} { + return me.Data +} diff --git a/torrent.go b/torrent.go index 751de60b..4217efa1 100644 --- a/torrent.go +++ b/torrent.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/bradfitz/iter" + "bitbucket.org/anacrolix/go.torrent/data" pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" "bitbucket.org/anacrolix/go.torrent/tracker" @@ -20,11 +22,11 @@ import ( "github.com/anacrolix/libtorgo/metainfo" ) -func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) { - piece := t.Pieces[index] - if piece.complete { +func (t *torrent) PieceNumPendingBytes(index int) (count pp.Integer) { + if t.pieceComplete(index) { return 0 } + piece := t.Pieces[index] if !piece.EverHashed { return t.PieceLength(index) } @@ -66,7 +68,7 @@ type torrent struct { Pieces []*piece length int64 - data data.Data + data StatefulData Info *metainfo.Info // Active peer connections. @@ -93,6 +95,12 @@ type torrent struct { pruneTimer *time.Timer } +func (t *torrent) pieceComplete(piece int) bool { + // TODO: This is called when setting metadata, and before storage is + // assigned, which doesn't seem right. + return t.data != nil && t.data.PieceComplete(piece) +} + // A file-like handle to torrent data that implements SectionOpener. Opened // sections will be reused so long as Reads and ReadAt's are contiguous. type handle struct { @@ -301,11 +309,10 @@ func (t *torrent) setStorage(td data.Data) (err error) { if c, ok := t.data.(io.Closer); ok { c.Close() } - t.data = td - if sd, ok := t.data.(StatefulData); ok { - for i, p := range t.Pieces { - p.complete = sd.PieceComplete(i) - } + if sd, ok := td.(StatefulData); ok { + t.data = sd + } else { + t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())} } return } @@ -351,7 +358,7 @@ func (t *torrent) Name() string { func (t *torrent) pieceStatusChar(index int) byte { p := t.Pieces[index] switch { - case p.Complete(): + case t.pieceComplete(index): return 'C' case p.QueuedForHash: return 'Q' @@ -550,8 +557,8 @@ func (t *torrent) numPieces() int { } func (t *torrent) numPiecesCompleted() (num int) { - for _, p := range t.Pieces { - if p.Complete() { + for i := range iter.N(t.Info.NumPieces()) { + if t.pieceComplete(i) { num++ } } @@ -689,8 +696,8 @@ func (t *torrent) haveAllPieces() bool { if !t.haveInfo() { return false } - for _, piece := range t.Pieces { - if !piece.Complete() { + for i := range t.Pieces { + if !t.pieceComplete(i) { return false } } @@ -698,8 +705,8 @@ func (t *torrent) haveAllPieces() bool { } func (me *torrent) haveAnyPieces() bool { - for _, piece := range me.Pieces { - if piece.Complete() { + for i := range me.Pieces { + if me.pieceComplete(i) { return true } } @@ -707,7 +714,7 @@ func (me *torrent) haveAnyPieces() bool { } func (t *torrent) havePiece(index int) bool { - return t.haveInfo() && t.Pieces[index].Complete() + return t.haveInfo() && t.pieceComplete(index) } func (t *torrent) haveChunk(r request) bool { @@ -732,7 +739,7 @@ func (t *torrent) wantPiece(index int) bool { return false } p := t.Pieces[index] - return p.EverHashed && len(p.PendingChunkSpecs) != 0 && p.Priority != piecePriorityNone + return !t.pieceComplete(index) && p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing } func (t *torrent) connHasWantedPieces(c *connection) bool {