]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peerconn.go
Ability to override fifos/
[btrtrc.git] / peerconn.go
index 0b2d85f650c11a31f3370c80ae24039d85df7edd..322e7625ba37b59884dbc7051a23b897893dc1e4 100644 (file)
@@ -20,6 +20,8 @@ import (
        "github.com/anacrolix/missinggo/iter"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/multiless"
+       "golang.org/x/time/rate"
+
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
@@ -63,10 +65,13 @@ type Peer struct {
        peerImpl
        callbacks *Callbacks
 
-       outgoing     bool
-       Network      string
-       RemoteAddr   PeerRemoteAddr
-       bannableAddr Option[bannableAddr]
+       outgoing   bool
+       Network    string
+       RemoteAddr PeerRemoteAddr
+       // The local address as observed by the remote peer. WebRTC seems to get this right without needing hints from the
+       // config.
+       localPublicAddr peerLocalPublicAddr
+       bannableAddr    Option[bannableAddr]
        // True if the connection is operating over MSE obfuscation.
        headerEncrypted bool
        cryptoMethod    mse.CryptoMethod
@@ -166,8 +171,8 @@ type PeerConn struct {
        peerSentHaveAll bool
 }
 
-func (cn *PeerConn) connStatusString() string {
-       return fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)
+func (cn *PeerConn) peerImplStatusLines() []string {
+       return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)}
 }
 
 func (cn *Peer) updateExpectingChunks() {
@@ -278,6 +283,10 @@ func (cn *Peer) completedString() string {
        return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
 }
 
+func (cn *Peer) CompletedString() string {
+       return cn.completedString()
+}
+
 func (cn *PeerConn) onGotInfo(info *metainfo.Info) {
        cn.setNumPieces(info.NumPieces())
 }
@@ -343,6 +352,10 @@ func (cn *Peer) statusFlags() (ret string) {
        return
 }
 
+func (cn *Peer) StatusFlags() string {
+       return cn.statusFlags()
+}
+
 func (cn *Peer) downloadRate() float64 {
        num := cn._stats.BytesReadUsefulData.Int64()
        if num == 0 {
@@ -351,6 +364,24 @@ func (cn *Peer) downloadRate() float64 {
        return float64(num) / cn.totalExpectingTime().Seconds()
 }
 
+func (cn *Peer) DownloadRate() float64 {
+       cn.locker().RLock()
+       defer cn.locker().RUnlock()
+
+       return cn.downloadRate()
+}
+
+func (cn *Peer) UploadRate() float64 {
+       cn.locker().RLock()
+       defer cn.locker().RUnlock()
+       num := cn._stats.BytesWrittenData.Int64()
+       if num == 0 {
+               return 0
+       }
+       return float64(num) / time.Now().Sub(cn.completedHandshake).Seconds()
+}
+
+
 func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
        var last Option[pieceIndex]
        var count int
@@ -359,7 +390,7 @@ func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int))
                        count++
                } else {
                        if count != 0 {
-                               f(last.Value(), count)
+                               f(last.Value, count)
                        }
                        last = item
                        count = 1
@@ -377,14 +408,14 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
        if cn.closed.IsSet() {
                fmt.Fprint(w, "CLOSED: ")
        }
-       fmt.Fprintln(w, cn.connStatusString())
+       fmt.Fprintln(w, strings.Join(cn.peerImplStatusLines(), "\n"))
        prio, err := cn.peerPriority()
        prioStr := fmt.Sprintf("%08x", prio)
        if err != nil {
                prioStr += ": " + err.Error()
        }
-       fmt.Fprintf(w, "    bep40-prio: %v\n", prioStr)
-       fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
+       fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
+       fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
                eventAgeString(cn.lastMessageReceived),
                eventAgeString(cn.completedHandshake),
                eventAgeString(cn.lastHelpful()),
@@ -392,7 +423,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.totalExpectingTime(),
        )
        fmt.Fprintf(w,
-               "    %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
+               "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
                cn.completedString(),
                len(cn.peerTouchedPieces),
                &cn._stats.ChunksReadUseful,
@@ -407,7 +438,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.statusFlags(),
                cn.downloadRate()/(1<<10),
        )
-       fmt.Fprintf(w, "    requested pieces:")
+       fmt.Fprintf(w, "requested pieces:")
        cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
                fmt.Fprintf(w, " %v(%v)", piece, count)
        })
