event sync.Cond
quit chan struct{}
- halfOpen int
handshaking int
- torrents map[InfoHash]*torrent
+
+ torrents map[InfoHash]*torrent
dataWaits map[*torrent][]dataWait
}
defer cl.mu.Unlock()
fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
- fmt.Fprintf(w, "Half open outgoing connections: %d\n", cl.halfOpen)
fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
if cl.dHT != nil {
fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
if peer.Id == me.peerID {
return
}
- addr := &net.TCPAddr{
- IP: peer.IP,
- Port: peer.Port,
- }
- // Don't connect to the same address twice for the same torrent.
- for _, c := range torrent.Conns {
- if c.Socket.RemoteAddr().String() == addr.String() {
- duplicateConnsAvoided.Add(1)
- return
- }
+ addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
+ if t.addrActive(addr) {
+ duplicateConnsAvoided.Add(1)
+ return
}
- me.halfOpen++
+ t.HalfOpen[addr] = struct{}{}
go func() {
// Binding to the listener address and dialing via net.Dialer gives
// "address in use" error. It seems it's not possible to dial out from
go func() {
me.mu.Lock()
defer me.mu.Unlock()
- if me.halfOpen == 0 {
- panic("assert")
+ if _, ok := t.HalfOpen[addr]; !ok {
+ panic("invariant broken")
}
- me.halfOpen--
+ delete(t.HalfOpen, addr)
me.openNewConns()
}()
default:
}
for len(t.Peers) != 0 {
- if me.halfOpen >= me.halfOpenLimit {
+ if len(t.HalfOpen) >= me.halfOpenLimit {
return
}
- if me.halfOpen+me.handshaking+len(t.Conns) >= socketsPerTorrent {
+ if len(t.HalfOpen)+me.handshaking+len(t.Conns) >= socketsPerTorrent {
break
}
var (
// Prepare a Torrent without any attachment to a Client. That means we can
// initialize fields all fields that don't require the Client without locking
// it.
-func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) {
+func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
t = &torrent{
InfoHash: ih,
Peers: make(map[peersKey]Peer, 2000),
ceasingNetworking: make(chan struct{}),
gotMetainfo: make(chan *metainfo.MetaInfo, 1),
+
+ HalfOpen: make(map[string]struct{}, halfOpenLimit),
}
t.GotMetainfo = t.gotMetainfo
t.Trackers = make([][]tracker.Client, len(announceList))
if err != nil {
return
}
- t, err = newTorrent(m.InfoHash, [][]string{m.Trackers})
+ t, err = newTorrent(m.InfoHash, [][]string{m.Trackers}, cl.halfOpenLimit)
if err != nil {
return
}
func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
var ih InfoHash
CopyExact(&ih, metaInfo.Info.Hash)
- t, err := newTorrent(ih, metaInfo.AnnounceList)
+ t, err := newTorrent(ih, metaInfo.AnnounceList, me.halfOpenLimit)
if err != nil {
return
}
dataLock sync.RWMutex
Data mmap_span.MMapSpan
- Info *metainfo.Info
+ Info *metainfo.Info
+ // Active peer connections.
Conns []*connection
+ // Set of addrs to which we're attempting to connect.
+ HalfOpen map[string]struct{}
+ // Reserve of peers to connect to. A peer can be both here and in the
+ // active connections if were told about the peer after connecting with
+ // them. That encourages us to reconnect to peers that are well known.
Peers map[peersKey]Peer
+
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
// mirror their respective URLs from the announce-list key.
Trackers [][]tracker.Client
GotMetainfo <-chan *metainfo.MetaInfo
}
+func (t *torrent) addrActive(addr string) bool {
+ if _, ok := t.HalfOpen[addr]; ok {
+ return true
+ }
+ for _, c := range t.Conns {
+ if c.Socket.RemoteAddr().String() == addr {
+ return true
+ }
+ }
+ return false
+}
+
func (t *torrent) worstConnsHeap() (wcs *worstConns) {
wcs = &worstConns{
c: append([]*connection{}, t.Conns...),
}
fmt.Fprintln(w)
fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
+ fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
sort.Sort(&worstConns{
c: t.Conns,