import (
"bufio"
"bytes"
- "container/list"
"errors"
"expvar"
"fmt"
pieceInclination []int
pieceRequestOrder prioritybitmap.PriorityBitmap
- outgoingUnbufferedMessages *list.List
- outgoingUnbufferedMessagesNotEmpty missinggo.Event
+ postedBuffer bytes.Buffer
+ writerCond sync.Cond
}
func (cn *connection) mu() sync.Locker {
}
func (cn *connection) Post(msg pp.Message) {
- switch msg.Type {
- case pp.Cancel:
- for e := cn.outgoingUnbufferedMessages.Back(); e != nil; e = e.Prev() {
- elemMsg := e.Value.(pp.Message)
- if elemMsg.Type == pp.Request && elemMsg.Index == msg.Index && elemMsg.Begin == msg.Begin && elemMsg.Length == msg.Length {
- cn.outgoingUnbufferedMessages.Remove(e)
- optimizedCancels.Add(1)
- return
- }
- }
- }
- if cn.outgoingUnbufferedMessages == nil {
- cn.outgoingUnbufferedMessages = list.New()
- }
- cn.outgoingUnbufferedMessages.PushBack(msg)
- cn.outgoingUnbufferedMessagesNotEmpty.Set()
postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+ cn.postedBuffer.Write(msg.MustMarshalBinary())
+ cn.writerCond.Broadcast()
}
func (cn *connection) RequestPending(r request) bool {
// Writes buffers to the socket from the write channel.
func (cn *connection) writer(keepAliveTimeout time.Duration) {
- defer func() {
+ var (
+ buf bytes.Buffer
+ lastWrite time.Time = time.Now()
+ )
+ var keepAliveTimer *time.Timer
+ keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
cn.mu().Lock()
defer cn.mu().Unlock()
- cn.Close()
- }()
- // Reduce write syscalls.
- buf := bufio.NewWriter(cn.w)
- keepAliveTimer := time.NewTimer(keepAliveTimeout)
+ if time.Since(lastWrite) >= keepAliveTimeout {
+ cn.writerCond.Broadcast()
+ }
+ keepAliveTimer.Reset(keepAliveTimeout)
+ })
+ cn.mu().Lock()
+ defer cn.mu().Unlock()
+ defer cn.Close()
+ defer keepAliveTimer.Stop()
for {
- cn.mu().Lock()
- for cn.outgoingUnbufferedMessages != nil && cn.outgoingUnbufferedMessages.Len() != 0 {
- msg := cn.outgoingUnbufferedMessages.Remove(cn.outgoingUnbufferedMessages.Front()).(pp.Message)
- cn.mu().Unlock()
- b, err := msg.MarshalBinary()
- if err != nil {
- panic(err)
- }
- connectionWriterWrite.Add(1)
- n, err := buf.Write(b)
- if err != nil {
- return
- }
- keepAliveTimer.Reset(keepAliveTimeout)
- if n != len(b) {
- panic("short write")
- }
- cn.mu().Lock()
- cn.wroteMsg(&msg)
+ buf.Write(cn.postedBuffer.Bytes())
+ cn.postedBuffer.Reset()
+ if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
+ buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
+ postedKeepalives.Add(1)
+ }
+ if buf.Len() == 0 {
+ cn.writerCond.Wait()
+ continue
}
- cn.outgoingUnbufferedMessagesNotEmpty.Clear()
cn.mu().Unlock()
- connectionWriterFlush.Add(1)
- if buf.Buffered() != 0 {
- if buf.Flush() != nil {
- return
- }
+ // log.Printf("writing %d bytes", buf.Len())
+ n, err := cn.w.Write(buf.Bytes())
+ cn.mu().Lock()
+ if n != 0 {
+ lastWrite = time.Now()
keepAliveTimer.Reset(keepAliveTimeout)
}
- select {
- case <-cn.closed.LockedChan(cn.mu()):
+ if err != nil {
return
- case <-cn.outgoingUnbufferedMessagesNotEmpty.LockedChan(cn.mu()):
- case <-keepAliveTimer.C:
- cn.mu().Lock()
- cn.Post(pp.Message{Keepalive: true})
- cn.mu().Unlock()
- postedKeepalives.Add(1)
}
+ if n != buf.Len() {
+ panic("short write")
+ }
+ buf.Reset()
}
}