From: Matt Joiner <anacrolix@gmail.com>
Date: Thu, 3 Apr 2014 12:16:59 +0000 (+1100)
Subject: Break up client.go into several files and a few fixes
X-Git-Tag: v1.0.0~1761
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=9ba3af19ba811a8513c235712a3dc8d7b990099a;p=btrtrc.git

Break up client.go into several files and a few fixes

Suppress expected errors when initiating connections.
Add an explicit timeout to initiate connections.
Put a lock on closing a connection to prevent data race warnings.
---

diff --git a/client.go b/client.go
index a0e375c6..fac59b75 100644
--- a/client.go
+++ b/client.go
@@ -3,7 +3,6 @@ package torrent
 import (
 	"bufio"
 	"container/list"
-	"crypto"
 	"crypto/rand"
 	"encoding"
 	"errors"
@@ -13,9 +12,8 @@ import (
 	mathRand "math/rand"
 	"net"
 	"os"
-	"path/filepath"
-	"sort"
 	"sync"
+	"syscall"
 	"time"
 
 	metainfo "github.com/nsf/libtorgo/torrent"
@@ -23,290 +21,8 @@ import (
 	"bitbucket.org/anacrolix/go.torrent/peer_protocol"
 	"bitbucket.org/anacrolix/go.torrent/tracker"
 	_ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
-	"launchpad.net/gommap"
 )
 
-const (
-	PieceHash   = crypto.SHA1
-	maxRequests = 250
-	chunkSize   = 0x4000 // 16KiB
-	BEP20       = "-GT0000-"
-)
-
-type InfoHash [20]byte
-
-type pieceSum [20]byte
-
-func copyHashSum(dst, src []byte) {
-	if len(dst) != len(src) || copy(dst, src) != len(dst) {
-		panic("hash sum sizes differ")
-	}
-}
-
-func BytesInfoHash(b []byte) (ih InfoHash) {
-	if len(b) != len(ih) || copy(ih[:], b) != len(ih) {
-		panic("bad infohash bytes")
-	}
-	return
-}
-
-type piece struct {
-	Hash              pieceSum
-	PendingChunkSpecs map[ChunkSpec]struct{}
-	Hashing           bool
-	QueuedForHash     bool
-	EverHashed        bool
-}
-
-func (p *piece) Complete() bool {
-	return len(p.PendingChunkSpecs) == 0 && p.EverHashed
-}
-
-func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
-	cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
-	cs.Length = pieceLength - cs.Begin
-	return
-}
-
-func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) {
-	pendingChunks := t.Pieces[index].PendingChunkSpecs
-	count = peer_protocol.Integer(len(pendingChunks)) * chunkSize
-	_lastChunkSpec := lastChunkSpec(t.PieceLength(index))
-	if _lastChunkSpec.Length != chunkSize {
-		if _, ok := pendingChunks[_lastChunkSpec]; ok {
-			count += _lastChunkSpec.Length - chunkSize
-		}
-	}
-	return
-}
-
-type ChunkSpec struct {
-	Begin, Length peer_protocol.Integer
-}
-
-type Request struct {
-	Index peer_protocol.Integer
-	ChunkSpec
-}
-
-type Connection struct {
-	Socket net.Conn
-	Closed bool
-	post   chan encoding.BinaryMarshaler
-	write  chan []byte
-
-	// Stuff controlled by the local peer.
-	Interested bool
-	Choked     bool
-	Requests   map[Request]struct{}
-
-	// Stuff controlled by the remote peer.
-	PeerId         [20]byte
-	PeerInterested bool
-	PeerChoked     bool
-	PeerRequests   map[Request]struct{}
-	PeerExtensions [8]byte
-	PeerPieces     []bool
-}
-
-func (c *Connection) Close() {
-	if c.Closed {
-		return
-	}
-	c.Socket.Close()
-	close(c.post)
-	c.Closed = true
-}
-
-func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool {
-	if c.PeerPieces == nil {
-		return false
-	}
-	return c.PeerPieces[index]
-}
-
-func (c *Connection) Post(msg encoding.BinaryMarshaler) {
-	c.post <- msg
-}
-
-// Returns true if more requests can be sent.
-func (c *Connection) Request(chunk Request) bool {
-	if len(c.Requests) >= maxRequests {
-		return false
-	}
-	if !c.PeerPieces[chunk.Index] {
-		return true
-	}
-	c.SetInterested(true)
-	if c.PeerChoked {
-		return false
-	}
-	if _, ok := c.Requests[chunk]; !ok {
-		c.Post(peer_protocol.Message{
-			Type:   peer_protocol.Request,
-			Index:  chunk.Index,
-			Begin:  chunk.Begin,
-			Length: chunk.Length,
-		})
-	}
-	if c.Requests == nil {
-		c.Requests = make(map[Request]struct{}, maxRequests)
-	}
-	c.Requests[chunk] = struct{}{}
-	return true
-}
-
-func (c *Connection) Unchoke() {
-	if !c.Choked {
-		return
-	}
-	c.Post(peer_protocol.Message{
-		Type: peer_protocol.Unchoke,
-	})
-	c.Choked = false
-}
-
-func (c *Connection) SetInterested(interested bool) {
-	if c.Interested == interested {
-		return
-	}
-	c.Post(peer_protocol.Message{
-		Type: func() peer_protocol.MessageType {
-			if interested {
-				return peer_protocol.Interested
-			} else {
-				return peer_protocol.NotInterested
-			}
-		}(),
-	})
-	c.Interested = interested
-}
-
-var (
-	keepAliveBytes [4]byte
-)
-
-func (conn *Connection) writer() {
-	timer := time.NewTimer(0)
-	defer timer.Stop()
-	for {
-		if !timer.Reset(time.Minute) {
-			<-timer.C
-		}
-		var b []byte
-		select {
-		case <-timer.C:
-			b = keepAliveBytes[:]
-		case b = <-conn.write:
-			if b == nil {
-				return
-			}
-		}
-		n, err := conn.Socket.Write(b)
-		if err != nil {
-			log.Print(err)
-			break
-		}
-		if n != len(b) {
-			panic("didn't write all bytes")
-		}
-	}
-}
-
-func (conn *Connection) writeOptimizer() {
-	pending := list.New()
-	var nextWrite []byte
-	defer close(conn.write)
-	for {
-		write := conn.write
-		if pending.Len() == 0 {
-			write = nil
-		} else {
-			var err error
-			nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
-			if err != nil {
-				panic(err)
-			}
-		}
-		select {
-		case msg, ok := <-conn.post:
-			if !ok {
-				return
-			}
-			pending.PushBack(msg)
-		case write <- nextWrite:
-			pending.Remove(pending.Front())
-		}
-	}
-}
-
-type Torrent struct {
-	InfoHash   InfoHash
-	Pieces     []*piece
-	Data       MMapSpan
-	MetaInfo   *metainfo.MetaInfo
-	Conns      []*Connection
-	Peers      []Peer
-	Priorities *list.List
-	// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
-	// mirror their respective URLs from the announce-list key.
-	Trackers [][]tracker.Client
-}
-
-func (t *Torrent) NumPieces() int {
-	return len(t.MetaInfo.Pieces) / PieceHash.Size()
-}
-
-func (t *Torrent) NumPiecesCompleted() (num int) {
-	for _, p := range t.Pieces {
-		if p.Complete() {
-			num++
-		}
-	}
-	return
-}
-
-func (t *Torrent) Length() int64 {
-	return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
-}
-
-func (t *Torrent) Close() (err error) {
-	t.Data.Close()
-	for _, conn := range t.Conns {
-		conn.Close()
-	}
-	return
-}
-
-type pieceByBytesPendingSlice struct {
-	Pending, Indices []peer_protocol.Integer
-}
-
-func (pcs pieceByBytesPendingSlice) Len() int {
-	return len(pcs.Indices)
-}
-
-func (me pieceByBytesPendingSlice) Less(i, j int) bool {
-	return me.Pending[me.Indices[i]] < me.Pending[me.Indices[j]]
-}
-
-func (me pieceByBytesPendingSlice) Swap(i, j int) {
-	me.Indices[i], me.Indices[j] = me.Indices[j], me.Indices[i]
-}
-
-func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
-	slice := pieceByBytesPendingSlice{
-		Pending: make([]peer_protocol.Integer, 0, len(t.Pieces)),
-		Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),
-	}
-	for i := range t.Pieces {
-		slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(peer_protocol.Integer(i)))
-		slice.Indices = append(slice.Indices, peer_protocol.Integer(i))
-	}
-	sort.Sort(sort.Reverse(slice))
-	return slice.Indices
-}
-
 // Currently doesn't really queue, but should in the future.
 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
 	piece := t.Pieces[pieceIndex]
