src/net/http/h2_bundle.go | 137 +++++++++++++++++++++++++++++++++-------------------- diff --git a/src/net/http/h2_bundle.go b/src/net/http/h2_bundle.go index fd899034a7e747acee83edc2b0eee86837e34b52..2e0b3c905acf9395f91ea0f4ad0269678c4aaed1 100644 --- a/src/net/http/h2_bundle.go +++ b/src/net/http/h2_bundle.go @@ -855,10 +855,12 @@ // (currently 16MB) // If the limit is hit, MetaHeadersFrame.Truncated is set true. MaxHeaderListSize uint32 - logReads bool + logReads, logWrites bool - debugFramer *http2Framer // only use for logging written writes - debugFramerBuf *bytes.Buffer + debugFramer *http2Framer // only use for logging written writes + debugFramerBuf *bytes.Buffer + debugReadLoggerf func(string, ...interface{}) + debugWriteLoggerf func(string, ...interface{}) } func (fr *http2Framer) maxHeaderListSize() uint32 { @@ -892,7 +894,7 @@ _ = append(f.wbuf[:0], byte(length>>16), byte(length>>8), byte(length)) - if http2logFrameWrites { + if f.logWrites { f.logWrite() } @@ -914,10 +916,10 @@ } f.debugFramerBuf.Write(f.wbuf) fr, err := f.debugFramer.ReadFrame() if err != nil { - log.Printf("http2: Framer %p: failed to decode just-written frame", f) + f.debugWriteLoggerf("http2: Framer %p: failed to decode just-written frame", f) return } - log.Printf("http2: Framer %p: wrote %v", f, http2summarizeFrame(fr)) + f.debugWriteLoggerf("http2: Framer %p: wrote %v", f, http2summarizeFrame(fr)) } func (f *http2Framer) writeByte(v byte) { f.wbuf = append(f.wbuf, v) } @@ -938,9 +940,12 @@ // NewFramer returns a Framer that writes frames to w and reads them from r. func http2NewFramer(w io.Writer, r io.Reader) *http2Framer { fr := &http2Framer{ - w: w, - r: r, - logReads: http2logFrameReads, + w: w, + r: r, + logReads: http2logFrameReads, + logWrites: http2logFrameWrites, + debugReadLoggerf: log.Printf, + debugWriteLoggerf: log.Printf, } fr.getReadBuf = func(size uint32) []byte { if cap(fr.readBuf) >= int(size) { @@ -1022,7 +1027,7 @@ if err := fr.checkFrameOrder(f); err != nil { return nil, err } if fr.logReads { - log.Printf("http2: Framer %p: read %v", fr, http2summarizeFrame(f)) + fr.debugReadLoggerf("http2: Framer %p: read %v", fr, http2summarizeFrame(f)) } if fh.Type == http2FrameHeaders && fr.ReadMetaHeaders != nil { return fr.readMetaFrame(f.(*http2HeadersFrame)) @@ -1922,8 +1927,8 @@ hdec := fr.ReadMetaHeaders hdec.SetEmitEnabled(true) hdec.SetMaxStringLength(fr.maxHeaderStringLen()) hdec.SetEmitFunc(func(hf hpack.HeaderField) { - if http2VerboseLogs && http2logFrameReads { - log.Printf("http2: decoded hpack field %+v", hf) + if http2VerboseLogs && fr.logReads { + fr.debugReadLoggerf("http2: decoded hpack field %+v", hf) } if !httplex.ValidHeaderFieldValue(hf.Value) { invalid = http2headerFieldValueError(hf.Value) @@ -3285,8 +3290,7 @@ parent *http2stream // or nil numTrailerValues int64 weight uint8 state http2streamState - sentReset bool // only true once detached from streams map - gotReset bool // only true once detacted from streams map + resetQueued bool // RST_STREAM queued for write; set by sc.resetStream gotTrailerHeader bool // HEADER frame for trailers was seen wroteHeaders bool // whether we wrote headers (not status 100) reqBuf []byte // if non-nil, body pipe buffer to return later at EOF @@ -3682,13 +3686,25 @@ // If you're not on the serve goroutine, use writeFrameFromHandler instead. func (sc *http2serverConn) writeFrame(wr http2FrameWriteRequest) { sc.serveG.check() + // If true, wr will not be written and wr.done will not be signaled. var ignoreWrite bool + if wr.StreamID() != 0 { + _, isReset := wr.write.(http2StreamError) + if state, _ := sc.state(wr.StreamID()); state == http2stateClosed && !isReset { + ignoreWrite = true + } + } + switch wr.write.(type) { case *http2writeResHeaders: wr.stream.wroteHeaders = true case http2write100ContinueHeadersFrame: if wr.stream.wroteHeaders { + + if wr.done != nil { + panic("wr.done != nil for write100ContinueHeadersFrame") + } ignoreWrite = true } } @@ -3712,14 +3728,14 @@ st := wr.stream if st != nil { switch st.state { case http2stateHalfClosedLocal: - panic("internal error: attempt to send frame on half-closed-local stream") - case http2stateClosed: - if st.sentReset || st.gotReset { + switch wr.write.(type) { + case http2StreamError, http2handlerPanicRST, http2writeWindowUpdate: - sc.scheduleFrameWrite() - return + default: + panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr)) } - panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wr)) + case http2stateClosed: + panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr)) } } if wpp, ok := wr.write.(*http2writePushPromise); ok { @@ -3727,9 +3743,7 @@ var err error wpp.promisedID, err = wpp.allocatePromisedID() if err != nil { sc.writingFrameAsync = false - if wr.done != nil { - wr.done <- err - } + wr.replyToWriter(err) return } } @@ -3762,24 +3776,9 @@ sc.writingFrame = false sc.writingFrameAsync = false wr := res.wr - st := wr.stream - closeStream := http2endsStream(wr.write) - - if _, ok := wr.write.(http2handlerPanicRST); ok { - sc.closeStream(st, http2errHandlerPanicked) - } - - if ch := wr.done; ch != nil { - select { - case ch <- res.err: - default: - panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write)) - } - } - wr.write = nil - - if closeStream { + if http2writeEndsStream(wr.write) { + st := wr.stream if st == nil { panic("internal error: expecting non-nil stream") } @@ -3787,12 +3786,23 @@ switch st.state { case http2stateOpen: st.state = http2stateHalfClosedLocal - errCancel := http2streamError(st.id, http2ErrCodeCancel) - sc.resetStream(errCancel) + sc.resetStream(http2streamError(st.id, http2ErrCodeCancel)) case http2stateHalfClosedRemote: sc.closeStream(st, http2errHandlerComplete) } + } else { + switch v := wr.write.(type) { + case http2StreamError: + + if st, ok := sc.streams[v.StreamID]; ok { + sc.closeStream(st, v) + } + case http2handlerPanicRST: + sc.closeStream(wr.stream, http2errHandlerPanicked) + } } + + wr.replyToWriter(res.err) sc.scheduleFrameWrite() } @@ -3890,8 +3900,7 @@ func (sc *http2serverConn) resetStream(se http2StreamError) { sc.serveG.check() sc.writeFrame(http2FrameWriteRequest{write: se}) if st, ok := sc.streams[se.StreamID]; ok { - st.sentReset = true - sc.closeStream(st, se) + st.resetQueued = true } } @@ -4030,7 +4039,6 @@ return http2ConnectionError(http2ErrCodeProtocol) } if st != nil { - st.gotReset = true st.cancelCtx() sc.closeStream(st, http2streamError(f.StreamID, f.ErrCode)) } @@ -4145,7 +4153,7 @@ if id == 0 || state == http2stateIdle { return http2ConnectionError(http2ErrCodeProtocol) } - if st == nil || state != http2stateOpen || st.gotTrailerHeader { + if st == nil || state != http2stateOpen || st.gotTrailerHeader || st.resetQueued { if sc.inflow.available() < int32(f.Length) { return http2streamError(id, http2ErrCodeFlowControl) @@ -4154,6 +4162,10 @@ sc.inflow.take(int32(f.Length)) sc.sendWindowUpdate(nil, int(f.Length)) + if st != nil && st.resetQueued { + + return nil + } return http2streamError(id, http2ErrCodeStreamClosed) } if st.body == nil { @@ -4251,6 +4263,10 @@ return http2ConnectionError(http2ErrCodeProtocol) } if st := sc.streams[f.StreamID]; st != nil { + if st.resetQueued { + + return nil + } return st.processTrailerHeaders(f) } @@ -5216,7 +5232,7 @@ method: msg.method, scheme: msg.url.Scheme, authority: msg.url.Host, path: msg.url.RequestURI(), - header: msg.header, + header: http2cloneHeader(msg.header), }) if err != nil { @@ -5646,6 +5662,10 @@ host = authority } if a, err := idna.ToASCII(host); err == nil { host = a + } + + if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") { + return host + ":" + port } return net.JoinHostPort(host, port) } @@ -7376,9 +7396,10 @@ // returned buffer. HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) } -// endsStream reports whether the given frame writer w will locally -// close the stream. -func http2endsStream(w http2writeFramer) bool { +// writeEndsStream reports whether w writes a frame that will transition +// the stream to a half-closed local state. This returns false for RST_STREAM, +// which closes the entire stream (not just the local half). +func http2writeEndsStream(w http2writeFramer) bool { switch v := w.(type) { case *http2writeData: return v.endStream @@ -7386,7 +7407,7 @@ case *http2writeResHeaders: return v.endStream case nil: - panic("endsStream called on nil writeFramer") + panic("writeEndsStream called on nil writeFramer") } return false } @@ -7830,6 +7851,20 @@ } else { des = fmt.Sprintf("%T", wr.write) } return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des) +} + +// replyToWriter sends err to wr.done and panics if the send must block +// This does nothing if wr.done is nil. +func (wr *http2FrameWriteRequest) replyToWriter(err error) { + if wr.done == nil { + return + } + select { + case wr.done <- err: + default: + panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write)) + } + wr.write = nil } // writeQueue is used by implementations of WriteScheduler.