"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/storage"
- "github.com/anacrolix/torrent/tracker"
)
// Currently doesn't really queue, but should in the future.
// include ourselves if we end up trying to connect to our own address
// through legitimate channels.
dopplegangerAddrs map[string]struct{}
+ badPeerIPs map[string]struct{}
defaultStorage storage.Client
fmt.Fprintln(w, "Not listening!")
}
fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
+ fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPs))
if cl.dHT != nil {
dhtStats := cl.dHT.Stats()
fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
acceptTCP.Add(1)
}
cl.mu.RLock()
- doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
- _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
+ reject := cl.badPeerIPPort(
+ missinggo.AddrIP(conn.RemoteAddr()),
+ missinggo.AddrPort(conn.RemoteAddr()))
cl.mu.RUnlock()
- if blocked || doppleganger {
+ if reject {
acceptReject.Add(1)
- // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
conn.Close()
continue
}
if peer.Id == cl.peerID {
return
}
- addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
- if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
- duplicateConnsAvoided.Add(1)
+ if cl.badPeerIPPort(peer.IP, peer.Port) {
return
}
- if r, ok := cl.ipBlockRange(peer.IP); ok {
- log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
+ addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
+ if t.addrActive(addr) {
return
}
t.halfOpen[addr] = struct{}{}
func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
if c.PeerID == cl.peerID {
- // Only if we initiated the connection is the remote address a
- // listen addr for a doppleganger.
connsToSelf.Add(1)
addr := c.conn.RemoteAddr().String()
cl.dopplegangerAddrs[addr] = struct{}{}
cl.mu.Lock()
defer cl.mu.Unlock()
if c.PeerID == cl.peerID {
+ // Because the remote address is not necessarily the same as its
+ // client's torrent listen address, we won't record the remote address
+ // as a doppleganger. Instead, the initiator can record *us* as the
+ // doppleganger.
return
}
cl.runHandshookConn(c, t)
}
func (cl *Client) openNewConns(t *Torrent) {
+ defer t.updateWantPeersEvent()
for len(t.peers) != 0 {
if !cl.wantConns(t) {
return
delete(t.peers, k)
cl.initiateConn(p, t)
}
- t.wantPeers.Broadcast()
+}
+
+func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
+ if port == 0 {
+ return true
+ }
+ if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
+ return true
+ }
+ if _, ok := cl.ipBlockRange(ip); ok {
+ return true
+ }
+ if _, ok := cl.badPeerIPs[ip.String()]; ok {
+ return true
+ }
+ return false
}
func (cl *Client) addPeers(t *Torrent, peers []Peer) {
for _, p := range peers {
- if cl.dopplegangerAddr(net.JoinHostPort(
- p.IP.String(),
- strconv.FormatInt(int64(p.Port), 10),
- )) {
- continue
- }
- if _, ok := cl.ipBlockRange(p.IP); ok {
- continue
- }
- if p.Port == 0 {
- // The spec says to scrub these yourselves. Fine.
+ if cl.badPeerIPPort(p.IP, p.Port) {
continue
}
t.addPeer(p, cl)
storageOpener: cl.defaultStorage,
}
- t.wantPeers.L = &cl.mu
return
}
}
}
-func copyTrackers(base []trackerTier) (copy []trackerTier) {
- for _, tier := range base {
- copy = append(copy, append(trackerTier(nil), tier...))
- }
- return
-}
-
-func mergeTier(tier trackerTier, newURLs []string) trackerTier {
-nextURL:
- for _, url := range newURLs {
- for _, trURL := range tier {
- if trURL == url {
- continue nextURL
- }
- }
- tier = append(tier, url)
- }
- return tier
-}
-
// A file-like handle to some torrent data resource.
type Handle interface {
io.Reader
DisplayName: mi.Info.Name,
InfoHash: mi.Info.Hash(),
}
- if len(spec.Trackers) == 0 {
- spec.Trackers = [][]string{[]string{mi.Announce}}
- } else {
- spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
+ if spec.Trackers == nil && mi.Announce != "" {
+ spec.Trackers = [][]string{{mi.Announce}}
}
return
}
}
new = true
t = cl.newTorrent(infoHash)
- if !cl.config.DisableTrackers {
- go cl.announceTorrentTrackers(t)
- }
if cl.dHT != nil {
go cl.announceTorrentDHT(t, true)
}
cl.torrents[infoHash] = t
+ t.updateWantPeersEvent()
return
}
}
func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
- for t.waitWantPeers() {
+ for {
+ select {
+ case <-t.wantPeersEvent.LockedChan(&cl.mu):
+ case <-t.closed.LockedChan(&cl.mu):
+ return
+ }
// log.Printf("getting peers for %q from DHT", t)
ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
if err != nil {
}
}
-func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
- url_, err := url.Parse(trRawURL)
+func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
+ _url, err := url.Parse(announceURL)
if err != nil {
return
}
- host, _, err := net.SplitHostPort(url_.Host)
- if err != nil {
- host = url_.Host
+ hmp := missinggo.SplitHostMaybePort(_url.Host)
+ if hmp.Err != nil {
+ err = hmp.Err
+ return
}
- addr, err := net.ResolveIPAddr("ip", host)
+ addr, err := net.ResolveIPAddr("ip", hmp.Host)
if err != nil {
return
}
cl.mu.RLock()
_, blocked = cl.ipBlockRange(addr.IP)
cl.mu.RUnlock()
+ host = _url.Host
+ hmp.Host = addr.String()
+ _url.Host = hmp.String()
+ urlToUse = _url.String()
return
}
-func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) (interval time.Duration, err error) {
- blocked, err := cl.trackerBlockedUnlocked(tr)
- if err != nil {
- err = fmt.Errorf("error determining if tracker blocked: %s", err)
- return
- }
- if blocked {
- err = errors.New("tracker has blocked IP")
- return
- }
- resp, err := tracker.Announce(tr, req)
- if err != nil {
- return
- }
- var peers []Peer
- for _, peer := range resp.Peers {
- peers = append(peers, Peer{
- IP: peer.IP,
- Port: peer.Port,
- })
- }
- t.AddPeers(peers)
- interval = time.Second * time.Duration(resp.Interval)
- return
-}
-
-func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
- oks := make(chan bool)
- outstanding := 0
- for _, tier := range trackers {
- for _, tr := range tier {
- outstanding++
- go func(tr string) {
- _, err := cl.announceTorrentSingleTracker(tr, req, t)
- oks <- err == nil
- }(tr)
- }
- }
- for outstanding > 0 {
- ok := <-oks
- outstanding--
- if ok {
- atLeastOne = true
- }
- }
- return
-}
-
-// Announce torrent to its trackers.
-func (cl *Client) announceTorrentTrackers(t *Torrent) {
- req := tracker.AnnounceRequest{
- Event: tracker.Started,
- NumWant: -1,
- Port: uint16(cl.incomingPeerPort()),
- PeerId: cl.peerID,
- InfoHash: t.infoHash,
- }
- if !t.waitWantPeers() {
- return
- }
- cl.mu.RLock()
- req.Left = t.bytesLeftAnnounce()
- trackers := t.trackers
- cl.mu.RUnlock()
- if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
- req.Event = tracker.None
- }
-newAnnounce:
- for t.waitWantPeers() {
- cl.mu.RLock()
- req.Left = t.bytesLeftAnnounce()
- trackers = t.trackers
- cl.mu.RUnlock()
- numTrackersTried := 0
- for _, tier := range trackers {
- for trIndex, tr := range tier {
- numTrackersTried++
- interval, err := cl.announceTorrentSingleTracker(tr, &req, t)
- if err != nil {
- // Try the next tracker.
- continue
- }
- // Float the successful announce to the top of the tier. If
- // the trackers list has been changed, we'll be modifying an
- // old copy so it won't matter.
- cl.mu.Lock()
- tier[0], tier[trIndex] = tier[trIndex], tier[0]
- cl.mu.Unlock()
-
- req.Event = tracker.None
- // Wait the interval before attempting another announce.
- time.Sleep(interval)
- continue newAnnounce
- }
- }
- if numTrackersTried != 0 {
- log.Printf("%s: all trackers failed", t)
- }
- // TODO: Wait until trackers are added if there are none.
- time.Sleep(10 * time.Second)
- }
-}
-
func (cl *Client) allTorrentsCompleted() bool {
for _, t := range cl.torrents {
if !t.haveInfo() {
p.EverHashed = true
touchers := cl.reapPieceTouches(t, piece)
if correct {
+ for _, c := range touchers {
+ c.goodPiecesDirtied++
+ }
err := p.Storage().MarkComplete()
if err != nil {
log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
}
t.updatePieceCompletion(piece)
} else if len(touchers) != 0 {
- log.Printf("dropping %d conns that touched piece", len(touchers))
+ log.Printf("dropping and banning %d conns that touched piece", len(touchers))
for _, c := range touchers {
+ c.badPiecesDirtied++
+ t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
t.dropConnection(c)
}
}
cl.DHT().AddNode(ni)
}
}
+
+func (cl *Client) banPeerIP(ip net.IP) {
+ if cl.badPeerIPs == nil {
+ cl.badPeerIPs = make(map[string]struct{})
+ }
+ cl.badPeerIPs[ip.String()] = struct{}{}
+}