From: Matt Joiner Date: Wed, 23 Nov 2016 00:48:44 +0000 (+1100) Subject: Make Torrent.conns a map X-Git-Tag: v1.0.0~530 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=4807c9e29a9c6ccf346bcdb06f2640cea8a5353c;p=btrtrc.git Make Torrent.conns a map Can't remember if I've tried this before. But dropping arbitrary connections while iterating established conns will become much simpler. --- diff --git a/client.go b/client.go index 8c356eea..c457bef6 100644 --- a/client.go +++ b/client.go @@ -1147,6 +1147,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) { cl: cl, infoHash: ih, peers: make(map[peersKey]Peer), + conns: make(map[*connection]struct{}, 2*defaultEstablishedConnsPerTorrent), halfOpen: make(map[string]struct{}), pieceStateChanges: pubsub.NewPubSub(), diff --git a/connection.go b/connection.go index 06cb7ca8..3ba41076 100644 --- a/connection.go +++ b/connection.go @@ -964,7 +964,7 @@ func (c *connection) receiveChunk(msg *pp.Message) { piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) // Cancel pending requests for this chunk. - for _, c := range t.conns { + for c := range t.conns { if cl.connCancel(t, c, req) { c.updateRequests() } diff --git a/torrent.go b/torrent.go index c2baf9c1..f6f723bf 100644 --- a/torrent.go +++ b/torrent.go @@ -66,7 +66,7 @@ type Torrent struct { // The info dict. nil if we don't have it (yet). info *metainfo.Info // Active peer connections, running message stream loops. - conns []*connection + conns map[*connection]struct{} maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. @@ -140,7 +140,7 @@ func (t *Torrent) pieceCompleteUncached(piece int) bool { } func (t *Torrent) numConnsUnchoked() (num int) { - for _, c := range t.conns { + for c := range t.conns { if !c.PeerChoked { num++ } @@ -153,7 +153,7 @@ func (t *Torrent) addrActive(addr string) bool { if _, ok := t.halfOpen[addr]; ok { return true } - for _, c := range t.conns { + for c := range t.conns { if c.remoteAddr().String() == addr { return true } @@ -163,7 +163,7 @@ func (t *Torrent) addrActive(addr string) bool { func (t *Torrent) worstUnclosedConns() (ret []*connection) { ret = make([]*connection, 0, len(t.conns)) - for _, c := range t.conns { + for c := range t.conns { if !c.closed.IsSet() { ret = append(ret, c) } @@ -278,7 +278,7 @@ func (t *Torrent) setInfoBytes(b []byte) error { t.metadataBytes = b t.metadataCompletedChunks = nil t.makePieces() - for _, conn := range t.conns { + for conn := range t.conns { if err := conn.setNumPieces(t.numPieces()); err != nil { log.Printf("closing connection: %s", err) conn.Close() @@ -325,7 +325,7 @@ func (t *Torrent) setMetadataSize(bytes int64) (err error) { } t.metadataBytes = make([]byte, bytes) t.metadataCompletedChunks = make([]bool, (bytes+(1<<14)-1)/(1<<14)) - for _, c := range t.conns { + for c := range t.conns { c.requestPendingMetadata() } return @@ -477,8 +477,9 @@ func (t *Torrent) writeStatus(w io.Writer) { fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers)) fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen)) fmt.Fprintf(w, "Active peers: %d\n", len(t.conns)) - slices.Sort(t.conns, worseConn) - for i, c := range t.conns { + conns := t.connsAsSlice() + slices.Sort(conns, worseConn) + for i, c := range conns { fmt.Fprintf(w, "%2d. ", i+1) c.WriteStatus(w, t) } @@ -558,7 +559,7 @@ func (t *Torrent) close() (err error) { if t.storage != nil { t.storage.Close() } - for _, conn := range t.conns { + for conn := range t.conns { conn.Close() } t.pieceStateChanges.Close() @@ -873,7 +874,7 @@ func (t *Torrent) maybeNewConns() { } func (t *Torrent) piecePriorityChanged(piece int) { - for _, c := range t.conns { + for c := range t.conns { c.updatePiecePriority(piece) } t.maybeNewConns() @@ -1004,13 +1005,12 @@ func (t *Torrent) pendRequest(req request) { t.pieces[req.Index].pendChunkIndex(ci) } -func (t *Torrent) pieceChanged(piece int) { - correct := t.pieceComplete(piece) - defer t.cl.event.Broadcast() - if correct { - t.onCompletedPiece(piece) +func (t *Torrent) pieceCompletionChanged(piece int) { + t.cl.event.Broadcast() + if t.pieceComplete(piece) { + t.onPieceCompleted(piece) } else { - t.onFailedPiece(piece) + t.onIncompletePiece(piece) } t.updatePiecePriority(piece) } @@ -1039,7 +1039,7 @@ func (t *Torrent) updatePieceCompletion(piece int) { changed := t.completedPieces.Get(piece) != pcu t.completedPieces.Set(piece, pcu) if changed { - t.pieceChanged(piece) + t.pieceCompletionChanged(piece) } } @@ -1160,19 +1160,10 @@ func (t *Torrent) SetInfoBytes(b []byte) (err error) { } // Returns true if connection is removed from torrent.Conns. -func (t *Torrent) deleteConnection(c *connection) bool { - for i0, _c := range t.conns { - if _c != c { - continue - } - i1 := len(t.conns) - 1 - if i0 != i1 { - t.conns[i0] = t.conns[i1] - } - t.conns = t.conns[:i1] - return true - } - return false +func (t *Torrent) deleteConnection(c *connection) (ret bool) { + _, ret = t.conns[c] + delete(t.conns, c) + return } func (t *Torrent) dropConnection(c *connection) { @@ -1356,7 +1347,7 @@ func (t *Torrent) addConnection(c *connection) bool { if !t.wantConns() { return false } - for _, c0 := range t.conns { + for c0 := range t.conns { if c.PeerID == c0.PeerID { // Already connected to a client with that ID. duplicateClientConns.Add(1) @@ -1377,7 +1368,6 @@ func (t *Torrent) addConnection(c *connection) bool { if len(t.conns) >= t.maxEstablishedConns { panic(len(t.conns)) } - t.conns = append(t.conns, c) if c.t != nil { panic("connection already associated with a torrent") } @@ -1386,6 +1376,7 @@ func (t *Torrent) addConnection(c *connection) bool { t.stats.wroteBytes(c.stats.BytesWritten) t.stats.readBytes(c.stats.BytesRead) c.t = t + t.conns[c] = struct{}{} return true } @@ -1407,7 +1398,7 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { defer t.cl.mu.Unlock() oldMax = t.maxEstablishedConns t.maxEstablishedConns = max - wcs := slices.HeapInterface(append([]*connection(nil), t.conns...), worseConn) + wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn) for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 { t.dropConnection(wcs.Pop().(*connection)) } @@ -1435,7 +1426,7 @@ func (t *Torrent) pieceHashed(piece int, correct bool) { } } p.EverHashed = true - touchers := t.reapPieceTouches(piece) + touchers := t.reapPieceTouchers(piece) if correct { for _, c := range touchers { c.goodPiecesDirtied++ @@ -1453,13 +1444,12 @@ func (t *Torrent) pieceHashed(piece int, correct bool) { t.dropConnection(c) } } - t.pieceChanged(piece) } -func (t *Torrent) onCompletedPiece(piece int) { +func (t *Torrent) onPieceCompleted(piece int) { t.pendingPieces.Remove(piece) t.pendAllChunkSpecs(piece) - for _, conn := range t.conns { + for conn := range t.conns { conn.Have(piece) for r := range conn.Requests { if int(r.Index) == piece { @@ -1472,16 +1462,14 @@ func (t *Torrent) onCompletedPiece(piece int) { } } -func (t *Torrent) onFailedPiece(piece int) { - cl := t.cl +func (t *Torrent) onIncompletePiece(piece int) { if t.pieceAllDirty(piece) { t.pendAllChunkSpecs(piece) } if !t.wantPieceIndex(piece) { return } - cl.openNewConns(t) - for _, conn := range t.conns { + for conn := range t.conns { if conn.PeerHasPiece(piece) { conn.updateRequests() } @@ -1512,8 +1500,8 @@ func (t *Torrent) verifyPiece(piece int) { // Return the connections that touched a piece, and clear the entry while // doing it. -func (t *Torrent) reapPieceTouches(piece int) (ret []*connection) { - for _, c := range t.conns { +func (t *Torrent) reapPieceTouchers(piece int) (ret []*connection) { + for c := range t.conns { if _, ok := c.peerTouchedPieces[piece]; ok { ret = append(ret, c) delete(c.peerTouchedPieces, piece) @@ -1521,3 +1509,10 @@ func (t *Torrent) reapPieceTouches(piece int) (ret []*connection) { } return } + +func (t *Torrent) connsAsSlice() (ret []*connection) { + for c := range t.conns { + ret = append(ret, c) + } + return +}