]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Make Torrent.conns a map
authorMatt Joiner <anacrolix@gmail.com>
Wed, 23 Nov 2016 00:48:44 +0000 (11:48 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 23 Nov 2016 00:48:44 +0000 (11:48 +1100)
Can't remember if I've tried this before. But dropping arbitrary connections while iterating established conns will become much simpler.

client.go
connection.go
torrent.go

index 8c356eea324b2dc1981eb442f30d4d079d739511..c457bef6c85d4db12afef1f0cb5df88a4296a769 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1147,6 +1147,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
                cl:       cl,
                infoHash: ih,
                peers:    make(map[peersKey]Peer),
+               conns:    make(map[*connection]struct{}, 2*defaultEstablishedConnsPerTorrent),
 
                halfOpen:          make(map[string]struct{}),
                pieceStateChanges: pubsub.NewPubSub(),
index 06cb7ca8fb3ba312fc1ddc828ab0603daead393a..3ba41076fbe3ba895adc0a754e08dd6b84c04a4b 100644 (file)
@@ -964,7 +964,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
 
        // Cancel pending requests for this chunk.
-       for _, c := range t.conns {
+       for c := range t.conns {
                if cl.connCancel(t, c, req) {
                        c.updateRequests()
                }
index c2baf9c10dd5a0eeaf2421d8488776f53122592c..f6f723bf42b74a5eb553f442ec4161a77830ff49 100644 (file)
@@ -66,7 +66,7 @@ type Torrent struct {
        // The info dict. nil if we don't have it (yet).
        info *metainfo.Info
        // Active peer connections, running message stream loops.
-       conns               []*connection
+       conns               map[*connection]struct{}
        maxEstablishedConns int
        // Set of addrs to which we're attempting to connect. Connections are
        // half-open until all handshakes are completed.
@@ -140,7 +140,7 @@ func (t *Torrent) pieceCompleteUncached(piece int) bool {
 }
 
 func (t *Torrent) numConnsUnchoked() (num int) {
-       for _, c := range t.conns {
+       for c := range t.conns {
                if !c.PeerChoked {
                        num++
                }
@@ -153,7 +153,7 @@ func (t *Torrent) addrActive(addr string) bool {
        if _, ok := t.halfOpen[addr]; ok {
                return true
        }
-       for _, c := range t.conns {
+       for c := range t.conns {
                if c.remoteAddr().String() == addr {
                        return true
                }
@@ -163,7 +163,7 @@ func (t *Torrent) addrActive(addr string) bool {
 
 func (t *Torrent) worstUnclosedConns() (ret []*connection) {
        ret = make([]*connection, 0, len(t.conns))
-       for _, c := range t.conns {
+       for c := range t.conns {
                if !c.closed.IsSet() {
                        ret = append(ret, c)
                }
@@ -278,7 +278,7 @@ func (t *Torrent) setInfoBytes(b []byte) error {
        t.metadataBytes = b
        t.metadataCompletedChunks = nil
        t.makePieces()
-       for _, conn := range t.conns {
+       for conn := range t.conns {
                if err := conn.setNumPieces(t.numPieces()); err != nil {
                        log.Printf("closing connection: %s", err)
                        conn.Close()
@@ -325,7 +325,7 @@ func (t *Torrent) setMetadataSize(bytes int64) (err error) {
        }
        t.metadataBytes = make([]byte, bytes)
        t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14))
-       for _, c := range t.conns {
+       for c := range t.conns {
                c.requestPendingMetadata()
        }
        return
@@ -477,8 +477,9 @@ func (t *Torrent) writeStatus(w io.Writer) {
        fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
        fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
        fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
-       slices.Sort(t.conns, worseConn)
-       for i, c := range t.conns {
+       conns := t.connsAsSlice()
+       slices.Sort(conns, worseConn)
+       for i, c := range conns {
                fmt.Fprintf(w, "%2d. ", i+1)
                c.WriteStatus(w, t)
        }
@@ -558,7 +559,7 @@ func (t *Torrent) close() (err error) {
        if t.storage != nil {
                t.storage.Close()
        }
-       for _, conn := range t.conns {
+       for conn := range t.conns {
                conn.Close()
        }
        t.pieceStateChanges.Close()
@@ -873,7 +874,7 @@ func (t *Torrent) maybeNewConns() {
 }
 
 func (t *Torrent) piecePriorityChanged(piece int) {
-       for _, c := range t.conns {
+       for c := range t.conns {
                c.updatePiecePriority(piece)
        }
        t.maybeNewConns()
@@ -1004,13 +1005,12 @@ func (t *Torrent) pendRequest(req request) {
        t.pieces[req.Index].pendChunkIndex(ci)
 }
 
-func (t *Torrent) pieceChanged(piece int) {
-       correct := t.pieceComplete(piece)
-       defer t.cl.event.Broadcast()
-       if correct {
-               t.onCompletedPiece(piece)
+func (t *Torrent) pieceCompletionChanged(piece int) {
+       t.cl.event.Broadcast()
+       if t.pieceComplete(piece) {
+               t.onPieceCompleted(piece)
        } else {
-               t.onFailedPiece(piece)
+               t.onIncompletePiece(piece)
        }
        t.updatePiecePriority(piece)
 }
@@ -1039,7 +1039,7 @@ func (t *Torrent) updatePieceCompletion(piece int) {
        changed := t.completedPieces.Get(piece) != pcu
        t.completedPieces.Set(piece, pcu)
        if changed {
-               t.pieceChanged(piece)
+               t.pieceCompletionChanged(piece)
        }
 }
 
@@ -1160,19 +1160,10 @@ func (t *Torrent) SetInfoBytes(b []byte) (err error) {
 }
 
 // Returns true if connection is removed from torrent.Conns.
-func (t *Torrent) deleteConnection(c *connection) bool {
-       for i0, _c := range t.conns {
-               if _c != c {
-                       continue
-               }
-               i1 := len(t.conns) - 1
-               if i0 != i1 {
-                       t.conns[i0] = t.conns[i1]
-               }
-               t.conns = t.conns[:i1]
-               return true
-       }
-       return false
+func (t *Torrent) deleteConnection(c *connection) (ret bool) {
+       _, ret = t.conns[c]
+       delete(t.conns, c)
+       return
 }
 
 func (t *Torrent) dropConnection(c *connection) {
@@ -1356,7 +1347,7 @@ func (t *Torrent) addConnection(c *connection) bool {
        if !t.wantConns() {
                return false
        }
-       for _, c0 := range t.conns {
+       for c0 := range t.conns {
                if c.PeerID == c0.PeerID {
                        // Already connected to a client with that ID.
                        duplicateClientConns.Add(1)
@@ -1377,7 +1368,6 @@ func (t *Torrent) addConnection(c *connection) bool {
        if len(t.conns) >= t.maxEstablishedConns {
                panic(len(t.conns))
        }
-       t.conns = append(t.conns, c)
        if c.t != nil {
                panic("connection already associated with a torrent")
        }
@@ -1386,6 +1376,7 @@ func (t *Torrent) addConnection(c *connection) bool {
        t.stats.wroteBytes(c.stats.BytesWritten)
        t.stats.readBytes(c.stats.BytesRead)
        c.t = t
+       t.conns[c] = struct{}{}
        return true
 }
 
@@ -1407,7 +1398,7 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
        defer t.cl.mu.Unlock()
        oldMax = t.maxEstablishedConns
        t.maxEstablishedConns = max
-       wcs := slices.HeapInterface(append([]*connection(nil), t.conns...), worseConn)
+       wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
        for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
                t.dropConnection(wcs.Pop().(*connection))
        }
@@ -1435,7 +1426,7 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
                }
        }
        p.EverHashed = true
-       touchers := t.reapPieceTouches(piece)
+       touchers := t.reapPieceTouchers(piece)
        if correct {
                for _, c := range touchers {
                        c.goodPiecesDirtied++
@@ -1453,13 +1444,12 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
                        t.dropConnection(c)
                }
        }
-       t.pieceChanged(piece)
 }
 
-func (t *Torrent) onCompletedPiece(piece int) {
+func (t *Torrent) onPieceCompleted(piece int) {
        t.pendingPieces.Remove(piece)
        t.pendAllChunkSpecs(piece)
-       for _, conn := range t.conns {
+       for conn := range t.conns {
                conn.Have(piece)
                for r := range conn.Requests {
                        if int(r.Index) == piece {
@@ -1472,16 +1462,14 @@ func (t *Torrent) onCompletedPiece(piece int) {
        }
 }
 
-func (t *Torrent) onFailedPiece(piece int) {
-       cl := t.cl
+func (t *Torrent) onIncompletePiece(piece int) {
        if t.pieceAllDirty(piece) {
                t.pendAllChunkSpecs(piece)
        }
        if !t.wantPieceIndex(piece) {
                return
        }
-       cl.openNewConns(t)
-       for _, conn := range t.conns {
+       for conn := range t.conns {
                if conn.PeerHasPiece(piece) {
                        conn.updateRequests()
                }
@@ -1512,8 +1500,8 @@ func (t *Torrent) verifyPiece(piece int) {
 
 // Return the connections that touched a piece, and clear the entry while
 // doing it.
-func (t *Torrent) reapPieceTouches(piece int) (ret []*connection) {
-       for _, c := range t.conns {
+func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) {
+       for c := range t.conns {
                if _, ok := c.peerTouchedPieces[piece]; ok {
                        ret = append(ret, c)
                        delete(c.peerTouchedPieces, piece)
@@ -1521,3 +1509,10 @@ func (t *Torrent) reapPieceTouches(piece int) (ret []*connection) {
        }
        return
 }
+
+func (t *Torrent) connsAsSlice() (ret []*connection) {
+       for c := range t.conns {
+               ret = append(ret, c)
+       }
+       return
+}