// been checked before.
func (cl *Client) queueFirstHash(t *torrent, piece int) {
p := t.Pieces[piece]
- if p.EverHashed || p.Hashing || p.QueuedForHash || p.Complete() {
+ if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
return
}
cl.queuePieceCheck(t, pp.Integer(piece))
}
type Client struct {
- noUpload bool
- dataDir string
- halfOpenLimit int
- peerID [20]byte
- listeners []net.Listener
- utpSock *utp.Socket
- disableTrackers bool
- downloadStrategy downloadStrategy
- dHT *dht.Server
- disableUTP bool
- disableTCP bool
- ipBlockList *iplist.IPList
- bannedTorrents map[InfoHash]struct{}
- _configDir string
- config Config
- pruneTimer *time.Timer
+ noUpload bool
+ dataDir string
+ halfOpenLimit int
+ peerID [20]byte
+ listeners []net.Listener
+ utpSock *utp.Socket
+ disableTrackers bool
+ dHT *dht.Server
+ disableUTP bool
+ disableTCP bool
+ ipBlockList *iplist.IPList
+ bannedTorrents map[InfoHash]struct{}
+ _configDir string
+ config Config
+ pruneTimer *time.Timer
torrentDataOpener TorrentDataOpener
}
piece := t.Pieces[index]
cl.readRaisePiecePriorities(t, off)
- for !piece.Complete() && !t.isClosed() {
+ for !t.pieceComplete(index) && !t.isClosed() {
+ // This is to prevent being starved if a piece is dropped before we
+ // can read it.
+ cl.readRaisePiecePriorities(t, off)
piece.Event.Wait()
}
return t.Info.Piece(index).Length() - off%t.Info.PieceLength
}
func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
+again:
if ra, ok := d.(io.ReaderAt); ok {
return ra.ReadAt(b, off)
}
defer rc.Close()
return io.ReadFull(rc, b)
}
+ if dp, ok := super(d); ok {
+ d = dp.(data.Data)
+ goto again
+ }
panic(fmt.Sprintf("can't read from %T", d))
}
}
cl.queueFirstHash(t, piece)
t.Pieces[piece].Priority = priority
- if t.wantPiece(piece) {
- for _, c := range t.Conns {
- if c.PeerHasPiece(pp.Integer(piece)) {
- t.connPendPiece(c, piece)
- cl.replenishConnRequests(t, c)
- }
- }
- }
+ cl.pieceChanged(t, piece)
}
func (cl *Client) setEnvBlocklist() (err error) {
return true
}
+func (me *Client) fillRequests(t *torrent, c *connection) {
+ if c.Interested {
+ if c.PeerChoked {
+ return
+ }
+ if len(c.Requests) > c.requestsLowWater {
+ return
+ }
+ }
+ addRequest := func(req request) (again bool) {
+ if len(c.Requests) >= 32 {
+ return false
+ }
+ return c.Request(req)
+ }
+ for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
+ pieceIndex := e.Piece()
+ if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
+ panic("piece in request order but peer doesn't have it")
+ }
+ if !t.wantPiece(pieceIndex) {
+ panic("unwanted piece in connection request order")
+ }
+ piece := t.Pieces[pieceIndex]
+ for _, cs := range piece.shuffledPendingChunkSpecs() {
+ r := request{pp.Integer(pieceIndex), cs}
+ if !addRequest(r) {
+ return
+ }
+ }
+ }
+ return
+}
+
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
if !t.haveInfo() {
return
}
- me.downloadStrategy.FillRequests(t, c)
+ me.fillRequests(t, c)
if len(c.Requests) == 0 && !c.PeerChoked {
c.SetInterested(false)
}
piece := t.Pieces[req.Index]
// Do we actually want this chunk?
- if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
+ if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone {
unusedDownloadedChunksCount.Add(1)
c.UnwantedChunksReceived++
return nil
}
}
}
+ me.pieceChanged(t, int(piece))
+}
+
+func (me *Client) pieceChanged(t *torrent, piece int) {
+ correct := t.pieceComplete(piece)
+ p := t.Pieces[piece]
if correct {
p.Priority = piecePriorityNone
p.PendingChunkSpecs = nil
- p.complete = true
p.Event.Broadcast()
} else {
if len(p.PendingChunkSpecs) == 0 {
cl.event.Wait()
}
p.QueuedForHash = false
- if t.isClosed() || p.complete {
+ if t.isClosed() || t.pieceComplete(int(index)) {
return
}
p.Hashing = true
package blob
import (
- "bytes"
- "crypto/sha1"
"encoding/hex"
- "errors"
"io"
- "os"
- "path/filepath"
-
- dataPkg "bitbucket.org/anacrolix/go.torrent/data"
"github.com/anacrolix/libtorgo/metainfo"
)
-const (
- filePerm = 0640
- dirPerm = 0750
-)
-
type data struct {
- info *metainfo.Info
- baseDir string
-}
-
-type store struct {
- baseDir string
-}
-
-func (me store) OpenTorrent(info *metainfo.Info) dataPkg.Data {
- return &data{info, me.baseDir}
-}
-
-func NewStore(baseDir string) dataPkg.Store {
- return store{baseDir}
+ info *metainfo.Info
+ store *store
}
func (me *data) pieceHashHex(i int) string {
func (me *data) Close() {}
-func (me *data) ReadAt(p []byte, off int64) (n int, err error) {
- hash := me.pieceHashHex(int(off / me.info.PieceLength))
- f, err := os.Open(me.baseDir + "/complete/" + hash)
- if os.IsNotExist(err) {
- f, err = os.Open(me.baseDir + "/incomplete/" + hash)
- if os.IsNotExist(err) {
- err = io.EOF
- return
- }
- if err != nil {
- return
- }
- } else if err != nil {
+func (me *data) ReadAt(b []byte, off int64) (n int, err error) {
+ p := me.info.Piece(int(off / me.info.PieceLength))
+ f := me.store.pieceRead(p)
+ if f == nil {
+ err = io.ErrUnexpectedEOF
return
}
defer f.Close()
- off %= me.info.PieceLength
- return f.ReadAt(p, off)
-}
-
-func (me *data) openComplete(piece int) (f *os.File, err error) {
- return os.OpenFile(me.baseDir+"/complete/"+me.pieceHashHex(piece), os.O_RDWR, 0660)
+ n, err = f.ReadAt(b, off%me.info.PieceLength)
+ if err == io.EOF {
+ err = io.ErrUnexpectedEOF
+ }
+ return
}
func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
i := int(off / me.info.PieceLength)
off %= me.info.PieceLength
for len(p) != 0 {
- _, err = os.Stat(me.baseDir + "/complete/" + me.pieceHashHex(i))
- if err == nil {
- err = errors.New("can't write to completed piece")
- return
- }
- os.MkdirAll(me.baseDir+"/incomplete", 0750)
- var f *os.File
- f, err = os.OpenFile(me.baseDir+"/incomplete/"+me.pieceHashHex(i), os.O_WRONLY|os.O_CREATE, 0640)
- if err != nil {
- return
- }
+ f := me.store.pieceWrite(me.info.Piece(i))
p1 := p
maxN := me.info.Piece(i).Length() - off
if int64(len(p1)) > maxN {
}
p = p[n1:]
off = 0
+ i++
}
return
}
func (me *data) pieceReader(piece int, off int64) (ret io.ReadCloser, err error) {
- f, err := os.Open(me.completedPiecePath(piece))
- if os.IsNotExist(err) {
- f, err = os.Open(me.incompletePiecePath(piece))
- if os.IsNotExist(err) {
- err = io.EOF
- return
- }
- if err != nil {
- return
- }
- } else if err != nil {
+ f := me.store.pieceRead(me.info.Piece(piece))
+ if f == nil {
+ err = io.ErrUnexpectedEOF
return
}
return struct {
io.Reader
io.Closer
- }{io.NewSectionReader(f, off, me.info.Piece(piece).Length()-off), f}, nil
+ }{
+ Reader: io.NewSectionReader(f, off, me.info.Piece(piece).Length()-off),
+ Closer: f,
+ }, nil
}
func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) {
err = nil
}
return
+ pr.Close()
}
var n1 int64
n1, err = io.CopyN(w, pr, n)
+ pr.Close()
written += n1
n -= n1
if err != nil {
return
}
off = 0
+ i++
}
return
}
-func (me *data) incompletePiecePath(piece int) string {
- return filepath.Join(me.baseDir, "incomplete", me.pieceHashHex(piece))
-}
-
-func (me *data) completedPiecePath(piece int) string {
- return filepath.Join(me.baseDir, "complete", me.pieceHashHex(piece))
-}
-
func (me *data) PieceCompleted(index int) (err error) {
- var (
- incompletePiecePath = me.incompletePiecePath(index)
- completedPiecePath = me.completedPiecePath(index)
- )
- fSrc, err := os.Open(incompletePiecePath)
- if err != nil {
- return
- }
- defer fSrc.Close()
- os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
- fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
- if err != nil {
- return
- }
- defer fDst.Close()
- hasher := sha1.New()
- r := io.TeeReader(io.LimitReader(fSrc, me.info.Piece(index).Length()), hasher)
- _, err = io.Copy(fDst, r)
- if err != nil {
- return
- }
- if !bytes.Equal(hasher.Sum(nil), me.info.Piece(index).Hash()) {
- err = errors.New("piece incomplete")
- os.Remove(completedPiecePath)
- return
- }
- os.Remove(incompletePiecePath)
- return
+ return me.store.PieceCompleted(me.info.Piece(index))
}
func (me *data) PieceComplete(piece int) bool {
- _, err := os.Stat(me.completedPiecePath(piece))
- return err == nil
+ return me.store.pieceComplete(me.info.Piece(piece))
}
--- /dev/null
+package blob
+
+import (
+ "bytes"
+ "crypto/sha1"
+ "encoding/hex"
+ "errors"
+ "fmt"
+ "io"
+ "math/rand"
+ "os"
+ "path/filepath"
+
+ dataPkg "bitbucket.org/anacrolix/go.torrent/data"
+ "github.com/anacrolix/libtorgo/metainfo"
+)
+
+const (
+ filePerm = 0640
+ dirPerm = 0750
+)
+
+type store struct {
+ baseDir string
+ capacity int64
+ completed map[string]struct{}
+}
+
+func (me *store) OpenTorrent(info *metainfo.Info) dataPkg.Data {
+ return &data{info, me}
+}
+
+type StoreOption func(*store)
+
+func Capacity(bytes int64) StoreOption {
+ return func(s *store) {
+ s.capacity = bytes
+ }
+}
+
+func NewStore(baseDir string, opt ...StoreOption) dataPkg.Store {
+ s := &store{baseDir, -1, nil}
+ for _, o := range opt {
+ o(s)
+ }
+ s.initCompleted()
+ return s
+}
+
+func (me *store) initCompleted() {
+ fis, err := me.readCompletedDir()
+ if err != nil {
+ panic(err)
+ }
+ me.completed = make(map[string]struct{}, len(fis))
+ for _, fi := range fis {
+ me.completed[fi.Name()] = struct{}{}
+ }
+}
+
+func (me *store) completePieceDirPath() string {
+ return filepath.Join(me.baseDir, "complete")
+}
+
+func (me *store) path(p metainfo.Piece, completed bool) string {
+ return filepath.Join(me.baseDir, func() string {
+ if completed {
+ return "complete"
+ } else {
+ return "incomplete"
+ }
+ }(), fmt.Sprintf("%x", p.Hash()))
+}
+
+func (me *store) pieceComplete(p metainfo.Piece) bool {
+ _, ok := me.completed[hex.EncodeToString(p.Hash())]
+ return ok
+}
+
+func (me *store) pieceWrite(p metainfo.Piece) (f *os.File) {
+ if me.pieceComplete(p) {
+ return
+ }
+ name := me.path(p, false)
+ os.MkdirAll(filepath.Dir(name), dirPerm)
+ f, err := os.OpenFile(name, os.O_CREATE|os.O_WRONLY, filePerm)
+ if err != nil {
+ panic(err)
+ }
+ return
+}
+
+func (me *store) pieceRead(p metainfo.Piece) (f *os.File) {
+ f, err := os.Open(me.path(p, true))
+ if err == nil {
+ return
+ }
+ if !os.IsNotExist(err) {
+ panic(err)
+ }
+ f, err = os.Open(me.path(p, false))
+ if err == nil {
+ return
+ }
+ if !os.IsNotExist(err) {
+ panic(err)
+ }
+ return
+}
+
+func (me *store) readCompletedDir() (fis []os.FileInfo, err error) {
+ f, err := os.Open(me.completePieceDirPath())
+ if err != nil {
+ if os.IsNotExist(err) {
+ err = nil
+ }
+ return
+ }
+ fis, err = f.Readdir(-1)
+ f.Close()
+ return
+}
+
+func (me *store) removeCompleted(name string) (err error) {
+ err = os.Remove(filepath.Join(me.completePieceDirPath(), name))
+ if os.IsNotExist(err) {
+ err = nil
+ }
+ if err != nil {
+ return err
+ }
+ delete(me.completed, name)
+ return
+}
+
+func (me *store) makeSpace(space int64) error {
+ if me.capacity < 0 {
+ return nil
+ }
+ if space > me.capacity {
+ return errors.New("space requested exceeds capacity")
+ }
+ fis, err := me.readCompletedDir()
+ if err != nil {
+ return err
+ }
+ var size int64
+ for _, fi := range fis {
+ size += fi.Size()
+ }
+ for size > me.capacity-space {
+ i := rand.Intn(len(fis))
+ me.removeCompleted(fis[i].Name())
+ size -= fis[i].Size()
+ fis[i] = fis[len(fis)-1]
+ fis = fis[:len(fis)-1]
+ }
+ return nil
+}
+
+func (me *store) PieceCompleted(p metainfo.Piece) (err error) {
+ err = me.makeSpace(p.Length())
+ if err != nil {
+ return
+ }
+ var (
+ incompletePiecePath = me.path(p, false)
+ completedPiecePath = me.path(p, true)
+ )
+ fSrc, err := os.Open(incompletePiecePath)
+ if err != nil {
+ return
+ }
+ defer fSrc.Close()
+ os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
+ fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
+ if err != nil {
+ return
+ }
+ defer fDst.Close()
+ hasher := sha1.New()
+ r := io.TeeReader(io.LimitReader(fSrc, p.Length()), hasher)
+ _, err = io.Copy(fDst, r)
+ if err != nil {
+ return
+ }
+ if !bytes.Equal(hasher.Sum(nil), p.Hash()) {
+ err = errors.New("piece incomplete")
+ os.Remove(completedPiecePath)
+ return
+ }
+ os.Remove(incompletePiecePath)
+ me.completed[hex.EncodeToString(p.Hash())] = struct{}{}
+ return
+}
+++ /dev/null
-package torrent
-
-import (
- "io"
-
- pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
-)
-
-type downloadStrategy interface {
- // Tops up the outgoing pending requests.
- FillRequests(*torrent, *connection)
- TorrentStarted(*torrent)
- TorrentStopped(*torrent)
- DeleteRequest(*torrent, request)
- TorrentPrioritize(t *torrent, off, _len int64)
- TorrentGotChunk(*torrent, request)
- TorrentGotPiece(t *torrent, piece int)
- WriteStatus(w io.Writer)
- AssertNotRequested(*torrent, request)
- PendingData(*torrent) bool
-}
-
-type defaultDownloadStrategy struct{}
-
-func (me *defaultDownloadStrategy) PendingData(t *torrent) bool {
- return !t.haveAllPieces()
-}
-
-func (me *defaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {}
-
-func (me *defaultDownloadStrategy) WriteStatus(w io.Writer) {}
-
-func (s *defaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
- if c.Interested {
- if c.PeerChoked {
- return
- }
- if len(c.Requests) > c.requestsLowWater {
- return
- }
- }
- addRequest := func(req request) (again bool) {
- if len(c.Requests) >= 32 {
- return false
- }
- return c.Request(req)
- }
- for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
- pieceIndex := e.Piece()
- if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
- panic("piece in request order but peer doesn't have it")
- }
- if !t.wantPiece(pieceIndex) {
- panic("unwanted piece in connection request order")
- }
- piece := t.Pieces[pieceIndex]
- for _, cs := range piece.shuffledPendingChunkSpecs() {
- r := request{pp.Integer(pieceIndex), cs}
- if !addRequest(r) {
- return
- }
- }
- }
- return
-}
-
-func (s *defaultDownloadStrategy) TorrentStarted(t *torrent) {}
-
-func (s *defaultDownloadStrategy) TorrentStopped(t *torrent) {
-}
-
-func (s *defaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
-}
-
-func (me *defaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
-func (me *defaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
-func (*defaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
type piece struct {
Hash pieceSum
- complete bool
PendingChunkSpecs map[chunkSpec]struct{}
Hashing bool
QueuedForHash bool
return
}
-func (p *piece) Complete() bool {
- return p.complete
-}
-
func lastChunkSpec(pieceLength peer_protocol.Integer) (cs chunkSpec) {
cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
cs.Length = pieceLength - cs.Begin
}
return ret
}
+
+type Super interface {
+ Super() interface{}
+}
+
+// Returns ok if there's a parent, and it's not nil.
+func super(child interface{}) (parent interface{}, ok bool) {
+ s, ok := child.(Super)
+ if !ok {
+ return
+ }
+ parent = s.Super()
+ ok = parent != nil
+ return
+}
--- /dev/null
+package torrent
+
+import "bitbucket.org/anacrolix/go.torrent/data"
+
+type statelessDataWrapper struct {
+ data.Data
+ complete []bool
+}
+
+func (me *statelessDataWrapper) PieceComplete(piece int) bool {
+ return me.complete[piece]
+}
+
+func (me *statelessDataWrapper) PieceCompleted(piece int) error {
+ me.complete[piece] = true
+ return nil
+}
+
+func (me *statelessDataWrapper) Super() interface{} {
+ return me.Data
+}
"sync"
"time"
+ "github.com/bradfitz/iter"
+
"bitbucket.org/anacrolix/go.torrent/data"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker"
"github.com/anacrolix/libtorgo/metainfo"
)
-func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
- piece := t.Pieces[index]
- if piece.complete {
+func (t *torrent) PieceNumPendingBytes(index int) (count pp.Integer) {
+ if t.pieceComplete(index) {
return 0
}
+ piece := t.Pieces[index]
if !piece.EverHashed {
return t.PieceLength(index)
}
Pieces []*piece
length int64
- data data.Data
+ data StatefulData
Info *metainfo.Info
// Active peer connections.
pruneTimer *time.Timer
}
+func (t *torrent) pieceComplete(piece int) bool {
+ // TODO: This is called when setting metadata, and before storage is
+ // assigned, which doesn't seem right.
+ return t.data != nil && t.data.PieceComplete(piece)
+}
+
// A file-like handle to torrent data that implements SectionOpener. Opened
// sections will be reused so long as Reads and ReadAt's are contiguous.
type handle struct {
if c, ok := t.data.(io.Closer); ok {
c.Close()
}
- t.data = td
- if sd, ok := t.data.(StatefulData); ok {
- for i, p := range t.Pieces {
- p.complete = sd.PieceComplete(i)
- }
+ if sd, ok := td.(StatefulData); ok {
+ t.data = sd
+ } else {
+ t.data = &statelessDataWrapper{td, make([]bool, t.Info.NumPieces())}
}
return
}
func (t *torrent) pieceStatusChar(index int) byte {
p := t.Pieces[index]
switch {
- case p.Complete():
+ case t.pieceComplete(index):
return 'C'
case p.QueuedForHash:
return 'Q'
}
func (t *torrent) numPiecesCompleted() (num int) {
- for _, p := range t.Pieces {
- if p.Complete() {
+ for i := range iter.N(t.Info.NumPieces()) {
+ if t.pieceComplete(i) {
num++
}
}
if !t.haveInfo() {
return false
}
- for _, piece := range t.Pieces {
- if !piece.Complete() {
+ for i := range t.Pieces {
+ if !t.pieceComplete(i) {
return false
}
}
}
func (me *torrent) haveAnyPieces() bool {
- for _, piece := range me.Pieces {
- if piece.Complete() {
+ for i := range me.Pieces {
+ if me.pieceComplete(i) {
return true
}
}
}
func (t *torrent) havePiece(index int) bool {
- return t.haveInfo() && t.Pieces[index].Complete()
+ return t.haveInfo() && t.pieceComplete(index)
}
func (t *torrent) haveChunk(r request) bool {
return false
}
p := t.Pieces[index]
- return p.EverHashed && len(p.PendingChunkSpecs) != 0 && p.Priority != piecePriorityNone
+ return !t.pieceComplete(index) && p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing
}
func (t *torrent) connHasWantedPieces(c *connection) bool {