@@ -317,25 +33,6 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer)
 	go cl.verifyPiece(t, pieceIndex)
 }
 
-func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
-	req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength)
-	if req.Index < 0 || int(req.Index) >= len(t.Pieces) {
-		return
-	}
-	off %= t.MetaInfo.PieceLength
-	pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off)
-	if pieceLeft <= 0 {
-		return
-	}
-	req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize)
-	req.Length = chunkSize
-	if req.Length > pieceLeft {
-		req.Length = pieceLeft
-	}
-	ok = true
-	return
-}
-
 func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
 	cl.mu.Lock()
 	defer cl.mu.Unlock()
@@ -369,79 +66,6 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
 	}
 }
 
-func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
-	_, err = t.Data.WriteAt(data, int64(piece)*t.MetaInfo.PieceLength+begin)
-	return
-}
-
-func (t *Torrent) bitfield() (bf []bool) {
-	for _, p := range t.Pieces {
-		bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
-	}
-	return
-}
-
-func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) {
-	piece := t.Pieces[index]
-	if piece.PendingChunkSpecs == nil {
-		piece.PendingChunkSpecs = make(
-			map[ChunkSpec]struct{},
-			(t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
-	}
-	c := ChunkSpec{
-		Begin: 0,
-	}
-	cs := piece.PendingChunkSpecs
-	for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length {
-		c.Length = left
-		if c.Length > chunkSize {
-			c.Length = chunkSize
-		}
-		cs[c] = struct{}{}
-		c.Begin += c.Length
-	}
-	return
-}
-
-func (t *Torrent) requestHeat() (ret map[Request]int) {
-	ret = make(map[Request]int)
-	for _, conn := range t.Conns {
-		for req, _ := range conn.Requests {
-			ret[req]++
-		}
-	}
-	return
-}
-
-type Peer struct {
-	Id   [20]byte
-	IP   net.IP
-	Port int
-}
-
-func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
-	if int(piece) == t.NumPieces()-1 {
-		len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
-	}
-	if len_ == 0 {
-		len_ = peer_protocol.Integer(t.MetaInfo.PieceLength)
-	}
-	return
-}
-
-func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) {
-	hash := PieceHash.New()
-	n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
-	if err != nil {
-		panic(err)
-	}
-	if peer_protocol.Integer(n) != t.PieceLength(piece) {
-		panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
-	}
-	copyHashSum(ps[:], hash.Sum(nil))
-	return
-}
-
 type DataSpec struct {
 	InfoHash
 	Request
@@ -464,10 +88,6 @@ type Client struct {
 	dataWaiter chan struct{}
 }
 
-var (
-	ErrDataNotReady = errors.New("data not ready")
-)
-
 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
 	cl.mu.Lock()
 	defer cl.mu.Unlock()
@@ -576,54 +196,6 @@ func (cl *Client) acceptConnections() {
 	}
 }
 
