]> Sergey Matveev's repositories - btrtrc.git/blobdiff - client.go
Merge branch 'master' of https://github.com/lovedboy/torrent
[btrtrc.git] / client.go
index 3759bfd1dda63546de0e9d7d88084164fcbb301a..e03e3a3c698fa721fcf10102e403ee27a6330b60 100644 (file)
--- a/client.go
+++ b/client.go
@@ -33,7 +33,6 @@ import (
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/storage"
-       "github.com/anacrolix/torrent/tracker"
 )
 
 // Currently doesn't really queue, but should in the future.
@@ -74,6 +73,7 @@ type Client struct {
        // include ourselves if we end up trying to connect to our own address
        // through legitimate channels.
        dopplegangerAddrs map[string]struct{}
+       badPeerIPs        map[string]struct{}
 
        defaultStorage storage.Client
 
@@ -157,6 +157,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
                fmt.Fprintln(w, "Not listening!")
        }
        fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
+       fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPs))
        if cl.dHT != nil {
                dhtStats := cl.dHT.Stats()
                fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
@@ -397,12 +398,12 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) {
                        acceptTCP.Add(1)
                }
                cl.mu.RLock()
-               doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
-               _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
+               reject := cl.badPeerIPPort(
+                       missinggo.AddrIP(conn.RemoteAddr()),
+                       missinggo.AddrPort(conn.RemoteAddr()))
                cl.mu.RUnlock()
