"os"
"path/filepath"
"sort"
+ "sync"
+ "time"
)
const (
PieceHash = crypto.SHA1
- maxRequests = 10
+ maxRequests = 250
chunkSize = 0x4000 // 16KiB
BEP20 = "-GT0000-"
)
type pieceState uint8
-const (
- pieceStateUnknown = iota
- pieceStateComplete
- pieceStateIncomplete
-)
-
type piece struct {
- State pieceState
Hash pieceSum
PendingChunkSpecs map[ChunkSpec]struct{}
+ Hashing bool
+ EverHashed bool
+}
+
+func (p *piece) Complete() bool {
+ return len(p.PendingChunkSpecs) == 0 && !p.Hashing && p.EverHashed
+}
+
+func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
+ cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
+ cs.Length = pieceLength - cs.Begin
+ return
}
-func (p piece) NumPendingBytes() (count peer_protocol.Integer) {
- for cs, _ := range p.PendingChunkSpecs {
- count += cs.Length
+func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) {
+ pendingChunks := t.Pieces[index].PendingChunkSpecs
+ count = peer_protocol.Integer(len(pendingChunks)) * chunkSize
+ _lastChunkSpec := lastChunkSpec(t.PieceLength(index))
+ if _lastChunkSpec.Length != chunkSize {
+ if _, ok := pendingChunks[_lastChunkSpec]; ok {
+ count += _lastChunkSpec.Length - chunkSize
+ }
}
return
}
c.Interested = interested
}
+var (
+ keepAliveBytes [4]byte
+)
+
func (conn *connection) writer() {
for {
- b := <-conn.write
- if b == nil {
- break
+ timer := time.NewTimer(time.Minute)
+ var b []byte
+ select {
+ case <-timer.C:
+ b = keepAliveBytes[:]
+ case b = <-conn.write:
+ timer.Stop()
+ if b == nil {
+ return
+ }
}
n, err := conn.Socket.Write(b)
if err != nil {
type Torrent struct {
InfoHash InfoHash
- Pieces []piece
+ Pieces []*piece
Data MMapSpan
MetaInfo *metainfo.MetaInfo
Conns []*connection
Priorities *list.List
}
+func (t *Torrent) Close() (err error) {
+ t.Data.Close()
+ for _, conn := range t.Conns {
+ conn.Close()
+ }
+ return
+}
+
type pieceByBytesPendingSlice struct {
- Torrent *Torrent
- Indices []peer_protocol.Integer
+ Pending, Indices []peer_protocol.Integer
}
func (pcs pieceByBytesPendingSlice) Len() int {
}
func (me pieceByBytesPendingSlice) Less(i, j int) bool {
- return me.Torrent.Pieces[i].NumPendingBytes() < me.Torrent.Pieces[j].NumPendingBytes()
+ return me.Pending[me.Indices[i]] < me.Pending[me.Indices[j]]
}
func (me pieceByBytesPendingSlice) Swap(i, j int) {
}
func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
- slice := pieceByBytesPendingSlice{
- Torrent: t,
- Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),
- }
+ slice := pieceByBytesPendingSlice{}
for i := range t.Pieces {
+ slice.Pending = append(slice.Pending, t.PieceNumPendingBytes(peer_protocol.Integer(i)))
slice.Indices = append(slice.Indices, peer_protocol.Integer(i))
}
sort.Sort(sort.Reverse(slice))
return slice.Indices
}
-func (t *Torrent) PrioritizeDataRegion(off, len_ int64) {
+func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
+ piece := t.Pieces[pieceIndex]
+ if piece.Hashing {
+ return
+ }
+ piece.Hashing = true
+ go cl.verifyPiece(t, pieceIndex)
+}
+
+func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ t := cl.torrent(ih)
newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
for len_ > 0 {
index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
piece := t.Pieces[index]
- if piece.State == pieceStateComplete {
- adv := int64(t.PieceLength(index) - pieceOff)
- off += adv
- len_ -= adv
- continue
+ if !piece.EverHashed {
+ cl.queuePieceCheck(t, index)
}
chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
adv := int64(chunkSize - pieceOff%chunkSize)
off += adv
len_ -= adv
- switch piece.State {
- case pieceStateIncomplete:
- if _, ok := piece.PendingChunkSpecs[chunk]; !ok {
- continue
- }
- case pieceStateUnknown:
- default:
- panic("unexpected piece state")
+ if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing {
+ continue
}
newPriorities = append(newPriorities, Request{index, chunk})
}
for _, req := range newPriorities[1:] {
t.Priorities.PushBack(req)
}
+ for _, cn := range t.Conns {
+ cl.replenishConnRequests(t, cn)
+ }
}
func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
func (t *Torrent) bitfield() (bf []bool) {
for _, p := range t.Pieces {
- bf = append(bf, p.State == pieceStateComplete)
+ bf = append(bf, p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) == 0)
}
return
}
PeerId [20]byte
DataReady chan DataSpec
+ mu sync.Mutex
+ event sync.Cond
+
halfOpen int
torrents map[InfoHash]*Torrent
-
- noTorrents chan struct{}
- addTorrent chan *Torrent
- torrentFinished chan InfoHash
- actorTask chan func()
}
var (
)
func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
- done := make(chan struct{})
- cl.withContext(func() {
- defer func() {
- close(done)
- }()
- t := cl.torrent(ih)
- if t == nil {
- err = errors.New("unknown torrent")
- return
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ t := cl.torrent(ih)
+ if t == nil {
+ err = errors.New("unknown torrent")
+ return
+ }
+ index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
+ piece := t.Pieces[index]
+ if !piece.EverHashed {
+ cl.queuePieceCheck(t, index)
+ }
+ if piece.Hashing {
+ err = ErrDataNotReady
+ return
+ }
+ pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
+ high := int(t.PieceLength(index) - pieceOff)
+ if high < len(p) {
+ p = p[:high]
+ }
+ for cs, _ := range piece.PendingChunkSpecs {
+ chunkOff := int64(pieceOff) - int64(cs.Begin)
+ if chunkOff >= int64(t.PieceLength(index)) {
+ panic(chunkOff)
}
- index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
- piece := t.Pieces[index]
- pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
- switch piece.State {
- case pieceStateComplete:
- high := int(t.PieceLength(index) - peer_protocol.Integer(off%int64(t.PieceLength(0))))
- if high < len(p) {
- p = p[:high]
- }
- case pieceStateIncomplete:
- for cs, _ := range piece.PendingChunkSpecs {
- chunkOff := int64(pieceOff - cs.Begin)
- if 0 <= chunkOff && chunkOff < int64(cs.Length) {
- // read begins in a pending chunk
- err = ErrDataNotReady
- return
- }
- // pending chunk caps available data
- if chunkOff < 0 && int64(len(p)) > -chunkOff {
- p = p[:-chunkOff]
- }
- }
- default:
+ if 0 <= chunkOff && chunkOff < int64(cs.Length) {
+ // read begins in a pending chunk
err = ErrDataNotReady
return
}
- n, err = t.Data.ReadAt(p, off)
- })
- <-done
- return
+ // pending chunk caps available data
+ if chunkOff < 0 && int64(len(p)) > -chunkOff {
+ p = p[:-chunkOff]
+ }
+ }
+ return t.Data.ReadAt(p, off)
}
func (c *Client) Start() {
c.torrents = make(map[InfoHash]*Torrent)
- c.noTorrents = make(chan struct{})
- c.addTorrent = make(chan *Torrent)
- c.torrentFinished = make(chan InfoHash)
- c.actorTask = make(chan func())
if c.HalfOpenLimit == 0 {
c.HalfOpenLimit = 10
}
if err != nil {
panic("error generating peer id")
}
- go c.run()
}
func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
IP: peer.IP,
Port: peer.Port,
})
- me.withContext(func() {
- me.halfOpen--
- me.openNewConns()
- })
+
+ me.mu.Lock()
+ me.halfOpen--
+ me.openNewConns()
+ me.mu.Unlock()
+
if err != nil {
log.Printf("error connecting to peer: %s", err)
return
}
log.Printf("connected to %s", conn.RemoteAddr())
- err = me.handshake(conn, torrent, peer.Id)
+ err = me.runConnection(conn, torrent, peer.Id)
if err != nil {
log.Print(err)
}
func (me *Torrent) haveAnyPieces() bool {
for _, piece := range me.Pieces {
- if piece.State == pieceStateComplete {
+ if piece.Complete() {
return true
}
}
return false
}
-func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
+func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
conn := &connection{
Socket: sock,
Choked: true,
}
var b [28]byte
_, err = io.ReadFull(conn.Socket, b[:])
- switch err {
- case nil:
- case io.EOF:
- return
- default:
+ if err != nil {
err = fmt.Errorf("when reading protocol and extensions: %s", err)
return
}
var infoHash [20]byte
_, err = io.ReadFull(conn.Socket, infoHash[:])
if err != nil {
- return
+ return fmt.Errorf("reading peer info hash: %s", err)
}
_, err = io.ReadFull(conn.Socket, conn.PeerId[:])
if err != nil {
- return
+ return fmt.Errorf("reading peer id: %s", err)
}
if torrent == nil {
torrent = me.torrent(infoHash)
conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
conn.post <- peer_protocol.Bytes(me.PeerId[:])
}
- done := make(chan struct{})
- me.withContext(func() {
- me.addConnection(torrent, conn)
- if torrent.haveAnyPieces() {
- conn.Post(peer_protocol.Message{
- Type: peer_protocol.Bitfield,
- Bitfield: torrent.bitfield(),
- })
- }
- close(done)
- })
- <-done
- defer me.withContext(func() {
- me.dropConnection(torrent, conn)
- })
- err = me.runConnection(torrent, conn)
+ me.mu.Lock()
+ me.addConnection(torrent, conn)
+ if torrent.haveAnyPieces() {
+ conn.Post(peer_protocol.Message{
+ Type: peer_protocol.Bitfield,
+ Bitfield: torrent.bitfield(),
+ })
+ }
+ err = me.connectionLoop(torrent, conn)
+ if err != nil {
+ err = fmt.Errorf("during connection loop: %s", err)
+ }
+ me.dropConnection(torrent, conn)
+ me.mu.Unlock()
return
}
}
func (t *Torrent) wantPiece(index int) bool {
- return t.Pieces[index].State == pieceStateIncomplete
+ return !t.Pieces[index].Complete()
}
func (me *Client) peerUnchoked(torrent *Torrent, conn *connection) {
me.replenishConnRequests(torrent, conn)
}
-func (me *Client) runConnection(torrent *Torrent, conn *connection) error {
+func (me *Client) connectionLoop(torrent *Torrent, conn *connection) error {
decoder := peer_protocol.Decoder{
R: bufio.NewReader(conn.Socket),
MaxLength: 256 * 1024,
}
for {
+ me.mu.Unlock()
msg := new(peer_protocol.Message)
err := decoder.Decode(msg)
+ me.mu.Lock()
if err != nil {
return err
}
+ log.Print(msg.Type)
if msg.Keepalive {
continue
}
- go me.withContext(func() {
- // log.Print(msg)
- var err error
- switch msg.Type {
- case peer_protocol.Choke:
- conn.PeerChoked = true
- conn.Requests = nil
- case peer_protocol.Unchoke:
- conn.PeerChoked = false
- me.peerUnchoked(torrent, conn)
- case peer_protocol.Interested:
- conn.PeerInterested = true
- case peer_protocol.NotInterested:
- conn.PeerInterested = false
- case peer_protocol.Have:
- me.peerGotPiece(torrent, conn, int(msg.Index))
- case peer_protocol.Request:
- conn.PeerRequests[Request{
- Index: msg.Index,
- ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
- }] = struct{}{}
- case peer_protocol.Bitfield:
- if len(msg.Bitfield) < len(torrent.Pieces) {
- err = errors.New("received invalid bitfield")
- break
- }
- if conn.PeerPieces != nil {
- err = errors.New("received unexpected bitfield")
- break
- }
- conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)]
- for index, has := range conn.PeerPieces {
- if has {
- me.peerGotPiece(torrent, conn, index)
- }
- }
- case peer_protocol.Piece:
- request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
- if _, ok := conn.Requests[request_]; !ok {
- err = errors.New("unexpected piece")
- break
+ switch msg.Type {
+ case peer_protocol.Choke:
+ conn.PeerChoked = true
+ conn.Requests = nil
+ case peer_protocol.Unchoke:
+ conn.PeerChoked = false
+ me.peerUnchoked(torrent, conn)
+ case peer_protocol.Interested:
+ conn.PeerInterested = true
+ case peer_protocol.NotInterested:
+ conn.PeerInterested = false
+ case peer_protocol.Have:
+ me.peerGotPiece(torrent, conn, int(msg.Index))
+ case peer_protocol.Request:
+ conn.PeerRequests[Request{
+ Index: msg.Index,
+ ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
+ }] = struct{}{}
+ case peer_protocol.Bitfield:
+ if len(msg.Bitfield) < len(torrent.Pieces) {
+ err = errors.New("received invalid bitfield")
+ break
+ }
+ if conn.PeerPieces != nil {
+ err = errors.New("received unexpected bitfield")
+ break
+ }
+ conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)]
+ for index, has := range conn.PeerPieces {
+ if has {
+ me.peerGotPiece(torrent, conn, index)
}
- delete(conn.Requests, request_)
- err = me.downloadedChunk(torrent, msg)
- default:
- log.Printf("received unknown message type: %#v", msg.Type)
}
- if err != nil {
- log.Print(err)
- me.dropConnection(torrent, conn)
- return
+ case peer_protocol.Piece:
+ request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
+ if _, ok := conn.Requests[request_]; !ok {
+ err = errors.New("unexpected piece")
+ break
}
- me.replenishConnRequests(torrent, conn)
- })
+ delete(conn.Requests, request_)
+ err = me.downloadedChunk(torrent, msg)
+ default:
+ log.Printf("received unknown message type: %#v", msg.Type)
+ }
+ if err != nil {
+ return err
+ }
+ me.replenishConnRequests(torrent, conn)
}
}
}
}
-func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) (err error) {
- me.withContext(func() {
- t := me.torrent(infoHash)
- if t == nil {
- err = errors.New("no such torrent")
- return
- }
- t.Peers = append(t.Peers, peers...)
- me.openNewConns()
- })
- return
+func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
+ me.mu.Lock()
+ t := me.torrent(infoHash)
+ if t == nil {
+ return errors.New("no such torrent")
+ }
+ t.Peers = append(t.Peers, peers...)
+ me.openNewConns()
+ me.mu.Unlock()
+ return nil
}
func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
if len(hash) != PieceHash.Size() {
return errors.New("bad piece hash in metainfo")
}
- piece := piece{}
+ piece := &piece{}
copyHashSum(piece.Hash[:], hash)
torrent.Pieces = append(torrent.Pieces, piece)
}
if err != nil {
return err
}
- me.addTorrent <- torrent
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ if _, ok := me.torrents[torrent.InfoHash]; ok {
+ return torrent.Close()
+ }
+ me.torrents[torrent.InfoHash] = torrent
return nil
}
func (me *Client) WaitAll() {
- <-me.noTorrents
+ me.mu.Lock()
+ for len(me.torrents) != 0 {
+ me.event.Wait()
+ }
+ me.mu.Unlock()
}
func (me *Client) Stop() {
}
-func (me *Client) withContext(f func()) {
- me.actorTask <- f
-}
-
func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
requestHeatMap := torrent.requestHeat()
addRequest := func(req Request) (again bool) {
- if !conn.PeerPieces[req.Index] {
+ piece := torrent.Pieces[req.Index]
+ if piece.Hashing {
return true
}
- if torrent.Pieces[req.Index].State != pieceStateIncomplete {
+ if piece.Complete() {
return true
}
if requestHeatMap[req] > 0 {
}
}
for _, index := range torrent.piecesByPendingBytesDesc() {
- if torrent.Pieces[index].NumPendingBytes() == torrent.PieceLength(index) {
+ if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
continue
}
for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
}
delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
- torrent.Pieces[request.Index].State = pieceStateUnknown
- go me.verifyPiece(torrent, request.Index)
+ me.queuePieceCheck(torrent, request.Index)
return
}
var next *list.Element
}()
}
-func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct bool) {
- torrent := me.torrents[ih]
- newState := func() pieceState {
- if correct {
- return pieceStateComplete
- } else {
- return pieceStateIncomplete
- }
- }()
- // oldState := torrent.Pieces[piece].State
- // if newState == oldState {
- // return
- // }
- torrent.Pieces[piece].State = newState
- switch newState {
- case pieceStateIncomplete:
- torrent.Pieces[int(piece)].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
- case pieceStateComplete:
+func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) {
+ p := t.Pieces[piece]
+ if !p.Hashing {
+ panic("invalid state")
+ }
+ p.Hashing = false
+ p.EverHashed = true
+ if correct {
+ p.PendingChunkSpecs = nil
var next *list.Element
- if torrent.Priorities != nil {
- for e := torrent.Priorities.Front(); e != nil; e = next {
+ if t.Priorities != nil {
+ for e := t.Priorities.Front(); e != nil; e = next {
next = e.Next()
if e.Value.(Request).Index == piece {
- torrent.Priorities.Remove(e)
+ t.Priorities.Remove(e)
}
}
}
me.dataReady(DataSpec{
- torrent.InfoHash,
+ t.InfoHash,
Request{
peer_protocol.Integer(piece),
- ChunkSpec{0, peer_protocol.Integer(torrent.PieceLength(piece))},
+ ChunkSpec{0, peer_protocol.Integer(t.PieceLength(piece))},
},
})
- torrent.Pieces[piece].PendingChunkSpecs = nil
+ } else {
+ if len(p.PendingChunkSpecs) == 0 {
+ p.PendingChunkSpecs = t.pieceChunkSpecs(piece)
+ }
}
- for _, conn := range torrent.Conns {
+ for _, conn := range t.Conns {
if correct {
conn.Post(peer_protocol.Message{
Type: peer_protocol.Have,
})
} else {
if conn.PeerHasPiece(piece) {
- me.replenishConnRequests(torrent, conn)
+ me.replenishConnRequests(t, conn)
}
}
}
}
-func (me *Client) verifyPiece(torrent *Torrent, index peer_protocol.Integer) {
- sum := torrent.HashPiece(index)
- done := make(chan struct{})
- me.withContext(func() {
- me.pieceHashed(torrent.InfoHash, index, sum == torrent.Pieces[index].Hash)
- close(done)
- })
- <-done
-}
-
-func (me *Client) run() {
- for {
- noTorrents := me.noTorrents
- if len(me.torrents) != 0 {
- noTorrents = nil
- }
- select {
- case noTorrents <- struct{}{}:
- case torrent := <-me.addTorrent:
- if _, ok := me.torrents[torrent.InfoHash]; ok {
- break
- }
- me.torrents[torrent.InfoHash] = torrent
- go func() {
- for index := range torrent.Pieces {
- me.verifyPiece(torrent, peer_protocol.Integer(index))
- }
- }()
- case infoHash := <-me.torrentFinished:
- delete(me.torrents, infoHash)
- case task := <-me.actorTask:
- task()
- }
- }
+func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {
+ sum := t.HashPiece(index)
+ cl.mu.Lock()
+ piece := t.Pieces[index]
+ cl.pieceHashed(t, index, sum == piece.Hash)
+ piece.Hashing = false
+ cl.mu.Unlock()
}
func (me *Client) Torrents() (ret []*Torrent) {
- done := make(chan struct{})
- me.withContext(func() {
- for _, t := range me.torrents {
- ret = append(ret, t)
- }
- close(done)
- })
- <-done
+ me.mu.Lock()
+ for _, t := range me.torrents {
+ ret = append(ret, t)
+ }
+ me.mu.Unlock()
return
}
-
-func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
- done := make(chan struct{})
- cl.withContext(func() {
- t := cl.torrent(ih)
- t.PrioritizeDataRegion(off, len_)
- for _, cn := range t.Conns {
- cl.replenishConnRequests(t, cn)
- }
- close(done)
- })
- <-done
-}