// Active peer connections, running message stream loops. TODO: Make this
// open (not-closed) connections only.
- conns map[*connection]struct{}
+ conns map[*PeerConn]struct{}
maxEstablishedConns int
// Set of addrs to which we're attempting to connect. Connections are
// half-open until all handshakes are completed.
halfOpen map[string]Peer
- fastestConn *connection
+ fastestConn *PeerConn
// Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with
return false
}
-func (t *Torrent) unclosedConnsAsSlice() (ret []*connection) {
- ret = make([]*connection, 0, len(t.conns))
+func (t *Torrent) unclosedConnsAsSlice() (ret []*PeerConn) {
+ ret = make([]*PeerConn, 0, len(t.conns))
for c := range t.conns {
if !c.closed.IsSet() {
ret = append(ret, c)
for conn := range t.conns {
if err := conn.setNumPieces(t.numPieces()); err != nil {
t.logger.Printf("closing connection: %s", err)
- conn.Close()
+ conn.close()
}
}
for i := range t.pieces {
return metadataPieceSize(len(t.metadataBytes), piece)
}
-func (t *Torrent) newMetadataExtensionMessage(c *connection, msgType int, piece int, data []byte) pp.Message {
+func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType int, piece int, data []byte) pp.Message {
d := map[string]int{
"msg_type": msgType,
"piece": piece,
slices.Sort(conns, worseConn)
for i, c := range conns {
fmt.Fprintf(w, "%2d. ", i+1)
- c.WriteStatus(w, t)
+ c.writeStatus(w, t)
}
}
t.storageLock.Unlock()
}
for conn := range t.conns {
- conn.Close()
+ conn.close()
}
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
// for the longest. A bad connection is one that usually sends us unwanted
// pieces, or has been in worser half of the established connections for more
// than a minute.
-func (t *Torrent) worstBadConn() *connection {
+func (t *Torrent) worstBadConn() *PeerConn {
wcs := worseConnSlice{t.unclosedConnsAsSlice()}
heap.Init(&wcs)
for wcs.Len() != 0 {
- c := heap.Pop(&wcs).(*connection)
+ c := heap.Pop(&wcs).(*PeerConn)
if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
return c
}
func (t *Torrent) numReceivedConns() (ret int) {
for c := range t.conns {
- if c.Discovery == peerSourceIncoming {
+ if c.Discovery == PeerSourceIncoming {
ret++
}
}
}
// Returns true if connection is removed from torrent.Conns.
-func (t *Torrent) deleteConnection(c *connection) (ret bool) {
+func (t *Torrent) deleteConnection(c *PeerConn) (ret bool) {
if !c.closed.IsSet() {
panic("connection is not closed")
// There are behaviours prevented by the closed state that will fail
//}
}
-func (t *Torrent) dropConnection(c *connection) {
+func (t *Torrent) dropConnection(c *PeerConn) {
t.cl.event.Broadcast()
- c.Close()
+ c.close()
if t.deleteConnection(c) {
t.openNewConns()
}
}
t.addPeer(Peer{
Addr: ipPortAddr{cp.IP, cp.Port},
- Source: peerSourceDhtGetPeers,
+ Source: PeerSourceDhtGetPeers,
})
}
cl.unlock()
// Reconcile bytes transferred before connection was associated with a
// torrent.
-func (t *Torrent) reconcileHandshakeStats(c *connection) {
+func (t *Torrent) reconcileHandshakeStats(c *PeerConn) {
if c._stats != (ConnStats{
// Handshakes should only increment these fields:
BytesWritten: c._stats.BytesWritten,
}
// Returns true if the connection is added.
-func (t *Torrent) addConnection(c *connection) (err error) {
+func (t *Torrent) addConnection(c *PeerConn) (err error) {
defer func() {
if err == nil {
torrent.Add("added connections", 1)
continue
}
if left, ok := c.hasPreferredNetworkOver(c0); ok && left {
- c0.Close()
+ c0.close()
t.deleteConnection(c0)
} else {
return errors.New("existing connection preferred")
if c == nil {
return errors.New("don't want conns")
}
- c.Close()
+ c.close()
t.deleteConnection(c)
}
if len(t.conns) >= t.maxEstablishedConns {
t.maxEstablishedConns = max
wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
- t.dropConnection(wcs.Pop().(*connection))
+ t.dropConnection(wcs.Pop().(*PeerConn))
}
t.openNewConns()
return oldMax
c.stats().incrementPiecesDirtiedBad()
}
- bannableTouchers := make([]*connection, 0, len(p.dirtiers))
+ bannableTouchers := make([]*PeerConn, 0, len(p.dirtiers))
for c := range p.dirtiers {
if !c.trusted {
bannableTouchers = append(bannableTouchers, c)
if len(bannableTouchers) >= 1 {
c := bannableTouchers[0]
t.cl.banPeerIP(c.remoteIp())
- c.Drop()
+ c.drop()
}
}
t.onIncompletePiece(piece)
t.pendAllChunkSpecs(piece)
t.cancelRequestsForPiece(piece)
for conn := range t.conns {
- conn.Have(piece)
+ conn.have(piece)
}
}
// for c := range t.conns {
// if c.sentHave(piece) {
- // c.Drop()
+ // c.drop()
// }
// }
for conn := range t.conns {
- if conn.PeerHasPiece(piece) {
+ if conn.peerHasPiece(piece) {
conn.updateRequests()
}
}
}
}
-func (t *Torrent) connsAsSlice() (ret []*connection) {
+func (t *Torrent) connsAsSlice() (ret []*PeerConn) {
for c := range t.conns {
ret = append(ret, c)
}
go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
}
-// Adds each a trusted, pending peer for each of the Client's addresses.
+// Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
+// quickly make one Client visible to the Torrent of another Client.
func (t *Torrent) AddClientPeer(cl *Client) {
t.AddPeers(func() (ps []Peer) {
for _, la := range cl.ListenAddrs() {
cb.t.cl.lock()
defer cb.t.cl.unlock()
for cn := range cb.t.conns {
- if cn.PeerHasPiece(pieceIndex(r.Index)) {
+ if cn.peerHasPiece(pieceIndex(r.Index)) {
cn.updateRequests()
}
}