@@ -503,7 +534,7 @@ var (
 
 // The actual value to use as the maximum outbound requests.
 func (cn *Peer) nominalMaxRequests() maxRequests {
-       return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)))
+       return maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))
 }
 
 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@@ -584,6 +615,10 @@ type messageWriter func(pp.Message) bool
 // This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
 // when we want to go fast.
 func (cn *Peer) shouldRequest(r RequestIndex) error {
+       err := cn.t.checkValidReceiveChunk(cn.t.requestIndexToRequest(r))
+       if err != nil {
+               return err
+       }
        pi := cn.t.pieceIndexOfRequestIndex(r)
        if cn.requestState.Cancelled.Contains(r) {
                return errors.New("request is cancelled and waiting acknowledgement")
@@ -979,10 +1014,22 @@ func (c *PeerConn) reject(r Request) {
        delete(c.peerRequests, r)
 }
 
-func (c *PeerConn) onReadRequest(r Request) error {
+func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {
+       uploadRateLimiter := c.t.cl.config.UploadRateLimiter
+       if uploadRateLimiter.Limit() == rate.Inf {
+               return
+       }
+       return Some(uploadRateLimiter.Burst())
+}
+
+// startFetch is for testing purposes currently.
+func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
        requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
        if _, ok := c.peerRequests[r]; ok {
                torrent.Add("duplicate requests received", 1)
+               if c.fastEnabled() {
+                       return errors.New("received duplicate request with fast enabled")
+               }
                return nil
        }
        if c.choking {
@@ -1002,10 +1049,18 @@ func (c *PeerConn) onReadRequest(r Request) error {
                // BEP 6 says we may close here if we choose.
                return nil
        }
+       if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value {
+               err := fmt.Errorf("peer requested chunk too long (%v)", r.Length)
+               c.logger.Levelf(log.Warning, err.Error())
+               if c.fastEnabled() {
+                       c.reject(r)
+                       return nil
+               } else {
+                       return err
+               }
+       }
        if !c.t.havePiece(pieceIndex(r.Index)) {
-               // This isn't necessarily them screwing up. We can drop pieces
-               // from our storage, and can't communicate this to peers
-               // except by reconnecting.
+               // TODO: Tell the peer we don't have the piece, and reject this request.
                requestsReceivedForMissingPieces.Add(1)
                return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
        }
@@ -1019,7 +1074,10 @@ func (c *PeerConn) onReadRequest(r Request) error {
        }
        value := &peerRequestState{}
        c.peerRequests[r] = value
-       go c.peerRequestDataReader(r, value)
+       if startFetch {
+               // TODO: Limit peer request data read concurrency.
+               go c.peerRequestDataReader(r, value)
+       }
        return nil
 }
 
@@ -1035,6 +1093,7 @@ func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
                }
                torrent.Add("peer request data read successes", 1)
                prs.data = b
+               // This might be required for the error case too (#752 and #753).
                c.tickleWriter()
        }
 }
@@ -1214,7 +1273,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        err = c.peerSentBitfield(msg.Bitfield)
                case pp.Request:
                        r := newRequestFromMessage(&msg)
-                       err = c.onReadRequest(r)
+                       err = c.onReadRequest(r, true)
                case pp.Piece:
                        c.doChunkReadStats(int64(len(msg.Piece)))
                        err = c.receiveChunk(&msg)
@@ -1389,11 +1448,16 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        chunksReceived.Add("total", 1)
 
        ppReq := newRequestFromMessage(msg)
-       req := c.t.requestIndexFromRequest(ppReq)
        t := c.t
+       err := t.checkValidReceiveChunk(ppReq)
+       if err != nil {
+               err = log.WithLevel(log.Warning, err)
+               return err
+       }
+       req := c.t.requestIndexFromRequest(ppReq)
 
-       if c.bannableAddr.Ok() {
-               t.smartBanCache.RecordBlock(c.bannableAddr.Value(), req, msg.Piece)
+       if c.bannableAddr.Ok {
+               t.smartBanCache.RecordBlock(c.bannableAddr.Value, req, msg.Piece)
        }
 
        if c.peerChoking {
@@ -1472,7 +1536,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                p.cancel(req)
        }
 
-       err := func() error {
+       err = func() error {
                cl.unlock()
                defer cl.lock()
                concurrentChunkWrites.Add(1)
@@ -1628,7 +1692,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
        if c.t.requestingPeer(r) != c {
                panic("only one peer should have a given request at a time")
        }
-       c.t.requestState[r] = requestState{}
+       delete(c.t.requestState, r)
        // c.t.iterPeers(func(p *Peer) {
        //      if p.isLowOnRequests() {
        //              p.updateRequests("Peer.deleteRequest")
@@ -1697,7 +1761,7 @@ func (c *PeerConn) setTorrent(t *Torrent) {
 }
 
 func (c *Peer) peerPriority() (peerPriority, error) {
-       return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
+       return bep40Priority(c.remoteIpPort(), c.localPublicAddr)
 }
 
 func (c *Peer) remoteIp() net.IP {
@@ -1791,6 +1855,10 @@ func (cn *Peer) stats() *ConnStats {
        return &cn._stats
 }
 
+func (cn *Peer) Stats() *ConnStats {
+       return cn.stats()
+}
+
 func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
        pc, ok := p.peerImpl.(*PeerConn)
        return pc, ok