]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Break up client.go into several files and a few fixes
authorMatt Joiner <anacrolix@gmail.com>
Thu, 3 Apr 2014 12:16:59 +0000 (23:16 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 3 Apr 2014 12:16:59 +0000 (23:16 +1100)
Suppress expected errors when initiating connections.
Add an explicit timeout to initiate connections.
Put a lock on closing a connection to prevent data race warnings.

client.go
connection.go [new file with mode: 0644]
misc.go [new file with mode: 0644]
torrent.go [new file with mode: 0644]

index a0e375c646d9ef3faeb29c4e6578e520eb3587eb..fac59b759636b80353f4683cdbd9a3b61e332f5b 100644 (file)
--- a/client.go
+++ b/client.go
@@ -3,7 +3,6 @@ package torrent
 import (
        "bufio"
        "container/list"
-       "crypto"
        "crypto/rand"
        "encoding"
        "errors"
@@ -13,9 +12,8 @@ import (
        mathRand "math/rand"
        "net"
        "os"
-       "path/filepath"
-       "sort"
        "sync"
+       "syscall"
        "time"
 
        metainfo "github.com/nsf/libtorgo/torrent"
@@ -23,290 +21,8 @@ import (
        "bitbucket.org/anacrolix/go.torrent/peer_protocol"
        "bitbucket.org/anacrolix/go.torrent/tracker"
        _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
-       "launchpad.net/gommap"
 )
 
-const (
-       PieceHash   = crypto.SHA1
-       maxRequests = 250
-       chunkSize   = 0x4000 // 16KiB
-       BEP20       = "-GT0000-"
-)
-
-type InfoHash [20]byte
-
-type pieceSum [20]byte
-
-func copyHashSum(dst, src []byte) {
-       if len(dst) != len(src) || copy(dst, src) != len(dst) {
-               panic("hash sum sizes differ")
-       }
-}
-
-func BytesInfoHash(b []byte) (ih InfoHash) {
-       if len(b) != len(ih) || copy(ih[:], b) != len(ih) {
-               panic("bad infohash bytes")
-       }
-       return
-}
-
-type piece struct {
-       Hash              pieceSum
-       PendingChunkSpecs map[ChunkSpec]struct{}
-       Hashing           bool
-       QueuedForHash     bool
-       EverHashed        bool
-}
-
-func (p *piece) Complete() bool {
-       return len(p.PendingChunkSpecs) == 0 && p.EverHashed
-}
-
-func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
-       cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
-       cs.Length = pieceLength - cs.Begin
-       return
-}
-
-func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) {
-       pendingChunks := t.Pieces[index].PendingChunkSpecs
-       count = peer_protocol.Integer(len(pendingChunks)) * chunkSize
-       _lastChunkSpec := lastChunkSpec(t.PieceLength(index))
-       if _lastChunkSpec.Length != chunkSize {
-               if _, ok := pendingChunks[_lastChunkSpec]; ok {
-                       count += _lastChunkSpec.Length - chunkSize
-               }
-       }
-       return
-}
-
-type ChunkSpec struct {
-       Begin, Length peer_protocol.Integer
-}
-
-type Request struct {
-       Index peer_protocol.Integer
-       ChunkSpec
-}
-
-type Connection struct {
-       Socket net.Conn
-       Closed bool
-       post   chan encoding.BinaryMarshaler
-       write  chan []byte
-
-       // Stuff controlled by the local peer.
-       Interested bool
-       Choked     bool
-       Requests   map[Request]struct{}
-
-       // Stuff controlled by the remote peer.
-       PeerId         [20]byte
-       PeerInterested bool
-       PeerChoked     bool
-       PeerRequests   map[Request]struct{}
-       PeerExtensions [8]byte
-       PeerPieces     []bool
-}
-
-func (c *Connection) Close() {
-       if c.Closed {
-               return
-       }
-       c.Socket.Close()
-       close(c.post)
-       c.Closed = true
-}
-
-func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool {
-       if c.PeerPieces == nil {
-               return false
-       }
-       return c.PeerPieces[index]
-}
-
-func (c *Connection) Post(msg encoding.BinaryMarshaler) {
-       c.post <- msg
-}
-
-// Returns true if more requests can be sent.
-func (c *Connection) Request(chunk Request) bool {
-       if len(c.Requests) >= maxRequests {
-               return false
-       }
-       if !c.PeerPieces[chunk.Index] {
-               return true
-       }
-       c.SetInterested(true)
-       if c.PeerChoked {
-               return false
-       }
-       if _, ok := c.Requests[chunk]; !ok {
-               c.Post(peer_protocol.Message{
-                       Type:   peer_protocol.Request,
-                       Index:  chunk.Index,
-                       Begin:  chunk.Begin,
-                       Length: chunk.Length,
-               })
-       }
-       if c.Requests == nil {
-               c.Requests = make(map[Request]struct{}, maxRequests)
-       }
-       c.Requests[chunk] = struct{}{}
-       return true
-}
-
-func (c *Connection) Unchoke() {
-       if !c.Choked {
-               return
-       }
-       c.Post(peer_protocol.Message{
-               Type: peer_protocol.Unchoke,
-       })
-       c.Choked = false
-}
-
-func (c *Connection) SetInterested(interested bool) {
-       if c.Interested == interested {
-               return
-       }
-       c.Post(peer_protocol.Message{
-               Type: func() peer_protocol.MessageType {
-                       if interested {
-                               return peer_protocol.Interested
-                       } else {
-                               return peer_protocol.NotInterested
-                       }
-               }(),
-       })
-       c.Interested = interested
-}
-
-var (
-       keepAliveBytes [4]byte
-)
-
-func (conn *Connection) writer() {
-       timer := time.NewTimer(0)
-       defer timer.Stop()
-       for {
-               if !timer.Reset(time.Minute) {
-                       <-timer.C
-               }
-               var b []byte
-               select {
-               case <-timer.C:
-                       b = keepAliveBytes[:]
-               case b = <-conn.write:
-                       if b == nil {
-                               return
-                       }
-               }
-               n, err := conn.Socket.Write(b)
-               if err != nil {
-                       log.Print(err)
-                       break
-               }
-               if n != len(b) {
-                       panic("didn't write all bytes")
-               }
-       }
-}
-
-func (conn *Connection) writeOptimizer() {
-       pending := list.New()
-       var nextWrite []byte
-       defer close(conn.write)
-       for {
-               write := conn.write
-               if pending.Len() == 0 {
-                       write = nil
-               } else {
-                       var err error
-                       nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
-                       if err != nil {
-                               panic(err)
-                       }
-               }
-               select {
-               case msg, ok := <-conn.post:
-                       if !ok {
-                               return
-                       }
-                       pending.PushBack(msg)
-               case write <- nextWrite:
-                       pending.Remove(pending.Front())
-               }
-       }
-}
-
-type Torrent struct {
-       InfoHash   InfoHash
-       Pieces     []*piece
-       Data       MMapSpan
-       MetaInfo   *metainfo.MetaInfo
-       Conns      []*Connection
-       Peers      []Peer
-       Priorities *list.List
-       // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
-       // mirror their respective URLs from the announce-list key.
-       Trackers [][]tracker.Client
-}
-
-func (t *Torrent) NumPieces() int {
-       return len(t.MetaInfo.Pieces) / PieceHash.Size()
-}
-
-func (t *Torrent) NumPiecesCompleted() (num int) {
-       for _, p := range t.Pieces {
-               if p.Complete() {
-                       num++
-               }
-       }
-       return
-}
-
-func (t *Torrent) Length() int64 {
-       return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
-}
-
-func (t *Torrent) Close() (err error) {
-       t.Data.Close()
-       for _, conn := range t.Conns {
-               conn.Close()
-       }
-       return
-}
-
-type pieceByBytesPendingSlice struct {
-       Pending, Indices []peer_protocol.Integer
-}
-
-func (pcs pieceByBytesPendingSlice) Len() int {
-       return len(pcs.Indices)
-}
-
-func (me pieceByBytesPendingSlice) Less(i, j int) bool {
-       return me.Pending[me.Indices[i]] < me.Pending[me.Indices[j]]
-}
-
-func (me pieceByBytesPendingSlice) Swap(i, j int) {
-       me.Indices[i], me.Indices[j] = me.Indices[j], me.Indices[i]
-}
-
-func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
-       slice := pieceByBytesPendingSlice{
-               Pending: make([]peer_protocol.Integer, 0, len(t.Pieces)),
-               Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),
-       }
-       for i := range t.Pieces {
-               slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(peer_protocol.Integer(i)))
-               slice.Indices = append(slice.Indices, peer_protocol.Integer(i))
-       }
-       sort.Sort(sort.Reverse(slice))
-       return slice.Indices
-}
-
 // Currently doesn't really queue, but should in the future.
 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
        piece := t.Pieces[pieceIndex]
