import (
"encoding/binary"
+ "errors"
"fmt"
"io"
"io/ioutil"
"log"
+ "math/rand"
"net"
"net/http"
"os"
"path/filepath"
+ "strings"
"sync"
"testing"
"time"
}
func (me badData) WriteSectionTo(w io.Writer, off, n int64) (int64, error) {
- written, err := w.Write([]byte("hello"))
- if err == nil {
- err = io.ErrUnexpectedEOF
- }
+ p := []byte(me.randomlyTruncatedDataString())
+ written, err := w.Write(p)
return int64(written), err
}
}
func (me badData) PieceCompleted(piece int) error {
- return nil
+ return errors.New("psyyyyyyyche")
+}
+
+func (me badData) randomlyTruncatedDataString() string {
+ return "hello, world\n"[:rand.Intn(14)]
}
func (me badData) ReadAt(b []byte, off int64) (n int, err error) {
- if off >= 5 {
- err = io.EOF
- return
- }
- n = copy(b, []byte("hello")[off:])
- return
+ r := strings.NewReader(me.randomlyTruncatedDataString())
+ return r.ReadAt(b, off)
}
// We read from a piece which is marked completed, but is missing data.
func TestCompletedPieceWrongSize(t *testing.T) {
+ t.Parallel()
cfg := TestingConfig
cfg.TorrentDataOpener = func(*metainfo.Info) Data {
return badData{}
},
},
})
- if err != nil {
- t.Fatal(err)
- }
- if !new {
- t.Fatal("expected new")
- }
+ require.NoError(t, err)
+ defer tt.Drop()
+ assert.True(t, new)
r := tt.NewReader()
defer r.Close()
- b := make([]byte, 20)
- n, err := io.ReadFull(r, b)
- if n != 5 || err != io.ErrUnexpectedEOF {
- t.Fatal(n, err)
- }
- defer tt.Drop()
+ b, err := ioutil.ReadAll(r)
+ assert.Len(t, b, 13)
+ assert.NoError(t, err)
}
func BenchmarkAddLargeTorrent(b *testing.B) {
import (
"errors"
"io"
+ "log"
"os"
"sync"
)
}
func (r *Reader) readable(off int64) (ret bool) {
- // log.Println("readable", off)
- // defer func() {
- // log.Println("readable", ret)
- // }()
- if r.t.torrent.isClosed() {
+ if r.torrentClosed() {
return true
}
req, ok := r.t.torrent.offsetRequest(off)
return
}
-// Must only return EOF at the end of the torrent.
-func (r *Reader) readAt(b []byte, pos int64) (n int, err error) {
- // defer func() {
- // log.Println(pos, n, err)
- // }()
- maxLen := r.t.torrent.Info.TotalLength() - pos
- if maxLen <= 0 {
- err = io.EOF
- return
- }
- if int64(len(b)) > maxLen {
- b = b[:maxLen]
- }
-again:
+// Safe to call with or without client lock.
+func (r *Reader) torrentClosed() bool {
+ return r.t.torrent.isClosed()
+}
+
+// Wait until some data should be available to read. Tickles the client if it
+// isn't. Returns how much should be readable without blocking.
+func (r *Reader) waitAvailable(pos, wanted int64) (avail int64) {
r.t.cl.mu.Lock()
+ defer r.t.cl.mu.Unlock()
for !r.readable(pos) {
r.waitReadable(pos)
}
- avail := r.available(pos, int64(len(b)))
- // log.Println("available", avail)
- r.t.cl.mu.Unlock()
- b1 := b[:avail]
- pi := int(pos / r.t.Info().PieceLength)
- tp := &r.t.torrent.Pieces[pi]
- ip := r.t.Info().Piece(pi)
- po := pos % ip.Length()
- if int64(len(b1)) > ip.Length()-po {
- b1 = b1[:ip.Length()-po]
- }
- tp.waitNoPendingWrites()
- n, err = dataReadAt(r.t.torrent.data, b1, pos)
- if n != 0 {
- err = nil
+ return r.available(pos, wanted)
+}
+
+// Performs at most one successful read to torrent storage.
+func (r *Reader) readOnceAt(b []byte, pos int64) (n int, err error) {
+ if pos >= r.t.torrent.length {
+ err = io.EOF
return
}
- if r.t.torrent.isClosed() {
- if err == nil {
- err = errors.New("torrent closed")
+ for {
+ avail := r.waitAvailable(pos, int64(len(b)))
+ if avail == 0 {
+ if r.torrentClosed() {
+ err = errors.New("torrent closed")
+ return
+ }
}
- return
+ b1 := b[:avail]
+ pi := int(pos / r.t.Info().PieceLength)
+ tp := &r.t.torrent.Pieces[pi]
+ ip := r.t.Info().Piece(pi)
+ po := pos % ip.Length()
+ if int64(len(b1)) > ip.Length()-po {
+ b1 = b1[:ip.Length()-po]
+ }
+ tp.waitNoPendingWrites()
+ n, err = dataReadAt(r.t.torrent.data, b1, pos)
+ if n != 0 {
+ return
+ }
+ log.Printf("error reading from torrent storage: %s", err)
+ r.t.torrent.updatePieceCompletion(pi)
+ r.t.torrent.updatePiecePriority(pi)
}
- if err == io.ErrUnexpectedEOF {
- goto again
+}
+
+// Must only return EOF at the end of the torrent. Fills b until error or
+// valid EOF. Note that the Reader pos is not updated until the read
+// completes, this may reduce piece priority recalculation, but also the
+// effectiveness of readahead.
+func (r *Reader) readAt(b []byte, pos int64) (n int, err error) {
+ for len(b) != 0 {
+ var n1 int
+ n1, err = r.readOnceAt(b, pos)
+ if n1 == 0 {
+ if err == nil {
+ panic("expected error")
+ }
+ break
+ }
+ b = b[n1:]
+ n += n1
+ pos += int64(n1)
+ }
+ if pos >= r.t.torrent.length {
+ err = io.EOF
+ } else if err == io.EOF {
+ err = io.ErrUnexpectedEOF
}
return
}