"fmt"
"io"
"math/rand"
+ "net/http"
"net/url"
+ "sort"
"sync"
"text/tabwriter"
"time"
"unsafe"
+ "github.com/anacrolix/torrent/common"
+ "github.com/anacrolix/torrent/segments"
+ "github.com/anacrolix/torrent/webseed"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
metainfo metainfo.MetaInfo
// The info dict. nil if we don't have it (yet).
- info *metainfo.Info
- files *[]*File
+ info *metainfo.Info
+ fileIndex segments.Index
+ files *[]*File
+
+ webSeeds map[string]*peer
// Active peer connections, running message stream loops. TODO: Make this
// open (not-closed) connections only.
maxEstablishedConns int
// Set of addrs to which we're attempting to connect. Connections are
// half-open until all handshakes are completed.
- halfOpen map[string]Peer
- fastestConn *PeerConn
+ halfOpen map[string]PeerInfo
+ fastestPeer *peer
// Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with
// KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
// pending, and half-open peers.
-func (t *Torrent) KnownSwarm() (ks []Peer) {
+func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
// Add pending peers to the list
- t.peers.Each(func(peer Peer) {
+ t.peers.Each(func(peer PeerInfo) {
ks = append(ks, peer)
})
// Add active peers to the list
for conn := range t.conns {
- ks = append(ks, Peer{
+ ks = append(ks, PeerInfo{
Id: conn.PeerID,
Addr: conn.remoteAddr,
Source: conn.Discovery,
return
}
-func (t *Torrent) addPeer(p Peer) (added bool) {
+func (t *Torrent) addPeer(p PeerInfo) (added bool) {
cl := t.cl
torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
if t.closed.IsSet() {
}
if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
torrent.Add("peers replaced", 1)
- if !replaced.Equal(p) {
+ if !replaced.equal(p) {
t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
added = true
}
t.nameMu.Lock()
t.info = info
t.nameMu.Unlock()
+ t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
t.displayName = "" // Save a few bytes lol.
t.initFiles()
t.cacheLength()
return nil
}
+// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
- for conn := range t.conns {
- if err := conn.setNumPieces(t.numPieces()); err != nil {
- t.logger.Printf("closing connection: %s", err)
- conn.close()
- }
- }
+ t.iterPeers(func(p *peer) {
+ p.onGotInfo(t.info)
+ })
for i := range t.pieces {
t.updatePieceCompletion(pieceIndex(i))
p := &t.pieces[i]
spew.NewDefaultConfig()
spew.Fdump(w, t.statsLocked())
- conns := t.connsAsSlice()
- slices.Sort(conns, worseConn)
- for i, c := range conns {
+ peers := t.peersAsSlice()
+ sort.Slice(peers, func(i, j int) bool {
+ return worseConn(peers[i], peers[j])
+ })
+ for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1)
c.writeStatus(w, t)
}
return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
}
-// Return the request that would include the given offset into the torrent
-// data. Returns !ok if there is no such request.
+// Return the request that would include the given offset into the torrent data. Returns !ok if
+// there is no such request.
func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
}
})
}
-// The worst connection is one that hasn't been sent, or sent anything useful
-// for the longest. A bad connection is one that usually sends us unwanted
-// pieces, or has been in worser half of the established connections for more
-// than a minute.
+// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
+// connection is one that usually sends us unwanted pieces, or has been in worser half of the
+// established connections for more than a minute.
func (t *Torrent) worstBadConn() *PeerConn {
wcs := worseConnSlice{t.unclosedConnsAsSlice()}
heap.Init(&wcs)
func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
// t.logger.Printf("piece %d priority changed", piece)
- for c := range t.conns {
+ t.iterPeers(func(c *peer) {
if c.updatePiecePriority(piece) {
// log.Print("conn piece priority changed")
c.updateRequests()
}
- }
+ })
t.maybeNewConns()
t.publishPieceChange(piece)
}
}
torrent.Add("deleted connections", 1)
c.deleteAllRequests()
- if len(t.conns) == 0 {
+ if t.numActivePeers() == 0 {
t.assertNoPendingRequests()
}
return
}
+func (t *Torrent) numActivePeers() (num int) {
+ t.iterPeers(func(*peer) {
+ num++
+ })
+ return
+}
+
func (t *Torrent) assertNoPendingRequests() {
if len(t.pendingRequests) != 0 {
panic(t.pendingRequests)
sl := func() torrentTrackerAnnouncer {
switch u.Scheme {
case "ws", "wss":
+ if t.cl.config.DisableWebtorrent {
+ return nil
+ }
return t.startWebsocketAnnouncer(*u)
}
if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
// Can't do anything with this.
continue
}
- t.addPeer(Peer{
+ t.addPeer(PeerInfo{
Addr: ipPortAddr{cp.IP, cp.Port},
Source: PeerSourceDhtGetPeers,
})
}
}
-func (t *Torrent) addPeers(peers []Peer) (added int) {
+func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
for _, p := range peers {
if t.addPeer(p) {
added++
for addr := range t.halfOpen {
peers[addr] = struct{}{}
}
- t.peers.Each(func(peer Peer) {
+ t.peers.Each(func(peer PeerInfo) {
peers[peer.Addr.String()] = struct{}{}
})
return len(peers)
defer t.cl.unlock()
oldMax = t.maxEstablishedConns
t.maxEstablishedConns = max
- wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
+ wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
+ return worseConn(&l.peer, &r.peer)
+ })
for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
t.dropConnection(wcs.Pop().(*PeerConn))
}
c.stats().incrementPiecesDirtiedBad()
}
- bannableTouchers := make([]*PeerConn, 0, len(p.dirtiers))
+ bannableTouchers := make([]*peer, 0, len(p.dirtiers))
for c := range p.dirtiers {
if !c.trusted {
bannableTouchers = append(bannableTouchers, c)
// c.drop()
// }
// }
- for conn := range t.conns {
+ t.iterPeers(func(conn *peer) {
if conn.peerHasPiece(piece) {
conn.updateRequests()
}
- }
+ })
}
func (t *Torrent) tryCreateMorePieceHashers() {
}
}
-func (t *Torrent) connsAsSlice() (ret []*PeerConn) {
- for c := range t.conns {
- ret = append(ret, c)
- }
+func (t *Torrent) peersAsSlice() (ret []*peer) {
+ t.iterPeers(func(p *peer) {
+ ret = append(ret, p)
+ })
return
}
}
// Start the process of connecting to the given peer for the given torrent if appropriate.
-func (t *Torrent) initiateConn(peer Peer) {
+func (t *Torrent) initiateConn(peer PeerInfo) {
if peer.Id == t.cl.peerID {
return
}
// Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
// quickly make one Client visible to the Torrent of another Client.
func (t *Torrent) AddClientPeer(cl *Client) int {
- return t.AddPeers(func() (ps []Peer) {
+ return t.AddPeers(func() (ps []PeerInfo) {
for _, la := range cl.ListenAddrs() {
- ps = append(ps, Peer{
+ ps = append(ps, PeerInfo{
Addr: la,
Trusted: true,
})
torrent.Add("request timeouts", 1)
cb.t.cl.lock()
defer cb.t.cl.unlock()
- for cn := range cb.t.conns {
+ cb.t.iterPeers(func(cn *peer) {
if cn.peerHasPiece(pieceIndex(r.Index)) {
cn.updateRequests()
}
- }
+ })
}
func (t *Torrent) disallowDataDownloadLocked() {
log.Printf("disallowing data download")
t.dataDownloadDisallowed = true
- for c := range t.conns {
+ t.iterPeers(func(c *peer) {
c.updateRequests()
- }
+ })
}
func (t *Torrent) AllowDataDownload() {
defer t.cl.unlock()
log.Printf("AllowDataDownload")
t.dataDownloadDisallowed = false
- for c := range t.conns {
+ t.iterPeers(func(c *peer) {
c.updateRequests()
- }
-
+ })
}
func (t *Torrent) AllowDataUpload() {
defer t.cl.unlock()
t.userOnWriteChunkErr = f
}
+
+func (t *Torrent) iterPeers(f func(*peer)) {
+ for pc := range t.conns {
+ f(&pc.peer)
+ }
+ for _, ws := range t.webSeeds {
+ f(ws)
+ }
+}
+
+func (t *Torrent) addWebSeed(url string) {
+ if t.cl.config.DisableWebseeds {
+ return
+ }
+ if _, ok := t.webSeeds[url]; ok {
+ return
+ }
+ const maxRequests = 10
+ ws := webSeed{
+ peer: peer{
+ t: t,
+ connString: url,
+ outgoing: true,
+ network: "http",
+ reconciledHandshakeStats: true,
+ peerSentHaveAll: true,
+ PeerMaxRequests: maxRequests,
+ },
+ client: webseed.Client{
+ HttpClient: http.DefaultClient,
+ Url: url,
+ },
+ requests: make(map[request]webseed.Request, maxRequests),
+ }
+ ws.peer.peerImpl = &ws
+ if t.haveInfo() {
+ ws.onGotInfo(t.info)
+ }
+ t.webSeeds[url] = &ws.peer
+}
+
+func (t *Torrent) peerIsActive(p *peer) (active bool) {
+ t.iterPeers(func(p1 *peer) {
+ if p1 == p {
+ active = true
+ }
+ })
+ return
+}