]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Move half-open tracking into per-torrent
authorMatt Joiner <anacrolix@gmail.com>
Sun, 16 Nov 2014 19:30:44 +0000 (13:30 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 16 Nov 2014 19:30:44 +0000 (13:30 -0600)
client.go
client_test.go
torrent.go
torrent_test.go

index 4a0bfd244d7b7cc0a463198940874cc7228a1dbc..89a7bffb761ffdc25bda76580b3a05f9e0918976 100644 (file)
--- a/client.go
+++ b/client.go
@@ -127,9 +127,9 @@ type Client struct {
        event sync.Cond
        quit  chan struct{}
 
-       halfOpen    int
        handshaking int
-       torrents    map[InfoHash]*torrent
+
+       torrents map[InfoHash]*torrent
 
        dataWaits map[*torrent][]dataWait
 }
@@ -149,7 +149,6 @@ func (cl *Client) WriteStatus(w io.Writer) {
        defer cl.mu.Unlock()
        fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
        fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
-       fmt.Fprintf(w, "Half open outgoing connections: %d\n", cl.halfOpen)
        fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
        if cl.dHT != nil {
                fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
@@ -373,18 +372,12 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
        if peer.Id == me.peerID {
                return
        }
-       addr := &net.TCPAddr{
-               IP:   peer.IP,
-               Port: peer.Port,
-       }
-       // Don't connect to the same address twice for the same torrent.
-       for _, c := range torrent.Conns {
-               if c.Socket.RemoteAddr().String() == addr.String() {
-                       duplicateConnsAvoided.Add(1)
-                       return
-               }
+       addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
+       if t.addrActive(addr) {
+               duplicateConnsAvoided.Add(1)
+               return
        }
-       me.halfOpen++
+       t.HalfOpen[addr] = struct{}{}
        go func() {
                // Binding to the listener address and dialing via net.Dialer gives
                // "address in use" error. It seems it's not possible to dial out from
@@ -401,10 +394,10 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
                go func() {
                        me.mu.Lock()
                        defer me.mu.Unlock()
-                       if me.halfOpen == 0 {
-                               panic("assert")
+                       if _, ok := t.HalfOpen[addr]; !ok {
+                               panic("invariant broken")
                        }
-                       me.halfOpen--
+                       delete(t.HalfOpen, addr)
                        me.openNewConns()
                }()
 
@@ -1078,10 +1071,10 @@ func (me *Client) openNewConns() {
                default:
                }
                for len(t.Peers) != 0 {
-                       if me.halfOpen >= me.halfOpenLimit {
+                       if len(t.HalfOpen) >= me.halfOpenLimit {
                                return
                        }
-                       if me.halfOpen+me.handshaking+len(t.Conns) >= socketsPerTorrent {
+                       if len(t.HalfOpen)+me.handshaking+len(t.Conns) >= socketsPerTorrent {
                                break
                        }
                        var (
@@ -1152,7 +1145,7 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e
 // Prepare a Torrent without any attachment to a Client. That means we can
 // initialize fields all fields that don't require the Client without locking
 // it.
-func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) {
+func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
        t = &torrent{
                InfoHash: ih,
                Peers:    make(map[peersKey]Peer, 2000),
@@ -1161,6 +1154,8 @@ func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) {
                ceasingNetworking: make(chan struct{}),
 
                gotMetainfo: make(chan *metainfo.MetaInfo, 1),
+
+               HalfOpen: make(map[string]struct{}, halfOpenLimit),
        }
        t.GotMetainfo = t.gotMetainfo
        t.Trackers = make([][]tracker.Client, len(announceList))
@@ -1191,7 +1186,7 @@ func (cl *Client) AddMagnet(uri string) (t *torrent, err error) {
        if err != nil {
                return
        }
-       t, err = newTorrent(m.InfoHash, [][]string{m.Trackers})
+       t, err = newTorrent(m.InfoHash, [][]string{m.Trackers}, cl.halfOpenLimit)
        if err != nil {
                return
        }
@@ -1245,7 +1240,7 @@ func (me *Client) addTorrent(t *torrent) (err error) {
 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
        var ih InfoHash
        CopyExact(&ih, metaInfo.Info.Hash)
-       t, err := newTorrent(ih, metaInfo.AnnounceList)
+       t, err := newTorrent(ih, metaInfo.AnnounceList, me.halfOpenLimit)
        if err != nil {
                return
        }
index 6287b65cfc6fe246772c291490b9cd11115016c5..3dcb6c34a1419d8793a9932f5c7509bbcb448480 100644 (file)
@@ -41,7 +41,7 @@ func TestTorrentInitialState(t *testing.T) {
        tor, err := newTorrent(func() (ih InfoHash) {
                util.CopyExact(ih[:], mi.Info.Hash)
                return
-       }(), nil)
+       }(), nil, 0)
        if err != nil {
                t.Fatal(err)
        }
index c4686edadec51cca491db1f28af95a2136dcca2f..3e42eec0d696d7acf06c44477b776e20d56daa2b 100644 (file)
@@ -58,9 +58,16 @@ type torrent struct {
        dataLock sync.RWMutex
        Data     mmap_span.MMapSpan
 
-       Info  *metainfo.Info
+       Info *metainfo.Info
+       // Active peer connections.
        Conns []*connection
+       // Set of addrs to which we're attempting to connect.
+       HalfOpen map[string]struct{}
+       // Reserve of peers to connect to. A peer can be both here and in the
+       // active connections if were told about the peer after connecting with
+       // them. That encourages us to reconnect to peers that are well known.
        Peers map[peersKey]Peer
+
        // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
        // mirror their respective URLs from the announce-list key.
        Trackers     [][]tracker.Client
@@ -72,6 +79,18 @@ type torrent struct {
        GotMetainfo <-chan *metainfo.MetaInfo
 }
 
+func (t *torrent) addrActive(addr string) bool {
+       if _, ok := t.HalfOpen[addr]; ok {
+               return true
+       }
+       for _, c := range t.Conns {
+               if c.Socket.RemoteAddr().String() == addr {
+                       return true
+               }
+       }
+       return false
+}
+
 func (t *torrent) worstConnsHeap() (wcs *worstConns) {
        wcs = &worstConns{
                c: append([]*connection{}, t.Conns...),
@@ -301,6 +320,7 @@ func (t *torrent) WriteStatus(w io.Writer) {
        }
        fmt.Fprintln(w)
        fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
+       fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
        fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
        sort.Sort(&worstConns{
                c: t.Conns,
index 302a97515c0f7d4d9dd7b853b1ca86fc25b9774f..eedb4beadd8a9274adb31428a41831220327d587 100644 (file)
@@ -44,7 +44,7 @@ func TestTorrentRequest(t *testing.T) {
 }
 
 func TestTorrentDoubleClose(t *testing.T) {
-       tt, err := newTorrent(InfoHash{}, nil)
+       tt, err := newTorrent(InfoHash{}, nil, 0)
        if err != nil {
                t.Fatal(err)
        }