]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Rename connection->PeerConn and fix exports
[btrtrc.git] / torrent.go
index 4cd6cd4b9e3821323624e6544a84e90c047fa34e..2c18b73df67e8c43c091058c76edbcdce8192ab2 100644 (file)
@@ -74,12 +74,12 @@ type Torrent struct {
 
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
-       conns               map[*connection]struct{}
+       conns               map[*PeerConn]struct{}
        maxEstablishedConns int
        // Set of addrs to which we're attempting to connect. Connections are
        // half-open until all handshakes are completed.
        halfOpen    map[string]Peer
-       fastestConn *connection
+       fastestConn *PeerConn
 
        // Reserve of peers to connect to. A peer can be both here and in the
        // active connections if were told about the peer after connecting with
@@ -236,8 +236,8 @@ func (t *Torrent) addrActive(addr string) bool {
        return false
 }
 
-func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
-       ret = make([]*connection, 0, len(t.conns))
+func (t *Torrent) unclosedConnsAsSlice() (ret []*PeerConn) {
+       ret = make([]*PeerConn, 0, len(t.conns))
        for c := range t.conns {
                if !c.closed.IsSet() {
                        ret = append(ret, c)
@@ -385,7 +385,7 @@ func (t *Torrent) onSetInfo() {
        for conn := range t.conns {
                if err := conn.setNumPieces(t.numPieces()); err != nil {
                        t.logger.Printf("closing connection: %s", err)
-                       conn.Close()
+                       conn.close()
                }
        }
        for i := range t.pieces {
@@ -485,7 +485,7 @@ func (t *Torrent) metadataPieceSize(piece int) int {
        return metadataPieceSize(len(t.metadataBytes), piece)
 }
 
-func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
+func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType int, piece int, data []byte) pp.Message {
        d := map[string]int{
                "msg_type": msgType,
                "piece":    piece,
@@ -617,7 +617,7 @@ func (t *Torrent) writeStatus(w io.Writer) {
        slices.Sort(conns, worseConn)
        for i, c := range conns {
                fmt.Fprintf(w, "%2d. ", i+1)
-               c.WriteStatus(w, t)
+               c.writeStatus(w, t)
        }
 }
 
@@ -702,7 +702,7 @@ func (t *Torrent) close() (err error) {
                t.storageLock.Unlock()
        }
        for conn := range t.conns {
-               conn.Close()
+               conn.close()
        }
        t.cl.event.Broadcast()
        t.pieceStateChanges.Close()
@@ -835,11 +835,11 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
 // for the longest. A bad connection is one that usually sends us unwanted
 // pieces, or has been in worser half of the established connections for more
 // than a minute.
-func (t *Torrent) worstBadConn() *connection {
+func (t *Torrent) worstBadConn() *PeerConn {
        wcs := worseConnSlice{t.unclosedConnsAsSlice()}
        heap.Init(&wcs)
        for wcs.Len() != 0 {
-               c := heap.Pop(&wcs).(*connection)
+               c := heap.Pop(&wcs).(*PeerConn)
                if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
                        return c
                }
@@ -1033,7 +1033,7 @@ func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
 
 func (t *Torrent) numReceivedConns() (ret int) {
        for c := range t.conns {
-               if c.Discovery == peerSourceIncoming {
+               if c.Discovery == PeerSourceIncoming {
                        ret++
                }
        }
@@ -1194,7 +1194,7 @@ func (t *Torrent) SetInfoBytes(b []byte) (err error) {
 }
 
 // Returns true if connection is removed from torrent.Conns.
-func (t *Torrent) deleteConnection(c *connection) (ret bool) {
+func (t *Torrent) deleteConnection(c *PeerConn) (ret bool) {
        if !c.closed.IsSet() {
                panic("connection is not closed")
                // There are behaviours prevented by the closed state that will fail
@@ -1219,9 +1219,9 @@ func (t *Torrent) assertNoPendingRequests() {
        //}
 }
 
-func (t *Torrent) dropConnection(c *connection) {
+func (t *Torrent) dropConnection(c *PeerConn) {
        t.cl.event.Broadcast()
-       c.Close()
+       c.close()
        if t.deleteConnection(c) {
                t.openNewConns()
        }
@@ -1352,7 +1352,7 @@ func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
                        }
                        t.addPeer(Peer{
                                Addr:   ipPortAddr{cp.IP, cp.Port},
-                               Source: peerSourceDhtGetPeers,
+                               Source: PeerSourceDhtGetPeers,
                        })
                }
                cl.unlock()
@@ -1440,7 +1440,7 @@ func (t *Torrent) numTotalPeers() int {
 
 // Reconcile bytes transferred before connection was associated with a
 // torrent.
-func (t *Torrent) reconcileHandshakeStats(c *connection) {
+func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
        if c._stats != (ConnStats{
                // Handshakes should only increment these fields:
                BytesWritten: c._stats.BytesWritten,
@@ -1456,7 +1456,7 @@ func (t *Torrent) reconcileHandshakeStats(c *connection) {
 }
 
 // Returns true if the connection is added.
-func (t *Torrent) addConnection(c *connection) (err error) {
+func (t *Torrent) addConnection(c *PeerConn) (err error) {
        defer func() {
                if err == nil {
                        torrent.Add("added connections", 1)
@@ -1473,7 +1473,7 @@ func (t *Torrent) addConnection(c *connection) (err error) {
                        continue
                }
                if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
-                       c0.Close()
+                       c0.close()
                        t.deleteConnection(c0)
                } else {
                        return errors.New("existing connection preferred")
@@ -1484,7 +1484,7 @@ func (t *Torrent) addConnection(c *connection) (err error) {
                if c == nil {
                        return errors.New("don't want conns")
                }
-               c.Close()
+               c.close()
                t.deleteConnection(c)
        }
        if len(t.conns) >= t.maxEstablishedConns {
@@ -1517,7 +1517,7 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
        t.maxEstablishedConns = max
        wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
        for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
-               t.dropConnection(wcs.Pop().(*connection))
+               t.dropConnection(wcs.Pop().(*PeerConn))
        }
        t.openNewConns()
        return oldMax
@@ -1568,7 +1568,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                                c.stats().incrementPiecesDirtiedBad()
                        }
 
-                       bannableTouchers := make([]*connection, 0, len(p.dirtiers))
+                       bannableTouchers := make([]*PeerConn, 0, len(p.dirtiers))
                        for c := range p.dirtiers {
                                if !c.trusted {
                                        bannableTouchers = append(bannableTouchers, c)
@@ -1593,7 +1593,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
                        if len(bannableTouchers) >= 1 {
                                c := bannableTouchers[0]
                                t.cl.banPeerIP(c.remoteIp())
-                               c.Drop()
+                               c.drop()
                        }
                }
                t.onIncompletePiece(piece)
@@ -1613,7 +1613,7 @@ func (t *Torrent) onPieceCompleted(piece pieceIndex) {
        t.pendAllChunkSpecs(piece)
        t.cancelRequestsForPiece(piece)
        for conn := range t.conns {
-               conn.Have(piece)
+               conn.have(piece)
        }
 }
 
@@ -1634,11 +1634,11 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
 
        // for c := range t.conns {
        //      if c.sentHave(piece) {
-       //              c.Drop()
+       //              c.drop()
        //      }
        // }
        for conn := range t.conns {
-               if conn.PeerHasPiece(piece) {
+               if conn.peerHasPiece(piece) {
                        conn.updateRequests()
                }
        }
@@ -1707,7 +1707,7 @@ func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
        }
 }
 
-func (t *Torrent) connsAsSlice() (ret []*connection) {
+func (t *Torrent) connsAsSlice() (ret []*PeerConn) {
        for c := range t.conns {
                ret = append(ret, c)
        }
@@ -1750,7 +1750,8 @@ func (t *Torrent) initiateConn(peer Peer) {
        go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
 }
 
-// Adds each a trusted, pending peer for each of the Client's addresses.
+// Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
+// quickly make one Client visible to the Torrent of another Client.
 func (t *Torrent) AddClientPeer(cl *Client) {
        t.AddPeers(func() (ps []Peer) {
                for _, la := range cl.ListenAddrs() {
@@ -1799,7 +1800,7 @@ func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) {
        cb.t.cl.lock()
        defer cb.t.cl.unlock()
        for cn := range cb.t.conns {
-               if cn.PeerHasPiece(pieceIndex(r.Index)) {
+               if cn.peerHasPiece(pieceIndex(r.Index)) {
                        cn.updateRequests()
                }
        }