-               if blocked || doppleganger {
+               if reject {
                        acceptReject.Add(1)
-                       // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
                        conn.Close()
                        continue
                }
@@ -474,13 +475,11 @@ func (cl *Client) initiateConn(peer Peer, t *Torrent) {
        if peer.Id == cl.peerID {
                return
        }
-       addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
-       if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
-               duplicateConnsAvoided.Add(1)
+       if cl.badPeerIPPort(peer.IP, peer.Port) {
                return
        }
-       if r, ok := cl.ipBlockRange(peer.IP); ok {
-               log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
+       addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
+       if t.addrActive(addr) {
                return
        }
        t.halfOpen[addr] = struct{}{}
@@ -876,8 +875,6 @@ func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainf
 
 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
        if c.PeerID == cl.peerID {
-               // Only if we initiated the connection is the remote address a
-               // listen addr for a doppleganger.
                connsToSelf.Add(1)
                addr := c.conn.RemoteAddr().String()
                cl.dopplegangerAddrs[addr] = struct{}{}
@@ -904,6 +901,10 @@ func (cl *Client) runReceivedConn(c *connection) {
        cl.mu.Lock()
        defer cl.mu.Unlock()
        if c.PeerID == cl.peerID {
+               // Because the remote address is not necessarily the same as its
+               // client's torrent listen address, we won't record the remote address
+               // as a doppleganger. Instead, the initiator can record *us* as the
+               // doppleganger.
                return
        }
        cl.runHandshookConn(c, t)
@@ -1375,6 +1376,7 @@ func (cl *Client) wantConns(t *Torrent) bool {
 }
 
 func (cl *Client) openNewConns(t *Torrent) {
+       defer t.updateWantPeersEvent()
        for len(t.peers) != 0 {
                if !cl.wantConns(t) {
                        return
@@ -1392,22 +1394,27 @@ func (cl *Client) openNewConns(t *Torrent) {
                delete(t.peers, k)
                cl.initiateConn(p, t)
        }
-       t.wantPeers.Broadcast()
+}
+
+func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
+       if port == 0 {
+               return true
+       }
+       if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
+               return true
+       }
+       if _, ok := cl.ipBlockRange(ip); ok {
+               return true
+       }
+       if _, ok := cl.badPeerIPs[ip.String()]; ok {
+               return true
+       }
+       return false
 }
 
 func (cl *Client) addPeers(t *Torrent, peers []Peer) {
        for _, p := range peers {
-               if cl.dopplegangerAddr(net.JoinHostPort(
-                       p.IP.String(),
-                       strconv.FormatInt(int64(p.Port), 10),
-               )) {
-                       continue
-               }
-               if _, ok := cl.ipBlockRange(p.IP); ok {
-                       continue
-               }
-               if p.Port == 0 {
-                       // The spec says to scrub these yourselves. Fine.
+               if cl.badPeerIPPort(p.IP, p.Port) {
                        continue
                }
                t.addPeer(p, cl)
@@ -1429,7 +1436,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
 
                storageOpener: cl.defaultStorage,
        }
-       t.wantPeers.L = &cl.mu
        return
 }
 
@@ -1450,26 +1456,6 @@ func shuffleTier(tier trackerTier) {
        }
 }
 
-func copyTrackers(base []trackerTier) (copy []trackerTier) {
-       for _, tier := range base {
-               copy = append(copy, append(trackerTier(nil), tier...))
-       }
-       return
-}
-
-func mergeTier(tier trackerTier, newURLs []string) trackerTier {
-nextURL:
-       for _, url := range newURLs {
-               for _, trURL := range tier {
-                       if trURL == url {
-                               continue nextURL
-                       }
-               }
-               tier = append(tier, url)
-       }
-       return tier
-}
-
 // A file-like handle to some torrent data resource.
 type Handle interface {
        io.Reader
@@ -1513,10 +1499,8 @@ func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
                DisplayName: mi.Info.Name,
                InfoHash:    mi.Info.Hash(),
        }
-       if len(spec.Trackers) == 0 {
-               spec.Trackers = [][]string{[]string{mi.Announce}}
-       } else {
-               spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
+       if spec.Trackers == nil && mi.Announce != "" {
+               spec.Trackers = [][]string{{mi.Announce}}
        }
        return
 }
@@ -1530,13 +1514,11 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
        }
        new = true
        t = cl.newTorrent(infoHash)
-       if !cl.config.DisableTrackers {
-               go cl.announceTorrentTrackers(t)
-       }
        if cl.dHT != nil {
                go cl.announceTorrentDHT(t, true)
        }
        cl.torrents[infoHash] = t
+       t.updateWantPeersEvent()
        return
 }
 
@@ -1580,7 +1562,12 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
 }
 
 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
-       for t.waitWantPeers() {
+       for {
+               select {
+               case <-t.wantPeersEvent.LockedChan(&cl.mu):
+               case <-t.closed.LockedChan(&cl.mu):
+                       return
+               }
                // log.Printf("getting peers for %q from DHT", t)
                ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
                if err != nil {
@@ -1630,128 +1617,30 @@ func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
        }
 }
 
-func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
-       url_, err := url.Parse(trRawURL)
+func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
+       _url, err := url.Parse(announceURL)
        if err != nil {
                return
        }
-       host, _, err := net.SplitHostPort(url_.Host)
-       if err != nil {
-               host = url_.Host
+       hmp := missinggo.SplitHostMaybePort(_url.Host)
+       if hmp.Err != nil {
+               err = hmp.Err
+               return
        }
-       addr, err := net.ResolveIPAddr("ip", host)
+       addr, err := net.ResolveIPAddr("ip", hmp.Host)
        if err != nil {
                return
        }
        cl.mu.RLock()
        _, blocked = cl.ipBlockRange(addr.IP)
        cl.mu.RUnlock()
+       host = _url.Host
+       hmp.Host = addr.String()
+       _url.Host = hmp.String()
+       urlToUse = _url.String()
        return
 }
 
-func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) (interval time.Duration, err error) {
-       blocked, err := cl.trackerBlockedUnlocked(tr)
-       if err != nil {
-               err = fmt.Errorf("error determining if tracker blocked: %s", err)
-               return
-       }
-       if blocked {
-               err = errors.New("tracker has blocked IP")
-               return
-       }
-       resp, err := tracker.Announce(tr, req)
-       if err != nil {
-               return
-       }
-       var peers []Peer
-       for _, peer := range resp.Peers {
-               peers = append(peers, Peer{
-                       IP:   peer.IP,
-                       Port: peer.Port,
-               })
-       }
-       t.AddPeers(peers)
-       interval = time.Second * time.Duration(resp.Interval)
-       return
-}
-
-func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
-       oks := make(chan bool)
-       outstanding := 0
-       for _, tier := range trackers {
-               for _, tr := range tier {
-                       outstanding++
-                       go func(tr string) {
-                               _, err := cl.announceTorrentSingleTracker(tr, req, t)
-                               oks <- err == nil
-                       }(tr)
-               }
-       }
-       for outstanding > 0 {
-               ok := <-oks
-               outstanding--
-               if ok {
-                       atLeastOne = true
-               }
-       }
-       return
-}
-
-// Announce torrent to its trackers.
-func (cl *Client) announceTorrentTrackers(t *Torrent) {
-       req := tracker.AnnounceRequest{
-               Event:    tracker.Started,
-               NumWant:  -1,
-               Port:     uint16(cl.incomingPeerPort()),
-               PeerId:   cl.peerID,
-               InfoHash: t.infoHash,
-       }
-       if !t.waitWantPeers() {
-               return
-       }
-       cl.mu.RLock()
-       req.Left = t.bytesLeftAnnounce()
-       trackers := t.trackers
-       cl.mu.RUnlock()
-       if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
-               req.Event = tracker.None
-       }
-newAnnounce:
-       for t.waitWantPeers() {
-               cl.mu.RLock()
-               req.Left = t.bytesLeftAnnounce()
-               trackers = t.trackers
-               cl.mu.RUnlock()
-               numTrackersTried := 0
-               for _, tier := range trackers {
-                       for trIndex, tr := range tier {
-                               numTrackersTried++
-                               interval, err := cl.announceTorrentSingleTracker(tr, &req, t)
-                               if err != nil {
-                                       // Try the next tracker.
-                                       continue
-                               }
-                               // Float the successful announce to the top of the tier. If
-                               // the trackers list has been changed, we'll be modifying an
-                               // old copy so it won't matter.
-                               cl.mu.Lock()
-                               tier[0], tier[trIndex] = tier[trIndex], tier[0]
-                               cl.mu.Unlock()
-
-                               req.Event = tracker.None
-                               // Wait the interval before attempting another announce.
-                               time.Sleep(interval)
-                               continue newAnnounce
-                       }
-               }
-               if numTrackersTried != 0 {
-                       log.Printf("%s: all trackers failed", t)
-               }
-               // TODO: Wait until trackers are added if there are none.
-               time.Sleep(10 * time.Second)
-       }
-}
-
 func (cl *Client) allTorrentsCompleted() bool {
        for _, t := range cl.torrents {
                if !t.haveInfo() {
@@ -1877,14 +1766,19 @@ func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
        p.EverHashed = true
        touchers := cl.reapPieceTouches(t, piece)
        if correct {
+               for _, c := range touchers {
+                       c.goodPiecesDirtied++
+               }
                err := p.Storage().MarkComplete()
                if err != nil {
                        log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
                }
                t.updatePieceCompletion(piece)
        } else if len(touchers) != 0 {
-               log.Printf("dropping %d conns that touched piece", len(touchers))
+               log.Printf("dropping and banning %d conns that touched piece", len(touchers))
                for _, c := range touchers {
+                       c.badPiecesDirtied++
+                       t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
                        t.dropConnection(c)
                }
        }
@@ -2014,3 +1908,10 @@ func (cl *Client) AddDHTNodes(nodes []string) {
                cl.DHT().AddNode(ni)
        }
 }
+
+func (cl *Client) banPeerIP(ip net.IP) {
+       if cl.badPeerIPs == nil {
+               cl.badPeerIPs = make(map[string]struct{})
+       }
+       cl.badPeerIPs[ip.String()] = struct{}{}
+}