w.WriteString("<missing metainfo>")
}
fmt.Fprint(w, "\n")
- t.writeStatus(w)
+ t.writeStatus(w, cl)
fmt.Fprintln(w)
}
}
func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
if t.Pieces[piece].Priority < priority {
cl.prioritizePiece(t, piece, priority)
- cl.event.Broadcast()
}
}
}
}
+func (me *Client) upload(t *torrent, c *connection) {
+ if me.config.NoUpload {
+ return
+ }
+ if !c.PeerInterested {
+ return
+ }
+ if !me.seeding(t) && !t.connHasWantedPieces(c) {
+ return
+ }
+another:
+ for c.chunksSent < c.UsefulChunksReceived+6 {
+ c.Unchoke()
+ for r := range c.PeerRequests {
+ err := me.sendChunk(t, c, r)
+ if err != nil {
+ log.Printf("error sending chunk to peer: %s", err)
+ }
+ delete(c.PeerRequests, r)
+ goto another
+ }
+ return
+ }
+ c.Choke()
+}
+
+func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
+ b := make([]byte, r.Length)
+ p := t.Info.Piece(int(r.Index))
+ n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin))
+ if err != nil {
+ return err
+ }
+ if n != len(b) {
+ log.Fatal(b)
+ }
+ c.Post(pp.Message{
+ Type: pp.Piece,
+ Index: r.Index,
+ Begin: r.Begin,
+ Piece: b,
+ })
+ uploadChunksPosted.Add(1)
+ c.chunksSent++
+ c.lastChunkSent = time.Now()
+ return nil
+}
+
// Processes incoming bittorrent messages. The client lock is held upon entry
// and exit.
func (me *Client) connectionLoop(t *torrent, c *connection) error {
me.peerUnchoked(t, c)
case pp.Interested:
c.PeerInterested = true
- // TODO: This should be done from a dedicated unchoking routine.
- if me.config.NoUpload {
- break
- }
- c.Unchoke()
+ me.upload(t, c)
case pp.NotInterested:
c.PeerInterested = false
c.Choke()
if c.Choked {
break
}
- request := newRequest(msg.Index, msg.Begin, msg.Length)
- // TODO: Requests should be satisfied from a dedicated upload
- // routine.
- // c.PeerRequests[request] = struct{}{}
- // if c.PeerRequests == nil {
- // c.PeerRequests = make(map[request]struct{}, maxRequests)
- // }
- p := make([]byte, msg.Length)
- n, err := dataReadAt(t.data, p, int64(t.pieceLength(0))*int64(msg.Index)+int64(msg.Begin))
- // TODO: Failing to read for a request should not be fatal to the connection.
- if err != nil {
- return fmt.Errorf("reading t data to serve request %q: %s", request, err)
+ if !c.PeerInterested {
+ err = errors.New("peer sent request but isn't interested")
+ break
}
- if n != int(msg.Length) {
- return fmt.Errorf("bad request: %v", msg)
+ if c.PeerRequests == nil {
+ c.PeerRequests = make(map[request]struct{}, maxRequests)
}
- c.Post(pp.Message{
- Type: pp.Piece,
- Index: msg.Index,
- Begin: msg.Begin,
- Piece: p,
- })
- uploadChunksPosted.Add(1)
- c.chunksSent++
+ c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
+ me.upload(t, c)
case pp.Cancel:
req := newRequest(msg.Index, msg.Begin, msg.Length)
if !c.PeerCancel(req) {
// TODO: This should probably be done by a routine that kills off bad
// connections, and extra connections killed here instead.
if len(t.Conns) > socketsPerTorrent {
- wcs := t.worstConnsHeap()
+ wcs := t.worstConnsHeap(me)
heap.Pop(wcs).(*connection).Close()
}
return true
return false
}
-// TODO: I'm sure there's something here to do with seeding.
-func (t *torrent) badConn(c *connection) bool {
+func (cl *Client) usefulConn(t *torrent, c *connection) bool {
// A 30 second grace for initial messages to go through.
if time.Since(c.completedHandshake) < 30*time.Second {
- return false
+ return true
}
if !t.haveInfo() {
if !c.supportsExtension("ut_metadata") {
- return true
+ return false
}
- if time.Since(c.completedHandshake) > 2*time.Minute {
+ if time.Since(c.completedHandshake) < 2*time.Minute {
return true
}
+ return false
}
- return !t.connHasWantedPieces(c)
+ if cl.seeding(t) {
+ return c.PeerInterested
+ }
+ return t.connHasWantedPieces(c)
}
-func (t *torrent) numGoodConns() (num int) {
+func (t *torrent) numGoodConns(cl *Client) (num int) {
for _, c := range t.Conns {
- if !t.badConn(c) {
+ if cl.usefulConn(t, c) {
num++
}
}
}
func (me *Client) wantConns(t *torrent) bool {
- if me.config.NoUpload && !t.needData() {
+ if !me.seeding(t) && !t.needData() {
return false
}
- if t.numGoodConns() >= socketsPerTorrent {
+ if t.numGoodConns(me) >= socketsPerTorrent {
return false
}
return true
// Returns whether the client should make effort to seed the torrent.
func (cl *Client) seeding(t *torrent) bool {
- return cl.config.Seed && !cl.config.NoUpload
+ if cl.config.NoUpload {
+ return false
+ }
+ if !cl.config.Seed {
+ return false
+ }
+ if t.needData() {
+ return false
+ }
+ return true
}
func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
c.UsefulChunksReceived++
c.lastUsefulChunkReceived = time.Now()
+ me.upload(t, c)
+
// Write the chunk out.
err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
if err != nil {
return false
}
-func (t *torrent) worstConnsHeap() (wcs *worstConns) {
+func (t *torrent) worstConnsHeap(cl *Client) (wcs *worstConns) {
wcs = &worstConns{
- c: append([]*connection{}, t.Conns...),
- t: t,
+ c: append([]*connection{}, t.Conns...),
+ t: t,
+ cl: cl,
}
heap.Init(wcs)
return
return
}
-func (t *torrent) writeStatus(w io.Writer) {
+func (t *torrent) writeStatus(w io.Writer, cl *Client) {
fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
fmt.Fprintf(w, "Metadata have: ")
fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
sort.Sort(&worstConns{
- c: t.Conns,
- t: t,
+ c: t.Conns,
+ t: t,
+ cl: cl,
})
for _, c := range t.Conns {
c.WriteStatus(w, t)
}
func (t *torrent) urgentChunkInPiece(piece int) bool {
+ p := pp.Integer(piece)
for req := range t.urgent {
- if int(req.Index) == piece {
+ if req.Index == p {
return true
}
}
// Implements heap functions such that [0] is the worst connection.
type worstConns struct {
- c []*connection
- t *torrent
+ c []*connection
+ t *torrent
+ cl *Client
}
-func (me worstConns) Len() int { return len(me.c) }
-func (me worstConns) Swap(i, j int) { me.c[i], me.c[j] = me.c[j], me.c[i] }
+func (me *worstConns) Len() int { return len(me.c) }
+func (me *worstConns) Swap(i, j int) { me.c[i], me.c[j] = me.c[j], me.c[i] }
func (me *worstConns) Pop() (ret interface{}) {
old := me.c
}
type worstConnsSortKey struct {
- // Peer has something we want.
- useless bool
- // A fabricated duration since peer was last helpful.
- age time.Duration
+ useful bool
+ lastHelpful time.Time
}
func (me worstConnsSortKey) Less(other worstConnsSortKey) bool {
- if me.useless != other.useless {
- return me.useless
+ if me.useful != other.useful {
+ return !me.useful
}
- return me.age > other.age
+ return me.lastHelpful.Before(other.lastHelpful)
}
-func (me worstConns) key(i int) (key worstConnsSortKey) {
+func (me *worstConns) key(i int) (key worstConnsSortKey) {
c := me.c[i]
- // Peer has had time to declare what they have.
- if time.Now().Sub(c.completedHandshake) >= 30*time.Second {
- if !me.t.haveInfo() {
- key.useless = !c.supportsExtension("ut_metadata")
- } else {
- if !me.t.connHasWantedPieces(c) {
- key.useless = true
- }
- }
+ key.useful = me.cl.usefulConn(me.t, c)
+ if me.cl.seeding(me.t) {
+ key.lastHelpful = c.lastChunkSent
+ } else {
+ key.lastHelpful = c.lastUsefulChunkReceived
}
- key.age = time.Duration(1+3*c.UnwantedChunksReceived) * time.Now().Sub(func() time.Time {
- if !c.lastUsefulChunkReceived.IsZero() {
- return c.lastUsefulChunkReceived
- }
- return c.completedHandshake.Add(-time.Minute)
- }()) / time.Duration(1+c.UsefulChunksReceived)
return
}