@@ -317,25 +33,6 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer)
        go cl.verifyPiece(t, pieceIndex)
 }
 
-func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
-       req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength)
-       if req.Index < 0 || int(req.Index) >= len(t.Pieces) {
-               return
-       }
-       off %= t.MetaInfo.PieceLength
-       pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off)
-       if pieceLeft <= 0 {
-               return
-       }
-       req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize)
-       req.Length = chunkSize
-       if req.Length > pieceLeft {
-               req.Length = pieceLeft
-       }
-       ok = true
-       return
-}
-
 func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
        cl.mu.Lock()
        defer cl.mu.Unlock()
@@ -369,79 +66,6 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
        }
 }
 
-func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
-       _, err = t.Data.WriteAt(data, int64(piece)*t.MetaInfo.PieceLength+begin)
-       return
-}
-
-func (t *Torrent) bitfield() (bf []bool) {
-       for _, p := range t.Pieces {
-               bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
-       }
-       return
-}
-
-func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) {
-       piece := t.Pieces[index]
-       if piece.PendingChunkSpecs == nil {
-               piece.PendingChunkSpecs = make(
-                       map[ChunkSpec]struct{},
-                       (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
-       }
-       c := ChunkSpec{
-               Begin: 0,
-       }
-       cs := piece.PendingChunkSpecs
-       for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length {
-               c.Length = left
-               if c.Length > chunkSize {
-                       c.Length = chunkSize
-               }
-               cs[c] = struct{}{}
-               c.Begin += c.Length
-       }
-       return
-}
-
-func (t *Torrent) requestHeat() (ret map[Request]int) {
-       ret = make(map[Request]int)
-       for _, conn := range t.Conns {
-               for req, _ := range conn.Requests {
-                       ret[req]++
-               }
-       }
-       return
-}
-
-type Peer struct {
-       Id   [20]byte
-       IP   net.IP
-       Port int
-}
-
-func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
-       if int(piece) == t.NumPieces()-1 {
-               len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
-       }
-       if len_ == 0 {
-               len_ = peer_protocol.Integer(t.MetaInfo.PieceLength)
-       }
-       return
-}
-
-func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) {
-       hash := PieceHash.New()
-       n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
-       if err != nil {
-               panic(err)
-       }
-       if peer_protocol.Integer(n) != t.PieceLength(piece) {
-               panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
-       }
-       copyHashSum(ps[:], hash.Sum(nil))
-       return
-}
-
 type DataSpec struct {
        InfoHash
        Request
@@ -464,10 +88,6 @@ type Client struct {
        dataWaiter chan struct{}
 }
 
-var (
-       ErrDataNotReady = errors.New("data not ready")
-)
-
 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
        cl.mu.Lock()
        defer cl.mu.Unlock()
@@ -576,54 +196,6 @@ func (cl *Client) acceptConnections() {
        }
 }
 
