"syscall"
"time"
+ filePkg "bitbucket.org/anacrolix/go.torrent/data/file"
+
"bitbucket.org/anacrolix/go.torrent/dht"
"bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
"bitbucket.org/anacrolix/go.torrent/iplist"
// impact of a few bad apples. 4s loses 1% of successful handshakes that
// are obtained with 60s timeout, and 5% of unsuccessful handshakes.
handshakeTimeout = 4 * time.Second
+
+ pruneInterval = 10 * time.Second
)
// Currently doesn't really queue, but should in the future.
disableTCP bool
ipBlockList *iplist.IPList
bannedTorrents map[InfoHash]struct{}
+ _configDir string
+ config Config
+ pruneTimer *time.Timer
+
+ torrentDataOpener TorrentDataOpener
mu sync.RWMutex
event sync.Cond
}
}
-// Read torrent data at the given offset. Returns ErrDataNotReady if the data
-// isn't available.
+// Read torrent data at the given offset. Will block until it is available.
func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
cl.mu.Lock()
defer cl.mu.Unlock()
if len(p) == 0 {
panic(len(p))
}
- for !piece.Complete() {
+ for !piece.Complete() && !t.isClosed() {
piece.Event.Wait()
}
- return t.Data.ReadAt(p, off)
+ return t.data.ReadAt(p, off)
}
func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) {
}
func (cl *Client) configDir() string {
- return filepath.Join(os.Getenv("HOME"), ".config/torrent")
+ if cl._configDir == "" {
+ return filepath.Join(os.Getenv("HOME"), ".config/torrent")
+ }
+ return cl._configDir
}
func (cl *Client) ConfigDir() string {
dataDir: cfg.DataDir,
disableUTP: cfg.DisableUTP,
disableTCP: cfg.DisableTCP,
+ _configDir: cfg.ConfigDir,
+ config: *cfg,
+ torrentDataOpener: func(md *metainfo.Info) (TorrentData, error) {
+ return filePkg.TorrentData(md, cfg.DataDir), nil
+ },
quit: make(chan struct{}),
torrents: make(map[InfoHash]*torrent),
// routine.
// c.PeerRequests[request] = struct{}{}
p := make([]byte, msg.Length)
- n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
+ n, err := t.data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
if err != nil {
return fmt.Errorf("reading t data to serve request %q: %s", request, err)
}
return nil
}
-func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
- err = t.setMetadata(md, cl.dataDir, bytes, &cl.mu)
- if err != nil {
- return
- }
-
- if err := cl.saveTorrentFile(t); err != nil {
- log.Printf("error saving torrent file for %s: %s", t, err)
+func (cl *Client) startTorrent(t *torrent) {
+ if t.Info == nil || t.data == nil {
+ panic("nope")
}
-
- if strings.Contains(strings.ToLower(md.Name), "porn") {
- cl.dropTorrent(t.InfoHash)
- err = errors.New("no porn plx")
- return
- }
-
// If the client intends to upload, it needs to know what state pieces are
// in.
if !cl.noUpload {
}
}()
}
-
cl.downloadStrategy.TorrentStarted(t)
+}
+
+// Storage cannot be changed once it's set.
+func (cl *Client) setStorage(t *torrent, td TorrentData) (err error) {
+ err = t.setStorage(td)
+ cl.event.Broadcast()
+ if err != nil {
+ return
+ }
+ cl.startTorrent(t)
+ return
+}
+
+type TorrentDataOpener func(*metainfo.Info) (TorrentData, error)
+
+func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
+ err = t.setMetadata(md, bytes, &cl.mu)
+ if err != nil {
+ return
+ }
+ if !cl.config.DisableMetainfoCache {
+ if err := cl.saveTorrentFile(t); err != nil {
+ log.Printf("error saving torrent file for %s: %s", t, err)
+ }
+ }
+ if strings.Contains(strings.ToLower(md.Name), "porn") {
+ cl.dropTorrent(t.InfoHash)
+ err = errors.New("no porn plx")
+ return
+ }
close(t.gotMetainfo)
+ td, err := cl.torrentDataOpener(&md)
+ if err != nil {
+ return
+ }
+ err = cl.setStorage(t, td)
return
}
// Returns nil metainfo if it isn't in the cache.
func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
+ if cl.config.DisableMetainfoCache {
+ return
+ }
f, err := os.Open(cl.torrentFileCachePath(ih))
if err != nil {
if os.IsNotExist(err) {
return
}
-// Actively prunes unused connections. This is required to make space to dial
-// for replacements.
-func (cl *Client) connectionPruner(t *torrent) {
- for {
- select {
- case <-t.ceasingNetworking:
- return
- case <-t.closing:
- return
- case <-time.After(15 * time.Second):
+// Prunes unused connections. This is required to make space to dial for
+// replacements.
+func (cl *Client) pruneConnectionsUnlocked(t *torrent) {
+ select {
+ case <-t.ceasingNetworking:
+ return
+ case <-t.closing:
+ return
+ default:
+ }
+ cl.mu.Lock()
+ license := len(t.Conns) - (socketsPerTorrent+1)/2
+ for _, c := range t.Conns {
+ if license <= 0 {
+ break
}
- cl.mu.Lock()
- license := len(t.Conns) - (socketsPerTorrent+1)/2
- for _, c := range t.Conns {
- if license <= 0 {
- break
- }
- if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
- continue
- }
- if time.Now().Sub(c.completedHandshake) < time.Minute {
- continue
- }
- c.Close()
- license--
+ if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
+ continue
}
- cl.mu.Unlock()
+ if time.Now().Sub(c.completedHandshake) < time.Minute {
+ continue
+ }
+ c.Close()
+ license--
}
+ cl.mu.Unlock()
+ t.pruneTimer.Reset(pruneInterval)
}
func (me *Client) dropTorrent(infoHash InfoHash) (err error) {
if me.dHT != nil {
go me.announceTorrentDHT(T.torrent, true)
}
- go me.connectionPruner(T.torrent)
+ T.torrent.pruneTimer = time.AfterFunc(0, func() {
+ me.pruneConnectionsUnlocked(T.torrent)
+ })
}
return
}
cl.mu.Lock()
defer cl.mu.Unlock()
p := t.Pieces[index]
- for p.Hashing {
+ for p.Hashing || t.data == nil {
cl.event.Wait()
}
if t.isClosed() {
loc string
}
-func TorrentData(md *metainfo.Info, location string) (ret *data, err error) {
- ret = &data{md, location}
- return
+func TorrentData(md *metainfo.Info, location string) data {
+ return data{md, location}
}
-func (me *data) ReadAt(p []byte, off int64) (n int, err error) {
+func (me data) ReadAt(p []byte, off int64) (n int, err error) {
for _, fi := range me.info.UpvertedFiles() {
if off >= fi.Length {
off -= fi.Length
return
}
-func (me *data) Close() {}
+func (me data) Close() {}
-func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
+func (me data) WriteAt(p []byte, off int64) (n int, err error) {
for _, fi := range me.info.UpvertedFiles() {
if off >= fi.Length {
off -= fi.Length
return
}
-func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) {
+func (me data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) {
for _, fi := range me.info.UpvertedFiles() {
if off >= fi.Length {
off -= fi.Length
return
}
-func (me *data) fileInfoName(fi metainfo.FileInfo) string {
+func (me data) fileInfoName(fi metainfo.FileInfo) string {
return filepath.Join(append([]string{me.loc, me.info.Name}, fi.Path...)...)
}
"sync"
"time"
- "bitbucket.org/anacrolix/go.torrent/data/file"
-
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker"
"bitbucket.org/anacrolix/go.torrent/util"
Port int
}
-type torrentData interface {
+type TorrentData interface {
ReadAt(p []byte, off int64) (n int, err error)
Close()
WriteAt(p []byte, off int64) (n int, err error)
Pieces []*piece
length int64
- // Prevent mutations to Data memory maps while in use as they're not safe.
- dataLock sync.RWMutex
- Data torrentData
+ data TorrentData
Info *MetaInfo
// Active peer connections.
gotMetainfo chan struct{}
GotMetainfo <-chan struct{}
+
+ pruneTimer *time.Timer
}
func (t *torrent) numConnsUnchoked() (num int) {
for _, c := range t.Conns {
c.Close()
}
+ t.pruneTimer.Stop()
}
func (t *torrent) AddPeers(pp []Peer) {
}
// Called when metadata for a torrent becomes available.
-func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte, eventLocker sync.Locker) (err error) {
+func (t *torrent) setMetadata(md metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
t.Info = newMetaInfo(&md)
t.length = 0
for _, f := range t.Info.UpvertedFiles() {
conn.Close()
}
}
- t.Data, err = file.TorrentData(&md, dataDir)
- if err != nil {
- err = fmt.Errorf("error mmap'ing torrent data: %s", err)
- return
+ return
+}
+
+func (t *torrent) setStorage(td TorrentData) (err error) {
+ if t.data != nil {
+ t.data.Close()
}
+ t.data = td
return
}
}
t.ceaseNetworking()
close(t.closing)
- t.dataLock.Lock()
- if t.Data != nil {
- t.Data.Close()
- t.Data = nil
+ if t.data != nil {
+ t.data.Close()
}
- t.dataLock.Unlock()
for _, conn := range t.Conns {
conn.Close()
}
}
func (t *torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
- _, err = t.Data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
+ _, err = t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
return
}
func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) {
hash := pieceHash.New()
- t.dataLock.RLock()
- t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
- t.dataLock.RUnlock()
+ t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
util.CopyExact(ps[:], hash.Sum(nil))
return
}