for _, cn := range t.Conns {
me.replenishConnRequests(t, cn)
}
+ me.openNewConns(t)
return nil
}
noUpload: cfg.NoUpload,
disableTrackers: cfg.DisableTrackers,
downloadStrategy: cfg.DownloadStrategy,
- halfOpenLimit: 100,
+ halfOpenLimit: socketsPerTorrent,
dataDir: cfg.DataDir,
disableUTP: cfg.DisableUTP,
panic("invariant broken")
}
delete(t.HalfOpen, addr)
- me.openNewConns()
+ me.openNewConns(t)
}()
if res.Conn == nil {
return
torrent.Conns[i0] = torrent.Conns[i1]
}
torrent.Conns = torrent.Conns[:i1]
- me.openNewConns()
+ me.openNewConns(torrent)
return
}
panic("connection not found")
}
}
t.Conns = append(t.Conns, c)
+ // TODO: This should probably be done by a routine that kills off bad
+ // connections, and extra connections killed here instead.
if len(t.Conns) > socketsPerTorrent {
wcs := t.worstConnsHeap()
heap.Pop(wcs).(*connection).Close()
return true
}
-func (me *Client) openNewConns() {
- for _, t := range me.torrents {
- select {
- case <-t.ceasingNetworking:
- continue
- default:
+func (me *Client) openNewConns(t *torrent) {
+ select {
+ case <-t.ceasingNetworking:
+ return
+ default:
+ }
+ if t.haveInfo() && !me.downloadStrategy.PendingData(t) {
+ return
+ }
+ for len(t.Peers) != 0 {
+ if len(t.Conns) >= socketsPerTorrent {
+ break
}
- for len(t.Peers) != 0 {
- if len(t.HalfOpen) >= me.halfOpenLimit {
- return
- }
- if len(t.HalfOpen)+me.handshaking+len(t.Conns) >= socketsPerTorrent {
- break
- }
- var (
- k peersKey
- p Peer
- )
- for k, p = range t.Peers {
- break
- }
- delete(t.Peers, k)
- me.initiateConn(p, t)
+ if len(t.HalfOpen)+me.handshaking >= me.halfOpenLimit {
+ break
+ }
+ var (
+ k peersKey
+ p Peer
+ )
+ for k, p = range t.Peers {
+ break
}
+ delete(t.Peers, k)
+ me.initiateConn(p, t)
}
+ t.wantPeers.Broadcast()
}
// Adds peers to the swarm for the torrent corresponding to infoHash.
if t == nil {
return errors.New("no such torrent")
}
- // for _, p := range peers {
- // log.Printf("adding peer for %q: %s", infoHash, p)
- // }
t.AddPeers(peers)
- me.openNewConns()
+ me.openNewConns(t)
return nil
}
HalfOpen: make(map[string]struct{}, halfOpenLimit),
}
+ t.wantPeers.L = &t.stateMu
t.GotMetainfo = t.gotMetainfo
- t.Trackers = make([][]tracker.Client, len(announceList))
- for tierIndex := range announceList {
- tier := t.Trackers[tierIndex]
- for _, url := range announceList[tierIndex] {
- tr, err := tracker.New(url)
- if err != nil {
- log.Print(err)
- continue
+ t.addTrackers(announceList)
+ return
+}
+
+// The trackers within each tier must be shuffled before use.
+// http://stackoverflow.com/a/12267471/149482
+// http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
+func shuffleTier(tier []tracker.Client) {
+ for i := range tier {
+ j := mathRand.Intn(i + 1)
+ tier[i], tier[j] = tier[j], tier[i]
+ }
+}
+
+func copyTrackers(base [][]tracker.Client) (copy [][]tracker.Client) {
+ for _, tier := range base {
+ copy = append(copy, tier)
+ }
+ return
+}
+
+func mergeTier(tier []tracker.Client, newURLs []string) []tracker.Client {
+nextURL:
+ for _, url := range newURLs {
+ for _, tr := range tier {
+ if tr.URL() == url {
+ continue nextURL
}
- tier = append(tier, tr)
}
- // The trackers within each tier must be shuffled before use.
- // http://stackoverflow.com/a/12267471/149482
- // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
- for i := range tier {
- j := mathRand.Intn(i + 1)
- tier[i], tier[j] = tier[j], tier[i]
+ tr, err := tracker.New(url)
+ if err != nil {
+ log.Printf("error creating tracker client for %q: %s", url, err)
+ continue
}
- t.Trackers[tierIndex] = tier
+ tier = append(tier, tr)
}
- return
+ return tier
+}
+
+func (t *torrent) addTrackers(announceList [][]string) {
+ newTrackers := copyTrackers(t.Trackers)
+ for tierIndex, tier := range announceList {
+ if tierIndex < len(newTrackers) {
+ newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
+ } else {
+ newTrackers = append(newTrackers, mergeTier(nil, tier))
+ }
+ }
+ t.Trackers = newTrackers
}
type Torrent struct {
return me.AddTorrent(mi)
}
-func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
+// Returns true when peers are required, or false if the torrent is closing.
+func (cl *Client) waitWantPeers(t *torrent) bool {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ t.stateMu.Lock()
+ defer t.stateMu.Unlock()
for {
+ select {
+ case <-t.ceasingNetworking:
+ return false
+ default:
+ }
+ if len(t.Peers) < socketsPerTorrent*5 {
+ return true
+ }
+ cl.mu.Unlock()
+ t.wantPeers.Wait()
+ t.stateMu.Unlock()
+ cl.mu.Lock()
+ t.stateMu.Lock()
+ }
+}
+
+func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
+ for cl.waitWantPeers(t) {
+ log.Printf("announcing torrent %q to DHT", t)
ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
if err != nil {
log.Printf("error getting peers from dht: %s", err)
return
}
- nextScrape := time.After(1 * time.Minute)
getPeers:
for {
select {
- case <-nextScrape:
- break getPeers
case v, ok := <-ps.Values:
if !ok {
break getPeers
}
}
ps.Close()
+ log.Printf("finished DHT peer scrape for %s", t)
// After a GetPeers, we can announce on the best nodes that gave us an
// announce token.
if port != 0 {
// We can't allow the port to be implied as long as the UTP and
// DHT ports are different.
- cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
+ err := cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
+ if err != nil {
+ log.Printf("error announcing torrent to DHT: %s", err)
+ } else {
+ log.Printf("announced %q to DHT", t)
+ }
}
}
}
InfoHash: t.InfoHash,
}
newAnnounce:
- for {
- select {
- case <-t.ceasingNetworking:
- return
- default:
- }
+ for cl.waitWantPeers(t) {
cl.mu.Lock()
req.Left = t.BytesLeft()
+ trackers := t.Trackers
cl.mu.Unlock()
- for _, tier := range t.Trackers {
+ for _, tier := range trackers {
for trIndex, tr := range tier {
if err := tr.Connect(); err != nil {
log.Print(err)