]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Merge pull request #410 from anacrolix/webseeds
authorMatt Joiner <anacrolix@gmail.com>
Wed, 3 Jun 2020 01:25:08 +0000 (11:25 +1000)
committerGitHub <noreply@github.com>
Wed, 3 Jun 2020 01:25:08 +0000 (11:25 +1000)
* Rename Peer to PeerInfo, and unexport PeerInfos

* Break peer out from PeerConn

* Abstract out segments mapping and use it in mmap storage

* Got file storage working with segment index

* Fix race in webtorrent.TrackerClient.Run

* storage file implementation: Error on short writes

* Remove debug logging from storage file implementation

* cmd/torrent-verify: Fix piece hash output

* Support disabling webtorrent

* Further progress on webseeding

* Handle webseed Client events

* Rename fastestConn->fastestPeer

* Add webseeds from magnet links

* Remove events from webseed

Manage this stuff inside the webseed peer instead.

* Make use of magnet source fields and expose Torrent.MergeSpec

* Add option to disable webseeds

* Fix webseeds when info isn't available immediately

* Handle webseed request errors

* Tidy up the interface changes

38 files changed:
client.go
client_test.go
cmd/torrent-pick/main.go
cmd/torrent-verify/main.go
cmd/torrent/main.go
cmd/torrentfs/main.go
common/upverted_files.go [new file with mode: 0644]
config.go
file.go
misc.go
mmap_span/mmap_span.go
mmap_span/span.go [deleted file]
peer-impl.go [new file with mode: 0644]
peer_info.go [moved from Peer.go with 81% similarity]
peer_infos.go [moved from Peers.go with 63% similarity]
peerconn.go
peerconn_test.go
peerid.go
pex_test.go
pexconn.go
piece.go
prioritized_peers.go
prioritized_peers_test.go
segments/index.go [new file with mode: 0644]
segments/segments.go [new file with mode: 0644]
segments/segments_test.go [new file with mode: 0644]
spec.go
storage/file.go
storage/mmap.go
t.go
torrent.go
torrent_test.go
tracker_scraper.go
web_seed.go [new file with mode: 0644]
webseed/client.go [new file with mode: 0644]
webseed/misc.go [new file with mode: 0644]
webtorrent/tracker_client.go
worst_conns.go

index ebbf21b38e7b4ebe161e91ea6a154fd2b538b4cd..dd1c77d1ba80d40d83933562db6ba2352f1bf1fe 100644 (file)
--- a/client.go
+++ b/client.go
@@ -10,6 +10,7 @@ import (
        "fmt"
        "io"
        "net"
+       "net/http"
        "strconv"
        "strings"
        "time"
@@ -1075,13 +1076,13 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
                infoHash: ih,
                peers: prioritizedPeers{
                        om: btree.New(32),
-                       getPrio: func(p Peer) peerPriority {
+                       getPrio: func(p PeerInfo) peerPriority {
                                return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
                        },
                },
                conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
 
-               halfOpen:          make(map[string]Peer),
+               halfOpen:          make(map[string]PeerInfo),
                pieceStateChanges: pubsub.NewPubSub(),
 
                storageOpener:       storageClient,
@@ -1091,6 +1092,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
                metadataChanged: sync.Cond{
                        L: cl.locker(),
                },
+               webSeeds: make(map[string]*peer),
        }
        t._pendingPieces.NewSet = priorityBitmapStableNewSet
        t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
@@ -1137,31 +1139,86 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
        return
 }
 
-// Add or merge a torrent spec. If the torrent is already present, the
-// trackers will be merged with the existing ones. If the Info isn't yet
-// known, it will be set. The display name is replaced if the new spec
-// provides one. Returns new if the torrent wasn't already in the client.
-// Note that any `Storage` defined on the spec will be ignored if the
-// torrent is already present (i.e. `new` return value is `true`)
+// Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
+// Torrent.MergeSpec.
 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
        t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
+       err = t.MergeSpec(spec)
+       return
+}
+
+// The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
+// The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
+func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
        if spec.DisplayName != "" {
                t.SetDisplayName(spec.DisplayName)
        }
        if spec.InfoBytes != nil {
-               err = t.SetInfoBytes(spec.InfoBytes)
+               err := t.SetInfoBytes(spec.InfoBytes)
                if err != nil {
-                       return
+                       return err
                }
        }
+       cl := t.cl
+       cl.AddDHTNodes(spec.DhtNodes)
        cl.lock()
        defer cl.unlock()
+       useTorrentSources(spec.Sources, t)
+       for _, url := range spec.Webseeds {
+               t.addWebSeed(url)
+       }
        if spec.ChunkSize != 0 {
                t.setChunkSize(pp.Integer(spec.ChunkSize))
        }
        t.addTrackers(spec.Trackers)
        t.maybeNewConns()
-       return
+       return nil
+}
+
+func useTorrentSources(sources []string, t *Torrent) {
+       for _, s := range sources {
+               go func(s string) {
+                       err := useTorrentSource(s, t)
+                       if err != nil {
+                               t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
+                       } else {
+                               t.logger.Printf("successfully used source %q", s)
+                       }
+               }(s)
+       }
+}
+
+func useTorrentSource(source string, t *Torrent) error {
+       req, err := http.NewRequest(http.MethodGet, source, nil)
+       if err != nil {
+               panic(err)
+       }
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       go func() {
+               select {
+               case <-t.GotInfo():
+               case <-t.Closed():
+               case <-ctx.Done():
+               }
+               cancel()
+       }()
+       req = req.WithContext(ctx)
+       resp, err := http.DefaultClient.Do(req)
+       if err != nil {
+               if ctx.Err() != nil {
+                       return nil
+               }
+               return err
+       }
+       mi, err := metainfo.Load(resp.Body)
+       if err != nil {
+               if ctx.Err() != nil {
+                       return nil
+               }
+               return err
+       }
+       return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
 }
 
 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
@@ -1229,9 +1286,6 @@ func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
 
 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
        T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
-       var ss []string
-       slices.MakeInto(&ss, mi.Nodes)
-       cl.AddDHTNodes(ss)
        return
 }
 
@@ -1277,16 +1331,20 @@ func (cl *Client) banPeerIP(ip net.IP) {
 
 func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
        c = &PeerConn{
-               conn:            nc,
-               outgoing:        outgoing,
-               choking:         true,
-               peerChoking:     true,
-               PeerMaxRequests: 250,
-               writeBuffer:     new(bytes.Buffer),
-               remoteAddr:      remoteAddr,
-               network:         network,
-               connString:      connString,
+               peer: peer{
+                       outgoing:        outgoing,
+                       choking:         true,
+                       peerChoking:     true,
+                       PeerMaxRequests: 250,
+
+                       remoteAddr: remoteAddr,
+                       network:    network,
+                       connString: connString,
+               },
+               conn:        nc,
+               writeBuffer: new(bytes.Buffer),
        }
+       c.peerImpl = c
        c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
                return fmt.Sprintf("%v: %s", c, m.Text())
        })
@@ -1307,7 +1365,7 @@ func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, ip net.IP, port int, portO
        if t == nil {
                return
        }