-func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
-       defer func() {
-               if err != nil {
-                       mms.Close()
-                       mms = nil
-               }
-       }()
-       for _, miFile := range metaInfo.Files {
-               fileName := filepath.Join(append([]string{location, metaInfo.Name}, miFile.Path...)...)
-               err = os.MkdirAll(filepath.Dir(fileName), 0777)
-               if err != nil {
-                       return
-               }
-               var file *os.File
-               file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
-               if err != nil {
-                       return
-               }
-               func() {
-                       defer file.Close()
-                       var fi os.FileInfo
-                       fi, err = file.Stat()
-                       if err != nil {
-                               return
-                       }
-                       if fi.Size() < miFile.Length {
-                               err = file.Truncate(miFile.Length)
-                               if err != nil {
-                                       return
-                               }
-                       }
-                       var mMap gommap.MMap
-                       mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
-                       if err != nil {
-                               return
-                       }
-                       if int64(len(mMap)) != miFile.Length {
-                               panic("mmap has wrong length")
-                       }
-                       mms = append(mms, MMap{mMap})
-               }()
-               if err != nil {
-                       return
-               }
-       }
-       return
-}
-
 func (me *Client) torrent(ih InfoHash) *Torrent {
        for _, t := range me.torrents {
                if t.InfoHash == ih {
@@ -639,18 +211,33 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
        }
        me.halfOpen++
        go func() {
-               conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
+               addr := &net.TCPAddr{
                        IP:   peer.IP,
                        Port: peer.Port,
-               })
+               }
+               conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
 
-               me.mu.Lock()
-               me.halfOpen--
-               me.openNewConns()
-               me.mu.Unlock()
+               go func() {
+                       me.mu.Lock()
+                       defer me.mu.Unlock()
+                       if me.halfOpen == 0 {
+                               panic("assert")
+                       }
+                       me.halfOpen--
+                       me.openNewConns()
+               }()
 
+               if netOpErr, ok := err.(*net.OpError); ok {
+                       if netOpErr.Timeout() {
+                               return
+                       }
+                       switch netOpErr.Err {
+                       case syscall.ECONNREFUSED:
+                               return
+                       }
+               }
                if err != nil {
-                       log.Printf("error connecting to peer: %s", err)
+                       log.Printf("error connecting to peer: %s %#v", err, err)
                        return
                }
                log.Printf("connected to %s", conn.RemoteAddr())
@@ -661,24 +248,6 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
        }()
 }
 
