import (
"bufio"
"container/list"
- "crypto"
"crypto/rand"
"encoding"
"errors"
mathRand "math/rand"
"net"
"os"
- "path/filepath"
- "sort"
"sync"
+ "syscall"
"time"
metainfo "github.com/nsf/libtorgo/torrent"
"bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker"
_ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
- "launchpad.net/gommap"
)
-const (
- PieceHash = crypto.SHA1
- maxRequests = 250
- chunkSize = 0x4000 // 16KiB
- BEP20 = "-GT0000-"
-)
-
-type InfoHash [20]byte
-
-type pieceSum [20]byte
-
-func copyHashSum(dst, src []byte) {
- if len(dst) != len(src) || copy(dst, src) != len(dst) {
- panic("hash sum sizes differ")
- }
-}
-
-func BytesInfoHash(b []byte) (ih InfoHash) {
- if len(b) != len(ih) || copy(ih[:], b) != len(ih) {
- panic("bad infohash bytes")
- }
- return
-}
-
-type piece struct {
- Hash pieceSum
- PendingChunkSpecs map[ChunkSpec]struct{}
- Hashing bool
- QueuedForHash bool
- EverHashed bool
-}
-
-func (p *piece) Complete() bool {
- return len(p.PendingChunkSpecs) == 0 && p.EverHashed
-}
-
-func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
- cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
- cs.Length = pieceLength - cs.Begin
- return
-}
-
-func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) {
- pendingChunks := t.Pieces[index].PendingChunkSpecs
- count = peer_protocol.Integer(len(pendingChunks)) * chunkSize
- _lastChunkSpec := lastChunkSpec(t.PieceLength(index))
- if _lastChunkSpec.Length != chunkSize {
- if _, ok := pendingChunks[_lastChunkSpec]; ok {
- count += _lastChunkSpec.Length - chunkSize
- }
- }
- return
-}
-
-type ChunkSpec struct {
- Begin, Length peer_protocol.Integer
-}
-
-type Request struct {
- Index peer_protocol.Integer
- ChunkSpec
-}
-
-type Connection struct {
- Socket net.Conn
- Closed bool
- post chan encoding.BinaryMarshaler
- write chan []byte
-
- // Stuff controlled by the local peer.
- Interested bool
- Choked bool
- Requests map[Request]struct{}
-
- // Stuff controlled by the remote peer.
- PeerId [20]byte
- PeerInterested bool
- PeerChoked bool
- PeerRequests map[Request]struct{}
- PeerExtensions [8]byte
- PeerPieces []bool
-}
-
-func (c *Connection) Close() {
- if c.Closed {
- return
- }
- c.Socket.Close()
- close(c.post)
- c.Closed = true
-}
-
-func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool {
- if c.PeerPieces == nil {
- return false
- }
- return c.PeerPieces[index]
-}
-
-func (c *Connection) Post(msg encoding.BinaryMarshaler) {
- c.post <- msg
-}
-
-// Returns true if more requests can be sent.
-func (c *Connection) Request(chunk Request) bool {
- if len(c.Requests) >= maxRequests {
- return false
- }
- if !c.PeerPieces[chunk.Index] {
- return true
- }
- c.SetInterested(true)
- if c.PeerChoked {
- return false
- }
- if _, ok := c.Requests[chunk]; !ok {
- c.Post(peer_protocol.Message{
- Type: peer_protocol.Request,
- Index: chunk.Index,
- Begin: chunk.Begin,
- Length: chunk.Length,
- })
- }
- if c.Requests == nil {
- c.Requests = make(map[Request]struct{}, maxRequests)
- }
- c.Requests[chunk] = struct{}{}
- return true
-}
-
-func (c *Connection) Unchoke() {
- if !c.Choked {
- return
- }
- c.Post(peer_protocol.Message{
- Type: peer_protocol.Unchoke,
- })
- c.Choked = false
-}
-
-func (c *Connection) SetInterested(interested bool) {
- if c.Interested == interested {
- return
- }
- c.Post(peer_protocol.Message{
- Type: func() peer_protocol.MessageType {
- if interested {
- return peer_protocol.Interested
- } else {
- return peer_protocol.NotInterested
- }
- }(),
- })
- c.Interested = interested
-}
-
-var (
- keepAliveBytes [4]byte
-)
-
-func (conn *Connection) writer() {
- timer := time.NewTimer(0)
- defer timer.Stop()
- for {
- if !timer.Reset(time.Minute) {
- <-timer.C
- }
- var b []byte
- select {
- case <-timer.C:
- b = keepAliveBytes[:]
- case b = <-conn.write:
- if b == nil {
- return
- }
- }
- n, err := conn.Socket.Write(b)
- if err != nil {
- log.Print(err)
- break
- }
- if n != len(b) {
- panic("didn't write all bytes")
- }
- }
-}
-
-func (conn *Connection) writeOptimizer() {
- pending := list.New()
- var nextWrite []byte
- defer close(conn.write)
- for {
- write := conn.write
- if pending.Len() == 0 {
- write = nil
- } else {
- var err error
- nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
- if err != nil {
- panic(err)
- }
- }
- select {
- case msg, ok := <-conn.post:
- if !ok {
- return
- }
- pending.PushBack(msg)
- case write <- nextWrite:
- pending.Remove(pending.Front())
- }
- }
-}
-
-type Torrent struct {
- InfoHash InfoHash
- Pieces []*piece
- Data MMapSpan
- MetaInfo *metainfo.MetaInfo
- Conns []*Connection
- Peers []Peer
- Priorities *list.List
- // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
- // mirror their respective URLs from the announce-list key.
- Trackers [][]tracker.Client
-}
-
-func (t *Torrent) NumPieces() int {
- return len(t.MetaInfo.Pieces) / PieceHash.Size()
-}
-
-func (t *Torrent) NumPiecesCompleted() (num int) {
- for _, p := range t.Pieces {
- if p.Complete() {
- num++
- }
- }
- return
-}
-
-func (t *Torrent) Length() int64 {
- return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
-}
-
-func (t *Torrent) Close() (err error) {
- t.Data.Close()
- for _, conn := range t.Conns {
- conn.Close()
- }
- return
-}
-
-type pieceByBytesPendingSlice struct {
- Pending, Indices []peer_protocol.Integer
-}
-
-func (pcs pieceByBytesPendingSlice) Len() int {
- return len(pcs.Indices)
-}
-
-func (me pieceByBytesPendingSlice) Less(i, j int) bool {
- return me.Pending[me.Indices[i]] < me.Pending[me.Indices[j]]
-}
-
-func (me pieceByBytesPendingSlice) Swap(i, j int) {
- me.Indices[i], me.Indices[j] = me.Indices[j], me.Indices[i]
-}
-
-func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
- slice := pieceByBytesPendingSlice{
- Pending: make([]peer_protocol.Integer, 0, len(t.Pieces)),
- Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),
- }
- for i := range t.Pieces {
- slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(peer_protocol.Integer(i)))
- slice.Indices = append(slice.Indices, peer_protocol.Integer(i))
- }
- sort.Sort(sort.Reverse(slice))
- return slice.Indices
-}
-
// Currently doesn't really queue, but should in the future.
func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
piece := t.Pieces[pieceIndex]
go cl.verifyPiece(t, pieceIndex)
}
-func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
- req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength)
- if req.Index < 0 || int(req.Index) >= len(t.Pieces) {
- return
- }
- off %= t.MetaInfo.PieceLength
- pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off)
- if pieceLeft <= 0 {
- return
- }
- req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize)
- req.Length = chunkSize
- if req.Length > pieceLeft {
- req.Length = pieceLeft
- }
- ok = true
- return
-}
-
func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
cl.mu.Lock()
defer cl.mu.Unlock()
}
}
-func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
- _, err = t.Data.WriteAt(data, int64(piece)*t.MetaInfo.PieceLength+begin)
- return
-}
-
-func (t *Torrent) bitfield() (bf []bool) {
- for _, p := range t.Pieces {
- bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
- }
- return
-}
-
-func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) {
- piece := t.Pieces[index]
- if piece.PendingChunkSpecs == nil {
- piece.PendingChunkSpecs = make(
- map[ChunkSpec]struct{},
- (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
- }
- c := ChunkSpec{
- Begin: 0,
- }
- cs := piece.PendingChunkSpecs
- for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length {
- c.Length = left
- if c.Length > chunkSize {
- c.Length = chunkSize
- }
- cs[c] = struct{}{}
- c.Begin += c.Length
- }
- return
-}
-
-func (t *Torrent) requestHeat() (ret map[Request]int) {
- ret = make(map[Request]int)
- for _, conn := range t.Conns {
- for req, _ := range conn.Requests {
- ret[req]++
- }
- }
- return
-}
-
-type Peer struct {
- Id [20]byte
- IP net.IP
- Port int
-}
-
-func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
- if int(piece) == t.NumPieces()-1 {
- len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
- }
- if len_ == 0 {
- len_ = peer_protocol.Integer(t.MetaInfo.PieceLength)
- }
- return
-}
-
-func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) {
- hash := PieceHash.New()
- n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
- if err != nil {
- panic(err)
- }
- if peer_protocol.Integer(n) != t.PieceLength(piece) {
- panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
- }
- copyHashSum(ps[:], hash.Sum(nil))
- return
-}
-
type DataSpec struct {
InfoHash
Request
dataWaiter chan struct{}
}
-var (
- ErrDataNotReady = errors.New("data not ready")
-)
-
func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
cl.mu.Lock()
defer cl.mu.Unlock()
}
}
-func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
- defer func() {
- if err != nil {
- mms.Close()
- mms = nil
- }
- }()
- for _, miFile := range metaInfo.Files {
- fileName := filepath.Join(append([]string{location, metaInfo.Name}, miFile.Path...)...)
- err = os.MkdirAll(filepath.Dir(fileName), 0777)
- if err != nil {
- return
- }
- var file *os.File
- file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
- if err != nil {
- return
- }
- func() {
- defer file.Close()
- var fi os.FileInfo
- fi, err = file.Stat()
- if err != nil {
- return
- }
- if fi.Size() < miFile.Length {
- err = file.Truncate(miFile.Length)
- if err != nil {
- return
- }
- }
- var mMap gommap.MMap
- mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
- if err != nil {
- return
- }
- if int64(len(mMap)) != miFile.Length {
- panic("mmap has wrong length")
- }
- mms = append(mms, MMap{mMap})
- }()
- if err != nil {
- return
- }
- }
- return
-}
-
func (me *Client) torrent(ih InfoHash) *Torrent {
for _, t := range me.torrents {
if t.InfoHash == ih {
}
me.halfOpen++
go func() {
- conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
+ addr := &net.TCPAddr{
IP: peer.IP,
Port: peer.Port,
- })
+ }
+ conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
- me.mu.Lock()
- me.halfOpen--
- me.openNewConns()
- me.mu.Unlock()
+ go func() {
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ if me.halfOpen == 0 {
+ panic("assert")
+ }
+ me.halfOpen--
+ me.openNewConns()
+ }()
+ if netOpErr, ok := err.(*net.OpError); ok {
+ if netOpErr.Timeout() {
+ return
+ }
+ switch netOpErr.Err {
+ case syscall.ECONNREFUSED:
+ return
+ }
+ }
if err != nil {
- log.Printf("error connecting to peer: %s", err)
+ log.Printf("error connecting to peer: %s %#v", err, err)
return
}
log.Printf("connected to %s", conn.RemoteAddr())
}()
}
-func (t *Torrent) haveAllPieces() bool {
- for _, piece := range t.Pieces {
- if !piece.Complete() {
- return false
- }
- }
- return true
-}
-
-func (me *Torrent) haveAnyPieces() bool {
- for _, piece := range me.Pieces {
- if piece.Complete() {
- return true
- }
- }
- return false
-}
-
func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) {
conn := &Connection{
Socket: sock,
}
}
-func (t *Torrent) wantPiece(index int) bool {
- p := t.Pieces[index]
- return p.EverHashed && len(p.PendingChunkSpecs) != 0
-}
-
func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) {
me.replenishConnRequests(torrent, conn)
}
case peer_protocol.Piece:
request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
if _, ok := conn.Requests[request_]; !ok {
- err = errors.New("unexpected piece")
+ err = fmt.Errorf("unexpected piece: %s", request_)
break
}
delete(conn.Requests, request_)
}
}
}
- // Then finish of incomplete pieces in order of bytes remaining.
+ // Then finish off incomplete pieces in order of bytes remaining.
for _, index := range torrent.piecesByPendingBytesDesc() {
if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
continue
--- /dev/null
+package torrent
+
+import (
+ "container/list"
+ "encoding"
+ "log"
+ "net"
+ "sync"
+ "time"
+
+ "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+)
+
+// Maintains the state of a connection with a peer.
+type Connection struct {
+ Socket net.Conn
+ closed bool
+ mu sync.Mutex // Only for closing.
+ post chan encoding.BinaryMarshaler
+ write chan []byte
+
+ // Stuff controlled by the local peer.
+ Interested bool
+ Choked bool
+ Requests map[Request]struct{}
+
+ // Stuff controlled by the remote peer.
+ PeerId [20]byte
+ PeerInterested bool
+ PeerChoked bool
+ PeerRequests map[Request]struct{}
+ PeerExtensions [8]byte
+ PeerPieces []bool
+}
+
+func (c *Connection) Close() {
+ c.mu.Lock()
+ if c.closed {
+ return
+ }
+ c.Socket.Close()
+ close(c.post)
+ c.closed = true
+ c.mu.Unlock()
+}
+
+func (c *Connection) getClosed() bool {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ return c.closed
+}
+
+func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool {
+ if c.PeerPieces == nil {
+ return false
+ }
+ return c.PeerPieces[index]
+}
+
+func (c *Connection) Post(msg encoding.BinaryMarshaler) {
+ c.post <- msg
+}
+
+// Returns true if more requests can be sent.
+func (c *Connection) Request(chunk Request) bool {
+ if len(c.Requests) >= maxRequests {
+ return false
+ }
+ if !c.PeerPieces[chunk.Index] {
+ return true
+ }
+ c.SetInterested(true)
+ if c.PeerChoked {
+ return false
+ }
+ if _, ok := c.Requests[chunk]; !ok {
+ c.Post(peer_protocol.Message{
+ Type: peer_protocol.Request,
+ Index: chunk.Index,
+ Begin: chunk.Begin,
+ Length: chunk.Length,
+ })
+ }
+ if c.Requests == nil {
+ c.Requests = make(map[Request]struct{}, maxRequests)
+ }
+ c.Requests[chunk] = struct{}{}
+ return true
+}
+
+func (c *Connection) Unchoke() {
+ if !c.Choked {
+ return
+ }
+ c.Post(peer_protocol.Message{
+ Type: peer_protocol.Unchoke,
+ })
+ c.Choked = false
+}
+
+func (c *Connection) SetInterested(interested bool) {
+ if c.Interested == interested {
+ return
+ }
+ c.Post(peer_protocol.Message{
+ Type: func() peer_protocol.MessageType {
+ if interested {
+ return peer_protocol.Interested
+ } else {
+ return peer_protocol.NotInterested
+ }
+ }(),
+ })
+ c.Interested = interested
+}
+
+var (
+ // Four consecutive zero bytes that comprise a keep alive on the wire.
+ keepAliveBytes [4]byte
+)
+
+func (conn *Connection) writer() {
+ timer := time.NewTimer(0)
+ defer timer.Stop()
+ for {
+ if !timer.Reset(time.Minute) {
+ <-timer.C
+ }
+ var b []byte
+ select {
+ case <-timer.C:
+ b = keepAliveBytes[:]
+ case b = <-conn.write:
+ if b == nil {
+ return
+ }
+ }
+ _, err := conn.Socket.Write(b)
+ if conn.getClosed() {
+ break
+ }
+ if err != nil {
+ log.Print(err)
+ break
+ }
+ }
+}
+
+func (conn *Connection) writeOptimizer() {
+ pending := list.New()
+ var nextWrite []byte
+ defer close(conn.write)
+ for {
+ write := conn.write
+ if pending.Len() == 0 {
+ write = nil
+ } else {
+ var err error
+ nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
+ if err != nil {
+ panic(err)
+ }
+ }
+ select {
+ case msg, ok := <-conn.post:
+ if !ok {
+ return
+ }
+ pending.PushBack(msg)
+ case write <- nextWrite:
+ pending.Remove(pending.Front())
+ }
+ }
+}
--- /dev/null
+package torrent
+
+import (
+ "crypto"
+ "errors"
+ "os"
+ "path/filepath"
+ "time"
+
+ "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+ metainfo "github.com/nsf/libtorgo/torrent"
+ "launchpad.net/gommap"
+)
+
+const (
+ PieceHash = crypto.SHA1
+ maxRequests = 250
+ chunkSize = 0x4000 // 16KiB
+ BEP20 = "-GT0000-"
+ dialTimeout = time.Second * 15
+)
+
+type InfoHash [20]byte
+
+type pieceSum [20]byte
+
+func copyHashSum(dst, src []byte) {
+ if len(dst) != len(src) || copy(dst, src) != len(dst) {
+ panic("hash sum sizes differ")
+ }
+}
+
+func BytesInfoHash(b []byte) (ih InfoHash) {
+ if len(b) != len(ih) || copy(ih[:], b) != len(ih) {
+ panic("bad infohash bytes")
+ }
+ return
+}
+
+type piece struct {
+ Hash pieceSum
+ PendingChunkSpecs map[ChunkSpec]struct{}
+ Hashing bool
+ QueuedForHash bool
+ EverHashed bool
+}
+
+func (p *piece) Complete() bool {
+ return len(p.PendingChunkSpecs) == 0 && p.EverHashed
+}
+
+func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
+ cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
+ cs.Length = pieceLength - cs.Begin
+ return
+}
+
+type ChunkSpec struct {
+ Begin, Length peer_protocol.Integer
+}
+
+type Request struct {
+ Index peer_protocol.Integer
+ ChunkSpec
+}
+
+type pieceByBytesPendingSlice struct {
+ Pending, Indices []peer_protocol.Integer
+}
+
+func (pcs pieceByBytesPendingSlice) Len() int {
+ return len(pcs.Indices)
+}
+
+func (me pieceByBytesPendingSlice) Less(i, j int) bool {
+ return me.Pending[me.Indices[i]] < me.Pending[me.Indices[j]]
+}
+
+func (me pieceByBytesPendingSlice) Swap(i, j int) {
+ me.Indices[i], me.Indices[j] = me.Indices[j], me.Indices[i]
+}
+
+var (
+ ErrDataNotReady = errors.New("data not ready")
+)
+
+func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
+ defer func() {
+ if err != nil {
+ mms.Close()
+ mms = nil
+ }
+ }()
+ for _, miFile := range metaInfo.Files {
+ fileName := filepath.Join(append([]string{location, metaInfo.Name}, miFile.Path...)...)
+ err = os.MkdirAll(filepath.Dir(fileName), 0777)
+ if err != nil {
+ return
+ }
+ var file *os.File
+ file, err = os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
+ if err != nil {
+ return
+ }
+ func() {
+ defer file.Close()
+ var fi os.FileInfo
+ fi, err = file.Stat()
+ if err != nil {
+ return
+ }
+ if fi.Size() < miFile.Length {
+ err = file.Truncate(miFile.Length)
+ if err != nil {
+ return
+ }
+ }
+ var mMap gommap.MMap
+ mMap, err = gommap.MapRegion(file.Fd(), 0, miFile.Length, gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
+ if err != nil {
+ return
+ }
+ if int64(len(mMap)) != miFile.Length {
+ panic("mmap has wrong length")
+ }
+ mms = append(mms, MMap{mMap})
+ }()
+ if err != nil {
+ return
+ }
+ }
+ return
+}
--- /dev/null
+package torrent
+
+import (
+ "container/list"
+ "fmt"
+ "net"
+ "sort"
+
+ "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+ "bitbucket.org/anacrolix/go.torrent/tracker"
+ metainfo "github.com/nsf/libtorgo/torrent"
+)
+
+func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) {
+ pendingChunks := t.Pieces[index].PendingChunkSpecs
+ count = peer_protocol.Integer(len(pendingChunks)) * chunkSize
+ _lastChunkSpec := lastChunkSpec(t.PieceLength(index))
+ if _lastChunkSpec.Length != chunkSize {
+ if _, ok := pendingChunks[_lastChunkSpec]; ok {
+ count += _lastChunkSpec.Length - chunkSize
+ }
+ }
+ return
+}
+
+type Torrent struct {
+ InfoHash InfoHash
+ Pieces []*piece
+ Data MMapSpan
+ MetaInfo *metainfo.MetaInfo
+ Conns []*Connection
+ Peers []Peer
+ Priorities *list.List
+ // BEP 12 Multitracker Metadata Extension. The tracker.Client instances
+ // mirror their respective URLs from the announce-list key.
+ Trackers [][]tracker.Client
+}
+
+func (t *Torrent) NumPieces() int {
+ return len(t.MetaInfo.Pieces) / PieceHash.Size()
+}
+
+func (t *Torrent) NumPiecesCompleted() (num int) {
+ for _, p := range t.Pieces {
+ if p.Complete() {
+ num++
+ }
+ }
+ return
+}
+
+func (t *Torrent) Length() int64 {
+ return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
+}
+
+func (t *Torrent) Close() (err error) {
+ t.Data.Close()
+ for _, conn := range t.Conns {
+ conn.Close()
+ }
+ return
+}
+
+func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
+ slice := pieceByBytesPendingSlice{
+ Pending: make([]peer_protocol.Integer, 0, len(t.Pieces)),
+ Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),
+ }
+ for i := range t.Pieces {
+ slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(peer_protocol.Integer(i)))
+ slice.Indices = append(slice.Indices, peer_protocol.Integer(i))
+ }
+ sort.Sort(sort.Reverse(slice))
+ return slice.Indices
+}
+
+func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
+ req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+ if req.Index < 0 || int(req.Index) >= len(t.Pieces) {
+ return
+ }
+ off %= t.MetaInfo.PieceLength
+ pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off)
+ if pieceLeft <= 0 {
+ return
+ }
+ req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize)
+ req.Length = chunkSize
+ if req.Length > pieceLeft {
+ req.Length = pieceLeft
+ }
+ ok = true
+ return
+}
+
+func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
+ _, err = t.Data.WriteAt(data, int64(piece)*t.MetaInfo.PieceLength+begin)
+ return
+}
+
+func (t *Torrent) bitfield() (bf []bool) {
+ for _, p := range t.Pieces {
+ bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
+ }
+ return
+}
+
+func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) {
+ piece := t.Pieces[index]
+ if piece.PendingChunkSpecs == nil {
+ piece.PendingChunkSpecs = make(
+ map[ChunkSpec]struct{},
+ (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+ }
+ c := ChunkSpec{
+ Begin: 0,
+ }
+ cs := piece.PendingChunkSpecs
+ for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length {
+ c.Length = left
+ if c.Length > chunkSize {
+ c.Length = chunkSize
+ }
+ cs[c] = struct{}{}
+ c.Begin += c.Length
+ }
+ return
+}
+
+func (t *Torrent) requestHeat() (ret map[Request]int) {
+ ret = make(map[Request]int)
+ for _, conn := range t.Conns {
+ for req, _ := range conn.Requests {
+ ret[req]++
+ }
+ }
+ return
+}
+
+type Peer struct {
+ Id [20]byte
+ IP net.IP
+ Port int
+}
+
+func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
+ if int(piece) == t.NumPieces()-1 {
+ len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
+ }
+ if len_ == 0 {
+ len_ = peer_protocol.Integer(t.MetaInfo.PieceLength)
+ }
+ return
+}
+
+func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) {
+ hash := PieceHash.New()
+ n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
+ if err != nil {
+ panic(err)
+ }
+ if peer_protocol.Integer(n) != t.PieceLength(piece) {
+ panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
+ }
+ copyHashSum(ps[:], hash.Sum(nil))
+ return
+}
+func (t *Torrent) haveAllPieces() bool {
+ for _, piece := range t.Pieces {
+ if !piece.Complete() {
+ return false
+ }
+ }
+ return true
+}
+
+func (me *Torrent) haveAnyPieces() bool {
+ for _, piece := range me.Pieces {
+ if piece.Complete() {
+ return true
+ }
+ }
+ return false
+}
+
+func (t *Torrent) wantPiece(index int) bool {
+ p := t.Pieces[index]
+ return p.EverHashed && len(p.PendingChunkSpecs) != 0
+}