-func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
-	defer func() {
-		if err != nil {
-			mms.Close()
-			mms = nil
-		}
-	}()
-	for _, miFile := range metaInfo.Files {
-		fileName := filepath.Join(append([]string{location, metaInfo.Name}, miFile.Path...)...)
-		err = os.MkdirAll(filepath.Dir(fileName), 0777)
-		if err != nil {
-			return
-		}
-		var file *os.File
-		file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
-		if err != nil {
-			return
-		}
-		func() {
-			defer file.Close()
-			var fi os.FileInfo
-			fi, err = file.Stat()
-			if err != nil {
-				return
-			}
-			if fi.Size() < miFile.Length {
-				err = file.Truncate(miFile.Length)
-				if err != nil {
-					return
-				}
-			}
-			var mMap gommap.MMap
-			mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
-			if err != nil {
-				return
-			}
-			if int64(len(mMap)) != miFile.Length {
-				panic("mmap has wrong length")
-			}
-			mms = append(mms, MMap{mMap})
-		}()
-		if err != nil {
-			return
-		}
-	}
-	return
-}
-
 func (me *Client) torrent(ih InfoHash) *Torrent {
 	for _, t := range me.torrents {
 		if t.InfoHash == ih {
@@ -639,18 +211,33 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
 	}
 	me.halfOpen++
 	go func() {
-		conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
+		addr := &net.TCPAddr{
 			IP:   peer.IP,
 			Port: peer.Port,
-		})
+		}
+		conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
 
-		me.mu.Lock()
-		me.halfOpen--
-		me.openNewConns()
-		me.mu.Unlock()
+		go func() {
+			me.mu.Lock()
+			defer me.mu.Unlock()
+			if me.halfOpen == 0 {
+				panic("assert")
+			}
+			me.halfOpen--
+			me.openNewConns()
+		}()
 
+		if netOpErr, ok := err.(*net.OpError); ok {
+			if netOpErr.Timeout() {
+				return
+			}
+			switch netOpErr.Err {
+			case syscall.ECONNREFUSED:
+				return
+			}
+		}
 		if err != nil {
-			log.Printf("error connecting to peer: %s", err)
+			log.Printf("error connecting to peer: %s %#v", err, err)
 			return
 		}
 		log.Printf("connected to %s", conn.RemoteAddr())
@@ -661,24 +248,6 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
 	}()
 }
 
