From: Matt Joiner Date: Sun, 16 Nov 2014 19:30:44 +0000 (-0600) Subject: Move half-open tracking into per-torrent X-Git-Tag: v1.0.0~1542 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=e37d36986491041f92698419b6f8611e205cba5b;p=btrtrc.git Move half-open tracking into per-torrent --- diff --git a/client.go b/client.go index 4a0bfd24..89a7bffb 100644 --- a/client.go +++ b/client.go @@ -127,9 +127,9 @@ type Client struct { event sync.Cond quit chan struct{} - halfOpen int handshaking int - torrents map[InfoHash]*torrent + + torrents map[InfoHash]*torrent dataWaits map[*torrent][]dataWait } @@ -149,7 +149,6 @@ func (cl *Client) WriteStatus(w io.Writer) { defer cl.mu.Unlock() fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr()) fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID) - fmt.Fprintf(w, "Half open outgoing connections: %d\n", cl.halfOpen) fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking) if cl.dHT != nil { fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes()) @@ -373,18 +372,12 @@ func (me *Client) initiateConn(peer Peer, t *torrent) { if peer.Id == me.peerID { return } - addr := &net.TCPAddr{ - IP: peer.IP, - Port: peer.Port, - } - // Don't connect to the same address twice for the same torrent. - for _, c := range torrent.Conns { - if c.Socket.RemoteAddr().String() == addr.String() { - duplicateConnsAvoided.Add(1) - return - } + addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port)) + if t.addrActive(addr) { + duplicateConnsAvoided.Add(1) + return } - me.halfOpen++ + t.HalfOpen[addr] = struct{}{} go func() { // Binding to the listener address and dialing via net.Dialer gives // "address in use" error. It seems it's not possible to dial out from @@ -401,10 +394,10 @@ func (me *Client) initiateConn(peer Peer, t *torrent) { go func() { me.mu.Lock() defer me.mu.Unlock() - if me.halfOpen == 0 { - panic("assert") + if _, ok := t.HalfOpen[addr]; !ok { + panic("invariant broken") } - me.halfOpen-- + delete(t.HalfOpen, addr) me.openNewConns() }() @@ -1078,10 +1071,10 @@ func (me *Client) openNewConns() { default: } for len(t.Peers) != 0 { - if me.halfOpen >= me.halfOpenLimit { + if len(t.HalfOpen) >= me.halfOpenLimit { return } - if me.halfOpen+me.handshaking+len(t.Conns) >= socketsPerTorrent { + if len(t.HalfOpen)+me.handshaking+len(t.Conns) >= socketsPerTorrent { break } var ( @@ -1152,7 +1145,7 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e // Prepare a Torrent without any attachment to a Client. That means we can // initialize fields all fields that don't require the Client without locking // it. -func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) { +func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) { t = &torrent{ InfoHash: ih, Peers: make(map[peersKey]Peer, 2000), @@ -1161,6 +1154,8 @@ func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) { ceasingNetworking: make(chan struct{}), gotMetainfo: make(chan *metainfo.MetaInfo, 1), + + HalfOpen: make(map[string]struct{}, halfOpenLimit), } t.GotMetainfo = t.gotMetainfo t.Trackers = make([][]tracker.Client, len(announceList)) @@ -1191,7 +1186,7 @@ func (cl *Client) AddMagnet(uri string) (t *torrent, err error) { if err != nil { return } - t, err = newTorrent(m.InfoHash, [][]string{m.Trackers}) + t, err = newTorrent(m.InfoHash, [][]string{m.Trackers}, cl.halfOpenLimit) if err != nil { return } @@ -1245,7 +1240,7 @@ func (me *Client) addTorrent(t *torrent) (err error) { func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) { var ih InfoHash CopyExact(&ih, metaInfo.Info.Hash) - t, err := newTorrent(ih, metaInfo.AnnounceList) + t, err := newTorrent(ih, metaInfo.AnnounceList, me.halfOpenLimit) if err != nil { return } diff --git a/client_test.go b/client_test.go index 6287b65c..3dcb6c34 100644 --- a/client_test.go +++ b/client_test.go @@ -41,7 +41,7 @@ func TestTorrentInitialState(t *testing.T) { tor, err := newTorrent(func() (ih InfoHash) { util.CopyExact(ih[:], mi.Info.Hash) return - }(), nil) + }(), nil, 0) if err != nil { t.Fatal(err) } diff --git a/torrent.go b/torrent.go index c4686eda..3e42eec0 100644 --- a/torrent.go +++ b/torrent.go @@ -58,9 +58,16 @@ type torrent struct { dataLock sync.RWMutex Data mmap_span.MMapSpan - Info *metainfo.Info + Info *metainfo.Info + // Active peer connections. Conns []*connection + // Set of addrs to which we're attempting to connect. + HalfOpen map[string]struct{} + // Reserve of peers to connect to. A peer can be both here and in the + // active connections if were told about the peer after connecting with + // them. That encourages us to reconnect to peers that are well known. Peers map[peersKey]Peer + // BEP 12 Multitracker Metadata Extension. The tracker.Client instances // mirror their respective URLs from the announce-list key. Trackers [][]tracker.Client @@ -72,6 +79,18 @@ type torrent struct { GotMetainfo <-chan *metainfo.MetaInfo } +func (t *torrent) addrActive(addr string) bool { + if _, ok := t.HalfOpen[addr]; ok { + return true + } + for _, c := range t.Conns { + if c.Socket.RemoteAddr().String() == addr { + return true + } + } + return false +} + func (t *torrent) worstConnsHeap() (wcs *worstConns) { wcs = &worstConns{ c: append([]*connection{}, t.Conns...), @@ -301,6 +320,7 @@ func (t *torrent) WriteStatus(w io.Writer) { } fmt.Fprintln(w) fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers)) + fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen)) fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns)) sort.Sort(&worstConns{ c: t.Conns, diff --git a/torrent_test.go b/torrent_test.go index 302a9751..eedb4bea 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -44,7 +44,7 @@ func TestTorrentRequest(t *testing.T) { } func TestTorrentDoubleClose(t *testing.T) { - tt, err := newTorrent(InfoHash{}, nil) + tt, err := newTorrent(InfoHash{}, nil, 0) if err != nil { t.Fatal(err) }