import (
"bytes"
"container/heap"
- "context"
"crypto/sha1"
"errors"
"fmt"
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
"github.com/davecgh/go-spew/spew"
- "github.com/pion/datachannel"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/tracker"
typedRoaring "github.com/anacrolix/torrent/typed-roaring"
"github.com/anacrolix/torrent/webseed"
- "github.com/anacrolix/torrent/webtorrent"
)
// Maintains state of torrent within a Client. Many methods should not be called before the info is
}
return worseConn(i, j)
})
+ var buf bytes.Buffer
for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1)
- c.writeStatus(w, t)
+ buf.Reset()
+ c.writeStatus(&buf, t)
+ w.Write(bytes.TrimRight(
+ bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")),
+ " "))
}
}
if t.storage != nil {
t.deletePieceRequestOrder()
}
+ t.assertAllPiecesRelativeAvailabilityZero()
+ t.pex.Reset()
+ t.cl.event.Broadcast()
+ t.pieceStateChanges.Close()
+ t.updateWantPeersEvent()
+ return
+}
+
+func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
for i := range t.pieces {
p := t.piece(i)
if p.relativeAvailability != 0 {
panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
}
}
- t.pex.Reset()
- t.cl.event.Broadcast()
- t.pieceStateChanges.Close()
- t.updateWantPeersEvent()
- return
}
func (t *Torrent) requestOffset(r Request) int64 {
return true
}
-func (t *Torrent) onWebRtcConn(
- c datachannel.ReadWriteCloser,
- dcc webtorrent.DataChannelContext,
-) {
- defer c.Close()
- netConn := webrtcNetConn{
- ReadWriteCloser: c,
- DataChannelContext: dcc,
- }
- peerRemoteAddr := netConn.RemoteAddr()
- //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
- if t.cl.badPeerAddr(peerRemoteAddr) {
- return
- }
- localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
- pc, err := t.cl.initiateProtocolHandshakes(
- context.Background(),
- netConn,
- t,
- false,
- newConnectionOpts{
- outgoing: dcc.LocalOffered,
- remoteAddr: peerRemoteAddr,
- localPublicAddr: localAddrIpPort,
- network: webrtcNetwork,
- connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
- },
- )
- if err != nil {
- t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
- return
- }
- if dcc.LocalOffered {
- pc.Discovery = PeerSourceTracker
- } else {
- pc.Discovery = PeerSourceIncoming
- }
- pc.conn.SetWriteDeadline(time.Time{})
- t.cl.lock()
- defer t.cl.unlock()
- err = t.cl.runHandshookConn(pc, t)
- if err != nil {
- t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err)
- }
-}
-
func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
err := t.cl.runHandshookConn(pc, t)
if err != nil || logAll {
t.logRunHandshookConn(pc, false, log.Debug)
}
-func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
- wtc, release := t.cl.websocketTrackers.Get(u.String(), t.infoHash)
- // This needs to run before the Torrent is dropped from the Client, to prevent a new webtorrent.TrackerClient for
- // the same info hash before the old one is cleaned up.
- t.onClose = append(t.onClose, release)
- wst := websocketTrackerStatus{u, wtc}
- go func() {
- err := wtc.Announce(tracker.Started, t.infoHash)
- if err != nil {
- t.logger.WithDefaultLevel(log.Warning).Printf(
- "error in initial announce to %q: %v",
- u.String(), err,
- )
- }
- }()
- return wst
-}
-
func (t *Torrent) startScrapingTracker(_url string) {
if _url == "" {
return
}
sl := func() torrentTrackerAnnouncer {
switch u.Scheme {
- case "ws", "wss":
- if t.cl.config.DisableWebtorrent {
- return nil
- }
- return t.startWebsocketAnnouncer(*u)
case "udp4":
if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
return nil
},
},
activeRequests: make(map[Request]webseed.Request, maxRequests),
- maxRequests: maxRequests,
}
ws.peer.initRequestState()
for _, opt := range opts {