From: Matt Joiner Date: Sun, 8 Mar 2015 06:28:14 +0000 (+1100) Subject: More cleaning of public interface X-Git-Tag: v1.0.0~1296 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=ad6ac3f2cd4328fa1888d1343ff6d404b1114b56;p=btrtrc.git More cleaning of public interface --- diff --git a/client.go b/client.go index ce8d4e3c..4a70e7c5 100644 --- a/client.go +++ b/client.go @@ -114,7 +114,7 @@ type Client struct { listeners []net.Listener utpSock *utp.Socket disableTrackers bool - downloadStrategy DownloadStrategy + downloadStrategy downloadStrategy dHT *dht.Server disableUTP bool disableTCP bool @@ -192,6 +192,8 @@ func (cl *Client) sortedTorrents() (ret []*torrent) { return } +// Writes out a human readable status of the client, such as for writing to a +// HTTP status page. func (cl *Client) WriteStatus(_w io.Writer) { cl.mu.RLock() defer cl.mu.RUnlock() @@ -290,7 +292,7 @@ type SectionOpener interface { OpenSection(off, n int64) (io.ReadCloser, error) } -func dataReadAt(d Data, b []byte, off int64) (n int, err error) { +func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) { if ra, ok := d.(io.ReaderAt); ok { return ra.ReadAt(b, off) } @@ -447,15 +449,14 @@ func NewClient(cfg *Config) (cl *Client, err error) { } cl = &Client{ - noUpload: cfg.NoUpload, - disableTrackers: cfg.DisableTrackers, - downloadStrategy: cfg.DownloadStrategy, - halfOpenLimit: socketsPerTorrent, - dataDir: cfg.DataDir, - disableUTP: cfg.DisableUTP, - disableTCP: cfg.DisableTCP, - _configDir: cfg.ConfigDir, - config: *cfg, + noUpload: cfg.NoUpload, + disableTrackers: cfg.DisableTrackers, + halfOpenLimit: socketsPerTorrent, + dataDir: cfg.DataDir, + disableUTP: cfg.DisableUTP, + disableTCP: cfg.DisableTCP, + _configDir: cfg.ConfigDir, + config: *cfg, torrentDataOpener: func(md *metainfo.Info) data.Data { return filePkg.TorrentData(md, cfg.DataDir) }, @@ -483,7 +484,7 @@ func NewClient(cfg *Config) (cl *Client, err error) { if cfg.PeerID != "" { CopyExact(&cl.peerID, cfg.PeerID) } else { - o := copy(cl.peerID[:], BEP20) + o := copy(cl.peerID[:], bep20) _, err = rand.Read(cl.peerID[o:]) if err != nil { panic("error generating peer id") @@ -491,7 +492,7 @@ func NewClient(cfg *Config) (cl *Client, err error) { } if cl.downloadStrategy == nil { - cl.downloadStrategy = &DefaultDownloadStrategy{} + cl.downloadStrategy = &defaultDownloadStrategy{} } // Returns the laddr string to listen on for the next Listen call. @@ -557,7 +558,7 @@ func (cl *Client) stopped() bool { // Stops the client. All connections to peers are closed and all activity will // come to a halt. -func (me *Client) Stop() { +func (me *Client) Close() { me.mu.Lock() defer me.mu.Unlock() close(me.quit) @@ -622,12 +623,7 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) { } func (me *Client) torrent(ih InfoHash) *torrent { - for _, t := range me.torrents { - if t.InfoHash == ih { - return t - } - } - return nil + return me.torrents[ih] } type dialResult struct { @@ -1342,7 +1338,8 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { break } go func() { - err := me.AddPeers(t.InfoHash, func() (ret []Peer) { + me.mu.Lock() + me.addPeers(t, func() (ret []Peer) { for _, cp := range pexMsg.Added { p := Peer{ IP: make([]byte, 4), @@ -1356,10 +1353,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { } return }()) - if err != nil { - log.Printf("error adding PEX peers: %s", err) - return - } + me.mu.Unlock() peersFoundByPEX.Add(int64(len(pexMsg.Added))) }() default: @@ -1513,34 +1507,19 @@ func (me *Client) openNewConns(t *torrent) { func (me *Client) addPeers(t *torrent, peers []Peer) { blocked := 0 - for i, p := range peers { - if me.ipBlockRange(p.IP) == nil { + for _, p := range peers { + if me.ipBlockRange(p.IP) != nil { + blocked++ continue } - peers[i] = peers[len(peers)-1] - peers = peers[:len(peers)-1] - i-- - blocked++ + t.addPeer(p) } if blocked != 0 { log.Printf("IP blocklist screened %d peers from being added", blocked) } - t.AddPeers(peers) me.openNewConns(t) } -// Adds peers to the swarm for the torrent corresponding to infoHash. -func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error { - me.mu.Lock() - defer me.mu.Unlock() - t := me.torrent(infoHash) - if t == nil { - return errors.New("no such torrent") - } - me.addPeers(t, peers) - return nil -} - func (cl *Client) torrentFileCachePath(ih InfoHash) string { return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent") } @@ -1592,7 +1571,7 @@ func (cl *Client) startTorrent(t *torrent) { } // Storage cannot be changed once it's set. -func (cl *Client) setStorage(t *torrent, td Data) (err error) { +func (cl *Client) setStorage(t *torrent, td data.Data) (err error) { err = t.setStorage(td) cl.event.Broadcast() if err != nil { @@ -1881,7 +1860,11 @@ func (t Torrent) MetainfoFilepath() string { } func (t Torrent) AddPeers(pp []Peer) error { - return t.cl.AddPeers(t.torrent.InfoHash, pp) + cl := t.cl + cl.mu.Lock() + defer cl.mu.Unlock() + cl.addPeers(t.torrent, pp) + return nil } func (t Torrent) DownloadAll() { @@ -2142,12 +2125,10 @@ func (cl *Client) announceTorrentSingleTracker(tr tracker.Client, req *tracker.A Port: peer.Port, }) } - err = cl.AddPeers(t.InfoHash, peers) - if err != nil { - log.Printf("error adding peers to torrent %s: %s", t, err) - } else { - log.Printf("%s: %d new peers from %s", t, len(peers), tr) - } + cl.mu.Lock() + cl.addPeers(t, peers) + cl.mu.Unlock() + log.Printf("%s: %d new peers from %s", t, len(peers), tr) time.Sleep(time.Second * time.Duration(resp.Interval)) return nil @@ -2393,14 +2374,7 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) { cl.pieceHashed(t, index, sum == p.Hash) } -func (cl *Client) Torrent(ih InfoHash) (t Torrent, ok bool) { - cl.mu.Lock() - defer cl.mu.Unlock() - t.torrent, ok = cl.torrents[ih] - t.cl = cl - return -} - +// Returns handles to all the torrents loaded in the Client. func (me *Client) Torrents() (ret []Torrent) { me.mu.Lock() for _, t := range me.torrents { diff --git a/client_test.go b/client_test.go index 01d923b8..55827d26 100644 --- a/client_test.go +++ b/client_test.go @@ -37,7 +37,7 @@ func TestClientDefault(t *testing.T) { if err != nil { t.Fatal(err) } - cl.Stop() + cl.Close() } func TestAddDropTorrent(t *testing.T) { @@ -47,7 +47,7 @@ func TestAddDropTorrent(t *testing.T) { if err != nil { t.Fatal(err) } - defer cl.Stop() + defer cl.Close() dir, mi := testutil.GreetingTestTorrent() defer os.RemoveAll(dir) tt, err := cl.AddTorrent(mi) @@ -223,13 +223,13 @@ func TestTwoClientsArbitraryPorts(t *testing.T) { if err != nil { t.Fatal(err) } - defer cl.Stop() + defer cl.Close() } } func TestAddDropManyTorrents(t *testing.T) { cl, _ := NewClient(&TestingConfig) - defer cl.Stop() + defer cl.Close() var m Magnet for i := range iter.N(1000) { binary.PutVarint(m.InfoHash[:], int64(i)) @@ -246,7 +246,7 @@ func TestClientTransfer(t *testing.T) { if err != nil { t.Fatal(err) } - defer seeder.Stop() + defer seeder.Close() seeder.AddTorrent(mi) leecherDataDir, err := ioutil.TempDir("", "") if err != nil { @@ -258,7 +258,7 @@ func TestClientTransfer(t *testing.T) { // } cfg.TorrentDataOpener = blob.NewStore(leecherDataDir).OpenTorrent leecher, _ := NewClient(&cfg) - defer leecher.Stop() + defer leecher.Close() leecherGreeting, _ := leecher.AddTorrent(mi) leecherGreeting.AddPeers([]Peer{ Peer{ diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index 54a541ce..46e103a1 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -42,7 +42,7 @@ func main() { http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { client.WriteStatus(w) }) - defer client.Stop() + defer client.Close() if flag.NArg() == 0 { fmt.Fprintln(os.Stderr, "no torrents specified") return diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index 48330b9c..a93b94d0 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -69,7 +69,7 @@ func exitSignalHandlers(fs *torrentfs.TorrentFS) { func addTestPeer(client *torrent.Client) { for _, t := range client.Torrents() { if testPeerAddr != nil { - if err := client.AddPeers(t.InfoHash, []torrent.Peer{{ + if err := t.AddPeers([]torrent.Peer{{ IP: testPeerAddr.IP, Port: testPeerAddr.Port, }}); err != nil { diff --git a/config.go b/config.go index 29d35c6a..99d625fc 100644 --- a/config.go +++ b/config.go @@ -4,20 +4,38 @@ import ( "bitbucket.org/anacrolix/go.torrent/dht" ) +// Override Client defaults. type Config struct { - DataDir string - ListenAddr string - DisableTrackers bool - DownloadStrategy DownloadStrategy - NoDHT bool - DHTConfig *dht.ServerConfig - NoUpload bool - PeerID string - DisableUTP bool - DisableTCP bool + // Store torrent file data in this directory unless TorrentDataOpener is + // specified. + DataDir string + // The address to listen for new uTP and TCP bittorrent protocol + // connections. DHT shares a UDP socket with uTP unless configured + // otherwise. + ListenAddr string + // Don't announce to trackers. This only leaves DHT to discover peers. + DisableTrackers bool + // Don't create a DHT. + NoDHT bool + // Overrides the default DHT configuration. + DHTConfig *dht.ServerConfig + // Don't chunks to peers. + NoUpload bool + // User-provided Client peer ID. If not present, one is generated automatically. + PeerID string + // For the bittorrent protocol. + DisableUTP bool + // For the bittorrent protocol. + DisableTCP bool + // Don't automatically load "$ConfigDir/blocklist". NoDefaultBlocklist bool - // Defaults to "$HOME/.config/torrent" - ConfigDir string + // Defaults to "$HOME/.config/torrent". This is where "blocklist", + // "torrents" and other operational files are stored. + ConfigDir string + // Don't save or load to a cache of torrent files stored in + // "$ConfigDir/torrents". DisableMetainfoCache bool + // Called to instantiate storage for each added torrent. Provided backends + // are in $REPO/data. If not set, the "file" implementation is used. TorrentDataOpener } diff --git a/data/data.go b/data/data.go index 2364eee3..bc61d25b 100644 --- a/data/data.go +++ b/data/data.go @@ -10,6 +10,8 @@ type Store interface { OpenTorrent(*metainfo.Info) Data } +// Represents data storage for a Torrent. Additional optional interfaces to +// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener. type Data interface { // OpenSection(off, n int64) (io.ReadCloser, error) // ReadAt(p []byte, off int64) (n int, err error) diff --git a/download_strategies.go b/download_strategies.go index f3ef027b..fdf9e245 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -6,7 +6,7 @@ import ( pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" ) -type DownloadStrategy interface { +type downloadStrategy interface { // Tops up the outgoing pending requests. FillRequests(*torrent, *connection) TorrentStarted(*torrent) @@ -20,17 +20,17 @@ type DownloadStrategy interface { PendingData(*torrent) bool } -type DefaultDownloadStrategy struct{} +type defaultDownloadStrategy struct{} -func (me *DefaultDownloadStrategy) PendingData(t *torrent) bool { +func (me *defaultDownloadStrategy) PendingData(t *torrent) bool { return !t.haveAllPieces() } -func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {} +func (me *defaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {} -func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {} +func (me *defaultDownloadStrategy) WriteStatus(w io.Writer) {} -func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { +func (s *defaultDownloadStrategy) FillRequests(t *torrent, c *connection) { if c.Interested { if c.PeerChoked { return @@ -64,14 +64,14 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { return } -func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {} +func (s *defaultDownloadStrategy) TorrentStarted(t *torrent) {} -func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) { +func (s *defaultDownloadStrategy) TorrentStopped(t *torrent) { } -func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) { +func (s *defaultDownloadStrategy) DeleteRequest(t *torrent, r request) { } -func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {} -func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {} -func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {} +func (me *defaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {} +func (me *defaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {} +func (*defaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {} diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index ab8bc0a0..573982ed 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -98,7 +98,7 @@ func TestUnmountWedged(t *testing.T) { NoDefaultBlocklist: true, }) - defer client.Stop() + defer client.Close() client.AddTorrent(layout.Metainfo) fs := New(client) fuseConn, err := fuse.Mount(layout.MountDir) @@ -177,7 +177,7 @@ func TestDownloadOnDemand(t *testing.T) { t.Fatalf("error creating seeder client: %s", err) } seeder.SetIPBlockList(nil) - defer seeder.Stop() + defer seeder.Close() http.HandleFunc("/seeder", func(w http.ResponseWriter, req *http.Request) { seeder.WriteStatus(w) }) @@ -207,11 +207,9 @@ func TestDownloadOnDemand(t *testing.T) { http.HandleFunc("/leecher", func(w http.ResponseWriter, req *http.Request) { leecher.WriteStatus(w) }) - defer leecher.Stop() - leecher.AddTorrent(layout.Metainfo) - var ih torrent.InfoHash - util.CopyExact(ih[:], layout.Metainfo.Info.Hash) - leecher.AddPeers(ih, []torrent.Peer{func() torrent.Peer { + defer leecher.Close() + leecherTorrent, _ := leecher.AddTorrent(layout.Metainfo) + leecherTorrent.AddPeers([]torrent.Peer{func() torrent.Peer { _, port, err := net.SplitHostPort(seeder.ListenAddr().String()) if err != nil { panic(err) diff --git a/iplist/iplist.go b/iplist/iplist.go index c102733d..d04b0905 100644 --- a/iplist/iplist.go +++ b/iplist/iplist.go @@ -21,8 +21,9 @@ func (r *Range) String() string { return fmt.Sprintf("%s-%s (%s)", r.First, r.Last, r.Description) } -// Create a new IP list. The given range must already sorted by the lower IP -// in the range. Behaviour is undefined for lists of overlapping ranges. +// Create a new IP list. The given ranges must already sorted by the lower +// bound IP in each range. Behaviour is undefined for lists of overlapping +// ranges. func New(initSorted []Range) *IPList { return &IPList{ ranges: initSorted, diff --git a/misc.go b/misc.go index c3da1048..b47198b2 100644 --- a/misc.go +++ b/misc.go @@ -15,7 +15,7 @@ const ( pieceHash = crypto.SHA1 maxRequests = 250 // Maximum pending requests we allow peers to send us. chunkSize = 0x4000 // 16KiB - BEP20 = "-GT0000-" // Peer ID client identifier prefix + bep20 = "-GT0000-" // Peer ID client identifier prefix nominalDialTimeout = time.Second * 30 minDialTimeout = 5 * time.Second ) @@ -97,7 +97,7 @@ func newRequest(index, begin, length peer_protocol.Integer) request { var ( // Requested data not yet available. - ErrDataNotReady = errors.New("data not ready") + errDataNotReady = errors.New("data not ready") ) // The size in bytes of a metadata extension piece. diff --git a/ordered.go b/ordered.go index 9c546054..2d63c539 100644 --- a/ordered.go +++ b/ordered.go @@ -4,23 +4,26 @@ import ( "container/list" ) -type OrderedList struct { +// This was used to maintain pieces in order of bytes left to download. I +// don't think it's currently in use. + +type orderedList struct { list *list.List lessFunc func(a, b interface{}) bool } -func (me *OrderedList) Len() int { +func (me *orderedList) Len() int { return me.list.Len() } -func NewList(lessFunc func(a, b interface{}) bool) *OrderedList { - return &OrderedList{ +func newOrderedList(lessFunc func(a, b interface{}) bool) *orderedList { + return &orderedList{ list: list.New(), lessFunc: lessFunc, } } -func (me *OrderedList) ValueChanged(e *list.Element) { +func (me *orderedList) ValueChanged(e *list.Element) { for prev := e.Prev(); prev != nil && me.lessFunc(e.Value, prev.Value); prev = e.Prev() { me.list.MoveBefore(e, prev) } @@ -29,16 +32,16 @@ func (me *OrderedList) ValueChanged(e *list.Element) { } } -func (me *OrderedList) Insert(value interface{}) (ret *list.Element) { +func (me *orderedList) Insert(value interface{}) (ret *list.Element) { ret = me.list.PushFront(value) me.ValueChanged(ret) return } -func (me *OrderedList) Front() *list.Element { +func (me *orderedList) Front() *list.Element { return me.list.Front() } -func (me *OrderedList) Remove(e *list.Element) interface{} { +func (me *orderedList) Remove(e *list.Element) interface{} { return me.list.Remove(e) } diff --git a/ordered_test.go b/ordered_test.go index 7c4c4b08..0d27b84d 100644 --- a/ordered_test.go +++ b/ordered_test.go @@ -5,7 +5,7 @@ import ( ) func TestOrderedList(t *testing.T) { - ol := NewList(func(a, b interface{}) bool { + ol := newOrderedList(func(a, b interface{}) bool { return a.(int) < b.(int) }) if ol.Len() != 0 { diff --git a/torrent.go b/torrent.go index 13ca8aa2..f0dd4c07 100644 --- a/torrent.go +++ b/torrent.go @@ -44,15 +44,9 @@ type peersKey struct { Port int } -// Represents data storage for a Torrent. Additional optional interfaces to -// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener. -type Data interface { - data.Data -} - // Data maintains per-piece persistent state. type StatefulData interface { - Data + data.Data // We believe the piece data will pass a hash check. PieceCompleted(index int) error // Returns true if the piece is complete. @@ -72,7 +66,7 @@ type torrent struct { Pieces []*piece length int64 - data Data + data data.Data Info *metainfo.Info // Active peer connections. @@ -229,10 +223,8 @@ func (t *torrent) ceaseNetworking() { t.pruneTimer.Stop() } -func (t *torrent) AddPeers(pp []Peer) { - for _, p := range pp { - t.Peers[peersKey{string(p.IP), p.Port}] = p - } +func (t *torrent) addPeer(p Peer) { + t.Peers[peersKey{string(p.IP), p.Port}] = p } func (t *torrent) invalidateMetadata() { @@ -305,7 +297,7 @@ func (t *torrent) setMetadata(md metainfo.Info, infoBytes []byte, eventLocker sy return } -func (t *torrent) setStorage(td Data) (err error) { +func (t *torrent) setStorage(td data.Data) (err error) { if c, ok := t.data.(io.Closer); ok { c.Close() } @@ -539,7 +531,7 @@ func (t *torrent) piecePartiallyDownloaded(index int) bool { return t.PieceNumPendingBytes(pp.Integer(index)) != t.PieceLength(pp.Integer(index)) } -func NumChunksForPiece(chunkSize int, pieceSize int) int { +func numChunksForPiece(chunkSize int, pieceSize int) int { return (pieceSize + chunkSize - 1) / chunkSize }