-       t.addPeers([]Peer{{
+       t.addPeers([]PeerInfo{{
                Addr:   ipPortAddr{ip, port},
                Source: PeerSourceDhtAnnouncePeer,
        }})
index cf6aaf21ffd7fe084421793000b5265c83793cb3..6822c328e6ad9d278134c3085e8d48954d4b51da 100644 (file)
@@ -546,9 +546,9 @@ func TestPeerInvalidHave(t *testing.T) {
        require.NoError(t, err)
        assert.True(t, _new)
        defer tt.Drop()
-       cn := &PeerConn{
+       cn := &PeerConn{peer: peer{
                t: tt,
-       }
+       }}
        assert.NoError(t, cn.peerSentHave(0))
        assert.Error(t, cn.peerSentHave(1))
 }
index 70f82f7cea73fa86239e0448fdf52b1f589c027f..e68121afc8166eb86df802e3cba287eb42f5c5c3 100644 (file)
@@ -24,14 +24,14 @@ import (
 
 // fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0])
 
-func resolvedPeerAddrs(ss []string) (ret []torrent.Peer, err error) {
+func resolvedPeerAddrs(ss []string) (ret []torrent.PeerInfo, err error) {
        for _, s := range ss {
                var addr *net.TCPAddr
                addr, err = net.ResolveTCPAddr("tcp", s)
                if err != nil {
                        return
                }
-               ret = append(ret, torrent.Peer{
+               ret = append(ret, torrent.PeerInfo{
                        Addr: addr,
                })
        }
index dfd14a7400aee1a629dc155bf1293bad81a79910..0b7f21543bcd3c4bef5bcf2764c6329b10b0d417 100644 (file)
@@ -46,6 +46,7 @@ func verifyTorrent(info *metainfo.Info, root string) error {
                }
                span.Append(mm)
        }
+       span.InitIndex()
        for i := range iter.N(info.NumPieces()) {
                p := info.Piece(i)
                hash := sha1.New()
@@ -57,7 +58,7 @@ func verifyTorrent(info *metainfo.Info, root string) error {
                if !good {
                        return fmt.Errorf("hash mismatch at piece %d", i)
                }
-               fmt.Printf("%d: %x: %v\n", i, p.Hash(), good)
+               fmt.Printf("%d: %v: %v\n", i, p.Hash(), good)
        }
        return nil
 }
index adc4f869bcf19bfda36d330139699ec228e722ef..6b14691d55a79f0d10840741161c66cd0b26a871 100644 (file)
@@ -113,9 +113,9 @@ func addTorrents(client *torrent.Client) error {
                if flags.Progress {
                        torrentBar(t, flags.PieceStates)
                }
-               t.AddPeers(func() (ret []torrent.Peer) {
+               t.AddPeers(func() (ret []torrent.PeerInfo) {
                        for _, ta := range flags.TestPeer {
-                               ret = append(ret, torrent.Peer{
+                               ret = append(ret, torrent.PeerInfo{
                                        Addr: ta,
                                })
                        }
@@ -144,23 +144,32 @@ var flags = struct {
        PieceStates     bool
        Quiet           bool `help:"discard client logging"`
        Dht             bool
+
        TcpPeers        bool
        UtpPeers        bool
-       Ipv4            bool
-       Ipv6            bool
-       Pex             bool
+       Webtorrent      bool
+       DisableWebseeds bool
+
+       Ipv4 bool
+       Ipv6 bool
+       Pex  bool
+
        tagflag.StartPos
+
        Torrent []string `arity:"+" help:"torrent file path or magnet uri"`
 }{
        UploadRate:   -1,
        DownloadRate: -1,
        Progress:     true,
        Dht:          true,
-       TcpPeers:     true,
-       UtpPeers:     true,
-       Ipv4:         true,
-       Ipv6:         true,
-       Pex:          true,
+
+       TcpPeers:   true,
+       UtpPeers:   true,
+       Webtorrent: true,
+
+       Ipv4: true,
+       Ipv6: true,
+       Pex:  true,
 }
 
 func stdoutAndStderrAreSameFile() bool {
@@ -213,6 +222,7 @@ func downloadErr(args []string, parent *tagflag.Parser) error {
        tagflag.ParseArgs(&flags, args, tagflag.Parent(parent))
        defer envpprof.Stop()
        clientConfig := torrent.NewDefaultClientConfig()
+       clientConfig.DisableWebseeds = flags.DisableWebseeds
        clientConfig.DisableTCP = !flags.TcpPeers
        clientConfig.DisableUTP = !flags.UtpPeers
        clientConfig.DisableIPv4 = !flags.Ipv4
@@ -224,6 +234,7 @@ func downloadErr(args []string, parent *tagflag.Parser) error {
        clientConfig.PublicIp4 = flags.PublicIP
        clientConfig.PublicIp6 = flags.PublicIP
        clientConfig.DisablePEX = !flags.Pex
+       clientConfig.DisableWebtorrent = !flags.Webtorrent
        if flags.PackedBlocklist != "" {
                blocklist, err := iplist.MMapPackedFile(flags.PackedBlocklist)
                if err != nil {
index 172f61029259e78e977d5893381f31d2639f4a5f..117457c116698817bc9afb6df50aa204e2a32ec8 100644 (file)
@@ -61,7 +61,7 @@ func exitSignalHandlers(fs *torrentfs.TorrentFS) {
 
 func addTestPeer(client *torrent.Client) {
        for _, t := range client.Torrents() {
-               t.AddPeers([]torrent.Peer{{
+               t.AddPeers([]torrent.PeerInfo{{
                        Addr: args.TestPeer,
                }})
        }
diff --git a/common/upverted_files.go b/common/upverted_files.go
new file mode 100644 (file)
index 0000000..1933e16
--- /dev/null
@@ -0,0 +1,18 @@
+package common
+
+import (
+       "github.com/anacrolix/torrent/metainfo"
+       "github.com/anacrolix/torrent/segments"
+)
+
+func LengthIterFromUpvertedFiles(fis []metainfo.FileInfo) segments.LengthIter {
+       i := 0
+       return func() (segments.Length, bool) {
+               if i == len(fis) {
+                       return -1, false
+               }
+               l := fis[i].Length
+               i++
+               return l, true
+       }
+}
index 105e8674bfca7775a3e039ddf0165f81e0d45e20..ae9b4ea09b1ef6a62f558b88a824244f143f7229 100644 (file)
--- a/config.go
+++ b/config.go
@@ -130,6 +130,9 @@ type ClientConfig struct {
        DefaultRequestStrategy RequestStrategyMaker
 
        Extensions PeerExtensionBits
+
+       DisableWebtorrent bool
+       DisableWebseeds   bool
 }
 
 func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig {
diff --git a/file.go b/file.go
index f02424000971f46f65583864c7fd1809aac8b841..e2ebcf4d5c6cf5a2a8c90b1e8a109f8caa4f3282 100644 (file)
--- a/file.go
+++ b/file.go
@@ -96,7 +96,6 @@ func (f *File) DisplayPath() string {
                return f.t.info.Name
        }
        return strings.Join(fip, "/")
-
 }
 
 // The download status of a piece that comprises part of a File.
diff --git a/misc.go b/misc.go
index b49ff7f431a8b7d48a8f5f123d652bdc45d9f92d..318ae9945a2da83829869e52eb7df1f29225826a 100644 (file)
--- a/misc.go
+++ b/misc.go
@@ -106,7 +106,7 @@ func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) chunkSp
        return ret
 }
 
-func connLessTrusted(l, r *PeerConn) bool {
+func connLessTrusted(l, r *peer) bool {
        return l.trust().Less(r.trust())
 }
 
index 21f6bd26b76a93b5229a895f69de404b041054b9..c698a89c1f2595ea1c2897404de85b73c4376731 100644 (file)
@@ -1,81 +1,89 @@
 package mmap_span
 
 import (
+       "fmt"
        "io"
        "log"
        "sync"
 
+       "github.com/anacrolix/torrent/segments"
        "github.com/edsrzf/mmap-go"
 )
 
-type segment struct {
-       *mmap.MMap
-}
-
-func (s segment) Size() int64 {
-       return int64(len(*s.MMap))
-}
-
 type MMapSpan struct {
-       mu sync.RWMutex
-       span
+       mu             sync.RWMutex
+       mMaps          []mmap.MMap
+       segmentLocater segments.Index
 }
 
-func (ms *MMapSpan) Append(mmap mmap.MMap) {
-       ms.span = append(ms.span, segment{&mmap})
+func (ms *MMapSpan) Append(mMap mmap.MMap) {
+       ms.mMaps = append(ms.mMaps, mMap)
 }
 
-func (ms *MMapSpan) Close() error {
+func (ms *MMapSpan) Close() (errs []error) {
        ms.mu.Lock()
        defer ms.mu.Unlock()
-       for _, mMap := range ms.span {
-               err := mMap.(segment).Unmap()
+       for _, mMap := range ms.mMaps {
+               err := mMap.Unmap()
                if err != nil {
-                       log.Print(err)
+                       errs = append(errs, err)
                }
        }
-       return nil
+       // This is for issue 211.
+       ms.mMaps = nil
+       ms.InitIndex()
+       return
 }
 
-func (ms *MMapSpan) Size() (ret int64) {
+func (me *MMapSpan) InitIndex() {
+       i := 0
+       me.segmentLocater = segments.NewIndex(func() (segments.Length, bool) {
+               if i == len(me.mMaps) {
+                       return -1, false
+               }
+               l := int64(len(me.mMaps[i]))
+               i++
+               return l, true
+       })
+       //log.Printf("made mmapspan index: %v", me.segmentLocater)
+}
+
+func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
+       //log.Printf("reading %v bytes at %v", len(p), off)
        ms.mu.RLock()
        defer ms.mu.RUnlock()
-       for _, seg := range ms.span {
-               ret += seg.Size()
+       n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return a, b }, p, off)
+       if n != len(p) {
+               err = io.EOF
        }
        return
 }
 
-func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
-       ms.mu.RLock()
-       defer ms.mu.RUnlock()
-       ms.ApplyTo(off, func(intervalOffset int64, interval sizer) (stop bool) {
-               _n := copy(p, (*interval.(segment).MMap)[intervalOffset:])
+func copyBytes(dst, src []byte) int {
+       return copy(dst, src)
+}
+
+func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64) (n int) {
+       ms.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
+               mMapBytes := ms.mMaps[i][e.Start:]
+               //log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes))
+               _n := copyBytes(copyArgs(p, mMapBytes))
                p = p[_n:]
                n += _n
-               return len(p) == 0
+               if segments.Int(_n) != e.Length {
+                       panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length))
+               }
+               return true
        })
-       if len(p) != 0 {
-               err = io.EOF
-       }
        return
 }
 
 func (ms *MMapSpan) WriteAt(p []byte, off int64) (n int, err error) {
+       log.Printf("writing %v bytes at %v", len(p), off)
        ms.mu.RLock()
        defer ms.mu.RUnlock()
-       ms.ApplyTo(off, func(iOff int64, i sizer) (stop bool) {
-               mMap := i.(segment)
-               _n := copy((*mMap.MMap)[iOff:], p)
-               // err = mMap.Sync(gommap.MS_ASYNC)
-               // if err != nil {
-               //      return true
-               // }
-               p = p[_n:]
-               n += _n
-               return len(p) == 0
-       })
-       if err != nil && len(p) != 0 {
+       n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return b, a }, p, off)
+       if n != len(p) {
                err = io.ErrShortWrite
        }
        return
diff --git a/mmap_span/span.go b/mmap_span/span.go
deleted file mode 100644 (file)
index 141ea99..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-package mmap_span
-
-type sizer interface {
-       Size() int64
-}
-
-type span []sizer
-
-func (s span) ApplyTo(off int64, f func(int64, sizer) (stop bool)) {
-       for _, interval := range s {
-               iSize := interval.Size()
-               if off >= iSize {
-                       off -= iSize
-               } else {
-                       if f(off, interval) {
-                               return
-                       }
-                       off = 0
-               }
-       }
-}
diff --git a/peer-impl.go b/peer-impl.go
new file mode 100644 (file)
index 0000000..41540c4
--- /dev/null
@@ -0,0 +1,21 @@
+package torrent
+
+import (
+       "github.com/anacrolix/torrent/metainfo"
+)
+
+// Contains implementation details that differ between peer types, like Webseeds and regular
+// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
+// legacy PeerConn methods.
+type peerImpl interface {
+       updateRequests()
+       writeInterested(interested bool) bool
+       cancel(request) bool
+       // Return true if there's room for more activity.
+       request(request) bool
+       connectionFlags() string
+       _close()
+       _postCancel(request)
+       onGotInfo(*metainfo.Info)
+       drop()
+}
similarity index 81%
rename from Peer.go
rename to peer_info.go
index d62b028fd8568e33a5bda3e629f94c0a4e986c2d..fdd74c52b284e30ad6a9bd8074840dc84204339c 100644 (file)
--- a/Peer.go
@@ -9,7 +9,7 @@ import (
 )
 
 // Peer connection info, handed about publicly.
-type Peer struct {
+type PeerInfo struct {
        Id     [20]byte
        Addr   net.Addr
        Source PeerSource
@@ -20,7 +20,7 @@ type Peer struct {
        Trusted bool
 }
 
-func (me Peer) Equal(other Peer) bool {
+func (me PeerInfo) equal(other PeerInfo) bool {
        return me.Id == other.Id &&
                me.Addr.String() == other.Addr.String() &&
                me.Source == other.Source &&
@@ -29,8 +29,8 @@ func (me Peer) Equal(other Peer) bool {
                me.Trusted == other.Trusted
 }
 
-// FromPex generate Peer from peer exchange
-func (me *Peer) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) {
+// Generate PeerInfo from peer exchange
+func (me *PeerInfo) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) {
        me.Addr = ipPortAddr{append([]byte(nil), na.IP...), na.Port}
        me.Source = PeerSourcePex
        // If they prefer encryption, they must support it.
@@ -40,6 +40,6 @@ func (me *Peer) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) {
        me.PexPeerFlags = fs
 }
 
-func (me Peer) addr() IpPort {
+func (me PeerInfo) addr() IpPort {
        return IpPort{IP: addrIpOrNil(me.Addr), Port: uint16(addrPortOrZero(me.Addr))}
 }
similarity index 63%
rename from Peers.go
rename to peer_infos.go
index a49247e2c61b8b04efeb55a63cbadf483f582e69..f3da64e872c95328ec057b7fe891cbcea13c9e11 100644 (file)
--- a/Peers.go
@@ -7,11 +7,12 @@ import (
        "github.com/anacrolix/torrent/tracker"
 )
 
-type Peers []Peer
+// Helper-type used to bulk-manage PeerInfos.
+type peerInfos []PeerInfo
 
-func (me *Peers) AppendFromPex(nas []krpc.NodeAddr, fs []peer_protocol.PexPeerFlags) {
+func (me *peerInfos) AppendFromPex(nas []krpc.NodeAddr, fs []peer_protocol.PexPeerFlags) {
        for i, na := range nas {
-               var p Peer
+               var p PeerInfo
                var f peer_protocol.PexPeerFlags
                if i < len(fs) {
                        f = fs[i]
@@ -21,9 +22,9 @@ func (me *Peers) AppendFromPex(nas []krpc.NodeAddr, fs []peer_protocol.PexPeerFl
        }
 }
 
-func (ret Peers) AppendFromTracker(ps []tracker.Peer) Peers {
+func (ret peerInfos) AppendFromTracker(ps []tracker.Peer) peerInfos {
        for _, p := range ps {
-               _p := Peer{
+               _p := PeerInfo{
                        Addr:   ipPortAddr{p.IP, p.Port},
                        Source: PeerSourceTracker,
                }
index 779e9a7364def13855707e832adcb2a918349d41..1c2879ee30940daf598c03717102548c4d24f713 100644 (file)
@@ -18,6 +18,7 @@ import (
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/prioritybitmap"
        "github.com/anacrolix/multiless"
+       "github.com/anacrolix/torrent/metainfo"
        "github.com/pkg/errors"
 
        "github.com/anacrolix/torrent/bencode"
@@ -35,22 +36,18 @@ const (
        PeerSourcePex             = "X"
 )
 
-// Maintains the state of a connection with a peer.
-type PeerConn struct {
+type peer struct {
        // First to ensure 64-bit alignment for atomics. See #262.
        _stats ConnStats
 
        t *Torrent
-       // The actual Conn, used for closing, and setting socket options.
-       conn       net.Conn
+
+       peerImpl
+
        connString string
        outgoing   bool
        network    string
        remoteAddr net.Addr
-       // The Reader and Writer for this Conn, with hooks installed for stats,
-       // limiting, deadlines etc.
-       w io.Writer
-       r io.Reader
        // True if the connection is operating over MSE obfuscation.
        headerEncrypted bool
        cryptoMethod    mse.CryptoMethod
@@ -116,14 +113,26 @@ type PeerConn struct {
        pieceInclination   []int
        _pieceRequestOrder prioritybitmap.PriorityBitmap
 
+       logger log.Logger
+}
+
+// Maintains the state of a connection with a peer.
+type PeerConn struct {
+       peer
+
+       // The actual Conn, used for closing, and setting socket options.
+       conn net.Conn
+       // The Reader and Writer for this Conn, with hooks installed for stats,
+       // limiting, deadlines etc.
+       w io.Writer
+       r io.Reader
+
        writeBuffer *bytes.Buffer
        uploadTimer *time.Timer
        writerCond  sync.Cond
-
-       logger log.Logger
 }
 
-func (cn *PeerConn) updateExpectingChunks() {
+func (cn *peer) updateExpectingChunks() {
        if cn.expectingChunks() {
                if cn.lastStartedExpectingToReceiveChunks.IsZero() {
                        cn.lastStartedExpectingToReceiveChunks = time.Now()
@@ -136,7 +145,7 @@ func (cn *PeerConn) updateExpectingChunks() {
        }
 }
 
-func (cn *PeerConn) expectingChunks() bool {
+func (cn *peer) expectingChunks() bool {
        return cn.interested && !cn.peerChoking
 }
 
@@ -166,7 +175,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) {
        return ml.FinalOk()
 }
 
-func (cn *PeerConn) cumInterest() time.Duration {
+func (cn *peer) cumInterest() time.Duration {
        ret := cn.priorInterest
        if cn.interested {
                ret += time.Since(cn.lastBecameInterested)
@@ -192,20 +201,20 @@ func (cn *PeerConn) localAddr() net.Addr {
        return cn.conn.LocalAddr()
 }
 
-func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
+func (cn *peer) supportsExtension(ext pp.ExtensionName) bool {
        _, ok := cn.PeerExtensionIDs[ext]
        return ok
 }
 
 // The best guess at number of pieces in the torrent for this peer.
-func (cn *PeerConn) bestPeerNumPieces() pieceIndex {
+func (cn *peer) bestPeerNumPieces() pieceIndex {
        if cn.t.haveInfo() {
                return cn.t.numPieces()
        }
        return cn.peerMinPieces
 }
 
-func (cn *PeerConn) completedString() string {
+func (cn *peer) completedString() string {
        have := pieceIndex(cn._peerPieces.Len())
        if cn.peerSentHaveAll {
                have = cn.bestPeerNumPieces()
@@ -213,13 +222,15 @@ func (cn *PeerConn) completedString() string {
        return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
 }
 
-// Correct the PeerPieces slice length. Return false if the existing slice is
-// invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
-// messages.
-func (cn *PeerConn) setNumPieces(num pieceIndex) error {
+func (cn *PeerConn) onGotInfo(info *metainfo.Info) {
+       cn.setNumPieces(info.NumPieces())
+}
+
+// Correct the PeerPieces slice length. Return false if the existing slice is invalid, such as by
+// receiving badly sized BITFIELD, or invalid HAVE messages.
+func (cn *PeerConn) setNumPieces(num pieceIndex) {
        cn._peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd)
        cn.peerPiecesChanged()
-       return nil
 }
 
 func eventAgeString(t time.Time) string {
@@ -250,7 +261,7 @@ func (cn *PeerConn) utp() bool {
 }
 
 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
-func (cn *PeerConn) statusFlags() (ret string) {
+func (cn *peer) statusFlags() (ret string) {
        c := func(b byte) {
                ret += string([]byte{b})
        }
@@ -278,11 +289,11 @@ func (cn *PeerConn) statusFlags() (ret string) {
 //     return buf.String()
 // }
 
-func (cn *PeerConn) downloadRate() float64 {
+func (cn *peer) downloadRate() float64 {
        return float64(cn._stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
 }
 
-func (cn *PeerConn) writeStatus(w io.Writer, t *Torrent) {
+func (cn *peer) writeStatus(w io.Writer, t *Torrent) {
        // \t isn't preserved in <pre> blocks?
        fmt.Fprintf(w, "%+-55q %s %s\n", cn.PeerID, cn.PeerExtensionBytes, cn.connString)
        fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
@@ -309,7 +320,7 @@ func (cn *PeerConn) writeStatus(w io.Writer, t *Torrent) {
        fmt.Fprintf(w, "    next pieces: %v%s\n",
                iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
                func() string {
-                       if cn == t.fastestConn {
+                       if cn == t.fastestPeer {
                                return " (fastest)"
                        } else {
                                return ""
@@ -318,22 +329,26 @@ func (cn *PeerConn) writeStatus(w io.Writer, t *Torrent) {
        )
 }
 
-func (cn *PeerConn) close() {
+func (cn *peer) close() {
        if !cn.closed.Set() {
                return
        }
+       cn.discardPieceInclination()
+       cn._pieceRequestOrder.Clear()
+       cn.peerImpl._close()
+}
+
+func (cn *PeerConn) _close() {
        if cn.pex.IsEnabled() {
                cn.pex.Close()
        }
        cn.tickleWriter()
-       cn.discardPieceInclination()
-       cn._pieceRequestOrder.Clear()
        if cn.conn != nil {
                cn.conn.Close()
        }
 }
 
-func (cn *PeerConn) peerHasPiece(piece pieceIndex) bool {
+func (cn *peer) peerHasPiece(piece pieceIndex) bool {
        return cn.peerSentHaveAll || cn._peerPieces.Contains(bitmap.BitIndex(piece))
 }
 
@@ -350,6 +365,16 @@ func (cn *PeerConn) post(msg pp.Message) {
        cn.tickleWriter()
 }
 
+// Returns true if there's room to write more.
+func (cn *PeerConn) write(msg pp.Message) bool {
+       cn.wroteMsg(&msg)
+       cn.writeBuffer.Write(msg.MustMarshalBinary())
+       torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
+       // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update
+       // when https://github.com/pion/datachannel/issues/59 is fixed.
+       return cn.writeBuffer.Len() < 1<<15
+}
+
 func (cn *PeerConn) requestMetadataPiece(index int) {
        eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
        if eID == 0 {
@@ -384,7 +409,7 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool {
 }
 
 // The actual value to use as the maximum outbound requests.
-func (cn *PeerConn) nominalMaxRequests() (ret int) {
+func (cn *peer) nominalMaxRequests() (ret int) {
        return int(clamp(
                1,
                int64(cn.PeerMaxRequests),
@@ -392,7 +417,7 @@ func (cn *PeerConn) nominalMaxRequests() (ret int) {
        ))
 }
 
-func (cn *PeerConn) totalExpectingTime() (ret time.Duration) {
+func (cn *peer) totalExpectingTime() (ret time.Duration) {
        ret = cn.cumulativeExpectedToReceiveChunks
        if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
                ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
@@ -442,7 +467,7 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
        })
 }
 
-func (cn *PeerConn) setInterested(interested bool, msg func(pp.Message) bool) bool {
+func (cn *peer) setInterested(interested bool) bool {
        if cn.interested == interested {
                return true
        }
@@ -454,7 +479,11 @@ func (cn *PeerConn) setInterested(interested bool, msg func(pp.Message) bool) bo
        }
        cn.updateExpectingChunks()
        // log.Printf("%p: setting interest: %v", cn, interested)
-       return msg(pp.Message{
+       return cn.writeInterested(interested)
+}
+
+func (pc *PeerConn) writeInterested(interested bool) bool {
+       return pc.write(pp.Message{
                Type: func() pp.MessageType {
                        if interested {
                                return pp.Interested
@@ -469,15 +498,14 @@ func (cn *PeerConn) setInterested(interested bool, msg func(pp.Message) bool) bo
 // are okay.
 type messageWriter func(pp.Message) bool
 
-// Proxies the messageWriter's response.
-func (cn *PeerConn) request(r request, mw messageWriter) bool {
+func (cn *peer) request(r request) bool {
        if _, ok := cn.requests[r]; ok {
                panic("chunk already requested")
        }
        if !cn.peerHasPiece(pieceIndex(r.Index)) {
                panic("requesting piece peer doesn't have")
        }
-       if _, ok := cn.t.conns[cn]; !ok {
+       if !cn.t.peerIsActive(cn) {
                panic("requesting but not in active conns")
        }
        if cn.closed.IsSet() {
@@ -507,7 +535,11 @@ func (cn *PeerConn) request(r request, mw messageWriter) bool {
        cn.t.pendingRequests[r]++
        cn.t.requestStrategy.hooks().sentRequest(r)
        cn.updateExpectingChunks()
-       return mw(pp.Message{
+       return cn.peerImpl.request(r)
+}
+
+func (me *PeerConn) request(r request) bool {
+       return me.write(pp.Message{
                Type:   pp.Request,
                Index:  r.Index,
                Begin:  r.Begin,
@@ -515,17 +547,21 @@ func (cn *PeerConn) request(r request, mw messageWriter) bool {
        })
 }
 
-func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) {
+func (me *PeerConn) cancel(r request) bool {
+       return me.write(makeCancelMessage(r))
+}
+
+func (cn *peer) doRequestState() bool {
        if !cn.t.networkingEnabled || cn.t.dataDownloadDisallowed {
-               if !cn.setInterested(false, msg) {
-                       return
+               if !cn.setInterested(false) {
+                       return false
                }
                if len(cn.requests) != 0 {
                        for r := range cn.requests {
                                cn.deleteRequest(r)
                                // log.Printf("%p: cancelling request: %v", cn, r)
-                               if !msg(makeCancelMessage(r)) {
-                                       return
+                               if !cn.peerImpl.cancel(r) {
+                                       return false
                                }
                        }
                }
@@ -533,7 +569,7 @@ func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) {
                filledBuffer := false
                cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
                        cn.iterPendingRequests(pieceIndex, func(r request) bool {
-                               if !cn.setInterested(true, msg) {
+                               if !cn.setInterested(true) {
                                        filledBuffer = true
                                        return false
                                }
@@ -550,7 +586,7 @@ func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) {
                                if _, ok := cn.requests[r]; ok {
                                        return true
                                }
-                               filledBuffer = !cn.request(r, msg)
+                               filledBuffer = !cn.request(r)
                                return !filledBuffer
                        })
                        return !filledBuffer
@@ -559,16 +595,23 @@ func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) {
                        // If we didn't completely top up the requests, we shouldn't mark
                        // the low water, since we'll want to top up the requests as soon
                        // as we have more write buffer space.
-                       return
+                       return false
                }
                cn.requestsLowWater = len(cn.requests) / 2
        }
+       return true
+}
+
+func (cn *PeerConn) fillWriteBuffer() {
+       if !cn.doRequestState() {
+               return
+       }
        if cn.pex.IsEnabled() {
-               if flow := cn.pex.Share(msg); !flow {
+               if flow := cn.pex.Share(cn.write); !flow {
                        return
                }
        }
-       cn.upload(msg)
+       cn.upload(cn.write)
 }
 
 // Routine that writes to the peer. Some of what to write is buffered by
@@ -597,14 +640,7 @@ func (cn *PeerConn) writer(keepAliveTimeout time.Duration) {
                        return
                }
                if cn.writeBuffer.Len() == 0 {
-                       cn.fillWriteBuffer(func(msg pp.Message) bool {
-                               cn.wroteMsg(&msg)
-                               cn.writeBuffer.Write(msg.MustMarshalBinary())
-                               torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
-                               // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update
-                               // when https://github.com/pion/datachannel/issues/59 is fixed.
-                               return cn.writeBuffer.Len() < 1<<15
-                       })
+                       cn.fillWriteBuffer()
                }
                if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
                        cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
@@ -714,21 +750,21 @@ func iterUnbiasedPieceRequestOrder(cn requestStrategyConnection, f func(piece pi
 // conceivable that the best connection should do this, since it's least likely to waste our time if
 // assigned to the highest priority pieces, and assigning more than one this role would cause
 // significant wasted bandwidth.
-func (cn *PeerConn) shouldRequestWithoutBias() bool {
+func (cn *peer) shouldRequestWithoutBias() bool {
        return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection())
 }
 
-func (cn *PeerConn) iterPendingPieces(f func(pieceIndex) bool) bool {
+func (cn *peer) iterPendingPieces(f func(pieceIndex) bool) bool {
        if !cn.t.haveInfo() {
                return false
        }
        return cn.t.requestStrategy.iterPendingPieces(cn, f)
 }
-func (cn *PeerConn) iterPendingPiecesUntyped(f iter.Callback) {
+func (cn *peer) iterPendingPiecesUntyped(f iter.Callback) {
        cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
 }
 
-func (cn *PeerConn) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
+func (cn *peer) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
        return cn.t.requestStrategy.iterUndirtiedChunks(
                cn.t.piece(piece).requestStrategyPiece(),
                func(cs chunkSpec) bool {
@@ -738,7 +774,7 @@ func (cn *PeerConn) iterPendingRequests(piece pieceIndex, f func(request) bool)
 }
 
 // check callers updaterequests
-func (cn *PeerConn) stopRequestingPiece(piece pieceIndex) bool {
+func (cn *peer) stopRequestingPiece(piece pieceIndex) bool {
        return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece))
 }
 
@@ -746,7 +782,7 @@ func (cn *PeerConn) stopRequestingPiece(piece pieceIndex) bool {
 // preference. Connection piece priority is specific to a connection and is
 // used to pseudorandomly avoid connections always requesting the same pieces
 // and thus wasting effort.
-func (cn *PeerConn) updatePiecePriority(piece pieceIndex) bool {
+func (cn *peer) updatePiecePriority(piece pieceIndex) bool {
        tpp := cn.t.piecePriority(piece)
        if !cn.peerHasPiece(piece) {
                tpp = PiecePriorityNone
@@ -759,14 +795,14 @@ func (cn *PeerConn) updatePiecePriority(piece pieceIndex) bool {
        return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
 }
 
-func (cn *PeerConn) getPieceInclination() []int {
+func (cn *peer) getPieceInclination() []int {
        if cn.pieceInclination == nil {
                cn.pieceInclination = cn.t.getConnPieceInclination()
        }
        return cn.pieceInclination
 }
 
-func (cn *PeerConn) discardPieceInclination() {
+func (cn *peer) discardPieceInclination() {
        if cn.pieceInclination == nil {
                return
        }
@@ -885,7 +921,7 @@ func (cn *PeerConn) readMsg(msg *pp.Message) {
 
 // After handshake, we know what Torrent and Client stats to include for a
 // connection.
-func (cn *PeerConn) postHandshakeStats(f func(*ConnStats)) {
+func (cn *peer) postHandshakeStats(f func(*ConnStats)) {
        t := cn.t
        f(&t.stats)
        f(&t.cl.stats)
@@ -894,7 +930,7 @@ func (cn *PeerConn) postHandshakeStats(f func(*ConnStats)) {
 // All ConnStats that include this connection. Some objects are not known
 // until the handshake is complete, after which it's expected to reconcile the
 // differences.
-func (cn *PeerConn) allStats(f func(*ConnStats)) {
+func (cn *peer) allStats(f func(*ConnStats)) {
        f(&cn._stats)
        if cn.reconciledHandshakeStats {
                cn.postHandshakeStats(f)
@@ -911,7 +947,7 @@ func (cn *PeerConn) readBytes(n int64) {
 
 // Returns whether the connection could be useful to us. We're seeding and
 // they want data, we don't have metainfo and they can provide it, etc.
-func (c *PeerConn) useful() bool {
+func (c *peer) useful() bool {
        t := c.t
        if c.closed.IsSet() {
                return false
@@ -928,7 +964,7 @@ func (c *PeerConn) useful() bool {
        return false
 }
 
-func (c *PeerConn) lastHelpful() (ret time.Time) {
+func (c *peer) lastHelpful() (ret time.Time) {
        ret = c.lastUsefulChunkReceived
        if c.t.seeding() && c.lastChunkSent.After(ret) {
                ret = c.lastChunkSent
@@ -1103,8 +1139,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.HaveNone:
                        err = c.peerSentHaveNone()
                case pp.Reject:
-                       c.deleteRequest(newRequestFromMessage(&msg))
-                       c.decExpectedChunkReceive(newRequestFromMessage(&msg))
+                       c.remoteRejectedRequest(newRequestFromMessage(&msg))
                case pp.AllowedFast:
                        torrent.Add("allowed fasts received", 1)
                        log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
@@ -1121,7 +1156,12 @@ func (c *PeerConn) mainReadLoop() (err error) {
        }
 }
 
-func (c *PeerConn) decExpectedChunkReceive(r request) {
+func (c *peer) remoteRejectedRequest(r request) {
+       c.deleteRequest(r)
+       c.decExpectedChunkReceive(r)
+}
+
+func (c *peer) decExpectedChunkReceive(r request) {
        count := c.validReceiveChunks[r]
        if count == 1 {
                delete(c.validReceiveChunks, r)
@@ -1210,7 +1250,7 @@ func (cn *PeerConn) rw() io.ReadWriter {
 }
 
 // Handle a received chunk from a peer.
-func (c *PeerConn) receiveChunk(msg *pp.Message) error {
+func (c *peer) receiveChunk(msg *pp.Message) error {
        t := c.t
        cl := t.cl
        torrent.Add("chunks received", 1)
@@ -1252,10 +1292,10 @@ func (c *PeerConn) receiveChunk(msg *pp.Message) error {
        c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
        c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
        c.lastUsefulChunkReceived = time.Now()
-       // if t.fastestConn != c {
+       // if t.fastestPeer != c {
        // log.Printf("setting fastest connection %p", c)
        // }
-       t.fastestConn = c
+       t.fastestPeer = c
 
        // Need to record that it hasn't been written yet, before we attempt to do
        // anything with it.
@@ -1266,7 +1306,7 @@ func (c *PeerConn) receiveChunk(msg *pp.Message) error {
 
        // Cancel pending requests for this chunk.
        for c := range t.conns {
-               c.postCancel(req)
+               c._postCancel(req)
        }
 
        err := func() error {
@@ -1309,14 +1349,14 @@ func (c *PeerConn) receiveChunk(msg *pp.Message) error {
        return nil
 }
 
-func (c *PeerConn) onDirtiedPiece(piece pieceIndex) {
+func (c *peer) onDirtiedPiece(piece pieceIndex) {
        if c.peerTouchedPieces == nil {
                c.peerTouchedPieces = make(map[pieceIndex]struct{})
        }
        c.peerTouchedPieces[piece] = struct{}{}
        ds := &c.t.pieces[piece].dirtiers
        if *ds == nil {
-               *ds = make(map[*PeerConn]struct{})
+               *ds = make(map[*peer]struct{})
        }
        (*ds)[c] = struct{}{}
 }
@@ -1404,19 +1444,19 @@ func (cn *PeerConn) drop() {
        cn.t.dropConnection(cn)
 }
 
-func (cn *PeerConn) netGoodPiecesDirtied() int64 {
+func (cn *peer) netGoodPiecesDirtied() int64 {
        return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
 }
 
-func (c *PeerConn) peerHasWantedPieces() bool {
+func (c *peer) peerHasWantedPieces() bool {
        return !c._pieceRequestOrder.IsEmpty()
 }
 
-func (c *PeerConn) numLocalRequests() int {
+func (c *peer) numLocalRequests() int {
        return len(c.requests)
 }
 
-func (c *PeerConn) deleteRequest(r request) bool {
+func (c *peer) deleteRequest(r request) bool {
        if _, ok := c.requests[r]; !ok {
                return false
        }
@@ -1433,15 +1473,15 @@ func (c *PeerConn) deleteRequest(r request) bool {
                panic(n)
        }
        c.updateRequests()
-       for _c := range c.t.conns {
+       c.t.iterPeers(func(_c *peer) {
                if !_c.interested && _c != c && c.peerHasPiece(pieceIndex(r.Index)) {
                        _c.updateRequests()
                }
-       }
+       })
        return true
 }
 
-func (c *PeerConn) deleteAllRequests() {
+func (c *peer) deleteAllRequests() {
        for r := range c.requests {
                c.deleteRequest(r)
        }
@@ -1457,14 +1497,18 @@ func (c *PeerConn) tickleWriter() {
        c.writerCond.Broadcast()
 }
 
-func (c *PeerConn) postCancel(r request) bool {
+func (c *peer) postCancel(r request) bool {
        if !c.deleteRequest(r) {
                return false
        }
-       c.post(makeCancelMessage(r))
+       c.peerImpl._postCancel(r)
        return true
 }
 
+func (c *PeerConn) _postCancel(r request) {
+       c.post(makeCancelMessage(r))
+}
+
 func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
        // Count the chunk being sent, even if it isn't.
        b := make([]byte, r.Length)
@@ -1497,15 +1541,15 @@ func (c *PeerConn) setTorrent(t *Torrent) {
        t.reconcileHandshakeStats(c)
 }
 
-func (c *PeerConn) peerPriority() (peerPriority, error) {
+func (c *peer) peerPriority() (peerPriority, error) {
        return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
 }
 
-func (c *PeerConn) remoteIp() net.IP {
+func (c *peer) remoteIp() net.IP {
        return addrIpOrNil(c.remoteAddr)
 }
 
-func (c *PeerConn) remoteIpPort() IpPort {
+func (c *peer) remoteIpPort() IpPort {
        ipa, _ := tryIpPortFromNetAddr(c.remoteAddr)
        return IpPort{ipa.IP, uint16(ipa.Port)}
 }
@@ -1550,7 +1594,7 @@ func (c *PeerConn) String() string {
        return fmt.Sprintf("connection %p", c)
 }
 
-func (c *PeerConn) trust() connectionTrust {
+func (c *peer) trust() connectionTrust {
        return connectionTrust{c.trusted, c.netGoodPiecesDirtied()}
 }
 
@@ -1563,19 +1607,19 @@ func (l connectionTrust) Less(r connectionTrust) bool {
        return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
 }
 
-func (cn *PeerConn) requestStrategyConnection() requestStrategyConnection {
+func (cn *peer) requestStrategyConnection() requestStrategyConnection {
        return cn
 }
 
-func (cn *PeerConn) chunksReceivedWhileExpecting() int64 {
+func (cn *peer) chunksReceivedWhileExpecting() int64 {
        return cn._chunksReceivedWhileExpecting
 }
 
-func (cn *PeerConn) fastest() bool {
-       return cn == cn.t.fastestConn
+func (cn *peer) fastest() bool {
+       return cn == cn.t.fastestPeer
 }
 
-func (cn *PeerConn) peerMaxRequests() int {
+func (cn *peer) peerMaxRequests() int {
        return cn.PeerMaxRequests
 }
 
@@ -1586,7 +1630,7 @@ func (cn *PeerConn) PeerPieces() bitmap.Bitmap {
        return cn.peerPieces()
 }
 
-func (cn *PeerConn) peerPieces() bitmap.Bitmap {
+func (cn *peer) peerPieces() bitmap.Bitmap {
        ret := cn._peerPieces.Copy()
        if cn.peerSentHaveAll {
                ret.AddRange(0, cn.t.numPieces())
@@ -1594,14 +1638,14 @@ func (cn *PeerConn) peerPieces() bitmap.Bitmap {
        return ret
 }
 
-func (cn *PeerConn) pieceRequestOrder() *prioritybitmap.PriorityBitmap {
+func (cn *peer) pieceRequestOrder() *prioritybitmap.PriorityBitmap {
        return &cn._pieceRequestOrder
 }
 
-func (cn *PeerConn) stats() *ConnStats {
+func (cn *peer) stats() *ConnStats {
        return &cn._stats
 }
 
-func (cn *PeerConn) torrent() requestStrategyTorrent {
+func (cn *peer) torrent() requestStrategyTorrent {
        return cn.t.requestStrategyTorrent()
 }
index 80950d5ff807dec513cb7b9c9f1501a0fc2f32e8..7ad4c41a646168761c87aca8ed24f40505cc5340 100644 (file)
@@ -155,14 +155,14 @@ func TestConnPexPeerFlags(t *testing.T) {
                conn *PeerConn
                f    pp.PexPeerFlags
        }{
-               {&PeerConn{outgoing: false, PeerPrefersEncryption: false}, 0},
-               {&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption},
-               {&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn},
-               {&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
-               {&PeerConn{remoteAddr: udpAddr}, pp.PexSupportsUtp},
-               {&PeerConn{remoteAddr: udpAddr, outgoing: true}, pp.PexOutgoingConn | pp.PexSupportsUtp},
-               {&PeerConn{remoteAddr: tcpAddr, outgoing: true}, pp.PexOutgoingConn},
-               {&PeerConn{remoteAddr: tcpAddr}, 0},
+               {&PeerConn{peer: peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
+               {&PeerConn{peer: peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
+               {&PeerConn{peer: peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
+               {&PeerConn{peer: peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
+               {&PeerConn{peer: peer{remoteAddr: udpAddr}}, pp.PexSupportsUtp},
+               {&PeerConn{peer: peer{remoteAddr: udpAddr, outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
+               {&PeerConn{peer: peer{remoteAddr: tcpAddr, outgoing: true}}, pp.PexOutgoingConn},
+               {&PeerConn{peer: peer{remoteAddr: tcpAddr}}, 0},
        }
        for i, tc := range testcases {
                f := tc.conn.pexPeerFlags()
@@ -184,22 +184,22 @@ func TestConnPexEvent(t *testing.T) {
        }{
                {
                        pexAdd,
-                       &PeerConn{remoteAddr: udpAddr},
+                       &PeerConn{peer: peer{remoteAddr: udpAddr}},
                        pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
                },
                {
                        pexDrop,
-                       &PeerConn{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port},
+                       &PeerConn{peer: peer{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port}},
                        pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
                },
                {
                        pexAdd,
-                       &PeerConn{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port},
+                       &PeerConn{peer: peer{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port}},
                        pexEvent{pexAdd, dialTcpAddr, 0},
                },
                {
                        pexDrop,
-                       &PeerConn{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port},
+                       &PeerConn{peer: peer{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port}},
                        pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
                },
        }
index 9bc947544bb8d8a399c2800e58b2fbe9b8360ce3..b689d3049f4fd2ddff36513d28b4d68e22380b1f 100644 (file)
--- a/peerid.go
+++ b/peerid.go
@@ -3,7 +3,7 @@ package torrent
 // Peer client ID.
 type PeerID [20]byte
 
-// // Pretty prints the ID as hex, except parts that adher to the Peer ID
+// // Pretty prints the ID as hex, except parts that adher to the PeerInfo ID
 // // Conventions of BEP 20.
 // func (me PeerID) String() string {
 //     // if me[0] == '-' && me[7] == '-' {
index 4967b5d92dd17810008edbe0ff9f0e02112d6f0c..828883c444bf9ebd89b42a4c88ebd4749ae9523e 100644 (file)
@@ -23,7 +23,7 @@ var (
 func TestPexAdded(t *testing.T) {
        t.Run("noHold", func(t *testing.T) {
                s := new(pexState)
-               s.Add(&PeerConn{remoteAddr: addrs[0], outgoing: true})
+               s.Add(&PeerConn{peer: peer{remoteAddr: addrs[0], outgoing: true}})
                targ := &pexState{
                        ev: []pexEvent{
                                pexEvent{pexAdd, addrs[0], pp.PexOutgoingConn},
@@ -39,7 +39,7 @@ func TestPexAdded(t *testing.T) {
                        },
                        nc: 0,
                }
-               s.Add(&PeerConn{remoteAddr: addrs[0]})
+               s.Add(&PeerConn{peer: peer{remoteAddr: addrs[0]}})
                targ := &pexState{
                        hold: []pexEvent{
                                pexEvent{pexDrop, addrs[1], 0},
@@ -59,7 +59,7 @@ func TestPexAdded(t *testing.T) {
                        },
                        nc: pexTargAdded,
                }
-               s.Add(&PeerConn{remoteAddr: addrs[0]})
+               s.Add(&PeerConn{peer: peer{remoteAddr: addrs[0]}})
                targ := &pexState{
                        hold: []pexEvent{},
                        ev: []pexEvent{
@@ -75,7 +75,7 @@ func TestPexAdded(t *testing.T) {
 func TestPexDropped(t *testing.T) {
        t.Run("belowTarg", func(t *testing.T) {
                s := &pexState{nc: 1}
-               s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}})
+               s.Drop(&PeerConn{peer: peer{remoteAddr: addrs[0], pex: pexConnState{Listed: true}}})
                targ := &pexState{
                        hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
                        nc:   0,
@@ -84,7 +84,7 @@ func TestPexDropped(t *testing.T) {
        })
        t.Run("aboveTarg", func(t *testing.T) {
                s := &pexState{nc: pexTargAdded + 1}
-               s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}})
+               s.Drop(&PeerConn{peer: peer{remoteAddr: addrs[0], pex: pexConnState{Listed: true}}})
                targ := &pexState{
                        ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
                        nc: pexTargAdded,
@@ -93,7 +93,7 @@ func TestPexDropped(t *testing.T) {
        })
        t.Run("aboveTargNotListed", func(t *testing.T) {
                s := &pexState{nc: pexTargAdded + 1}
-               s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: false}})
+               s.Drop(&PeerConn{peer: peer{remoteAddr: addrs[0], pex: pexConnState{Listed: false}}})
                targ := &pexState{nc: pexTargAdded + 1}
                require.EqualValues(t, targ, s)
        })
index e70ed105de502f3aec81a4dd8776dec280d3e588..dcb3136c3ecfb6190cdb4d377fd6a2b5298f54b0 100644 (file)
@@ -96,7 +96,7 @@ func (s *pexConnState) Recv(payload []byte) error {
        torrent.Add("pex added peers received", int64(len(rx.Added)))
        torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
 
-       var peers Peers
+       var peers peerInfos
        peers.AppendFromPex(rx.Added6, rx.Added6Flags)
        peers.AppendFromPex(rx.Added, rx.AddedFlags)
        s.dbg.Printf("adding %d peers from PEX", len(peers))
index f112e0bd262c4f194628a54f94af8aa528258090..fd0eb68a56e04148b64ce4af8bb8028bd0042c3a 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -61,7 +61,7 @@ type Piece struct {
 
        // Connections that have written data to this piece since its last check.
        // This can include connections that have closed.
-       dirtiers map[*PeerConn]struct{}
+       dirtiers map[*peer]struct{}
 }
 
 func (p *Piece) String() string {
index 0eddf161dd5ac0ea475b0688c3cfe2e081df5705..d0ef43edbb6d9bed2c89ea8f9bfbaa2152fc3a74 100644 (file)
@@ -11,7 +11,7 @@ import (
 // change if our apparent IP changes, we don't currently handle that.
 type prioritizedPeersItem struct {
        prio peerPriority
-       p    Peer
+       p    PeerInfo
 }
 
 var hashSeed = maphash.MakeSeed()
@@ -34,10 +34,10 @@ func (me prioritizedPeersItem) Less(than btree.Item) bool {
 
 type prioritizedPeers struct {
        om      *btree.BTree
-       getPrio func(Peer) peerPriority
+       getPrio func(PeerInfo) peerPriority
 }
 
-func (me *prioritizedPeers) Each(f func(Peer)) {
+func (me *prioritizedPeers) Each(f func(PeerInfo)) {
        me.om.Ascend(func(i btree.Item) bool {
                f(i.(prioritizedPeersItem).p)
                return true
@@ -49,12 +49,12 @@ func (me *prioritizedPeers) Len() int {
 }
 
 // Returns true if a peer is replaced.
-func (me *prioritizedPeers) Add(p Peer) bool {
+func (me *prioritizedPeers) Add(p PeerInfo) bool {
        return me.om.ReplaceOrInsert(prioritizedPeersItem{me.getPrio(p), p}) != nil
 }
 
 // Returns true if a peer is replaced.
-func (me *prioritizedPeers) AddReturningReplacedPeer(p Peer) (ret Peer, ok bool) {
+func (me *prioritizedPeers) AddReturningReplacedPeer(p PeerInfo) (ret PeerInfo, ok bool) {
        item := me.om.ReplaceOrInsert(prioritizedPeersItem{me.getPrio(p), p})
        if item == nil {
                return
@@ -74,6 +74,6 @@ func (me *prioritizedPeers) DeleteMin() (ret prioritizedPeersItem, ok bool) {
        return
 }
 
-func (me *prioritizedPeers) PopMax() Peer {
+func (me *prioritizedPeers) PopMax() PeerInfo {
        return me.om.DeleteMax().(prioritizedPeersItem).p
 }
index c08c5a33e83d8cebe7a367dcc3cb26b2703398d4..5e61c25f1f56c4d8c0df6f92f3114a83a3d494cc 100644 (file)
@@ -11,14 +11,14 @@ import (
 func TestPrioritizedPeers(t *testing.T) {
        pp := prioritizedPeers{
                om: btree.New(3),
-               getPrio: func(p Peer) peerPriority {
+               getPrio: func(p PeerInfo) peerPriority {
                        return bep40PriorityIgnoreError(p.addr(), IpPort{IP: net.ParseIP("0.0.0.0")})
                },
        }
        _, ok := pp.DeleteMin()
        assert.Panics(t, func() { pp.PopMax() })
        assert.False(t, ok)
-       ps := []Peer{
+       ps := []PeerInfo{
                {Addr: ipPortAddr{IP: net.ParseIP("1.2.3.4")}},
                {Addr: ipPortAddr{IP: net.ParseIP("1::2")}},
                {Addr: ipPortAddr{IP: net.ParseIP("")}},
@@ -30,14 +30,14 @@ func TestPrioritizedPeers(t *testing.T) {
                assert.True(t, pp.Add(p))
                assert.Equal(t, i+1, pp.Len())
        }
-       pop := func(expected *Peer) {
+       pop := func(expected *PeerInfo) {
                if expected == nil {
                        assert.Panics(t, func() { pp.PopMax() })
                } else {
                        assert.Equal(t, *expected, pp.PopMax())
                }
        }
-       min := func(expected *Peer) {
+       min := func(expected *PeerInfo) {
                i, ok := pp.DeleteMin()
                if expected == nil {
                        assert.False(t, ok)
diff --git a/segments/index.go b/segments/index.go
new file mode 100644 (file)
index 0000000..6717dcb
--- /dev/null
@@ -0,0 +1,45 @@
+package segments
+
+import (
+       "sort"
+)
+
+func NewIndex(segments LengthIter) (ret Index) {
+       var start Length
+       for l, ok := segments(); ok; l, ok = segments() {
+               ret.segments = append(ret.segments, Extent{start, l})
+               start += l
+       }
+       return
+}
+
+type Index struct {
+       segments []Extent
+}
+
+func (me Index) iterSegments() func() (Length, bool) {
+       return func() (Length, bool) {
+               if len(me.segments) == 0 {
+                       return 0, false
+               } else {
+                       l := me.segments[0].Length
+                       me.segments = me.segments[1:]
+                       return l, true
+               }
+       }
+}
+
+func (me Index) Locate(e Extent, output Callback) bool {
+       first := sort.Search(len(me.segments), func(i int) bool {
+               _e := me.segments[i]
+               return _e.End() > e.Start
+       })
+       if first == len(me.segments) {
+               return false
+       }
+       e.Start -= me.segments[first].Start
+       me.segments = me.segments[first:]
+       return Scan(me.iterSegments(), e, func(i int, e Extent) bool {
+               return output(i+first, e)
+       })
+}
diff --git a/segments/segments.go b/segments/segments.go
new file mode 100644 (file)
index 0000000..90e77ce
--- /dev/null
@@ -0,0 +1,63 @@
+package segments
+
+type Int = int64
+
+type Length = Int
+
+func min(i Int, rest ...Int) Int {
+       ret := i
+       for _, i := range rest {
+               if i < ret {
+                       ret = i
+               }
+       }
+       return ret
+}
+
+type Extent struct {
+       Start, Length Int
+}
+
+func (e Extent) End() Int {
+       return e.Start + e.Length
+}
+
+type (
+       Callback   = func(int, Extent) bool
+       LengthIter = func() (Length, bool)
+)
+
+func Scan(haystack LengthIter, needle Extent, callback Callback) bool {
+       i := 0
+       for needle.Length != 0 {
+               l, ok := haystack()
+               if !ok {
+                       return false
+               }
+               if needle.Start < l || needle.Start == l && l == 0 {
+                       e1 := Extent{
+                               Start:  needle.Start,
+                               Length: min(l, needle.End()) - needle.Start,
+                       }
+                       if e1.Length >= 0 {
+                               if !callback(i, e1) {
+                                       return true
+                               }
+                               needle.Start = 0
+                               needle.Length -= e1.Length
+                       }
+               } else {
+                       needle.Start -= l
+               }
+               i++
+       }
+       return true
+}
+
+func LocaterFromLengthIter(li LengthIter) Locater {
+       return func(e Extent, c Callback) bool {
+               return Scan(li, e, c)
+       }
+}
+
+type Locater func(Extent, Callback) bool
diff --git a/segments/segments_test.go b/segments/segments_test.go
new file mode 100644 (file)
index 0000000..8dcff6c
--- /dev/null
@@ -0,0 +1,90 @@
+package segments
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func LengthIterFromSlice(ls []Length) LengthIter {
+       return func() (Length, bool) {
+               switch len(ls) {
+               case 0:
+                       return -1, false
+               default:
+                       l := ls[0]
+                       ls = ls[1:]
+                       return l, true
+               }
+       }
+}
+
+type ScanCallbackValue struct {
+       Index int
+       Extent
+}
+
+type collectExtents []ScanCallbackValue
+
+func (me *collectExtents) scanCallback(i int, e Extent) bool {
+       *me = append(*me, ScanCallbackValue{
+               Index:  i,
+               Extent: e,
+       })
+       return true
+}
+
+type newLocater func(LengthIter) Locater
+
+func assertLocate(t *testing.T, nl newLocater, ls []Length, needle Extent, firstExpectedIndex int, expectedExtents []Extent) {
+       var actual collectExtents
+       var expected collectExtents
+       for i, e := range expectedExtents {
+               expected.scanCallback(firstExpectedIndex+i, e)
+       }
+       nl(LengthIterFromSlice(ls))(needle, actual.scanCallback)
+       assert.EqualValues(t, expected, actual)
+}
+
+func testLocater(t *testing.T, newLocater newLocater) {
+       assertLocate(t, newLocater,
+               []Length{1, 0, 2, 0, 3},
+               Extent{2, 2},
+               2,
+               []Extent{{1, 1}, {0, 0}, {0, 1}})
+       assertLocate(t, newLocater,
+               []Length{1, 0, 2, 0, 3},
+               Extent{6, 2},
+               2,
+               []Extent{})
+       assertLocate(t, newLocater,
+               []Length{1652, 1514, 1554, 1618, 1546, 129241752, 1537}, // 128737588
+               Extent{0, 16384},
+               0,
+               []Extent{
+                       {0, 1652},
+                       {0, 1514},
+                       {0, 1554},
+                       {0, 1618},
+                       {0, 1546},
+                       {0, 8500}})
+       assertLocate(t, newLocater,
+               []Length{1652, 1514, 1554, 1618, 1546, 129241752, 1537, 1536, 1551}, // 128737588
+               Extent{129236992, 16384},
+               5,
+               []Extent{
+                       {129229108, 12644},
+                       {0, 1537},
+                       {0, 1536},
+                       {0, 667}})
+}
+
+func TestScan(t *testing.T) {
+       testLocater(t, LocaterFromLengthIter)
+}
+
+func TestIndex(t *testing.T) {
+       testLocater(t, func(li LengthIter) Locater {
+               return NewIndex(li).Locate
+       })
+}
diff --git a/spec.go b/spec.go
index 5dbd472bda965b5dbffed780ecefa3a456d13a3a..e0c0bc24232bfdc6837379c7d3aa7cb3565b4bc2 100644 (file)
--- a/spec.go
+++ b/spec.go
@@ -5,8 +5,8 @@ import (
        "github.com/anacrolix/torrent/storage"
 )
 
-// Specifies a new torrent for adding to a client. There are helpers for
-// magnet URIs and torrent metainfo files.
+// Specifies a new torrent for adding to a client. There are helpers for magnet URIs and torrent
+// metainfo files.
 type TorrentSpec struct {
        // The tiered tracker URIs.
        Trackers  [][]string
@@ -14,8 +14,12 @@ type TorrentSpec struct {
        InfoBytes []byte
        // The name to use if the Name field from the Info isn't available.
        DisplayName string
-       // The chunk size to use for outbound requests. Defaults to 16KiB if not
-       // set.
+       Webseeds    []string
+       DhtNodes    []string
+       // The combination of the "xs" and "as" fields in magnet links, for now.
+       Sources []string
+
+       // The chunk size to use for outbound requests. Defaults to 16KiB if not set.
        ChunkSize int
        Storage   storage.ClientImpl
 }
@@ -29,20 +33,30 @@ func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
                Trackers:    [][]string{m.Trackers},
                DisplayName: m.DisplayName,
                InfoHash:    m.InfoHash,
+               Webseeds:    m.Params["ws"],
+               Sources:     append(m.Params["xs"], m.Params["as"]...),
+               // TODO: What's the parameter for DHT nodes or bootstrap peers in a magnet link?
        }
        return
 }
 
-func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
-       info, _ := mi.UnmarshalInfo()
-       spec = &TorrentSpec{
-               Trackers:    mi.AnnounceList,
+func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) *TorrentSpec {
+       info, err := mi.UnmarshalInfo()
+       if err != nil {
+               panic(err)
+       }
+       return &TorrentSpec{
+               Trackers:    mi.UpvertedAnnounceList(),
+               InfoHash:    mi.HashInfoBytes(),
                InfoBytes:   mi.InfoBytes,
                DisplayName: info.Name,
-               InfoHash:    mi.HashInfoBytes(),
+               Webseeds:    mi.UrlList,
+               DhtNodes: func() (ret []string) {
+                       ret = make([]string, len(mi.Nodes))
+                       for _, node := range mi.Nodes {
+                               ret = append(ret, string(node))
+                       }
+                       return
+               }(),
        }
-       if spec.Trackers == nil && mi.Announce != "" {
-               spec.Trackers = [][]string{{mi.Announce}}
-       }
-       return
 }
index 013351d77f9be42066cdd1ad7728ad5cf50543bf..6dc3aeebb781be357d50ab80c99ecbba8173a970 100644 (file)
@@ -6,6 +6,8 @@ import (
        "path/filepath"
 
        "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/torrent/common"
+       "github.com/anacrolix/torrent/segments"
 
        "github.com/anacrolix/torrent/metainfo"
 )
@@ -67,19 +69,24 @@ func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has
        if err != nil {
                return nil, err
        }
+       upvertedFiles := info.UpvertedFiles()
        return &fileTorrentImpl{
                dir,
-               info,
+               info.Name,
+               upvertedFiles,
+               segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
                infoHash,
                fs.pc,
        }, nil
 }
 
 type fileTorrentImpl struct {
-       dir        string
-       info       *metainfo.Info
-       infoHash   metainfo.Hash
-       completion PieceCompletion
+       dir            string
+       infoName       string
+       upvertedFiles  []metainfo.FileInfo
+       segmentLocater segments.Index
+       infoHash       metainfo.Hash
+       completion     PieceCompletion
 }
 
 func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
@@ -154,62 +161,46 @@ func (fst *fileTorrentImplIO) readFileAt(fi metainfo.FileInfo, b []byte, off int
 
 // Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
 func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
-       for _, fi := range fst.fts.info.UpvertedFiles() {
-               for off < fi.Length {
-                       n1, err1 := fst.readFileAt(fi, b, off)
-                       n += n1
-                       off += int64(n1)
-                       b = b[n1:]
-                       if len(b) == 0 {
-                               // Got what we need.
-                               return
-                       }
-                       if n1 != 0 {
-                               // Made progress.
-                               continue
-                       }
-                       err = err1
-                       return
-               }
-               off -= fi.Length
+       fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool {
+               n1, err1 := fst.readFileAt(fst.fts.upvertedFiles[i], b[:e.Length], e.Start)
+               n += n1
+               b = b[n1:]
+               err = err1
+               return err == nil // && int64(n1) == e.Length
+       })
+       if len(b) != 0 && err == nil {
+               err = io.EOF
        }
-       err = io.EOF
        return
 }
 
 func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
-       for _, fi := range fst.fts.info.UpvertedFiles() {
-               if off >= fi.Length {
-                       off -= fi.Length
-                       continue
-               }
-               n1 := len(p)
-               if int64(n1) > fi.Length-off {
-                       n1 = int(fi.Length - off)
-               }
-               name := fst.fts.fileInfoName(fi)
+       //log.Printf("write at %v: %v bytes", off, len(p))
+       fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
+               name := fst.fts.fileInfoName(fst.fts.upvertedFiles[i])
                os.MkdirAll(filepath.Dir(name), 0777)
                var f *os.File
                f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0666)
                if err != nil {
-                       return
-               }
-               n1, err = f.WriteAt(p[:n1], off)
-               // TODO: On some systems, write errors can be delayed until the Close.
-               f.Close()
-               if err != nil {
-                       return
+                       return false
                }
+               var n1 int
+               n1, err = f.WriteAt(p[:e.Length], e.Start)
+               //log.Printf("%v %v wrote %v: %v", i, e, n1, err)
+               closeErr := f.Close()
                n += n1
-               off = 0
                p = p[n1:]
-               if len(p) == 0 {
-                       break
+               if err == nil {
+                       err = closeErr
                }
-       }
+               if err == nil && int64(n1) != e.Length {
+                       err = io.ErrShortWrite
+               }
+               return err == nil
+       })
        return
 }
 
 func (fts *fileTorrentImpl) fileInfoName(fi metainfo.FileInfo) string {
-       return filepath.Join(append([]string{fts.dir, fts.info.Name}, fi.Path...)...)
+       return filepath.Join(append([]string{fts.dir, fts.infoName}, fi.Path...)...)
 }
index aebf1094ce7e78c39300502c35c2e9f09b3bbaae..f811e24caa8a21c517e29882daf8714e88862c89 100644 (file)
@@ -61,7 +61,11 @@ func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
 }
 
 func (ts *mmapTorrentStorage) Close() error {
-       return ts.span.Close()
+       errs := ts.span.Close()
+       if len(errs) > 0 {
+               return errs[0]
+       }
+       return nil
 }
 
 type mmapStoragePiece struct {
@@ -113,6 +117,7 @@ func mMapTorrent(md *metainfo.Info, location string) (mms *mmap_span.MMapSpan, e
                        mms.Append(mm)
                }
        }
+       mms.InitIndex()
        return
 }
 
diff --git a/t.go b/t.go
index a808cbf47d29d505cf1f8eb0eab0abdf0b6a46a6..9d7ce2855e9d26c6e586b911ff46433c66279ee3 100644 (file)
--- a/t.go
+++ b/t.go
@@ -221,7 +221,7 @@ func (t *Torrent) Files() []*File {
        return *t.files
 }
 
-func (t *Torrent) AddPeers(pp []Peer) int {
+func (t *Torrent) AddPeers(pp []PeerInfo) int {
        cl := t.cl
        cl.lock()
        defer cl.unlock()
index d231a8e54c6c77ceb1212ac38f3c82073a0b2f59..c2f936e2e2a41a08f4e9d357844d675ea9b3a2a2 100644 (file)
@@ -8,12 +8,17 @@ import (
        "fmt"
        "io"
        "math/rand"
+       "net/http"
        "net/url"
+       "sort"
        "sync"
        "text/tabwriter"
        "time"
        "unsafe"
 
+       "github.com/anacrolix/torrent/common"
+       "github.com/anacrolix/torrent/segments"
+       "github.com/anacrolix/torrent/webseed"
        "github.com/davecgh/go-spew/spew"
        "github.com/pion/datachannel"
 
@@ -75,8 +80,11 @@ type Torrent struct {
        metainfo metainfo.MetaInfo
 
        // The info dict. nil if we don't have it (yet).
-       info  *metainfo.Info
-       files *[]*File
+       info      *metainfo.Info
+       fileIndex segments.Index
+       files     *[]*File
+
+       webSeeds map[string]*peer
 
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
@@ -84,8 +92,8 @@ type Torrent struct {
        maxEstablishedConns int
        // Set of addrs to which we're attempting to connect. Connections are
        // half-open until all handshakes are completed.
-       halfOpen    map[string]Peer
-       fastestConn *PeerConn
+       halfOpen    map[string]PeerInfo
+       fastestPeer *peer
 
        // Reserve of peers to connect to. A peer can be both here and in the
        // active connections if were told about the peer after connecting with
@@ -182,9 +190,9 @@ func (t *Torrent) Closed() <-chan struct{} {
 
 // KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
 // pending, and half-open peers.
-func (t *Torrent) KnownSwarm() (ks []Peer) {
+func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
        // Add pending peers to the list
-       t.peers.Each(func(peer Peer) {
+       t.peers.Each(func(peer PeerInfo) {
                ks = append(ks, peer)
        })
 
@@ -196,7 +204,7 @@ func (t *Torrent) KnownSwarm() (ks []Peer) {
        // Add active peers to the list
        for conn := range t.conns {
 
-               ks = append(ks, Peer{
+               ks = append(ks, PeerInfo{
                        Id:     conn.PeerID,
                        Addr:   conn.remoteAddr,
                        Source: conn.Discovery,
@@ -255,7 +263,7 @@ func (t *Torrent) unclosedConnsAsSlice() (ret []*PeerConn) {
        return
 }
 
-func (t *Torrent) addPeer(p Peer) (added bool) {
+func (t *Torrent) addPeer(p PeerInfo) (added bool) {
        cl := t.cl
        torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
        if t.closed.IsSet() {
@@ -270,7 +278,7 @@ func (t *Torrent) addPeer(p Peer) (added bool) {
        }
        if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
                torrent.Add("peers replaced", 1)
-               if !replaced.Equal(p) {
+               if !replaced.equal(p) {
                        t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
                        added = true
                }
@@ -391,6 +399,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        t.nameMu.Lock()
        t.info = info
        t.nameMu.Unlock()
+       t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
        t.displayName = "" // Save a few bytes lol.
        t.initFiles()
        t.cacheLength()
@@ -398,13 +407,11 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        return nil
 }
 
+// This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
-       for conn := range t.conns {
-               if err := conn.setNumPieces(t.numPieces()); err != nil {
-                       t.logger.Printf("closing connection: %s", err)
-                       conn.close()
-               }
-       }
+       t.iterPeers(func(p *peer) {
+               p.onGotInfo(t.info)
+       })
        for i := range t.pieces {
                t.updatePieceCompletion(pieceIndex(i))
                p := &t.pieces[i]
@@ -629,9 +636,11 @@ func (t *Torrent) writeStatus(w io.Writer) {
        spew.NewDefaultConfig()
        spew.Fdump(w, t.statsLocked())
 
-       conns := t.connsAsSlice()
-       slices.Sort(conns, worseConn)
-       for i, c := range conns {
+       peers := t.peersAsSlice()
+       sort.Slice(peers, func(i, j int) bool {
+               return worseConn(peers[i], peers[j])
+       })
+       for i, c := range peers {
                fmt.Fprintf(w, "%2d. ", i+1)
                c.writeStatus(w, t)
        }
@@ -731,8 +740,8 @@ func (t *Torrent) requestOffset(r request) int64 {
        return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
 }
 
-// Return the request that would include the given offset into the torrent
-// data. Returns !ok if there is no such request.
+// Return the request that would include the given offset into the torrent data. Returns !ok if
+// there is no such request.
 func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
        return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
 }
@@ -848,10 +857,9 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
        })
 }
 
-// The worst connection is one that hasn't been sent, or sent anything useful
-// for the longest. A bad connection is one that usually sends us unwanted
-// pieces, or has been in worser half of the established connections for more
-// than a minute.
+// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
+// connection is one that usually sends us unwanted pieces, or has been in worser half of the
+// established connections for more than a minute.
 func (t *Torrent) worstBadConn() *PeerConn {
        wcs := worseConnSlice{t.unclosedConnsAsSlice()}
        heap.Init(&wcs)
@@ -943,12 +951,12 @@ func (t *Torrent) maybeNewConns() {
 
 func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
        // t.logger.Printf("piece %d priority changed", piece)
-       for c := range t.conns {
+       t.iterPeers(func(c *peer) {
                if c.updatePiecePriority(piece) {
                        // log.Print("conn piece priority changed")
                        c.updateRequests()
                }
-       }
+       })
        t.maybeNewConns()
        t.publishPieceChange(piece)
 }
@@ -1227,12 +1235,19 @@ func (t *Torrent) deleteConnection(c *PeerConn) (ret bool) {
        }
        torrent.Add("deleted connections", 1)
        c.deleteAllRequests()
-       if len(t.conns) == 0 {
+       if t.numActivePeers() == 0 {
                t.assertNoPendingRequests()
        }
        return
 }
 
+func (t *Torrent) numActivePeers() (num int) {
+       t.iterPeers(func(*peer) {
+               num++
+       })
+       return
+}
+
 func (t *Torrent) assertNoPendingRequests() {
        if len(t.pendingRequests) != 0 {
                panic(t.pendingRequests)
@@ -1378,6 +1393,9 @@ func (t *Torrent) startScrapingTracker(_url string) {
        sl := func() torrentTrackerAnnouncer {
                switch u.Scheme {
                case "ws", "wss":
+                       if t.cl.config.DisableWebtorrent {
+                               return nil
+                       }
                        return t.startWebsocketAnnouncer(*u)
                }
                if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
@@ -1455,7 +1473,7 @@ func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
                                // Can't do anything with this.
                                continue
                        }
-                       t.addPeer(Peer{
+                       t.addPeer(PeerInfo{
                                Addr:   ipPortAddr{cp.IP, cp.Port},
                                Source: PeerSourceDhtGetPeers,
                        })
@@ -1510,7 +1528,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) {
        }
 }
 
-func (t *Torrent) addPeers(peers []Peer) (added int) {
+func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
        for _, p := range peers {
                if t.addPeer(p) {
                        added++
@@ -1556,7 +1574,7 @@ func (t *Torrent) numTotalPeers() int {
        for addr := range t.halfOpen {
                peers[addr] = struct{}{}
        }
-       t.peers.Each(func(peer Peer) {
+       t.peers.Each(func(peer PeerInfo) {
                peers[peer.Addr.String()] = struct{}{}
        })
        return len(peers)
@@ -1642,7 +1660,9 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
        defer t.cl.unlock()
        oldMax = t.maxEstablishedConns
        t.maxEstablishedConns = max
-       wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
+       wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
+               return worseConn(&l.peer, &r.peer)
+       })
        for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
                t.dropConnection(wcs.Pop().(*PeerConn))
        }
@@ -1695,7 +1715,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                                c.stats().incrementPiecesDirtiedBad()
                        }
 
-                       bannableTouchers := make([]*PeerConn, 0, len(p.dirtiers))
+                       bannableTouchers := make([]*peer, 0, len(p.dirtiers))
                        for c := range p.dirtiers {
                                if !c.trusted {
                                        bannableTouchers = append(bannableTouchers, c)
@@ -1764,11 +1784,11 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
        //              c.drop()
        //      }
        // }
-       for conn := range t.conns {
+       t.iterPeers(func(conn *peer) {
                if conn.peerHasPiece(piece) {
                        conn.updateRequests()
                }
-       }
+       })
 }
 
 func (t *Torrent) tryCreateMorePieceHashers() {
@@ -1836,10 +1856,10 @@ func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
        }
 }
 
-func (t *Torrent) connsAsSlice() (ret []*PeerConn) {
-       for c := range t.conns {
-               ret = append(ret, c)
-       }
+func (t *Torrent) peersAsSlice() (ret []*peer) {
+       t.iterPeers(func(p *peer) {
+               ret = append(ret, p)
+       })
        return
 }
 
@@ -1863,7 +1883,7 @@ func (t *Torrent) VerifyData() {
 }
 
 // Start the process of connecting to the given peer for the given torrent if appropriate.
-func (t *Torrent) initiateConn(peer Peer) {
+func (t *Torrent) initiateConn(peer PeerInfo) {
        if peer.Id == t.cl.peerID {
                return
        }
@@ -1882,9 +1902,9 @@ func (t *Torrent) initiateConn(peer Peer) {
 // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
 // quickly make one Client visible to the Torrent of another Client.
 func (t *Torrent) AddClientPeer(cl *Client) int {
-       return t.AddPeers(func() (ps []Peer) {
+       return t.AddPeers(func() (ps []PeerInfo) {
                for _, la := range cl.ListenAddrs() {
-                       ps = append(ps, Peer{
+                       ps = append(ps, PeerInfo{
                                Addr:    la,
                                Trusted: true,
                        })
@@ -1928,11 +1948,11 @@ func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) {
        torrent.Add("request timeouts", 1)
        cb.t.cl.lock()
        defer cb.t.cl.unlock()
-       for cn := range cb.t.conns {
+       cb.t.iterPeers(func(cn *peer) {
                if cn.peerHasPiece(pieceIndex(r.Index)) {
                        cn.updateRequests()
                }
-       }
+       })
 
 }
 
@@ -1957,9 +1977,9 @@ func (t *Torrent) DisallowDataDownload() {
 func (t *Torrent) disallowDataDownloadLocked() {
        log.Printf("disallowing data download")
        t.dataDownloadDisallowed = true
-       for c := range t.conns {
+       t.iterPeers(func(c *peer) {
                c.updateRequests()
-       }
+       })
 }
 
 func (t *Torrent) AllowDataDownload() {
@@ -1967,10 +1987,9 @@ func (t *Torrent) AllowDataDownload() {
        defer t.cl.unlock()
        log.Printf("AllowDataDownload")
        t.dataDownloadDisallowed = false
-       for c := range t.conns {
+       t.iterPeers(func(c *peer) {
                c.updateRequests()
-       }
-
+       })
 }
 
 func (t *Torrent) AllowDataUpload() {
@@ -1998,3 +2017,52 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) {
        defer t.cl.unlock()
        t.userOnWriteChunkErr = f
 }
+
+func (t *Torrent) iterPeers(f func(*peer)) {
+       for pc := range t.conns {
+               f(&pc.peer)
+       }
+       for _, ws := range t.webSeeds {
+               f(ws)
+       }
+}
+
+func (t *Torrent) addWebSeed(url string) {
+       if t.cl.config.DisableWebseeds {
+               return
+       }
+       if _, ok := t.webSeeds[url]; ok {
+               return
+       }
+       const maxRequests = 10
+       ws := webSeed{
+               peer: peer{
+                       t:                        t,
+                       connString:               url,
+                       outgoing:                 true,
+                       network:                  "http",
+                       reconciledHandshakeStats: true,
+                       peerSentHaveAll:          true,
+                       PeerMaxRequests:          maxRequests,
+               },
+               client: webseed.Client{
+                       HttpClient: http.DefaultClient,
+                       Url:        url,
+               },
+               requests: make(map[request]webseed.Request, maxRequests),
+       }
+       ws.peer.peerImpl = &ws
+       if t.haveInfo() {
+               ws.onGotInfo(t.info)
+       }
+       t.webSeeds[url] = &ws.peer
+}
+
+func (t *Torrent) peerIsActive(p *peer) (active bool) {
+       t.iterPeers(func(p1 *peer) {
+               if p1 == p {
+                       active = true
+               }
+       })
+       return
+}
index 6c47a80cfbd7ffffef5bdc4b35d4aa3bef570fb4..2812a0671fc8b1124f50101d54122b1c0e248fd6 100644 (file)
@@ -23,12 +23,12 @@ func r(i, b, l pp.Integer) request {
        return request{i, chunkSpec{b, l}}
 }
 
-// Check the given Request is correct for various torrent offsets.
+// Check the given request is correct for various torrent offsets.
 func TestTorrentRequest(t *testing.T) {
        const s = 472183431 // Length of torrent.
        for _, _case := range []struct {
                off int64   // An offset into the torrent.
-               req request // The expected Request. The zero value means !ok.
+               req request // The expected request. The zero value means !ok.
        }{
                // Invalid offset.
                {-1, request{}},
index a7d3041cfe7c37b9fafaa4537afc07cf1926ae63..1323795398496a0893b51c9b2e32d6de6f0cfd31 100644 (file)
@@ -132,7 +132,7 @@ func (me *trackerScraper) announce(event tracker.AnnounceEvent) (ret trackerAnno
                ret.Err = fmt.Errorf("error announcing: %s", err)
                return
        }
-       me.t.AddPeers(Peers(nil).AppendFromTracker(res.Peers))
+       me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers))
        ret.NumPeers = len(res.Peers)
        ret.Interval = time.Duration(res.Interval) * time.Second
        return
diff --git a/web_seed.go b/web_seed.go
new file mode 100644 (file)
index 0000000..72bda5f
--- /dev/null
@@ -0,0 +1,97 @@
+package torrent
+
+import (
+       "net/http"
+
+       "github.com/anacrolix/log"
+       "github.com/anacrolix/torrent/common"
+       "github.com/anacrolix/torrent/metainfo"
+       pp "github.com/anacrolix/torrent/peer_protocol"
+       "github.com/anacrolix/torrent/segments"
+       "github.com/anacrolix/torrent/webseed"
+)
+
+type httpRequestResult struct {
+       resp *http.Response
+       err  error
+}
+
+type requestPart struct {
+       req    *http.Request
+       e      segments.Extent
+       result chan httpRequestResult
+}
+
+type webseedRequest struct {
+       cancel func()
+}
+
+type webSeed struct {
+       client   webseed.Client
+       requests map[request]webseed.Request
+       peer     peer
+}
+
+var _ peerImpl = (*webSeed)(nil)
+
+func (ws *webSeed) onGotInfo(info *metainfo.Info) {
+       ws.client.FileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
+       ws.client.Info = info
+}
+
+func (ws *webSeed) _postCancel(r request) {
+       ws.cancel(r)
+}
+
+func (ws *webSeed) writeInterested(interested bool) bool {
+       return true
+}
+
+func (ws *webSeed) cancel(r request) bool {
+       ws.requests[r].Cancel()
+       return true
+}
+
+func (ws *webSeed) intoSpec(r request) webseed.RequestSpec {
+       return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
+}
+
+func (ws *webSeed) request(r request) bool {
+       webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
+       ws.requests[r] = webseedRequest
+       go ws.requestResultHandler(r, webseedRequest)
+       return true
+}
+
+func (ws *webSeed) connectionFlags() string {
+       return "WS"
+}
+
+func (ws *webSeed) drop() {
+}
+
+func (ws *webSeed) updateRequests() {
+       ws.peer.doRequestState()
+}
+
+func (ws *webSeed) _close() {}
+
+func (ws *webSeed) requestResultHandler(r request, webseedRequest webseed.Request) {
+       result := <-webseedRequest.Result
+       ws.peer.t.cl.lock()
+       defer ws.peer.t.cl.unlock()
+       if result.Err != nil {
+               log.Printf("webseed request rejected: %v", result.Err)
+               ws.peer.remoteRejectedRequest(r)
+       } else {
+               err := ws.peer.receiveChunk(&pp.Message{
+                       Type:  pp.Piece,
+                       Index: r.Index,
+                       Begin: r.Begin,
+                       Piece: result.Bytes,
+               })
+               if err != nil {
+                       panic(err)
+               }
+       }
+}
diff --git a/webseed/client.go b/webseed/client.go
new file mode 100644 (file)
index 0000000..0b98b93
--- /dev/null
@@ -0,0 +1,116 @@
+package webseed
+
+import (
+       "bytes"
+       "context"
+       "fmt"
+       "io"
+       "net/http"
+
+       "github.com/anacrolix/torrent/metainfo"
+       "github.com/anacrolix/torrent/segments"
+)
+
+type RequestSpec = segments.Extent
+
+type requestPartResult struct {
+       resp *http.Response
+       err  error
+}
+
+type requestPart struct {
+       req    *http.Request
+       e      segments.Extent
+       result chan requestPartResult
+}
+
+type Request struct {
+       cancel func()
+       Result chan RequestResult
+}
+
+func (r Request) Cancel() {
+       r.cancel()
+}
+
+type Client struct {
+       HttpClient *http.Client
+       Url        string
+       FileIndex  segments.Index
+       Info       *metainfo.Info
+}
+
+type RequestResult struct {
+       Bytes []byte
+       Err   error
+}
+
+func (ws *Client) NewRequest(r RequestSpec) Request {
+       ctx, cancel := context.WithCancel(context.Background())
+       var requestParts []requestPart
+       if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool {
+               req, err := NewRequest(ws.Url, i, ws.Info, e.Start, e.Length)
+               if err != nil {
+                       panic(err)
+               }
+               req = req.WithContext(ctx)
+               part := requestPart{
+                       req:    req,
+                       result: make(chan requestPartResult, 1),
+                       e:      e,
+               }
+               go func() {
+                       resp, err := ws.HttpClient.Do(req)
+                       part.result <- requestPartResult{
+                               resp: resp,
+                               err:  err,
+                       }
+               }()
+               requestParts = append(requestParts, part)
+               return true
+       }) {
+               panic("request out of file bounds")
+       }
+       req := Request{
+               cancel: cancel,
+               Result: make(chan RequestResult, 1),
+       }
+       go func() {
+               b, err := readRequestPartResponses(requestParts)
+               req.Result <- RequestResult{
+                       Bytes: b,
+                       Err:   err,
+               }
+       }()
+       return req
+}
+
+func recvPartResult(buf io.Writer, part requestPart) error {
+       result := <-part.result
+       if result.err != nil {
+               return result.err
+       }
+       defer result.resp.Body.Close()
+       if part.e.Start != 0 && result.resp.StatusCode != http.StatusPartialContent {
+               return fmt.Errorf("expected partial content response got %v", result.resp.StatusCode)
+       }
+       copied, err := io.Copy(buf, result.resp.Body)
+       if err != nil {
+               return err
+       }
+       if copied != part.e.Length {
+               return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length)
+       }
+       return nil
+}
+
+func readRequestPartResponses(parts []requestPart) ([]byte, error) {
+       var buf bytes.Buffer
+       for _, part := range parts {
+               err := recvPartResult(&buf, part)
+               if err != nil {
+                       return buf.Bytes(), err
+               }
+       }
+       return buf.Bytes(), nil
+}
diff --git a/webseed/misc.go b/webseed/misc.go
new file mode 100644 (file)
index 0000000..140bc23
--- /dev/null
@@ -0,0 +1,26 @@
+package webseed
+
+import (
+       "fmt"
+       "net/http"
+       "path"
+       "strings"
+
+       "github.com/anacrolix/torrent/metainfo"
+)
+
+// Creates a request per BEP 19.
+func NewRequest(url string, fileIndex int, info *metainfo.Info, offset, length int64) (*http.Request, error) {
+       fileInfo := info.UpvertedFiles()[fileIndex]
+       if strings.HasSuffix(url, "/") {
+               url += path.Join(append([]string{info.Name}, fileInfo.Path...)...)
+       }
+       req, err := http.NewRequest(http.MethodGet, url, nil)
+       if err != nil {
+               return nil, err
+       }
+       if offset != 0 || length != fileInfo.Length {
+               req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
+       }
+       return req, nil
+}
index 39a7e347b00e715f49c4b2986bf4cffb758efddb..5af6c7c0df74a522614f47b949d629a749c707c3 100644 (file)
@@ -92,9 +92,11 @@ func (tc *TrackerClient) Run() error {
                tc.mu.Unlock()
                err := tc.doWebsocket()
                level := log.Info
+               tc.mu.Lock()
                if tc.closed {
                        level = log.Debug
                }
+               tc.mu.Unlock()
                tc.Logger.WithDefaultLevel(level).Printf("websocket instance ended: %v", err)
                time.Sleep(time.Minute)
                tc.mu.Lock()
index 171885054de7af12baa3c80ce0e80a0f9fcb693d..36ac6bc5e6584fcdbcf98427a18ce498e2cf1899 100644 (file)
@@ -8,7 +8,7 @@ import (
        "github.com/anacrolix/multiless"
 )
 
-func worseConn(l, r *PeerConn) bool {
+func worseConn(l, r *peer) bool {
        less, ok := multiless.New().Bool(
                l.useful(), r.useful()).CmpInt64(
                l.lastHelpful().Sub(r.lastHelpful()).Nanoseconds()).CmpInt64(
@@ -45,7 +45,7 @@ func (me worseConnSlice) Len() int {
 }
 
 func (me worseConnSlice) Less(i, j int) bool {
-       return worseConn(me.conns[i], me.conns[j])
+       return worseConn(&me.conns[i].peer, &me.conns[j].peer)
 }
 
 func (me *worseConnSlice) Pop() interface{} {