+
+func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
+ p := t.requestingPeer(r)
+ if p != nil {
+ p.cancel(r)
+ }
+ // TODO: This is a check that an old invariant holds. It can be removed after some testing.
+ //delete(t.pendingRequests, r)
+ if _, ok := t.requestState[r]; ok {
+ panic("expected request state to be gone")
+ }
+ return p
+}
+
+func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
+ return t.requestState[r].peer
+}
+
+func (t *Torrent) addConnWithAllPieces(p *Peer) {
+ if t.connsWithAllPieces == nil {
+ t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
+ }
+ t.connsWithAllPieces[p] = struct{}{}
+}
+
+func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
+ _, ok := t.connsWithAllPieces[p]
+ delete(t.connsWithAllPieces, p)
+ return ok
+}
+
+func (t *Torrent) numActivePeers() int {
+ return len(t.conns) + len(t.webSeeds)
+}
+
+func (t *Torrent) hasStorageCap() bool {
+ f := t.storage.Capacity
+ if f == nil {
+ return false
+ }
+ _, ok := (*f)()
+ return ok
+}
+
+func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
+ return pieceIndex(ri / t.chunksPerRegularPiece())
+}
+
+func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
+ reuseIter *typedRoaring.Iterator[RequestIndex],
+ piece pieceIndex,
+ f func(RequestIndex),
+) {
+ reuseIter.Initialize(&t.dirtyChunks)
+ pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
+ iterBitmapUnsetInRange(
+ reuseIter,
+ pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
+ f,
+ )
+}
+
+type requestState struct {
+ peer *Peer
+ when time.Time
+}
+
+// Returns an error if a received chunk is out of bounds in someway.
+func (t *Torrent) checkValidReceiveChunk(r Request) error {
+ if !t.haveInfo() {
+ return errors.New("torrent missing info")
+ }
+ if int(r.Index) >= t.numPieces() {
+ return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces())
+ }
+ pieceLength := t.pieceLength(pieceIndex(r.Index))
+ if r.Begin >= pieceLength {
+ return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength)
+ }
+ // We could check chunk lengths here, but chunk request size is not changed often, and tricky
+ // for peers to manipulate as they need to send potentially large buffers to begin with. There
+ // should be considerable checks elsewhere for this case due to the network overhead. We should
+ // catch most of the overflow manipulation stuff by checking index and begin above.
+ return nil
+}
+
+func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) {
+ for pc := range t.conns {
+ dialAddr, err := pc.remoteDialAddrPort()
+ if err != nil {
+ continue
+ }
+ if dialAddr != target {
+ continue
+ }
+ ret = append(ret, pc)
+ }
+ return
+}
+
+func wrapUtHolepunchMsgForPeerConn(
+ recipient *PeerConn,
+ msg utHolepunch.Msg,
+) pp.Message {
+ extendedPayload, err := msg.MarshalBinary()
+ if err != nil {
+ panic(err)
+ }
+ return pp.Message{
+ Type: pp.Extended,
+ ExtendedID: MapMustGet(recipient.PeerExtensionIDs, utHolepunch.ExtensionName),
+ ExtendedPayload: extendedPayload,
+ }
+}
+
+func sendUtHolepunchMsg(
+ pc *PeerConn,
+ msgType utHolepunch.MsgType,
+ addrPort netip.AddrPort,
+ errCode utHolepunch.ErrCode,
+) {
+ holepunchMsg := utHolepunch.Msg{
+ MsgType: msgType,
+ AddrPort: addrPort,
+ ErrCode: errCode,
+ }
+ incHolepunchMessagesSent(holepunchMsg)
+ ppMsg := wrapUtHolepunchMsgForPeerConn(pc, holepunchMsg)
+ pc.write(ppMsg)
+}
+
+func incHolepunchMessages(msg utHolepunch.Msg, verb string) {
+ torrent.Add(
+ fmt.Sprintf(
+ "holepunch %v %v messages %v",
+ msg.MsgType,
+ addrPortProtocolStr(msg.AddrPort),
+ verb,
+ ),
+ 1,
+ )
+}
+
+func incHolepunchMessagesReceived(msg utHolepunch.Msg) {
+ incHolepunchMessages(msg, "received")
+}
+
+func incHolepunchMessagesSent(msg utHolepunch.Msg) {
+ incHolepunchMessages(msg, "sent")
+}
+
+func (t *Torrent) handleReceivedUtHolepunchMsg(msg utHolepunch.Msg, sender *PeerConn) error {
+ incHolepunchMessagesReceived(msg)
+ switch msg.MsgType {
+ case utHolepunch.Rendezvous:
+ t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender)
+ sendMsg := sendUtHolepunchMsg
+ senderAddrPort, err := sender.remoteDialAddrPort()
+ if err != nil {
+ sender.logger.Levelf(
+ log.Warning,
+ "error getting ut_holepunch rendezvous sender's dial address: %v",
+ err,
+ )
+ // There's no better error code. The sender's address itself is invalid. I don't see
+ // this error message being appropriate anywhere else anyway.
+ sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer)
+ }
+ targets := t.peerConnsWithDialAddrPort(msg.AddrPort)
+ if len(targets) == 0 {
+ sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected)
+ return nil
+ }
+ for _, pc := range targets {
+ if !pc.supportsExtension(utHolepunch.ExtensionName) {
+ sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSupport)
+ continue
+ }
+ sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0)
+ sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0)
+ }
+ return nil
+ case utHolepunch.Connect:
+ holepunchAddr := msg.AddrPort
+ t.logger.Printf("got holepunch connect request for %v from %p", holepunchAddr, sender)
+ if g.MapContains(t.cl.undialableWithoutHolepunch, holepunchAddr) {
+ setAdd(&t.cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, holepunchAddr)
+ if g.MapContains(t.cl.accepted, holepunchAddr) {
+ setAdd(&t.cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr)
+ }
+ }
+ opts := outgoingConnOpts{
+ peerInfo: PeerInfo{
+ Addr: msg.AddrPort,
+ Source: PeerSourceUtHolepunch,
+ PexPeerFlags: sender.pex.remoteLiveConns[msg.AddrPort].UnwrapOrZeroValue(),
+ },
+ t: t,
+ // Don't attempt to start our own rendezvous if we fail to connect.
+ skipHolepunchRendezvous: true,
+ receivedHolepunchConnect: true,
+ // Assume that the other end initiated the rendezvous, and will use our preferred
+ // encryption. So we will act normally.
+ HeaderObfuscationPolicy: t.cl.config.HeaderObfuscationPolicy,
+ }
+ initiateConn(opts, true)
+ return nil
+ case utHolepunch.Error:
+ torrent.Add("holepunch error messages received", 1)
+ t.logger.Levelf(log.Debug, "received ut_holepunch error message from %v: %v", sender, msg.ErrCode)
+ return nil
+ default:
+ return fmt.Errorf("unhandled msg type %v", msg.MsgType)
+ }
+}
+
+func addrPortProtocolStr(addrPort netip.AddrPort) string {
+ addr := addrPort.Addr()
+ switch {
+ case addr.Is4():
+ return "ipv4"
+ case addr.Is6():
+ return "ipv6"
+ default:
+ panic(addrPort)
+ }
+}
+
+func (t *Torrent) trySendHolepunchRendezvous(addrPort netip.AddrPort) error {
+ rzsSent := 0
+ for pc := range t.conns {
+ if !pc.supportsExtension(utHolepunch.ExtensionName) {
+ continue
+ }
+ if pc.supportsExtension(pp.ExtensionNamePex) {
+ if !g.MapContains(pc.pex.remoteLiveConns, addrPort) {
+ continue
+ }
+ }
+ t.logger.Levelf(log.Debug, "sent ut_holepunch rendezvous message to %v for %v", pc, addrPort)
+ sendUtHolepunchMsg(pc, utHolepunch.Rendezvous, addrPort, 0)
+ rzsSent++
+ }
+ if rzsSent == 0 {
+ return errors.New("no eligible relays")
+ }
+ return nil
+}
+
+func (t *Torrent) numHalfOpenAttempts() (num int) {
+ for _, attempts := range t.halfOpen {
+ num += len(attempts)
+ }
+ return
+}
+
+func (t *Torrent) getDialTimeoutUnlocked() time.Duration {
+ cl := t.cl
+ cl.rLock()
+ defer cl.rUnlock()
+ return t.dialTimeout()
+}