-func (t *Torrent) haveAllPieces() bool {
-       for _, piece := range t.Pieces {
-               if !piece.Complete() {
-                       return false
-               }
-       }
-       return true
-}
-
-func (me *Torrent) haveAnyPieces() bool {
-       for _, piece := range me.Pieces {
-               if piece.Complete() {
-                       return true
-               }
-       }
-       return false
-}
-
 func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) {
        conn := &Connection{
                Socket:     sock,
@@ -765,11 +334,6 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) {
        }
 }
 
-func (t *Torrent) wantPiece(index int) bool {
-       p := t.Pieces[index]
-       return p.EverHashed && len(p.PendingChunkSpecs) != 0
-}
-
 func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) {
        me.replenishConnRequests(torrent, conn)
 }
@@ -851,7 +415,7 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
                case peer_protocol.Piece:
                        request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
                        if _, ok := conn.Requests[request_]; !ok {
-                               err = errors.New("unexpected piece")
+                               err = fmt.Errorf("unexpected piece: %s", request_)
                                break
                        }
                        delete(conn.Requests, request_)
@@ -1086,7 +650,7 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
                        }
                }
        }
-       // Then finish of incomplete pieces in order of bytes remaining.
+       // Then finish off incomplete pieces in order of bytes remaining.
        for _, index := range torrent.piecesByPendingBytesDesc() {
                if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
                        continue
diff --git a/connection.go b/connection.go
new file mode 100644 (file)
index 0000000..cbe0e91
--- /dev/null
@@ -0,0 +1,174 @@
+package torrent
+
+import (
+       "container/list"
+       "encoding"
+       "log"
+       "net"
+       "sync"
+       "time"
+
+       "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+)
+
+// Maintains the state of a connection with a peer.
+type Connection struct {
+       Socket net.Conn
+       closed bool
+       mu     sync.Mutex // Only for closing.
+       post   chan encoding.BinaryMarshaler
+       write  chan []byte
+
+       // Stuff controlled by the local peer.
+       Interested bool
+       Choked     bool
+       Requests   map[Request]struct{}
+
+       // Stuff controlled by the remote peer.
+       PeerId         [20]byte
+       PeerInterested bool
+       PeerChoked     bool
+       PeerRequests   map[Request]struct{}
+       PeerExtensions [8]byte
+       PeerPieces     []bool
+}
+
+func (c *Connection) Close() {
+       c.mu.Lock()
+       if c.closed {
+               return
+       }
+       c.Socket.Close()
+       close(c.post)
+       c.closed = true
+       c.mu.Unlock()
+}
+
+func (c *Connection) getClosed() bool {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       return c.closed
+}
+
+func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool {
+       if c.PeerPieces == nil {
+               return false
+       }
+       return c.PeerPieces[index]
+}
+
+func (c *Connection) Post(msg encoding.BinaryMarshaler) {
+       c.post <- msg
+}
+
+// Returns true if more requests can be sent.
+func (c *Connection) Request(chunk Request) bool {
+       if len(c.Requests) >= maxRequests {
+               return false
+       }
+       if !c.PeerPieces[chunk.Index] {
+               return true
+       }
+       c.SetInterested(true)
+       if c.PeerChoked {
+               return false
+       }
+       if _, ok := c.Requests[chunk]; !ok {
+               c.Post(peer_protocol.Message{
+                       Type:   peer_protocol.Request,
+                       Index:  chunk.Index,
+                       Begin:  chunk.Begin,
+                       Length: chunk.Length,
+               })
+       }
+       if c.Requests == nil {
+               c.Requests = make(map[Request]struct{}, maxRequests)
+       }
+       c.Requests[chunk] = struct{}{}
+       return true
+}
+
+func (c *Connection) Unchoke() {
+       if !c.Choked {
+               return
+       }
+       c.Post(peer_protocol.Message{
+               Type: peer_protocol.Unchoke,
+       })
+       c.Choked = false
+}
+
+func (c *Connection) SetInterested(interested bool) {
+       if c.Interested == interested {
+               return
+       }
+       c.Post(peer_protocol.Message{
+               Type: func() peer_protocol.MessageType {
+                       if interested {
+                               return peer_protocol.Interested
+                       } else {
+                               return peer_protocol.NotInterested
+                       }
+               }(),
+       })
+       c.Interested = interested
+}
+
+var (
+       // Four consecutive zero bytes that comprise a keep alive on the wire.
+       keepAliveBytes [4]byte
+)
+
+func (conn *Connection) writer() {
+       timer := time.NewTimer(0)
+       defer timer.Stop()
+       for {
+               if !timer.Reset(time.Minute) {
+                       <-timer.C
+               }
+               var b []byte
+               select {
+               case <-timer.C:
+                       b = keepAliveBytes[:]
+               case b = <-conn.write:
+                       if b == nil {
+                               return
+                       }
+               }
+               _, err := conn.Socket.Write(b)
+               if conn.getClosed() {
+                       break
+               }
+               if err != nil {
+                       log.Print(err)
+                       break
+               }
+       }
+}
+
+func (conn *Connection) writeOptimizer() {
+       pending := list.New()
+       var nextWrite []byte
+       defer close(conn.write)
+       for {
+               write := conn.write
+               if pending.Len() == 0 {
+                       write = nil
+               } else {
+                       var err error
+                       nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
+                       if err != nil {
+                               panic(err)
+                       }
+               }
+               select {
+               case msg, ok := <-conn.post:
+                       if !ok {
+                               return
+                       }
+                       pending.PushBack(msg)
+               case write <- nextWrite:
+                       pending.Remove(pending.Front())
+               }
+       }
+}
diff --git a/misc.go b/misc.go
new file mode 100644 (file)
index 0000000..e564400
--- /dev/null
+++ b/misc.go
@@ -0,0 +1,133 @@
+package torrent
+
+import (
+       "crypto"
+       "errors"
+       "os"
+       "path/filepath"
+       "time"
+
+       "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+       metainfo "github.com/nsf/libtorgo/torrent"
+       "launchpad.net/gommap"
+)
+
+const (
+       PieceHash   = crypto.SHA1
+       maxRequests = 250
+       chunkSize   = 0x4000 // 16KiB
+       BEP20       = "-GT0000-"
+       dialTimeout = time.Second * 15
+)
+
+type InfoHash [20]byte
+
+type pieceSum [20]byte
+
+func copyHashSum(dst, src []byte) {
+       if len(dst) != len(src) || copy(dst, src) != len(dst) {
+               panic("hash sum sizes differ")
+       }
+}
+
+func BytesInfoHash(b []byte) (ih InfoHash) {
+       if len(b) != len(ih) || copy(ih[:], b) != len(ih) {
+               panic("bad infohash bytes")
+       }
+       return
+}
+
+type piece struct {
+       Hash              pieceSum
+       PendingChunkSpecs map[ChunkSpec]struct{}
+       Hashing           bool
+       QueuedForHash     bool
+       EverHashed        bool
+}
+
+func (p *piece) Complete() bool {
+       return len(p.PendingChunkSpecs) == 0 && p.EverHashed
+}
+
+func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
+       cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
+       cs.Length = pieceLength - cs.Begin
+       return
+}
+
+type ChunkSpec struct {
+       Begin, Length peer_protocol.Integer
+}
+
+type Request struct {
+       Index peer_protocol.Integer
+       ChunkSpec
+}
+
+type pieceByBytesPendingSlice struct {
+       Pending, Indices []peer_protocol.Integer
+}
+
+func (pcs pieceByBytesPendingSlice) Len() int {
+       return len(pcs.Indices)
+}
+
+func (me pieceByBytesPendingSlice) Less(i, j int) bool {
+       return me.Pending[me.Indices[i]] < me.Pending[me.Indices[j]]
+}
+
+func (me pieceByBytesPendingSlice) Swap(i, j int) {
+       me.Indices[i], me.Indices[j] = me.Indices[j], me.Indices[i]
+}
+
+var (
+       ErrDataNotReady = errors.New("data not ready")
+)
+
+func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
+       defer func() {
+               if err != nil {
+                       mms.Close()
+                       mms = nil
+               }
+       }()
+       for _, miFile := range metaInfo.Files {
+               fileName := filepath.Join(append([]string{location, metaInfo.Name}, miFile.Path...)...)
+               err = os.MkdirAll(filepath.Dir(fileName), 0777)
+               if err != nil {
+                       return
+               }
+               var file *os.File
+               file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
+               if err != nil {
+                       return
+               }
+               func() {
+                       defer file.Close()
+                       var fi os.FileInfo
+                       fi, err = file.Stat()
+                       if err != nil {
+                               return
+                       }
+                       if fi.Size() < miFile.Length {
+                               err = file.Truncate(miFile.Length)
+                               if err != nil {
+                                       return
+                               }
+                       }
+                       var mMap gommap.MMap
+                       mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
+                       if err != nil {
+                               return
+                       }
+                       if int64(len(mMap)) != miFile.Length {
+                               panic("mmap has wrong length")
+                       }
+                       mms = append(mms, MMap{mMap})
+               }()
+               if err != nil {
+                       return
+               }
+       }
+       return
+}
diff --git a/torrent.go b/torrent.go
new file mode 100644 (file)
index 0000000..19c64d2
--- /dev/null
@@ -0,0 +1,189 @@
+package torrent
+
+import (
+       "container/list"
+       "fmt"
+       "net"
+       "sort"
+
+       "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+       "bitbucket.org/anacrolix/go.torrent/tracker"
+       metainfo "github.com/nsf/libtorgo/torrent"
+)
+
+func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) {
+       pendingChunks := t.Pieces[index].PendingChunkSpecs
+       count = peer_protocol.Integer(len(pendingChunks)) * chunkSize
+       _lastChunkSpec := lastChunkSpec(t.PieceLength(index))
+       if _lastChunkSpec.Length != chunkSize {
+               if _, ok := pendingChunks[_lastChunkSpec]; ok {
+                       count += _lastChunkSpec.Length - chunkSize
+               }
+       }
+       return
+}
+
+type Torrent struct {
+       InfoHash   InfoHash
+       Pieces     []*piece
+       Data       MMapSpan
+       MetaInfo   *metainfo.MetaInfo
+       Conns      []*Connection
+       Peers      []Peer
+       Priorities *list.List
+       // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
+       // mirror their respective URLs from the announce-list key.
+       Trackers [][]tracker.Client
+}
+
+func (t *Torrent) NumPieces() int {
+       return len(t.MetaInfo.Pieces) / PieceHash.Size()
+}
+
+func (t *Torrent) NumPiecesCompleted() (num int) {
+       for _, p := range t.Pieces {
+               if p.Complete() {
+                       num++
+               }
+       }
+       return
+}
+
+func (t *Torrent) Length() int64 {
+       return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
+}
+
+func (t *Torrent) Close() (err error) {
+       t.Data.Close()
+       for _, conn := range t.Conns {
+               conn.Close()
+       }
+       return
+}
+
+func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
+       slice := pieceByBytesPendingSlice{
+               Pending: make([]peer_protocol.Integer, 0, len(t.Pieces)),
+               Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),
+       }
+       for i := range t.Pieces {
+               slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(peer_protocol.Integer(i)))
+               slice.Indices = append(slice.Indices, peer_protocol.Integer(i))
+       }
+       sort.Sort(sort.Reverse(slice))
+       return slice.Indices
+}
+
+func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
+       req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+       if req.Index < 0 || int(req.Index) >= len(t.Pieces) {
+               return
+       }
+       off %= t.MetaInfo.PieceLength
+       pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off)
+       if pieceLeft <= 0 {
+               return
+       }
+       req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize)
+       req.Length = chunkSize
+       if req.Length > pieceLeft {
+               req.Length = pieceLeft
+       }
+       ok = true
+       return
+}
+
+func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
+       _, err = t.Data.WriteAt(data, int64(piece)*t.MetaInfo.PieceLength+begin)
+       return
+}
+
+func (t *Torrent) bitfield() (bf []bool) {
+       for _, p := range t.Pieces {
+               bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
+       }
+       return
+}
+
+func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) {
+       piece := t.Pieces[index]
+       if piece.PendingChunkSpecs == nil {
+               piece.PendingChunkSpecs = make(
+                       map[ChunkSpec]struct{},
+                       (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+       }
+       c := ChunkSpec{
+               Begin: 0,
+       }
+       cs := piece.PendingChunkSpecs
+       for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length {
+               c.Length = left
+               if c.Length > chunkSize {
+                       c.Length = chunkSize
+               }
+               cs[c] = struct{}{}
+               c.Begin += c.Length
+       }
+       return
+}
+
+func (t *Torrent) requestHeat() (ret map[Request]int) {
+       ret = make(map[Request]int)
+       for _, conn := range t.Conns {
+               for req, _ := range conn.Requests {
+                       ret[req]++
+               }
+       }
+       return
+}
+
+type Peer struct {
+       Id   [20]byte
+       IP   net.IP
+       Port int
+}
+
+func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
+       if int(piece) == t.NumPieces()-1 {
+               len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
+       }
+       if len_ == 0 {
+               len_ = peer_protocol.Integer(t.MetaInfo.PieceLength)
+       }
+       return
+}
+
+func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) {
+       hash := PieceHash.New()
+       n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
+       if err != nil {
+               panic(err)
+       }
+       if peer_protocol.Integer(n) != t.PieceLength(piece) {
+               panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
+       }
+       copyHashSum(ps[:], hash.Sum(nil))
+       return
+}
+func (t *Torrent) haveAllPieces() bool {
+       for _, piece := range t.Pieces {
+               if !piece.Complete() {
+                       return false
+               }
+       }
+       return true
+}
+
+func (me *Torrent) haveAnyPieces() bool {
+       for _, piece := range me.Pieces {
+               if piece.Complete() {
+                       return true
+               }
+       }
+       return false
+}
+
+func (t *Torrent) wantPiece(index int) bool {
+       p := t.Pieces[index]
+       return p.EverHashed && len(p.PendingChunkSpecs) != 0
+}