From bae791a5a290d08d58acc4892af279d804ebc523 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 22 Feb 2020 19:40:50 +1100 Subject: [PATCH] Disable data downloading on storage write errors Also add a hook and Torrent methods to modify this. --- peerconn.go | 7 +-- test/issue377_test.go | 101 +++++++++++++++++++++++++++++++++--------- torrent.go | 43 +++++++++++++++++- 3 files changed, 127 insertions(+), 24 deletions(-) diff --git a/peerconn.go b/peerconn.go index 9ff1a693..ee5cdfd9 100644 --- a/peerconn.go +++ b/peerconn.go @@ -509,7 +509,7 @@ func (cn *PeerConn) request(r request, mw messageWriter) bool { } func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) { - if !cn.t.networkingEnabled { + if !cn.t.networkingEnabled || cn.t.dataDownloadDisallowed { if !cn.setInterested(false, msg) { return } @@ -1246,9 +1246,10 @@ func (c *PeerConn) receiveChunk(msg *pp.Message) error { piece.decrementPendingWrites() if err != nil { - panic(fmt.Sprintf("error writing chunk: %v", err)) + c.logger.Printf("error writing received chunk %v: %v", req, err) t.pendRequest(req) - t.updatePieceCompletion(pieceIndex(msg.Index)) + //t.updatePieceCompletion(pieceIndex(msg.Index)) + t.onWriteChunkErr(err) return nil } diff --git a/test/issue377_test.go b/test/issue377_test.go index 0c2e3837..40ed8916 100644 --- a/test/issue377_test.go +++ b/test/issue377_test.go @@ -2,7 +2,9 @@ package test import ( "errors" + "log" "os" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -14,11 +16,17 @@ import ( "github.com/anacrolix/torrent/storage" ) +func justOneNetwork(cc *torrent.ClientConfig) { + cc.DisableTCP = true + cc.DisableIPv4 = true +} + func TestReceiveChunkStorageFailure(t *testing.T) { seederDataDir, metainfo := testutil.GreetingTestTorrent() defer os.RemoveAll(seederDataDir) seederClientConfig := torrent.TestingConfig() seederClientConfig.Debug = true + justOneNetwork(seederClientConfig) seederClientStorage := storage.NewMMap(seederDataDir) defer seederClientStorage.Close() seederClientConfig.DefaultStorage = seederClientStorage @@ -29,57 +37,110 @@ func TestReceiveChunkStorageFailure(t *testing.T) { defer testutil.ExportStatusWriter(seederClient, "s")() leecherClientConfig := torrent.TestingConfig() leecherClientConfig.Debug = true + justOneNetwork(leecherClientConfig) leecherClient, err := torrent.NewClient(leecherClientConfig) require.NoError(t, err) defer testutil.ExportStatusWriter(leecherClient, "l")() + info, err := metainfo.UnmarshalInfo() + require.NoError(t, err) + leecherStorage := diskFullStorage{ + pieces: make([]pieceState, info.NumPieces()), + data: make([]byte, info.TotalLength()), + } + defer leecherStorage.Close() leecherTorrent, new, err := leecherClient.AddTorrentSpec(&torrent.TorrentSpec{ InfoHash: metainfo.HashInfoBytes(), - Storage: diskFullStorage{}, + Storage: &leecherStorage, }) + leecherStorage.t = leecherTorrent require.NoError(t, err) assert.True(t, new) seederTorrent, err := seederClient.AddTorrent(metainfo) require.NoError(t, err) // Tell the seeder to find the leecher. Is it guaranteed seeders will always try to do this? seederTorrent.AddClientPeer(leecherClient) - //leecherTorrent.AddClientPeer(seederClient) <-leecherTorrent.GotInfo() assertReadAllGreeting(t, leecherTorrent.NewReader()) } -type diskFullStorage struct{} +type pieceState struct { + complete bool +} -func (me diskFullStorage) ReadAt(p []byte, off int64) (n int, err error) { - panic("implement me") +type diskFullStorage struct { + pieces []pieceState + t *torrent.Torrent + defaultHandledWriteChunkError bool + data []byte + + mu sync.Mutex + diskNotFull bool } -func (me diskFullStorage) WriteAt(p []byte, off int64) (n int, err error) { - return 1, errors.New("disk full") +func (me *diskFullStorage) Piece(p metainfo.Piece) storage.PieceImpl { + return pieceImpl{ + mip: p, + diskFullStorage: me, + } } -func (me diskFullStorage) MarkComplete() error { - panic("implement me") +func (me diskFullStorage) Close() error { + return nil } -func (me diskFullStorage) MarkNotComplete() error { - panic("implement me") +func (d diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { + return &d, nil } -func (me diskFullStorage) Completion() storage.Completion { - return storage.Completion{ - Complete: false, - Ok: true, +type pieceImpl struct { + mip metainfo.Piece + *diskFullStorage +} + +func (me pieceImpl) state() *pieceState { + return &me.diskFullStorage.pieces[me.mip.Index()] +} + +func (me pieceImpl) ReadAt(p []byte, off int64) (n int, err error) { + off += me.mip.Offset() + return copy(p, me.data[off:]), nil +} + +func (me pieceImpl) WriteAt(p []byte, off int64) (int, error) { + off += me.mip.Offset() + if !me.defaultHandledWriteChunkError { + go func() { + me.t.SetOnWriteChunkError(func(err error) { + log.Printf("got write chunk error to custom handler: %v", err) + me.mu.Lock() + me.diskNotFull = true + me.mu.Unlock() + me.t.AllowDataDownload() + }) + me.t.AllowDataDownload() + }() + me.defaultHandledWriteChunkError = true } + me.mu.Lock() + defer me.mu.Unlock() + if me.diskNotFull { + return copy(me.data[off:], p), nil + } + return copy(me.data[off:], p[:1]), errors.New("disk full") } -func (me diskFullStorage) Piece(metainfo.Piece) storage.PieceImpl { - return me +func (me pieceImpl) MarkComplete() error { + me.state().complete = true + return nil } -func (me diskFullStorage) Close() error { +func (me pieceImpl) MarkNotComplete() error { panic("implement me") } -func (d diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { - return d, nil +func (me pieceImpl) Completion() storage.Completion { + return storage.Completion{ + Complete: me.state().complete, + Ok: true, + } } diff --git a/torrent.go b/torrent.go index 74688c0b..cd136523 100644 --- a/torrent.go +++ b/torrent.go @@ -40,7 +40,9 @@ type Torrent struct { cl *Client logger log.Logger - networkingEnabled bool + networkingEnabled bool + dataDownloadDisallowed bool + userOnWriteChunkErr func(error) // Determines what chunks to request from peers. requestStrategy requestStrategy @@ -1810,3 +1812,42 @@ func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) { func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks { return torrentRequestStrategyCallbacks{t} } + +func (t *Torrent) onWriteChunkErr(err error) { + if t.userOnWriteChunkErr != nil { + go t.userOnWriteChunkErr(err) + return + } + t.disallowDataDownloadLocked() +} + +func (t *Torrent) DisallowDataDownload() { + t.cl.lock() + defer t.cl.unlock() + t.disallowDataDownloadLocked() +} + +func (t *Torrent) disallowDataDownloadLocked() { + log.Printf("disallowing data download") + t.dataDownloadDisallowed = true + for c := range t.conns { + c.updateRequests() + } +} + +func (t *Torrent) AllowDataDownload() { + t.cl.lock() + defer t.cl.unlock() + log.Printf("AllowDataDownload") + t.dataDownloadDisallowed = false + for c := range t.conns { + c.updateRequests() + } + +} + +func (t *Torrent) SetOnWriteChunkError(f func(error)) { + t.cl.lock() + defer t.cl.unlock() + t.userOnWriteChunkErr = f +} -- 2.48.1