X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=torrent.go;h=6385a3fc026f9a2474ddc029908ef236ab30713a;hb=HEAD;hp=41848e4a2a39662cd619e0387182d392ce85410f;hpb=0f2604e3e98aae3241d917f02af729a2e5be581b;p=btrtrc.git diff --git a/torrent.go b/torrent.go index 41848e4a..6385a3fc 100644 --- a/torrent.go +++ b/torrent.go @@ -31,7 +31,6 @@ import ( "github.com/anacrolix/missinggo/v2/pubsub" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" - "github.com/davecgh/go-spew/spew" "github.com/pion/datachannel" "golang.org/x/exp/maps" @@ -107,8 +106,6 @@ type Torrent struct { // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. halfOpen map[string]map[outgoingConnAttemptKey]*PeerInfo - // The final ess is not silent here as it's in the plural. - utHolepunchRendezvous map[netip.AddrPort]*utHolepunchRendezvous // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with @@ -767,8 +764,7 @@ func (t *Torrent) writeStatus(w io.Writer) { fmt.Fprintf(w, "DHT Announces: %d\n", t.numDHTAnnounces) - spew.NewDefaultConfig() - spew.Fdump(w, t.statsLocked()) + dumpStats(w, t.statsLocked()) fmt.Fprintf(w, "webseeds:\n") t.writePeerStatuses(w, maps.Values(t.webSeeds)) @@ -787,7 +783,7 @@ func (t *Torrent) writeStatus(w io.Writer) { return ml.Less() }) - fmt.Fprintf(w, "peer conns:\n") + fmt.Fprintf(w, "%v peer conns:\n", len(peerConns)) t.writePeerStatuses(w, g.SliceMap(peerConns, func(pc *PeerConn) *Peer { return &pc.Peer })) @@ -834,7 +830,10 @@ func (t *Torrent) newMetaInfo() metainfo.MetaInfo { } } -// Get bytes left +// Returns a count of bytes that are not complete in storage, and not pending being written to +// storage. This value is from the perspective of the download manager, and may not agree with the +// actual state in storage. If you want read data synchronously you should use a Reader. See +// https://github.com/anacrolix/torrent/issues/828. func (t *Torrent) BytesMissing() (n int64) { t.cl.rLock() n = t.bytesMissingLocked() @@ -1061,7 +1060,7 @@ func (t *Torrent) havePiece(index pieceIndex) bool { func (t *Torrent) maybeDropMutuallyCompletePeer( // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's // okay? - p *Peer, + p *PeerConn, ) { if !t.cl.config.DropMutuallyCompletePeers { return @@ -1075,7 +1074,7 @@ func (t *Torrent) maybeDropMutuallyCompletePeer( if p.useful() { return } - t.logger.WithDefaultLevel(log.Debug).Printf("dropping %v, which is mutually complete", p) + p.logger.Levelf(log.Debug, "is mutually complete; dropping") p.drop() } @@ -1385,7 +1384,15 @@ func (t *Torrent) openNewConns() (initiated int) { return } p := t.peers.PopMax() - t.initiateConn(p, false, false, false) + opts := outgoingConnOpts{ + peerInfo: p, + t: t, + requireRendezvous: false, + skipHolepunchRendezvous: false, + receivedHolepunchConnect: false, + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, false) initiated++ } return @@ -1711,10 +1718,9 @@ func (t *Torrent) startScrapingTracker(_url string) { } u, err := url.Parse(_url) if err != nil { - // URLs with a leading '*' appear to be a uTorrent convention to - // disable trackers. + // URLs with a leading '*' appear to be a uTorrent convention to disable trackers. if _url[0] != '*' { - log.Str("error parsing tracker url").AddValues("url", _url).Log(t.logger) + t.logger.Levelf(log.Warning, "error parsing tracker url: %v", err) } return } @@ -2137,7 +2143,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } err := p.Storage().MarkComplete() if err != nil { - t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err) + t.logger.Levelf(log.Warning, "%T: error marking piece complete %d: %s", t.storage, piece, err) } t.cl.lock() @@ -2182,7 +2188,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { if len(bannableTouchers) >= 1 { c := bannableTouchers[0] if len(bannableTouchers) != 1 { - t.logger.Levelf(log.Warning, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece) + t.logger.Levelf(log.Debug, "would have banned %v for touching piece %v after failed piece check", c.remoteIp(), piece) } else { // Turns out it's still useful to ban peers like this because if there's only a // single peer for a piece, and we never progress that piece to completion, we @@ -2213,7 +2219,7 @@ func (t *Torrent) onPieceCompleted(piece pieceIndex) { t.piece(piece).readerCond.Broadcast() for conn := range t.conns { conn.have(piece) - t.maybeDropMutuallyCompletePeer(&conn.Peer) + t.maybeDropMutuallyCompletePeer(conn) } } @@ -2245,7 +2251,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { } func (t *Torrent) tryCreateMorePieceHashers() { - for !t.closed.IsSet() && t.activePieceHashes < 2 && t.tryCreatePieceHasher() { + for !t.closed.IsSet() && t.activePieceHashes < t.cl.config.PieceHashersPerTorrent && t.tryCreatePieceHasher() { } } @@ -2400,12 +2406,12 @@ func (t *Torrent) addHalfOpen(addrStr string, attemptKey *PeerInfo) { // Start the process of connecting to the given peer for the given torrent if appropriate. I'm not // sure all the PeerInfo fields are being used. -func (t *Torrent) initiateConn( - peer PeerInfo, - requireRendezvous bool, - skipHolepunchRendezvous bool, +func initiateConn( + opts outgoingConnOpts, ignoreLimits bool, ) { + t := opts.t + peer := opts.peerInfo if peer.Id == t.cl.peerID { return } @@ -2425,14 +2431,7 @@ func (t *Torrent) initiateConn( attemptKey := &peer t.addHalfOpen(addrStr, attemptKey) go t.cl.outgoingConnection( - outgoingConnOpts{ - t: t, - addr: peer.Addr, - requireRendezvous: requireRendezvous, - skipHolepunchRendezvous: skipHolepunchRendezvous, - }, - peer.Source, - peer.Trusted, + opts, attemptKey, ) } @@ -2455,7 +2454,7 @@ func (t *Torrent) AddClientPeer(cl *Client) int { // connection. func (t *Torrent) allStats(f func(*ConnStats)) { f(&t.stats) - f(&t.cl.stats) + f(&t.cl.connStats) } func (t *Torrent) hashingPiece(i pieceIndex) bool { @@ -2742,18 +2741,11 @@ func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerC return } -func makeUtHolepunchMsgForPeerConn( +func wrapUtHolepunchMsgForPeerConn( recipient *PeerConn, - msgType utHolepunch.MsgType, - addrPort netip.AddrPort, - errCode utHolepunch.ErrCode, + msg utHolepunch.Msg, ) pp.Message { - utHolepunchMsg := utHolepunch.Msg{ - MsgType: msgType, - AddrPort: addrPort, - ErrCode: errCode, - } - extendedPayload, err := utHolepunchMsg.MarshalBinary() + extendedPayload, err := msg.MarshalBinary() if err != nil { panic(err) } @@ -2770,10 +2762,38 @@ func sendUtHolepunchMsg( addrPort netip.AddrPort, errCode utHolepunch.ErrCode, ) { - pc.write(makeUtHolepunchMsgForPeerConn(pc, msgType, addrPort, errCode)) + holepunchMsg := utHolepunch.Msg{ + MsgType: msgType, + AddrPort: addrPort, + ErrCode: errCode, + } + incHolepunchMessagesSent(holepunchMsg) + ppMsg := wrapUtHolepunchMsgForPeerConn(pc, holepunchMsg) + pc.write(ppMsg) +} + +func incHolepunchMessages(msg utHolepunch.Msg, verb string) { + torrent.Add( + fmt.Sprintf( + "holepunch %v %v messages %v", + msg.MsgType, + addrPortProtocolStr(msg.AddrPort), + verb, + ), + 1, + ) +} + +func incHolepunchMessagesReceived(msg utHolepunch.Msg) { + incHolepunchMessages(msg, "received") +} + +func incHolepunchMessagesSent(msg utHolepunch.Msg) { + incHolepunchMessages(msg, "sent") } func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error { + incHolepunchMessagesReceived(msg) switch msg.MsgType { case utHolepunch.Rendezvous: t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender) @@ -2804,40 +2824,53 @@ func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *Peer } return nil case utHolepunch.Connect: - t.logger.Printf("got holepunch connect from %v for %v", sender, msg.AddrPort) - rz, ok := t.utHolepunchRendezvous[msg.AddrPort] - if ok { - delete(rz.relays, sender) - rz.gotConnect.Set() - rz.relayCond.Broadcast() - } else { - // If the rendezvous was removed because we timed out or already got a connect signal, - // it doesn't hurt to try again. - t.initiateConn(PeerInfo{ - Addr: msg.AddrPort, - Source: PeerSourceUtHolepunch, - }, false, true, true) + holepunchAddr := msg.AddrPort + t.logger.Printf("got holepunch connect request for %v from %p", holepunchAddr, sender) + if g.MapContains(t.cl.undialableWithoutHolepunch, holepunchAddr) { + setAdd(&t.cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, holepunchAddr) + if g.MapContains(t.cl.accepted, holepunchAddr) { + setAdd(&t.cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr) + } } + opts := outgoingConnOpts{ + peerInfo: PeerInfo{ + Addr: msg.AddrPort, + Source: PeerSourceUtHolepunch, + PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(), + }, + t: t, + // Don't attempt to start our own rendezvous if we fail to connect. + skipHolepunchRendezvous: true, + receivedHolepunchConnect: true, + // Assume that the other end initiated the rendezvous, and will use our preferred + // encryption. So we will act normally. + HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy, + } + initiateConn(opts, true) return nil case utHolepunch.Error: - rz, ok := t.utHolepunchRendezvous[msg.AddrPort] - if ok { - delete(rz.relays, sender) - rz.relayCond.Broadcast() - } - t.logger.Printf("received ut_holepunch error message from %v: %v", sender, msg.ErrCode) + torrent.Add("holepunch error messages received", 1) + t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode) return nil default: return fmt.Errorf("unhandled msg type %v", msg.MsgType) } } -func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) (rz *utHolepunchRendezvous, err error) { - if MapContains(t.utHolepunchRendezvous, addrPort) { - err = errors.New("rendezvous already exists") - return +func addrPortProtocolStr(addrPort netip.AddrPort) string { + addr := addrPort.Addr() + switch { + case addr.Is4(): + return "ipv4" + case addr.Is6(): + return "ipv6" + default: + panic(addrPort) } - g.InitNew(&rz) +} + +func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error { + rzsSent := 0 for pc := range t.conns { if !pc.supportsExtension(utHolepunch.ExtensionName) { continue @@ -2847,15 +2880,26 @@ func (t *Torrent) startHolepunchRendezvous(addrPort netip.AddrPort) (rz *utHolep continue } } + t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort) sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0) - MakeMapIfNilAndSet(&rz.relays, pc, struct{}{}) + rzsSent++ } - if len(rz.relays) == 0 { - err = fmt.Errorf("no eligible relays") - return + if rzsSent == 0 { + return errors.New("no eligible relays") } - if !MakeMapIfNilAndSet(&t.utHolepunchRendezvous, addrPort, rz) { - panic("expected to fail earlier if rendezvous already exists") + return nil +} + +func (t *Torrent) numHalfOpenAttempts() (num int) { + for _, attempts := range t.halfOpen { + num += len(attempts) } return } + +func (t *Torrent) getDialTimeoutUnlocked() time.Duration { + cl := t.cl + cl.rLock() + defer cl.rUnlock() + return t.dialTimeout() +}