]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
No Web*
[btrtrc.git] / torrent.go
index 157b44d4fa3a54d6c210df1eacb0a1b494693b41..b15b980326584a6bf0289835960cf635c2cae7ac 100644 (file)
@@ -3,7 +3,6 @@ package torrent
 import (
        "bytes"
        "container/heap"
-       "context"
        "crypto/sha1"
        "errors"
        "fmt"
@@ -30,20 +29,18 @@ import (
        "github.com/anacrolix/missinggo/v2/pubsub"
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/sync"
-       request_strategy "github.com/anacrolix/torrent/request-strategy"
-       typedRoaring "github.com/anacrolix/torrent/typed-roaring"
        "github.com/davecgh/go-spew/spew"
-       "github.com/pion/datachannel"
 
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/common"
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/anacrolix/torrent/segments"
        "github.com/anacrolix/torrent/storage"
        "github.com/anacrolix/torrent/tracker"
+       typedRoaring "github.com/anacrolix/torrent/typed-roaring"
        "github.com/anacrolix/torrent/webseed"
-       "github.com/anacrolix/torrent/webtorrent"
 )
 
 // Maintains state of torrent within a Client. Many methods should not be called before the info is
@@ -535,7 +532,7 @@ func (t *Torrent) setMetadataSize(size int) (err error) {
                return
        }
        if uint32(size) > maxMetadataSize {
-               return errors.New("bad size")
+               return log.WithLevel(log.Warning, errors.New("bad size"))
        }
        if len(t.metadataBytes) == size {
                return
@@ -770,9 +767,14 @@ func (t *Torrent) writeStatus(w io.Writer) {
                }
                return worseConn(i, j)
        })
+       var buf bytes.Buffer
        for i, c := range peers {
                fmt.Fprintf(w, "%2d. ", i+1)
-               c.writeStatus(w, t)
+               buf.Reset()
+               c.writeStatus(&buf, t)
+               w.Write(bytes.TrimRight(
+                       bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n    ")),
+                       " "))
        }
 }
 
@@ -889,17 +891,21 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
        if t.storage != nil {
                t.deletePieceRequestOrder()
        }
+       t.assertAllPiecesRelativeAvailabilityZero()
+       t.pex.Reset()
+       t.cl.event.Broadcast()
+       t.pieceStateChanges.Close()
+       t.updateWantPeersEvent()
+       return
+}
+
+func (t *Torrent) assertAllPiecesRelativeAvailabilityZero() {
        for i := range t.pieces {
                p := t.piece(i)
                if p.relativeAvailability != 0 {
                        panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
                }
        }
-       t.pex.Reset()
-       t.cl.event.Broadcast()
-       t.pieceStateChanges.Close()
-       t.updateWantPeersEvent()
-       return
 }
 
 func (t *Torrent) requestOffset(r Request) int64 {
@@ -1565,56 +1571,10 @@ func (t *Torrent) seeding() bool {
        return true
 }
 
-func (t *Torrent) onWebRtcConn(
-       c datachannel.ReadWriteCloser,
-       dcc webtorrent.DataChannelContext,
-) {
-       defer c.Close()
-       netConn := webrtcNetConn{
-               ReadWriteCloser:    c,
-               DataChannelContext: dcc,
-       }
-       peerRemoteAddr := netConn.RemoteAddr()
-       //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
-       if t.cl.badPeerAddr(peerRemoteAddr) {
-               return
-       }
-       localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
-       pc, err := t.cl.initiateProtocolHandshakes(
-               context.Background(),
-               netConn,
-               t,
-               false,
-               newConnectionOpts{
-                       outgoing:        dcc.LocalOffered,
-                       remoteAddr:      peerRemoteAddr,
-                       localPublicAddr: localAddrIpPort,
-                       network:         webrtcNetwork,
-                       connString:      fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
-               },
-       )
-       if err != nil {
-               t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
-               return
-       }
-       if dcc.LocalOffered {
-               pc.Discovery = PeerSourceTracker
-       } else {
-               pc.Discovery = PeerSourceIncoming
-       }
-       pc.conn.SetWriteDeadline(time.Time{})
-       t.cl.lock()
-       defer t.cl.unlock()
-       err = t.cl.runHandshookConn(pc, t)
-       if err != nil {
-               t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err)
-       }
-}
-
 func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
        err := t.cl.runHandshookConn(pc, t)
        if err != nil || logAll {
-               t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
+               t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err)
        }
 }
 
@@ -1622,24 +1582,6 @@ func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
        t.logRunHandshookConn(pc, false, log.Debug)
 }
 
-func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
-       wtc, release := t.cl.websocketTrackers.Get(u.String(), t.infoHash)
-       // This needs to run before the Torrent is dropped from the Client, to prevent a new webtorrent.TrackerClient for
-       // the same info hash before the old one is cleaned up.
-       t.onClose = append(t.onClose, release)
-       wst := websocketTrackerStatus{u, wtc}
-       go func() {
-               err := wtc.Announce(tracker.Started, t.infoHash)
-               if err != nil {
-                       t.logger.WithDefaultLevel(log.Warning).Printf(
-                               "error in initial announce to %q: %v",
-                               u.String(), err,
-                       )
-               }
-       }()
-       return wst
-}
-
 func (t *Torrent) startScrapingTracker(_url string) {
        if _url == "" {
                return
@@ -1665,11 +1607,6 @@ func (t *Torrent) startScrapingTracker(_url string) {
        }
        sl := func() torrentTrackerAnnouncer {
                switch u.Scheme {
-               case "ws", "wss":
-                       if t.cl.config.DisableWebtorrent {
-                               return nil
-                       }
-                       return t.startWebsocketAnnouncer(*u)
                case "udp4":
                        if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
                                return nil
@@ -1719,7 +1656,7 @@ func (t *Torrent) announceRequest(event tracker.AnnounceEvent) tracker.AnnounceR
                Event: event,
                NumWant: func() int32 {
                        if t.wantPeers() && len(t.cl.dialers) > 0 {
-                               return -1
+                               return 200 // Win has UDP packet limit. See: https://github.com/anacrolix/torrent/issues/764
                        } else {
                                return 0
                        }
@@ -2425,7 +2362,6 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
                        },
                },
                activeRequests: make(map[Request]webseed.Request, maxRequests),
-               maxRequests:    maxRequests,
        }
        ws.peer.initRequestState()
        for _, opt := range opts {
@@ -2541,3 +2477,22 @@ type requestState struct {
        peer *Peer
        when time.Time
 }
+
+// Returns an error if a received chunk is out of bounds in someway.
+func (t *Torrent) checkValidReceiveChunk(r Request) error {
+       if !t.haveInfo() {
+               return errors.New("torrent missing info")
+       }
+       if int(r.Index) >= t.numPieces() {
+               return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces())
+       }
+       pieceLength := t.pieceLength(pieceIndex(r.Index))
+       if r.Begin >= pieceLength {
+               return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength)
+       }
+       // We could check chunk lengths here, but chunk request size is not changed often, and tricky
+       // for peers to manipulate as they need to send potentially large buffers to begin with. There
+       // should be considerable checks elsewhere for this case due to the network overhead. We should
+       // catch most of the overflow manipulation stuff by checking index and begin above.
+       return nil
+}