-func (t *Torrent) haveAllPieces() bool {
-	for _, piece := range t.Pieces {
-		if !piece.Complete() {
-			return false
-		}
-	}
-	return true
-}
-
-func (me *Torrent) haveAnyPieces() bool {
-	for _, piece := range me.Pieces {
-		if piece.Complete() {
-			return true
-		}
-	}
-	return false
-}
-
 func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) {
 	conn := &Connection{
 		Socket:     sock,
@@ -765,11 +334,6 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) {
 	}
 }
 
-func (t *Torrent) wantPiece(index int) bool {
-	p := t.Pieces[index]
-	return p.EverHashed && len(p.PendingChunkSpecs) != 0
-}
-
 func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) {
 	me.replenishConnRequests(torrent, conn)
 }
@@ -851,7 +415,7 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
 		case peer_protocol.Piece:
 			request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
 			if _, ok := conn.Requests[request_]; !ok {
-				err = errors.New("unexpected piece")
+				err = fmt.Errorf("unexpected piece: %s", request_)
 				break
 			}
 			delete(conn.Requests, request_)
@@ -1086,7 +650,7 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
 			}
 		}
 	}
-	// Then finish of incomplete pieces in order of bytes remaining.
+	// Then finish off incomplete pieces in order of bytes remaining.
 	for _, index := range torrent.piecesByPendingBytesDesc() {
 		if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
 			continue
diff --git a/connection.go b/connection.go
new file mode 100644
index 00000000..cbe0e91c
--- /dev/null
+++ b/connection.go
@@ -0,0 +1,174 @@
+package torrent
+
+import (
+	"container/list"
+	"encoding"
+	"log"
+	"net"
+	"sync"
+	"time"
+
+	"bitbucket.org/anacrolix/go.torrent/peer_protocol"
+)
+
+// Maintains the state of a connection with a peer.
+type Connection struct {
+	Socket net.Conn
+	closed bool
+	mu     sync.Mutex // Only for closing.
+	post   chan encoding.BinaryMarshaler
+	write  chan []byte
+
+	// Stuff controlled by the local peer.
+	Interested bool
+	Choked     bool
+	Requests   map[Request]struct{}
+
+	// Stuff controlled by the remote peer.
+	PeerId         [20]byte
+	PeerInterested bool
+	PeerChoked     bool
+	PeerRequests   map[Request]struct{}
+	PeerExtensions [8]byte
+	PeerPieces     []bool
+}
+
+func (c *Connection) Close() {
+	c.mu.Lock()
+	if c.closed {
+		return
+	}
+	c.Socket.Close()
+	close(c.post)
+	c.closed = true
+	c.mu.Unlock()
+}
+
+func (c *Connection) getClosed() bool {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	return c.closed
+}
+
+func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool {
+	if c.PeerPieces == nil {
+		return false
+	}
+	return c.PeerPieces[index]
+}
+
+func (c *Connection) Post(msg encoding.BinaryMarshaler) {
+	c.post <- msg
+}
+
+// Returns true if more requests can be sent.
+func (c *Connection) Request(chunk Request) bool {
+	if len(c.Requests) >= maxRequests {
+		return false
+	}
+	if !c.PeerPieces[chunk.Index] {
+		return true
+	}
+	c.SetInterested(true)
+	if c.PeerChoked {
+		return false
+	}
+	if _, ok := c.Requests[chunk]; !ok {
+		c.Post(peer_protocol.Message{
+			Type:   peer_protocol.Request,
+			Index:  chunk.Index,
+			Begin:  chunk.Begin,
+			Length: chunk.Length,
+		})
+	}
+	if c.Requests == nil {
+		c.Requests = make(map[Request]struct{}, maxRequests)
+	}
+	c.Requests[chunk] = struct{}{}
+	return true
+}
+
+func (c *Connection) Unchoke() {
+	if !c.Choked {
+		return
+	}
+	c.Post(peer_protocol.Message{
+		Type: peer_protocol.Unchoke,
+	})
+	c.Choked = false
+}
+
+func (c *Connection) SetInterested(interested bool) {
+	if c.Interested == interested {
+		return
+	}
+	c.Post(peer_protocol.Message{
+		Type: func() peer_protocol.MessageType {
+			if interested {
+				return peer_protocol.Interested
+			} else {
+				return peer_protocol.NotInterested
+			}
+		}(),
+	})
+	c.Interested = interested
+}
+
+var (
+	// Four consecutive zero bytes that comprise a keep alive on the wire.
+	keepAliveBytes [4]byte
+)
+
+func (conn *Connection) writer() {
+	timer := time.NewTimer(0)
+	defer timer.Stop()
+	for {
+		if !timer.Reset(time.Minute) {
+			<-timer.C
+		}
+		var b []byte
+		select {
+		case <-timer.C:
+			b = keepAliveBytes[:]
+		case b = <-conn.write:
+			if b == nil {
+				return
+			}
+		}
+		_, err := conn.Socket.Write(b)
+		if conn.getClosed() {
+			break
+		}
+		if err != nil {
+			log.Print(err)
+			break
+		}
+	}
+}
+
+func (conn *Connection) writeOptimizer() {
+	pending := list.New()
+	var nextWrite []byte
+	defer close(conn.write)
+	for {
+		write := conn.write
+		if pending.Len() == 0 {
+			write = nil
+		} else {
+			var err error
+			nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
+			if err != nil {
+				panic(err)
+			}
+		}
+		select {
+		case msg, ok := <-conn.post:
+			if !ok {
+				return
+			}
+			pending.PushBack(msg)
+		case write <- nextWrite:
+			pending.Remove(pending.Front())
+		}
+	}
+}
diff --git a/misc.go b/misc.go
new file mode 100644
index 00000000..e5644005
--- /dev/null
+++ b/misc.go
@@ -0,0 +1,133 @@
+package torrent
+
+import (
+	"crypto"
+	"errors"
+	"os"
+	"path/filepath"
+	"time"
+
+	"bitbucket.org/anacrolix/go.torrent/peer_protocol"
+	metainfo "github.com/nsf/libtorgo/torrent"
+	"launchpad.net/gommap"
+)
+
+const (
+	PieceHash   = crypto.SHA1
+	maxRequests = 250
+	chunkSize   = 0x4000 // 16KiB
+	BEP20       = "-GT0000-"
+	dialTimeout = time.Second * 15
+)
+
+type InfoHash [20]byte
+
+type pieceSum [20]byte
+
+func copyHashSum(dst, src []byte) {
+	if len(dst) != len(src) || copy(dst, src) != len(dst) {
+		panic("hash sum sizes differ")
+	}
+}
+
+func BytesInfoHash(b []byte) (ih InfoHash) {
+	if len(b) != len(ih) || copy(ih[:], b) != len(ih) {
+		panic("bad infohash bytes")
+	}
+	return
+}
+
+type piece struct {
+	Hash              pieceSum
+	PendingChunkSpecs map[ChunkSpec]struct{}
+	Hashing           bool
+	QueuedForHash     bool
+	EverHashed        bool
+}
+
+func (p *piece) Complete() bool {
+	return len(p.PendingChunkSpecs) == 0 && p.EverHashed
+}
+
+func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
+	cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
+	cs.Length = pieceLength - cs.Begin
+	return
+}
+
+type ChunkSpec struct {
+	Begin, Length peer_protocol.Integer
+}
+
+type Request struct {
+	Index peer_protocol.Integer
+	ChunkSpec
+}
+
+type pieceByBytesPendingSlice struct {
+	Pending, Indices []peer_protocol.Integer
+}
+
+func (pcs pieceByBytesPendingSlice) Len() int {
+	return len(pcs.Indices)
+}
+
+func (me pieceByBytesPendingSlice) Less(i, j int) bool {
+	return me.Pending[me.Indices[i]] < me.Pending[me.Indices[j]]
+}
+
+func (me pieceByBytesPendingSlice) Swap(i, j int) {
+	me.Indices[i], me.Indices[j] = me.Indices[j], me.Indices[i]
+}
+
+var (
+	ErrDataNotReady = errors.New("data not ready")
+)
+
+func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
+	defer func() {
+		if err != nil {
+			mms.Close()
+			mms = nil
+		}
+	}()
+	for _, miFile := range metaInfo.Files {
+		fileName := filepath.Join(append([]string{location, metaInfo.Name}, miFile.Path...)...)
+		err = os.MkdirAll(filepath.Dir(fileName), 0777)
+		if err != nil {
+			return
+		}
+		var file *os.File
+		file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
+		if err != nil {
+			return
+		}
+		func() {
+			defer file.Close()
+			var fi os.FileInfo
+			fi, err = file.Stat()
+			if err != nil {
+				return
+			}
+			if fi.Size() < miFile.Length {
+				err = file.Truncate(miFile.Length)
+				if err != nil {
+					return
+				}
+			}
+			var mMap gommap.MMap
+			mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
+			if err != nil {
+				return
+			}
+			if int64(len(mMap)) != miFile.Length {
+				panic("mmap has wrong length")
+			}
+			mms = append(mms, MMap{mMap})
+		}()
+		if err != nil {
+			return
+		}
+	}
+	return
+}
diff --git a/torrent.go b/torrent.go
new file mode 100644
index 00000000..19c64d2f
--- /dev/null
+++ b/torrent.go
@@ -0,0 +1,189 @@
+package torrent
+
+import (
+	"container/list"
+	"fmt"
+	"net"
+	"sort"
+
+	"bitbucket.org/anacrolix/go.torrent/peer_protocol"
+	"bitbucket.org/anacrolix/go.torrent/tracker"
+	metainfo "github.com/nsf/libtorgo/torrent"
+)
+
+func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) {
+	pendingChunks := t.Pieces[index].PendingChunkSpecs
+	count = peer_protocol.Integer(len(pendingChunks)) * chunkSize
+	_lastChunkSpec := lastChunkSpec(t.PieceLength(index))
+	if _lastChunkSpec.Length != chunkSize {
+		if _, ok := pendingChunks[_lastChunkSpec]; ok {
+			count += _lastChunkSpec.Length - chunkSize
+		}
+	}
+	return
+}
+
+type Torrent struct {
+	InfoHash   InfoHash
+	Pieces     []*piece
+	Data       MMapSpan
+	MetaInfo   *metainfo.MetaInfo
+	Conns      []*Connection
+	Peers      []Peer
+	Priorities *list.List
+	// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
+	// mirror their respective URLs from the announce-list key.
+	Trackers [][]tracker.Client
+}
+
+func (t *Torrent) NumPieces() int {
+	return len(t.MetaInfo.Pieces) / PieceHash.Size()
+}
+
+func (t *Torrent) NumPiecesCompleted() (num int) {
+	for _, p := range t.Pieces {
+		if p.Complete() {
+			num++
+		}
+	}
+	return
+}
+
+func (t *Torrent) Length() int64 {
+	return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
+}
+
+func (t *Torrent) Close() (err error) {
+	t.Data.Close()
+	for _, conn := range t.Conns {
+		conn.Close()
+	}
+	return
+}
+
+func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
+	slice := pieceByBytesPendingSlice{
+		Pending: make([]peer_protocol.Integer, 0, len(t.Pieces)),
+		Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),
+	}
+	for i := range t.Pieces {
+		slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(peer_protocol.Integer(i)))
+		slice.Indices = append(slice.Indices, peer_protocol.Integer(i))
+	}
+	sort.Sort(sort.Reverse(slice))
+	return slice.Indices
+}
+
+func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
+	req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+	if req.Index < 0 || int(req.Index) >= len(t.Pieces) {
+		return
+	}
+	off %= t.MetaInfo.PieceLength
+	pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off)
+	if pieceLeft <= 0 {
+		return
+	}
+	req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize)
+	req.Length = chunkSize
+	if req.Length > pieceLeft {
+		req.Length = pieceLeft
+	}
+	ok = true
+	return
+}
+
+func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
+	_, err = t.Data.WriteAt(data, int64(piece)*t.MetaInfo.PieceLength+begin)
+	return
+}
+
+func (t *Torrent) bitfield() (bf []bool) {
+	for _, p := range t.Pieces {
+		bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
+	}
+	return
+}
+
+func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) {
+	piece := t.Pieces[index]
+	if piece.PendingChunkSpecs == nil {
+		piece.PendingChunkSpecs = make(
+			map[ChunkSpec]struct{},
+			(t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+	}
+	c := ChunkSpec{
+		Begin: 0,
+	}
+	cs := piece.PendingChunkSpecs
+	for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length {
+		c.Length = left
+		if c.Length > chunkSize {
+			c.Length = chunkSize
+		}
+		cs[c] = struct{}{}
+		c.Begin += c.Length
+	}
+	return
+}
+
+func (t *Torrent) requestHeat() (ret map[Request]int) {
+	ret = make(map[Request]int)
+	for _, conn := range t.Conns {
+		for req, _ := range conn.Requests {
+			ret[req]++
+		}
+	}
+	return
+}
+
+type Peer struct {
+	Id   [20]byte
+	IP   net.IP
+	Port int
+}
+
+func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
+	if int(piece) == t.NumPieces()-1 {
+		len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
+	}
+	if len_ == 0 {
+		len_ = peer_protocol.Integer(t.MetaInfo.PieceLength)
+	}
+	return
+}
+
+func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) {
+	hash := PieceHash.New()
+	n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
+	if err != nil {
+		panic(err)
+	}
+	if peer_protocol.Integer(n) != t.PieceLength(piece) {
+		panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
+	}
+	copyHashSum(ps[:], hash.Sum(nil))
+	return
+}
+func (t *Torrent) haveAllPieces() bool {
+	for _, piece := range t.Pieces {
+		if !piece.Complete() {
+			return false
+		}
+	}
+	return true
+}
+
+func (me *Torrent) haveAnyPieces() bool {
+	for _, piece := range me.Pieces {
+		if piece.Complete() {
+			return true
+		}
+	}
+	return false
+}
+
+func (t *Torrent) wantPiece(index int) bool {
+	p := t.Pieces[index]
+	return p.EverHashed && len(p.PendingChunkSpecs) != 0
+}