X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=torrent.go;h=5f6ddf394e5684282bf4a520f7e9c5583aba6256;hb=050e5cbab8e26a9b28e57a7bc2a964b6d2ceb899;hp=285f5cc804cb07e159aed181c42aee2eff97c4e6;hpb=e86e624415db323121e8b4c0824622cdc03bc874;p=btrtrc.git diff --git a/torrent.go b/torrent.go index 285f5cc8..5f6ddf39 100644 --- a/torrent.go +++ b/torrent.go @@ -17,8 +17,6 @@ import ( "time" "unsafe" - utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" - "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" @@ -33,14 +31,16 @@ import ( "github.com/anacrolix/missinggo/v2/pubsub" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" - "github.com/davecgh/go-spew/spew" "github.com/pion/datachannel" "golang.org/x/exp/maps" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/common" + "github.com/anacrolix/torrent/internal/check" + "github.com/anacrolix/torrent/internal/nestedmaps" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" + utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/segments" "github.com/anacrolix/torrent/storage" @@ -105,7 +105,7 @@ type Torrent struct { maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. - halfOpen map[string]PeerInfo + halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with @@ -172,6 +172,8 @@ type Torrent struct { requestIndexes []RequestIndex } +type outgoingConnAttemptKey = *PeerInfo + func (t *Torrent) length() int64 { return t._length.Value } @@ -236,8 +238,10 @@ func (t *Torrent) KnownSwarm() (ks []PeerInfo) { }) // Add half-open peers to the list - for _, peer := range t.halfOpen { - ks = append(ks, peer) + for _, attempts := range t.halfOpen { + for _, peer := range attempts { + ks = append(ks, *peer) + } } // Add active peers to the list @@ -760,8 +764,7 @@ func (t *Torrent) writeStatus(w io.Writer) { fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces) - spew.NewDefaultConfig() - spew.Fdump(w, t.statsLocked()) + dumpStats(w, t.statsLocked()) fmt.Fprintf(w, "webseeds:\n") t.writePeerStatuses(w, maps.Values(t.webSeeds)) @@ -780,7 +783,7 @@ func (t *Torrent) writeStatus(w io.Writer) { return ml.Less() }) - fmt.Fprintf(w, "peer conns:\n") + fmt.Fprintf(w, "%v peer conns:\n", len(peerConns)) t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer { return &pc.Peer })) @@ -827,7 +830,10 @@ func (t *Torrent) newMetaInfo() metainfo.MetaInfo { } } -// Get bytes left +// Returns a count of bytes that are not complete in storage, and not pending being written to +// storage. This value is from the perspective of the download manager, and may not agree with the +// actual state in storage. If you want read data synchronously you should use a Reader. See +// https://github.com/anacrolix/torrent/issues/828. func (t *Torrent) BytesMissing() (n int64) { t.cl.rLock() n = t.bytesMissingLocked() @@ -1054,7 +1060,7 @@ func (t *Torrent) havePiece(index pieceIndex) bool { func (t *Torrent) maybeDropMutuallyCompletePeer( // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's // okay? - p *Peer, + p *PeerConn, ) { if !t.cl.config.DropMutuallyCompletePeers { return @@ -1068,7 +1074,7 @@ func (t *Torrent) maybeDropMutuallyCompletePeer( if p.useful() { return } - t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p) + p.logger.Levelf(log.Debug, "is mutually complete; dropping") p.drop() } @@ -1378,7 +1384,15 @@ func (t *Torrent) openNewConns() (initiated int) { return } p := t.peers.PopMax() - t.initiateConn(p) + opts := outgoingConnOpts{ + peerInfo: p, + t: t, + requireRendezvous: false, + skipHolepunchRendezvous: false, + receivedHolepunchConnect: false, + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, false) initiated++ } return @@ -1556,7 +1570,7 @@ func (t *Torrent) decPeerPieceAvailability(p *Peer) { } func (t *Torrent) assertPendingRequests() { - if !check { + if !check.Enabled { return } // var actual pendingRequests @@ -1704,10 +1718,9 @@ func (t *Torrent) startScrapingTracker(_url string) { } u, err := url.Parse(_url) if err != nil { - // URLs with a leading '*' appear to be a uTorrent convention to - // disable trackers. + // URLs with a leading '*' appear to be a uTorrent convention to disable trackers. if _url[0] != '*' { - log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger) + t.logger.Levelf(log.Warning, "error parsing tracker url: %v", err) } return } @@ -2001,8 +2014,9 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { } t.conns[c] = struct{}{} t.cl.event.Broadcast() + // We'll never receive the "p" extended handshake parameter. if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() { - t.pex.Add(c) // as no further extended handshake expected + t.pex.Add(c) } return nil } @@ -2174,7 +2188,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { if len(bannableTouchers) >= 1 { c := bannableTouchers[0] if len(bannableTouchers) != 1 { - t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece) + t.logger.Levelf(log.Debug, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece) } else { // Turns out it's still useful to ban peers like this because if there's only a // single peer for a piece, and we never progress that piece to completion, we @@ -2205,7 +2219,7 @@ func (t *Torrent) onPieceCompleted(piece pieceIndex) { t.piece(piece).readerCond.Broadcast() for conn := range t.conns { conn.have(piece) - t.maybeDropMutuallyCompletePeer(&conn.Peer) + t.maybeDropMutuallyCompletePeer(conn) } } @@ -2359,9 +2373,45 @@ func (t *Torrent) VerifyData() { } } +func (t *Torrent) connectingToPeerAddr(addrStr string) bool { + return len(t.halfOpen[addrStr]) != 0 +} + +func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool { + addrStr := x.String() + for c := range t.conns { + ra := c.RemoteAddr + if ra.String() == addrStr { + return true + } + } + return false +} + +func (t *Torrent) getHalfOpenPath( + addrStr string, + attemptKey outgoingConnAttemptKey, +) nestedmaps.Path[*PeerInfo] { + return nestedmaps.Next(nestedmaps.Next(nestedmaps.Begin(&t.halfOpen), addrStr), attemptKey) +} + +func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) { + path := t.getHalfOpenPath(addrStr, attemptKey) + if path.Exists() { + panic("should be unique") + } + path.Set(attemptKey) + t.cl.numHalfOpen++ +} + // Start the process of connecting to the given peer for the given torrent if appropriate. I'm not // sure all the PeerInfo fields are being used. -func (t *Torrent) initiateConn(peer PeerInfo) { +func initiateConn( + opts outgoingConnOpts, + ignoreLimits bool, +) { + t := opts.t + peer := opts.peerInfo if peer.Id == t.cl.peerID { return } @@ -2369,12 +2419,21 @@ func (t *Torrent) initiateConn(peer PeerInfo) { return } addr := peer.Addr - if t.addrActive(addr.String()) { + addrStr := addr.String() + if !ignoreLimits { + if t.connectingToPeerAddr(addrStr) { + return + } + } + if t.hasPeerConnForAddr(addr) { return } - t.cl.numHalfOpen++ - t.halfOpen[addr.String()] = peer - go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted) + attemptKey := &peer + t.addHalfOpen(addrStr, attemptKey) + go t.cl.outgoingConnection( + opts, + attemptKey, + ) } // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to @@ -2395,7 +2454,7 @@ func (t *Torrent) AddClientPeer(cl *Client) int { // connection. func (t *Torrent) allStats(f func(*ConnStats)) { f(&t.stats) - f(&t.cl.stats) + f(&t.cl.connStats) } func (t *Torrent) hashingPiece(i pieceIndex) bool { @@ -2668,10 +2727,13 @@ func (t *Torrent) checkValidReceiveChunk(r Request) error { return nil } -func (t *Torrent) peerConnsWithRemoteAddrPort(addrPort netip.AddrPort) (ret []*PeerConn) { +func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) { for pc := range t.conns { - addr := pc.remoteAddrPort() - if !(addr.Ok && addr.Value == addrPort) { + dialAddr, err := pc.remoteDialAddrPort() + if err != nil { + continue + } + if dialAddr != target { continue } ret = append(ret, pc) @@ -2679,18 +2741,11 @@ func (t *Torrent) peerConnsWithRemoteAddrPort(addrPort netip.AddrPort) (ret []*P return } -func makeUtHolepunchMsgForPeerConn( +func wrapUtHolepunchMsgForPeerConn( recipient *PeerConn, - msgType utHolepunch.MsgType, - addrPort netip.AddrPort, - errCode utHolepunch.ErrCode, + msg utHolepunch.Msg, ) pp.Message { - utHolepunchMsg := utHolepunch.Msg{ - MsgType: msgType, - AddrPort: addrPort, - ErrCode: errCode, - } - extendedPayload, err := utHolepunchMsg.MarshalBinary() + extendedPayload, err := msg.MarshalBinary() if err != nil { panic(err) } @@ -2701,21 +2756,63 @@ func makeUtHolepunchMsgForPeerConn( } } +func sendUtHolepunchMsg( + pc *PeerConn, + msgType utHolepunch.MsgType, + addrPort netip.AddrPort, + errCode utHolepunch.ErrCode, +) { + holepunchMsg := utHolepunch.Msg{ + MsgType: msgType, + AddrPort: addrPort, + ErrCode: errCode, + } + incHolepunchMessagesSent(holepunchMsg) + ppMsg := wrapUtHolepunchMsgForPeerConn(pc, holepunchMsg) + pc.write(ppMsg) +} + +func incHolepunchMessages(msg utHolepunch.Msg, verb string) { + torrent.Add( + fmt.Sprintf( + "holepunch %v %v messages %v", + msg.MsgType, + addrPortProtocolStr(msg.AddrPort), + verb, + ), + 1, + ) +} + +func incHolepunchMessagesReceived(msg utHolepunch.Msg) { + incHolepunchMessages(msg, "received") +} + +func incHolepunchMessagesSent(msg utHolepunch.Msg) { + incHolepunchMessages(msg, "sent") +} + func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error { + incHolepunchMessagesReceived(msg) switch msg.MsgType { case utHolepunch.Rendezvous: - sendMsg := func( - pc *PeerConn, - msgType utHolepunch.MsgType, - addrPort netip.AddrPort, - errCode utHolepunch.ErrCode, - ) { - pc.write(makeUtHolepunchMsgForPeerConn(pc, msgType, addrPort, errCode)) - } - targets := t.peerConnsWithRemoteAddrPort(msg.AddrPort) + t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender) + sendMsg := sendUtHolepunchMsg + senderAddrPort, err := sender.remoteDialAddrPort() + if err != nil { + sender.logger.Levelf( + log.Warning, + "error getting ut_holepunch rendezvous sender's dial address: %v", + err, + ) + // There's no better error code. The sender's address itself is invalid. I don't see + // this error message being appropriate anywhere else anyway. + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer) + } + targets := t.peerConnsWithDialAddrPort(msg.AddrPort) if len(targets) == 0 { sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected) - break + return nil } for _, pc := range targets { if !pc.supportsExtension(utHolepunch.ExtensionName) { @@ -2723,16 +2820,86 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer continue } sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0) - sendMsg(pc, utHolepunch.Connect, sender.remoteAddrPort().Unwrap(), 0) + sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0) } + return nil case utHolepunch.Connect: - t.initiateConn(PeerInfo{ - Addr: msg.AddrPort, - Source: PeerSourceUtHolepunch, - }) + holepunchAddr := msg.AddrPort + t.logger.Printf("got holepunch connect request for %v from %p", holepunchAddr, sender) + if g.MapContains(t.cl.undialableWithoutHolepunch, holepunchAddr) { + setAdd(&t.cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, holepunchAddr) + if g.MapContains(t.cl.accepted, holepunchAddr) { + setAdd(&t.cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr) + } + } + opts := outgoingConnOpts{ + peerInfo: PeerInfo{ + Addr: msg.AddrPort, + Source: PeerSourceUtHolepunch, + PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(), + }, + t: t, + // Don't attempt to start our own rendezvous if we fail to connect. + skipHolepunchRendezvous: true, + receivedHolepunchConnect: true, + // Assume that the other end initiated the rendezvous, and will use our preferred + // encryption. So we will act normally. + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, true) + return nil case utHolepunch.Error: - + torrent.Add("holepunch error messages received", 1) + t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode) + return nil default: return fmt.Errorf("unhandled msg type %v", msg.MsgType) } } + +func addrPortProtocolStr(addrPort netip.AddrPort) string { + addr := addrPort.Addr() + switch { + case addr.Is4(): + return "ipv4" + case addr.Is6(): + return "ipv6" + default: + panic(addrPort) + } +} + +func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error { + rzsSent := 0 + for pc := range t.conns { + if !pc.supportsExtension(utHolepunch.ExtensionName) { + continue + } + if pc.supportsExtension(pp.ExtensionNamePex) { + if !g.MapContains(pc.pex.remoteLiveConns, addrPort) { + continue + } + } + t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort) + sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0) + rzsSent++ + } + if rzsSent == 0 { + return errors.New("no eligible relays") + } + return nil +} + +func (t *Torrent) numHalfOpenAttempts() (num int) { + for _, attempts := range t.halfOpen { + num += len(attempts) + } + return +} + +func (t *Torrent) getDialTimeoutUnlocked() time.Duration { + cl := t.cl + cl.rLock() + defer cl.rUnlock() + return t.dialTimeout() +}