if !cl.addConnection(t, c) {
return
}
- defer cl.dropConnection(t, c)
+ defer t.dropConnection(c)
go c.writer(time.Minute)
cl.sendInitialMessages(c, t)
err = cl.connectionLoop(t, c)
if !c.PeerInterested {
return
}
- seeding := cl.seeding(t)
+ seeding := t.seeding()
if !seeding && !t.connHasWantedPieces(c) {
return
}
}
}
-// Returns true if connection is removed from torrent.Conns.
-func (cl *Client) deleteConnection(t *Torrent, c *connection) bool {
- for i0, _c := range t.conns {
- if _c != c {
- continue
- }
- i1 := len(t.conns) - 1
- if i0 != i1 {
- t.conns[i0] = t.conns[i1]
- }
- t.conns = t.conns[:i1]
- return true
- }
- return false
-}
-
-func (cl *Client) dropConnection(t *Torrent, c *connection) {
- cl.event.Broadcast()
- c.Close()
- if cl.deleteConnection(t, c) {
- cl.openNewConns(t)
- }
-}
-
// Returns true if the connection is added.
func (cl *Client) addConnection(t *Torrent, c *connection) bool {
if cl.closed.IsSet() {
return false
}
- select {
- case <-t.ceasingNetworking:
- return false
- default:
- }
if !cl.wantConns(t) {
return false
}
log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
}
c.Close()
- cl.deleteConnection(t, c)
+ t.deleteConnection(c)
}
if len(t.conns) >= socketsPerTorrent {
panic(len(t.conns))
if !t.haveInfo() {
return c.supportsExtension("ut_metadata")
}
- if cl.seeding(t) {
+ if t.seeding() {
return c.PeerInterested
}
return t.connHasWantedPieces(c)
}
func (cl *Client) wantConns(t *Torrent) bool {
- if !cl.seeding(t) && !t.needData() {
+ if !t.seeding() && !t.needData() {
return false
}
if len(t.conns) < socketsPerTorrent {
}
func (cl *Client) openNewConns(t *Torrent) {
- select {
- case <-t.ceasingNetworking:
- return
- default:
- }
for len(t.peers) != 0 {
if !cl.wantConns(t) {
return
chunkSize: defaultChunkSize,
peers: make(map[peersKey]Peer),
- closing: make(chan struct{}),
- ceasingNetworking: make(chan struct{}),
-
halfOpen: make(map[string]struct{}),
pieceStateChanges: pubsub.NewPubSub(),
return
}
-// Returns true when peers are required, or false if the torrent is closing.
-func (cl *Client) waitWantPeers(t *Torrent) bool {
- cl.mu.Lock()
- defer cl.mu.Unlock()
- for {
- select {
- case <-t.ceasingNetworking:
- return false
- default:
- }
- if len(t.peers) > torrentPeersLowWater {
- goto wait
- }
- if t.needData() || cl.seeding(t) {
- return true
- }
- wait:
- t.wantPeers.Wait()
- }
-}
-
-// Returns whether the client should make effort to seed the torrent.
-func (cl *Client) seeding(t *Torrent) bool {
- if cl.config.NoUpload {
- return false
- }
- if !cl.config.Seed {
- return false
- }
- if t.needData() {
- return false
- }
- return true
-}
-
func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
- for cl.waitWantPeers(t) {
+ for t.waitWantPeers() {
// log.Printf("getting peers for %q from DHT", t)
ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
if err != nil {
if numPeers >= torrentPeersHighWater {
break getPeers
}
- case <-t.ceasingNetworking:
+ case <-t.closed.LockedChan(&cl.mu):
ps.Close()
return
}
PeerId: cl.peerID,
InfoHash: t.infoHash,
}
- if !cl.waitWantPeers(t) {
+ if !t.waitWantPeers() {
return
}
cl.mu.RLock()
req.Event = tracker.None
}
newAnnounce:
- for cl.waitWantPeers(t) {
+ for t.waitWantPeers() {
cl.mu.RLock()
req.Left = t.bytesLeftAnnounce()
trackers = t.trackers
} else if len(touchers) != 0 {
log.Printf("dropping %d conns that touched piece", len(touchers))
for _, c := range touchers {
- cl.dropConnection(t, c)
+ t.dropConnection(c)
}
}
cl.pieceChanged(t, piece)
cl.event.Wait()
}
p.QueuedForHash = false
- if t.isClosed() || t.pieceComplete(piece) {
+ if t.closed.IsSet() || t.pieceComplete(piece) {
t.updatePiecePriority(piece)
t.publishPieceChange(piece)
return
cl.mu.Unlock()
sum := t.hashPiece(piece)
cl.mu.Lock()
- select {
- case <-t.closing:
- return
- default:
- }
p.Hashing = false
cl.pieceHashed(t, piece, sum == p.Hash)
}
type Torrent struct {
cl *Client
- closing chan struct{}
-
- // Closed when no more network activity is desired. This includes
- // announcing, and communicating with peers.
- ceasingNetworking chan struct{}
-
+ closed missinggo.Event
infoHash metainfo.Hash
pieces []piece
// Values are the piece indices that changed.
return
}
-func (t *Torrent) ceaseNetworking() {
- select {
- case <-t.ceasingNetworking:
- return
- default:
- }
- close(t.ceasingNetworking)
- for _, c := range t.conns {
- c.Close()
- }
-}
-
func (t *Torrent) addPeer(p Peer, cl *Client) {
cl.openNewConns(t)
if len(t.peers) >= torrentPeersHighWater {
return t.completedPieces.Len()
}
-// Safe to call with or without client lock.
-func (t *Torrent) isClosed() bool {
- select {
- case <-t.closing:
- return true
- default:
- return false
- }
-}
-
func (t *Torrent) close() (err error) {
- if t.isClosed() {
- return
- }
- t.ceaseNetworking()
- close(t.closing)
+ t.closed.Set()
if c, ok := t.storage.(io.Closer); ok {
c.Close()
}
}
return t.setInfoBytes(b)
}
+
+// Returns true if connection is removed from torrent.Conns.
+func (t *Torrent) deleteConnection(c *connection) bool {
+ for i0, _c := range t.conns {
+ if _c != c {
+ continue
+ }
+ i1 := len(t.conns) - 1
+ if i0 != i1 {
+ t.conns[i0] = t.conns[i1]
+ }
+ t.conns = t.conns[:i1]
+ return true
+ }
+ return false
+}
+
+func (t *Torrent) dropConnection(c *connection) {
+ t.cl.event.Broadcast()
+ c.Close()
+ if t.deleteConnection(c) {
+ t.openNewConns()
+ }
+}
+
+// Returns true when peers are required, or false if the torrent is closing.
+func (t *Torrent) waitWantPeers() bool {
+ t.cl.mu.Lock()
+ defer t.cl.mu.Unlock()
+ for {
+ if t.closed.IsSet() {
+ return false
+ }
+ if len(t.peers) > torrentPeersLowWater {
+ goto wait
+ }
+ if t.needData() || t.seeding() {
+ return true
+ }
+ wait:
+ t.wantPeers.Wait()
+ }
+}
+
+// Returns whether the client should make effort to seed the torrent.
+func (t *Torrent) seeding() bool {
+ cl := t.cl
+ if cl.config.NoUpload {
+ return false
+ }
+ if !cl.config.Seed {
+ return false
+ }
+ if t.needData() {
+ return false
+ }
+ return true
+}