import (
"errors"
+ "log"
"os"
+ "sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/anacrolix/torrent/storage"
)
+func justOneNetwork(cc *torrent.ClientConfig) {
+ cc.DisableTCP = true
+ cc.DisableIPv4 = true
+}
+
func TestReceiveChunkStorageFailure(t *testing.T) {
seederDataDir, metainfo := testutil.GreetingTestTorrent()
defer os.RemoveAll(seederDataDir)
seederClientConfig := torrent.TestingConfig()
seederClientConfig.Debug = true
+ justOneNetwork(seederClientConfig)
seederClientStorage := storage.NewMMap(seederDataDir)
defer seederClientStorage.Close()
seederClientConfig.DefaultStorage = seederClientStorage
defer testutil.ExportStatusWriter(seederClient, "s")()
leecherClientConfig := torrent.TestingConfig()
leecherClientConfig.Debug = true
+ justOneNetwork(leecherClientConfig)
leecherClient, err := torrent.NewClient(leecherClientConfig)
require.NoError(t, err)
defer testutil.ExportStatusWriter(leecherClient, "l")()
+ info, err := metainfo.UnmarshalInfo()
+ require.NoError(t, err)
+ leecherStorage := diskFullStorage{
+ pieces: make([]pieceState, info.NumPieces()),
+ data: make([]byte, info.TotalLength()),
+ }
+ defer leecherStorage.Close()
leecherTorrent, new, err := leecherClient.AddTorrentSpec(&torrent.TorrentSpec{
InfoHash: metainfo.HashInfoBytes(),
- Storage: diskFullStorage{},
+ Storage: &leecherStorage,
})
+ leecherStorage.t = leecherTorrent
require.NoError(t, err)
assert.True(t, new)
seederTorrent, err := seederClient.AddTorrent(metainfo)
require.NoError(t, err)
// Tell the seeder to find the leecher. Is it guaranteed seeders will always try to do this?
seederTorrent.AddClientPeer(leecherClient)
- //leecherTorrent.AddClientPeer(seederClient)
<-leecherTorrent.GotInfo()
assertReadAllGreeting(t, leecherTorrent.NewReader())
}
-type diskFullStorage struct{}
+type pieceState struct {
+ complete bool
+}
-func (me diskFullStorage) ReadAt(p []byte, off int64) (n int, err error) {
- panic("implement me")
+type diskFullStorage struct {
+ pieces []pieceState
+ t *torrent.Torrent
+ defaultHandledWriteChunkError bool
+ data []byte
+
+ mu sync.Mutex
+ diskNotFull bool
}
-func (me diskFullStorage) WriteAt(p []byte, off int64) (n int, err error) {
- return 1, errors.New("disk full")
+func (me *diskFullStorage) Piece(p metainfo.Piece) storage.PieceImpl {
+ return pieceImpl{
+ mip: p,
+ diskFullStorage: me,
+ }
}
-func (me diskFullStorage) MarkComplete() error {
- panic("implement me")
+func (me diskFullStorage) Close() error {
+ return nil
}
-func (me diskFullStorage) MarkNotComplete() error {
- panic("implement me")
+func (d diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
+ return &d, nil
}
-func (me diskFullStorage) Completion() storage.Completion {
- return storage.Completion{
- Complete: false,
- Ok: true,
+type pieceImpl struct {
+ mip metainfo.Piece
+ *diskFullStorage
+}
+
+func (me pieceImpl) state() *pieceState {
+ return &me.diskFullStorage.pieces[me.mip.Index()]
+}
+
+func (me pieceImpl) ReadAt(p []byte, off int64) (n int, err error) {
+ off += me.mip.Offset()
+ return copy(p, me.data[off:]), nil
+}
+
+func (me pieceImpl) WriteAt(p []byte, off int64) (int, error) {
+ off += me.mip.Offset()
+ if !me.defaultHandledWriteChunkError {
+ go func() {
+ me.t.SetOnWriteChunkError(func(err error) {
+ log.Printf("got write chunk error to custom handler: %v", err)
+ me.mu.Lock()
+ me.diskNotFull = true
+ me.mu.Unlock()
+ me.t.AllowDataDownload()
+ })
+ me.t.AllowDataDownload()
+ }()
+ me.defaultHandledWriteChunkError = true
}
+ me.mu.Lock()
+ defer me.mu.Unlock()
+ if me.diskNotFull {
+ return copy(me.data[off:], p), nil
+ }
+ return copy(me.data[off:], p[:1]), errors.New("disk full")
}
-func (me diskFullStorage) Piece(metainfo.Piece) storage.PieceImpl {
- return me
+func (me pieceImpl) MarkComplete() error {
+ me.state().complete = true
+ return nil
}
-func (me diskFullStorage) Close() error {
+func (me pieceImpl) MarkNotComplete() error {
panic("implement me")
}
-func (d diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
- return d, nil
+func (me pieceImpl) Completion() storage.Completion {
+ return storage.Completion{
+ Complete: me.state().complete,
+ Ok: true,
+ }
}
cl *Client
logger log.Logger
- networkingEnabled bool
+ networkingEnabled bool
+ dataDownloadDisallowed bool
+ userOnWriteChunkErr func(error)
// Determines what chunks to request from peers.
requestStrategy requestStrategy
func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks {
return torrentRequestStrategyCallbacks{t}
}
+
+func (t *Torrent) onWriteChunkErr(err error) {
+ if t.userOnWriteChunkErr != nil {
+ go t.userOnWriteChunkErr(err)
+ return
+ }
+ t.disallowDataDownloadLocked()
+}
+
+func (t *Torrent) DisallowDataDownload() {
+ t.cl.lock()
+ defer t.cl.unlock()
+ t.disallowDataDownloadLocked()
+}
+
+func (t *Torrent) disallowDataDownloadLocked() {
+ log.Printf("disallowing data download")
+ t.dataDownloadDisallowed = true
+ for c := range t.conns {
+ c.updateRequests()
+ }
+}
+
+func (t *Torrent) AllowDataDownload() {
+ t.cl.lock()
+ defer t.cl.unlock()
+ log.Printf("AllowDataDownload")
+ t.dataDownloadDisallowed = false
+ for c := range t.conns {
+ c.updateRequests()
+ }
+
+}
+
+func (t *Torrent) SetOnWriteChunkError(f func(error)) {
+ t.cl.lock()
+ defer t.cl.unlock()
+ t.userOnWriteChunkErr = f
+}