listeners []net.Listener
utpSock *utp.Socket
disableTrackers bool
- downloadStrategy DownloadStrategy
+ downloadStrategy downloadStrategy
dHT *dht.Server
disableUTP bool
disableTCP bool
return
}
+// Writes out a human readable status of the client, such as for writing to a
+// HTTP status page.
func (cl *Client) WriteStatus(_w io.Writer) {
cl.mu.RLock()
defer cl.mu.RUnlock()
OpenSection(off, n int64) (io.ReadCloser, error)
}
-func dataReadAt(d Data, b []byte, off int64) (n int, err error) {
+func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
if ra, ok := d.(io.ReaderAt); ok {
return ra.ReadAt(b, off)
}
}
cl = &Client{
- noUpload: cfg.NoUpload,
- disableTrackers: cfg.DisableTrackers,
- downloadStrategy: cfg.DownloadStrategy,
- halfOpenLimit: socketsPerTorrent,
- dataDir: cfg.DataDir,
- disableUTP: cfg.DisableUTP,
- disableTCP: cfg.DisableTCP,
- _configDir: cfg.ConfigDir,
- config: *cfg,
+ noUpload: cfg.NoUpload,
+ disableTrackers: cfg.DisableTrackers,
+ halfOpenLimit: socketsPerTorrent,
+ dataDir: cfg.DataDir,
+ disableUTP: cfg.DisableUTP,
+ disableTCP: cfg.DisableTCP,
+ _configDir: cfg.ConfigDir,
+ config: *cfg,
torrentDataOpener: func(md *metainfo.Info) data.Data {
return filePkg.TorrentData(md, cfg.DataDir)
},
if cfg.PeerID != "" {
CopyExact(&cl.peerID, cfg.PeerID)
} else {
- o := copy(cl.peerID[:], BEP20)
+ o := copy(cl.peerID[:], bep20)
_, err = rand.Read(cl.peerID[o:])
if err != nil {
panic("error generating peer id")
}
if cl.downloadStrategy == nil {
- cl.downloadStrategy = &DefaultDownloadStrategy{}
+ cl.downloadStrategy = &defaultDownloadStrategy{}
}
// Returns the laddr string to listen on for the next Listen call.
// Stops the client. All connections to peers are closed and all activity will
// come to a halt.
-func (me *Client) Stop() {
+func (me *Client) Close() {
me.mu.Lock()
defer me.mu.Unlock()
close(me.quit)
}
func (me *Client) torrent(ih InfoHash) *torrent {
- for _, t := range me.torrents {
- if t.InfoHash == ih {
- return t
- }
- }
- return nil
+ return me.torrents[ih]
}
type dialResult struct {
break
}
go func() {
- err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
+ me.mu.Lock()
+ me.addPeers(t, func() (ret []Peer) {
for _, cp := range pexMsg.Added {
p := Peer{
IP: make([]byte, 4),
}
return
}())
- if err != nil {
- log.Printf("error adding PEX peers: %s", err)
- return
- }
+ me.mu.Unlock()
peersFoundByPEX.Add(int64(len(pexMsg.Added)))
}()
default:
func (me *Client) addPeers(t *torrent, peers []Peer) {
blocked := 0
- for i, p := range peers {
- if me.ipBlockRange(p.IP) == nil {
+ for _, p := range peers {
+ if me.ipBlockRange(p.IP) != nil {
+ blocked++
continue
}
- peers[i] = peers[len(peers)-1]
- peers = peers[:len(peers)-1]
- i--
- blocked++
+ t.addPeer(p)
}
if blocked != 0 {
log.Printf("IP blocklist screened %d peers from being added", blocked)
}
- t.AddPeers(peers)
me.openNewConns(t)
}
-// Adds peers to the swarm for the torrent corresponding to infoHash.
-func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
- me.mu.Lock()
- defer me.mu.Unlock()
- t := me.torrent(infoHash)
- if t == nil {
- return errors.New("no such torrent")
- }
- me.addPeers(t, peers)
- return nil
-}
-
func (cl *Client) torrentFileCachePath(ih InfoHash) string {
return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
}
}
// Storage cannot be changed once it's set.
-func (cl *Client) setStorage(t *torrent, td Data) (err error) {
+func (cl *Client) setStorage(t *torrent, td data.Data) (err error) {
err = t.setStorage(td)
cl.event.Broadcast()
if err != nil {
}
func (t Torrent) AddPeers(pp []Peer) error {
- return t.cl.AddPeers(t.torrent.InfoHash, pp)
+ cl := t.cl
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ cl.addPeers(t.torrent, pp)
+ return nil
}
func (t Torrent) DownloadAll() {
Port: peer.Port,
})
}
- err = cl.AddPeers(t.InfoHash, peers)
- if err != nil {
- log.Printf("error adding peers to torrent %s: %s", t, err)
- } else {
- log.Printf("%s: %d new peers from %s", t, len(peers), tr)
- }
+ cl.mu.Lock()
+ cl.addPeers(t, peers)
+ cl.mu.Unlock()
+ log.Printf("%s: %d new peers from %s", t, len(peers), tr)
time.Sleep(time.Second * time.Duration(resp.Interval))
return nil
cl.pieceHashed(t, index, sum == p.Hash)
}
-func (cl *Client) Torrent(ih InfoHash) (t Torrent, ok bool) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
- t.torrent, ok = cl.torrents[ih]
- t.cl = cl
- return
-}
-
+// Returns handles to all the torrents loaded in the Client.
func (me *Client) Torrents() (ret []Torrent) {
me.mu.Lock()
for _, t := range me.torrents {
if err != nil {
t.Fatal(err)
}
- cl.Stop()
+ cl.Close()
}
func TestAddDropTorrent(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- defer cl.Stop()
+ defer cl.Close()
dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)
tt, err := cl.AddTorrent(mi)
if err != nil {
t.Fatal(err)
}
- defer cl.Stop()
+ defer cl.Close()
}
}
func TestAddDropManyTorrents(t *testing.T) {
cl, _ := NewClient(&TestingConfig)
- defer cl.Stop()
+ defer cl.Close()
var m Magnet
for i := range iter.N(1000) {
binary.PutVarint(m.InfoHash[:], int64(i))
if err != nil {
t.Fatal(err)
}
- defer seeder.Stop()
+ defer seeder.Close()
seeder.AddTorrent(mi)
leecherDataDir, err := ioutil.TempDir("", "")
if err != nil {
// }
cfg.TorrentDataOpener = blob.NewStore(leecherDataDir).OpenTorrent
leecher, _ := NewClient(&cfg)
- defer leecher.Stop()
+ defer leecher.Close()
leecherGreeting, _ := leecher.AddTorrent(mi)
leecherGreeting.AddPeers([]Peer{
Peer{
http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
client.WriteStatus(w)
})
- defer client.Stop()
+ defer client.Close()
if flag.NArg() == 0 {
fmt.Fprintln(os.Stderr, "no torrents specified")
return
func addTestPeer(client *torrent.Client) {
for _, t := range client.Torrents() {
if testPeerAddr != nil {
- if err := client.AddPeers(t.InfoHash, []torrent.Peer{{
+ if err := t.AddPeers([]torrent.Peer{{
IP: testPeerAddr.IP,
Port: testPeerAddr.Port,
}}); err != nil {
"bitbucket.org/anacrolix/go.torrent/dht"
)
+// Override Client defaults.
type Config struct {
- DataDir string
- ListenAddr string
- DisableTrackers bool
- DownloadStrategy DownloadStrategy
- NoDHT bool
- DHTConfig *dht.ServerConfig
- NoUpload bool
- PeerID string
- DisableUTP bool
- DisableTCP bool
+ // Store torrent file data in this directory unless TorrentDataOpener is
+ // specified.
+ DataDir string
+ // The address to listen for new uTP and TCP bittorrent protocol
+ // connections. DHT shares a UDP socket with uTP unless configured
+ // otherwise.
+ ListenAddr string
+ // Don't announce to trackers. This only leaves DHT to discover peers.
+ DisableTrackers bool
+ // Don't create a DHT.
+ NoDHT bool
+ // Overrides the default DHT configuration.
+ DHTConfig *dht.ServerConfig
+ // Don't chunks to peers.
+ NoUpload bool
+ // User-provided Client peer ID. If not present, one is generated automatically.
+ PeerID string
+ // For the bittorrent protocol.
+ DisableUTP bool
+ // For the bittorrent protocol.
+ DisableTCP bool
+ // Don't automatically load "$ConfigDir/blocklist".
NoDefaultBlocklist bool
- // Defaults to "$HOME/.config/torrent"
- ConfigDir string
+ // Defaults to "$HOME/.config/torrent". This is where "blocklist",
+ // "torrents" and other operational files are stored.
+ ConfigDir string
+ // Don't save or load to a cache of torrent files stored in
+ // "$ConfigDir/torrents".
DisableMetainfoCache bool
+ // Called to instantiate storage for each added torrent. Provided backends
+ // are in $REPO/data. If not set, the "file" implementation is used.
TorrentDataOpener
}
OpenTorrent(*metainfo.Info) Data
}
+// Represents data storage for a Torrent. Additional optional interfaces to
+// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener.
type Data interface {
// OpenSection(off, n int64) (io.ReadCloser, error)
// ReadAt(p []byte, off int64) (n int, err error)
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
)
-type DownloadStrategy interface {
+type downloadStrategy interface {
// Tops up the outgoing pending requests.
FillRequests(*torrent, *connection)
TorrentStarted(*torrent)
PendingData(*torrent) bool
}
-type DefaultDownloadStrategy struct{}
+type defaultDownloadStrategy struct{}
-func (me *DefaultDownloadStrategy) PendingData(t *torrent) bool {
+func (me *defaultDownloadStrategy) PendingData(t *torrent) bool {
return !t.haveAllPieces()
}
-func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {}
+func (me *defaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {}
-func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {}
+func (me *defaultDownloadStrategy) WriteStatus(w io.Writer) {}
-func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
+func (s *defaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
if c.Interested {
if c.PeerChoked {
return
return
}
-func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {}
+func (s *defaultDownloadStrategy) TorrentStarted(t *torrent) {}
-func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
+func (s *defaultDownloadStrategy) TorrentStopped(t *torrent) {
}
-func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
+func (s *defaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
}
-func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
-func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
-func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
+func (me *defaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
+func (me *defaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
+func (*defaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
NoDefaultBlocklist: true,
})
- defer client.Stop()
+ defer client.Close()
client.AddTorrent(layout.Metainfo)
fs := New(client)
fuseConn, err := fuse.Mount(layout.MountDir)
t.Fatalf("error creating seeder client: %s", err)
}
seeder.SetIPBlockList(nil)
- defer seeder.Stop()
+ defer seeder.Close()
http.HandleFunc("/seeder", func(w http.ResponseWriter, req *http.Request) {
seeder.WriteStatus(w)
})
http.HandleFunc("/leecher", func(w http.ResponseWriter, req *http.Request) {
leecher.WriteStatus(w)
})
- defer leecher.Stop()
- leecher.AddTorrent(layout.Metainfo)
- var ih torrent.InfoHash
- util.CopyExact(ih[:], layout.Metainfo.Info.Hash)
- leecher.AddPeers(ih, []torrent.Peer{func() torrent.Peer {
+ defer leecher.Close()
+ leecherTorrent, _ := leecher.AddTorrent(layout.Metainfo)
+ leecherTorrent.AddPeers([]torrent.Peer{func() torrent.Peer {
_, port, err := net.SplitHostPort(seeder.ListenAddr().String())
if err != nil {
panic(err)
return fmt.Sprintf("%s-%s (%s)", r.First, r.Last, r.Description)
}
-// Create a new IP list. The given range must already sorted by the lower IP
-// in the range. Behaviour is undefined for lists of overlapping ranges.
+// Create a new IP list. The given ranges must already sorted by the lower
+// bound IP in each range. Behaviour is undefined for lists of overlapping
+// ranges.
func New(initSorted []Range) *IPList {
return &IPList{
ranges: initSorted,
pieceHash = crypto.SHA1
maxRequests = 250 // Maximum pending requests we allow peers to send us.
chunkSize = 0x4000 // 16KiB
- BEP20 = "-GT0000-" // Peer ID client identifier prefix
+ bep20 = "-GT0000-" // Peer ID client identifier prefix
nominalDialTimeout = time.Second * 30
minDialTimeout = 5 * time.Second
)
var (
// Requested data not yet available.
- ErrDataNotReady = errors.New("data not ready")
+ errDataNotReady = errors.New("data not ready")
)
// The size in bytes of a metadata extension piece.
"container/list"
)
-type OrderedList struct {
+// This was used to maintain pieces in order of bytes left to download. I
+// don't think it's currently in use.
+
+type orderedList struct {
list *list.List
lessFunc func(a, b interface{}) bool
}
-func (me *OrderedList) Len() int {
+func (me *orderedList) Len() int {
return me.list.Len()
}
-func NewList(lessFunc func(a, b interface{}) bool) *OrderedList {
- return &OrderedList{
+func newOrderedList(lessFunc func(a, b interface{}) bool) *orderedList {
+ return &orderedList{
list: list.New(),
lessFunc: lessFunc,
}
}
-func (me *OrderedList) ValueChanged(e *list.Element) {
+func (me *orderedList) ValueChanged(e *list.Element) {
for prev := e.Prev(); prev != nil && me.lessFunc(e.Value, prev.Value); prev = e.Prev() {
me.list.MoveBefore(e, prev)
}
}
}
-func (me *OrderedList) Insert(value interface{}) (ret *list.Element) {
+func (me *orderedList) Insert(value interface{}) (ret *list.Element) {
ret = me.list.PushFront(value)
me.ValueChanged(ret)
return
}
-func (me *OrderedList) Front() *list.Element {
+func (me *orderedList) Front() *list.Element {
return me.list.Front()
}
-func (me *OrderedList) Remove(e *list.Element) interface{} {
+func (me *orderedList) Remove(e *list.Element) interface{} {
return me.list.Remove(e)
}
)
func TestOrderedList(t *testing.T) {
- ol := NewList(func(a, b interface{}) bool {
+ ol := newOrderedList(func(a, b interface{}) bool {
return a.(int) < b.(int)
})
if ol.Len() != 0 {
Port int
}
-// Represents data storage for a Torrent. Additional optional interfaces to
-// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener.
-type Data interface {
- data.Data
-}
-
// Data maintains per-piece persistent state.
type StatefulData interface {
- Data
+ data.Data
// We believe the piece data will pass a hash check.
PieceCompleted(index int) error
// Returns true if the piece is complete.
Pieces []*piece
length int64
- data Data
+ data data.Data
Info *metainfo.Info
// Active peer connections.
t.pruneTimer.Stop()
}
-func (t *torrent) AddPeers(pp []Peer) {
- for _, p := range pp {
- t.Peers[peersKey{string(p.IP), p.Port}] = p
- }
+func (t *torrent) addPeer(p Peer) {
+ t.Peers[peersKey{string(p.IP), p.Port}] = p
}
func (t *torrent) invalidateMetadata() {
return
}
-func (t *torrent) setStorage(td Data) (err error) {
+func (t *torrent) setStorage(td data.Data) (err error) {
if c, ok := t.data.(io.Closer); ok {
c.Close()
}
return t.PieceNumPendingBytes(pp.Integer(index)) != t.PieceLength(pp.Integer(index))
}
-func NumChunksForPiece(chunkSize int, pieceSize int) int {
+func numChunksForPiece(chunkSize int, pieceSize int) int {
return (pieceSize + chunkSize - 1) / chunkSize
}