From: Matt Joiner Date: Sat, 29 Nov 2014 01:41:53 +0000 (-0600) Subject: Make use of IP blocklist and change the client lock to RWMutex X-Git-Tag: v1.0.0~1474 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=17a8b8aa17967ff043720e88a0d20319611f7488;p=btrtrc.git Make use of IP blocklist and change the client lock to RWMutex The blocklist is loaded through from the file at TORRENT_BLOCKLIST_FILE or $HOME/.config/torrent/blocklist. --- diff --git a/client.go b/client.go index d00c2c43..7336b623 100644 --- a/client.go +++ b/client.go @@ -30,6 +30,7 @@ import ( mathRand "math/rand" "net" "os" + "path/filepath" "sort" "strings" "sync" @@ -42,11 +43,11 @@ import ( "github.com/anacrolix/libtorgo/metainfo" "bitbucket.org/anacrolix/go.torrent/dht" + "bitbucket.org/anacrolix/go.torrent/iplist" pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" "bitbucket.org/anacrolix/go.torrent/tracker" _ "bitbucket.org/anacrolix/go.torrent/tracker/udp" . "bitbucket.org/anacrolix/go.torrent/util" - "bitbucket.org/anacrolix/go.torrent/util/levelmu" ) var ( @@ -128,8 +129,9 @@ type Client struct { downloadStrategy DownloadStrategy dHT *dht.Server disableUTP bool + ipBlockList *iplist.IPList - mu levelmu.LevelMutex + mu sync.RWMutex event sync.Cond quit chan struct{} @@ -140,6 +142,12 @@ type Client struct { dataWaits map[*torrent][]dataWait } +func (me *Client) SetIPBlockList(list *iplist.IPList) { + me.mu.Lock() + defer me.mu.Unlock() + me.ipBlockList = list +} + func (me *Client) PeerID() string { return string(me.peerID[:]) } @@ -183,8 +191,8 @@ func (cl *Client) sortedTorrents() (ret []*torrent) { } func (cl *Client) WriteStatus(_w io.Writer) { - cl.mu.LevelLock(1) - defer cl.mu.Unlock() + cl.mu.RLock() + defer cl.mu.RUnlock() w := bufio.NewWriter(_w) defer w.Flush() fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr()) @@ -221,8 +229,8 @@ func (cl *Client) WriteStatus(_w io.Writer) { // Read torrent data at the given offset. Returns ErrDataNotReady if the data // isn't available. func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) { - cl.mu.LevelLock(1) - defer cl.mu.Unlock() + cl.mu.RLock() + defer cl.mu.RUnlock() t := cl.torrent(ih) if t == nil { err = errors.New("unknown torrent") @@ -269,6 +277,42 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er return t.Data.ReadAt(p, off) } +func (cl *Client) setEnvBlocklist() (err error) { + filename := os.Getenv("TORRENT_BLOCKLIST_FILE") + defaultBlocklist := filename == "" + if defaultBlocklist { + filename = filepath.Join(os.Getenv("HOME"), ".config/torrent/blocklist") + } + f, err := os.Open(filename) + if err != nil { + if defaultBlocklist { + err = nil + } + return + } + defer f.Close() + var ranges []iplist.Range + scanner := bufio.NewScanner(f) + for scanner.Scan() { + r, ok, lineErr := iplist.ParseBlocklistP2PLine(scanner.Text()) + if lineErr != nil { + err = fmt.Errorf("error reading torrent blocklist line: %s", lineErr) + return + } + if !ok { + continue + } + ranges = append(ranges, r) + } + err = scanner.Err() + if err != nil { + err = fmt.Errorf("error reading torrent blocklist: %s", err) + return + } + cl.ipBlockList = iplist.New(ranges) + return +} + func NewClient(cfg *Config) (cl *Client, err error) { if cfg == nil { cfg = &Config{} @@ -288,7 +332,11 @@ func NewClient(cfg *Config) (cl *Client, err error) { dataWaits: make(map[*torrent][]dataWait), } cl.event.L = &cl.mu - cl.mu.Init(2) + + err = cl.setEnvBlocklist() + if err != nil { + return + } if cfg.PeerID != "" { CopyExact(&cl.peerID, cfg.PeerID) @@ -344,6 +392,9 @@ func NewClient(cfg *Config) (cl *Client, err error) { dhtCfg.Conn = utpL.RawConn } cl.dHT, err = dht.NewServer(dhtCfg) + if cl.ipBlockList != nil { + cl.dHT.SetIPBlockList(cl.ipBlockList) + } if err != nil { return } @@ -376,9 +427,20 @@ func (me *Client) Stop() { me.mu.Unlock() } +func (cl *Client) ipBlocked(ip net.IP) bool { + if cl.ipBlockList == nil { + return false + } + if r := cl.ipBlockList.Lookup(ip); r != nil { + log.Printf("IP blocked: %s in range %s-%s: %s", ip, r.First, r.Last, r.Description) + return true + } + return false +} + func (cl *Client) acceptConnections(l net.Listener, utp bool) { for { - // We accept all connections immediately, because we don't what + // We accept all connections immediately, because we don't know what // torrent they're for. conn, err := l.Accept() select { @@ -394,6 +456,12 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) { return } acceptedConns.Add(1) + cl.mu.RLock() + blocked := cl.ipBlocked(AddrIP(conn.RemoteAddr())) + cl.mu.RUnlock() + if blocked { + continue + } go func() { if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil { log.Print(err) @@ -461,6 +529,9 @@ func (me *Client) initiateConn(peer Peer, t *torrent) { duplicateConnsAvoided.Add(1) return } + if me.ipBlocked(peer.IP) { + return + } dialTimeout := reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers)) t.HalfOpen[addr] = struct{}{} go func() { diff --git a/dht/dht.go b/dht/dht.go index 65ff49bb..311730bc 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -14,6 +14,8 @@ import ( "sync" "time" + "bitbucket.org/anacrolix/go.torrent/iplist" + "bitbucket.org/anacrolix/go.torrent/logonce" "bitbucket.org/anacrolix/go.torrent/util" "github.com/anacrolix/libtorgo/bencode" @@ -28,6 +30,7 @@ type Server struct { mu sync.Mutex closed chan struct{} passive bool // Don't respond to queries. + ipBlockList *iplist.IPList NumConfirmedAnnounces int } @@ -92,7 +95,7 @@ func NewServer(c *ServerConfig) (s *Server, err error) { go func() { err := s.bootstrap() if err != nil { - panic(err) + log.Printf("error bootstrapping DHT: %s", err) } }() return @@ -267,6 +270,12 @@ func (s *Server) setDefaults() (err error) { return } +func (s *Server) SetIPBlockList(list *iplist.IPList) { + s.mu.Lock() + defer s.mu.Unlock() + s.ipBlockList = list +} + func (s *Server) init() (err error) { err = s.setDefaults() if err != nil { @@ -325,6 +334,13 @@ func (s *Server) serve() error { } } +func (s *Server) ipBlocked(ip net.IP) bool { + if s.ipBlockList == nil { + return false + } + return s.ipBlockList.Lookup(ip) != nil +} + func (s *Server) AddNode(ni NodeInfo) { s.mu.Lock() defer s.mu.Unlock() @@ -424,7 +440,7 @@ func (s *Server) reply(addr dHTAddr, t string, r map[string]interface{}) { } err = s.writeToNode(b, addr) if err != nil { - panic(err) + log.Printf("error replying to %s: %s", addr, err) } } @@ -451,6 +467,12 @@ func (s *Server) getNode(addr dHTAddr) (n *Node) { } func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) { + if list := s.ipBlockList; list != nil { + if r := list.Lookup(util.AddrIP(node)); r != nil { + err = fmt.Errorf("write to %s blocked: %s", node, r.Description) + return + } + } n, err := s.socket.WriteTo(b, node) if err != nil { err = fmt.Errorf("error writing %d bytes to %s: %s", len(b), node, err) @@ -735,6 +757,9 @@ func (s *Server) liftNodes(d Msg) { // TODO: Why would people even do this? continue } + if s.ipBlocked(util.AddrIP(cni.Addr)) { + continue + } n := s.getNode(cni.Addr) n.id = string(cni.ID[:]) }