X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=torrent.go;h=6385a3fc026f9a2474ddc029908ef236ab30713a;hb=HEAD;hp=2f6832e9703d943f15b7525b4052e81805007713;hpb=1e13625c7398094de923257f5855ef9af8d20681;p=btrtrc.git diff --git a/torrent.go b/torrent.go index 2f6832e9..6385a3fc 100644 --- a/torrent.go +++ b/torrent.go @@ -22,6 +22,7 @@ import ( "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" . "github.com/anacrolix/generics" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/slices" @@ -30,13 +31,16 @@ import ( "github.com/anacrolix/missinggo/v2/pubsub" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" - "github.com/davecgh/go-spew/spew" "github.com/pion/datachannel" + "golang.org/x/exp/maps" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/common" + "github.com/anacrolix/torrent/internal/check" + "github.com/anacrolix/torrent/internal/nestedmaps" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" + utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/segments" "github.com/anacrolix/torrent/storage" @@ -101,7 +105,7 @@ type Torrent struct { maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. - halfOpen map[string]PeerInfo + halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with @@ -168,6 +172,8 @@ type Torrent struct { requestIndexes []RequestIndex } +type outgoingConnAttemptKey = *PeerInfo + func (t *Torrent) length() int64 { return t._length.Value } @@ -232,8 +238,10 @@ func (t *Torrent) KnownSwarm() (ks []PeerInfo) { }) // Add half-open peers to the list - for _, peer := range t.halfOpen { - ks = append(ks, peer) + for _, attempts := range t.halfOpen { + for _, peer := range attempts { + ks = append(ks, *peer) + } } // Add active peers to the list @@ -756,27 +764,39 @@ func (t *Torrent) writeStatus(w io.Writer) { fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces) - spew.NewDefaultConfig() - spew.Fdump(w, t.statsLocked()) - - peers := t.peersAsSlice() - sort.Slice(peers, func(_i, _j int) bool { - i := peers[_i] - j := peers[_j] - if less, ok := multiless.New().EagerSameLess( - i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(), - ).LessOk(); ok { - return less - } - return worseConn(i, j) + dumpStats(w, t.statsLocked()) + + fmt.Fprintf(w, "webseeds:\n") + t.writePeerStatuses(w, maps.Values(t.webSeeds)) + + peerConns := maps.Keys(t.conns) + // Peers without priorities first, then those with. I'm undecided about how to order peers + // without priorities. + sort.Slice(peerConns, func(li, ri int) bool { + l := peerConns[li] + r := peerConns[ri] + ml := multiless.New() + lpp := g.ResultFromTuple(l.peerPriority()).ToOption() + rpp := g.ResultFromTuple(r.peerPriority()).ToOption() + ml = ml.Bool(lpp.Ok, rpp.Ok) + ml = ml.Uint32(rpp.Value, lpp.Value) + return ml.Less() }) + + fmt.Fprintf(w, "%v peer conns:\n", len(peerConns)) + t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer { + return &pc.Peer + })) +} + +func (t *Torrent) writePeerStatuses(w io.Writer, peers []*Peer) { var buf bytes.Buffer - for i, c := range peers { - fmt.Fprintf(w, "%2d. ", i+1) + for _, c := range peers { + fmt.Fprintf(w, "- ") buf.Reset() c.writeStatus(&buf) w.Write(bytes.TrimRight( - bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")), + bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")), " ")) } } @@ -810,7 +830,10 @@ func (t *Torrent) newMetaInfo() metainfo.MetaInfo { } } -// Get bytes left +// Returns a count of bytes that are not complete in storage, and not pending being written to +// storage. This value is from the perspective of the download manager, and may not agree with the +// actual state in storage. If you want read data synchronously you should use a Reader. See +// https://github.com/anacrolix/torrent/issues/828. func (t *Torrent) BytesMissing() (n int64) { t.cl.rLock() n = t.bytesMissingLocked() @@ -1037,7 +1060,7 @@ func (t *Torrent) havePiece(index pieceIndex) bool { func (t *Torrent) maybeDropMutuallyCompletePeer( // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's // okay? - p *Peer, + p *PeerConn, ) { if !t.cl.config.DropMutuallyCompletePeers { return @@ -1051,7 +1074,7 @@ func (t *Torrent) maybeDropMutuallyCompletePeer( if p.useful() { return } - t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p) + p.logger.Levelf(log.Debug, "is mutually complete; dropping") p.drop() } @@ -1361,7 +1384,15 @@ func (t *Torrent) openNewConns() (initiated int) { return } p := t.peers.PopMax() - t.initiateConn(p) + opts := outgoingConnOpts{ + peerInfo: p, + t: t, + requireRendezvous: false, + skipHolepunchRendezvous: false, + receivedHolepunchConnect: false, + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, false) initiated++ } return @@ -1539,7 +1570,7 @@ func (t *Torrent) decPeerPieceAvailability(p *Peer) { } func (t *Torrent) assertPendingRequests() { - if !check { + if !check.Enabled { return } // var actual pendingRequests @@ -1687,10 +1718,9 @@ func (t *Torrent) startScrapingTracker(_url string) { } u, err := url.Parse(_url) if err != nil { - // URLs with a leading '*' appear to be a uTorrent convention to - // disable trackers. + // URLs with a leading '*' appear to be a uTorrent convention to disable trackers. if _url[0] != '*' { - log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger) + t.logger.Levelf(log.Warning, "error parsing tracker url: %v", err) } return } @@ -1984,8 +2014,9 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { } t.conns[c] = struct{}{} t.cl.event.Broadcast() + // We'll never receive the "p" extended handshake parameter. if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() { - t.pex.Add(c) // as no further extended handshake expected + t.pex.Add(c) } return nil } @@ -2112,7 +2143,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } err := p.Storage().MarkComplete() if err != nil { - t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err) + t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err) } t.cl.lock() @@ -2157,7 +2188,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { if len(bannableTouchers) >= 1 { c := bannableTouchers[0] if len(bannableTouchers) != 1 { - t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece) + t.logger.Levelf(log.Debug, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece) } else { // Turns out it's still useful to ban peers like this because if there's only a // single peer for a piece, and we never progress that piece to completion, we @@ -2188,7 +2219,7 @@ func (t *Torrent) onPieceCompleted(piece pieceIndex) { t.piece(piece).readerCond.Broadcast() for conn := range t.conns { conn.have(piece) - t.maybeDropMutuallyCompletePeer(&conn.Peer) + t.maybeDropMutuallyCompletePeer(conn) } } @@ -2220,7 +2251,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { } func (t *Torrent) tryCreateMorePieceHashers() { - for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() { + for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() { } } @@ -2342,8 +2373,45 @@ func (t *Torrent) VerifyData() { } } -// Start the process of connecting to the given peer for the given torrent if appropriate. -func (t *Torrent) initiateConn(peer PeerInfo) { +func (t *Torrent) connectingToPeerAddr(addrStr string) bool { + return len(t.halfOpen[addrStr]) != 0 +} + +func (t *Torrent) hasPeerConnForAddr(x PeerRemoteAddr) bool { + addrStr := x.String() + for c := range t.conns { + ra := c.RemoteAddr + if ra.String() == addrStr { + return true + } + } + return false +} + +func (t *Torrent) getHalfOpenPath( + addrStr string, + attemptKey outgoingConnAttemptKey, +) nestedmaps.Path[*PeerInfo] { + return nestedmaps.Next(nestedmaps.Next(nestedmaps.Begin(&t.halfOpen), addrStr), attemptKey) +} + +func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) { + path := t.getHalfOpenPath(addrStr, attemptKey) + if path.Exists() { + panic("should be unique") + } + path.Set(attemptKey) + t.cl.numHalfOpen++ +} + +// Start the process of connecting to the given peer for the given torrent if appropriate. I'm not +// sure all the PeerInfo fields are being used. +func initiateConn( + opts outgoingConnOpts, + ignoreLimits bool, +) { + t := opts.t + peer := opts.peerInfo if peer.Id == t.cl.peerID { return } @@ -2351,12 +2419,21 @@ func (t *Torrent) initiateConn(peer PeerInfo) { return } addr := peer.Addr - if t.addrActive(addr.String()) { + addrStr := addr.String() + if !ignoreLimits { + if t.connectingToPeerAddr(addrStr) { + return + } + } + if t.hasPeerConnForAddr(addr) { return } - t.cl.numHalfOpen++ - t.halfOpen[addr.String()] = peer - go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted) + attemptKey := &peer + t.addHalfOpen(addrStr, attemptKey) + go t.cl.outgoingConnection( + opts, + attemptKey, + ) } // Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to @@ -2377,7 +2454,7 @@ func (t *Torrent) AddClientPeer(cl *Client) int { // connection. func (t *Torrent) allStats(f func(*ConnStats)) { f(&t.stats) - f(&t.cl.stats) + f(&t.cl.connStats) } func (t *Torrent) hashingPiece(i pieceIndex) bool { @@ -2649,3 +2726,180 @@ func (t *Torrent) checkValidReceiveChunk(r Request) error { // catch most of the overflow manipulation stuff by checking index and begin above. return nil } + +func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) { + for pc := range t.conns { + dialAddr, err := pc.remoteDialAddrPort() + if err != nil { + continue + } + if dialAddr != target { + continue + } + ret = append(ret, pc) + } + return +} + +func wrapUtHolepunchMsgForPeerConn( + recipient *PeerConn, + msg utHolepunch.Msg, +) pp.Message { + extendedPayload, err := msg.MarshalBinary() + if err != nil { + panic(err) + } + return pp.Message{ + Type: pp.Extended, + ExtendedID: MapMustGet(recipient.PeerExtensionIDs, utHolepunch.ExtensionName), + ExtendedPayload: extendedPayload, + } +} + +func sendUtHolepunchMsg( + pc *PeerConn, + msgType utHolepunch.MsgType, + addrPort netip.AddrPort, + errCode utHolepunch.ErrCode, +) { + holepunchMsg := utHolepunch.Msg{ + MsgType: msgType, + AddrPort: addrPort, + ErrCode: errCode, + } + incHolepunchMessagesSent(holepunchMsg) + ppMsg := wrapUtHolepunchMsgForPeerConn(pc, holepunchMsg) + pc.write(ppMsg) +} + +func incHolepunchMessages(msg utHolepunch.Msg, verb string) { + torrent.Add( + fmt.Sprintf( + "holepunch %v %v messages %v", + msg.MsgType, + addrPortProtocolStr(msg.AddrPort), + verb, + ), + 1, + ) +} + +func incHolepunchMessagesReceived(msg utHolepunch.Msg) { + incHolepunchMessages(msg, "received") +} + +func incHolepunchMessagesSent(msg utHolepunch.Msg) { + incHolepunchMessages(msg, "sent") +} + +func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error { + incHolepunchMessagesReceived(msg) + switch msg.MsgType { + case utHolepunch.Rendezvous: + t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender) + sendMsg := sendUtHolepunchMsg + senderAddrPort, err := sender.remoteDialAddrPort() + if err != nil { + sender.logger.Levelf( + log.Warning, + "error getting ut_holepunch rendezvous sender's dial address: %v", + err, + ) + // There's no better error code. The sender's address itself is invalid. I don't see + // this error message being appropriate anywhere else anyway. + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer) + } + targets := t.peerConnsWithDialAddrPort(msg.AddrPort) + if len(targets) == 0 { + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected) + return nil + } + for _, pc := range targets { + if !pc.supportsExtension(utHolepunch.ExtensionName) { + sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSupport) + continue + } + sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0) + sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0) + } + return nil + case utHolepunch.Connect: + holepunchAddr := msg.AddrPort + t.logger.Printf("got holepunch connect request for %v from %p", holepunchAddr, sender) + if g.MapContains(t.cl.undialableWithoutHolepunch, holepunchAddr) { + setAdd(&t.cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, holepunchAddr) + if g.MapContains(t.cl.accepted, holepunchAddr) { + setAdd(&t.cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr) + } + } + opts := outgoingConnOpts{ + peerInfo: PeerInfo{ + Addr: msg.AddrPort, + Source: PeerSourceUtHolepunch, + PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(), + }, + t: t, + // Don't attempt to start our own rendezvous if we fail to connect. + skipHolepunchRendezvous: true, + receivedHolepunchConnect: true, + // Assume that the other end initiated the rendezvous, and will use our preferred + // encryption. So we will act normally. + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, true) + return nil + case utHolepunch.Error: + torrent.Add("holepunch error messages received", 1) + t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode) + return nil + default: + return fmt.Errorf("unhandled msg type %v", msg.MsgType) + } +} + +func addrPortProtocolStr(addrPort netip.AddrPort) string { + addr := addrPort.Addr() + switch { + case addr.Is4(): + return "ipv4" + case addr.Is6(): + return "ipv6" + default: + panic(addrPort) + } +} + +func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error { + rzsSent := 0 + for pc := range t.conns { + if !pc.supportsExtension(utHolepunch.ExtensionName) { + continue + } + if pc.supportsExtension(pp.ExtensionNamePex) { + if !g.MapContains(pc.pex.remoteLiveConns, addrPort) { + continue + } + } + t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort) + sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0) + rzsSent++ + } + if rzsSent == 0 { + return errors.New("no eligible relays") + } + return nil +} + +func (t *Torrent) numHalfOpenAttempts() (num int) { + for _, attempts := range t.halfOpen { + num += len(attempts) + } + return +} + +func (t *Torrent) getDialTimeoutUnlocked() time.Duration { + cl := t.cl + cl.rLock() + defer cl.rUnlock() + return t.dialTimeout() +}