diff --git a/examples/upload/client.rb b/examples/upload/client.rb index 5d2c1614..558581dc 100644 --- a/examples/upload/client.rb +++ b/examples/upload/client.rb @@ -9,10 +9,27 @@ require 'async' require 'protocol/http/body/file' -require 'async/http/body/delayed' require 'async/http/client' require 'async/http/endpoint' +class Delayed < ::Protocol::HTTP::Body::Wrapper + def initialize(body, delay = 0.01) + super(body) + + @delay = delay + end + + def ready? + false + end + + def read + sleep(@delay) + + return super + end +end + Async do endpoint = Async::HTTP::Endpoint.parse("http://localhost:9222") client = Async::HTTP::Client.new(endpoint, protocol: Async::HTTP::Protocol::HTTP2) @@ -21,7 +38,7 @@ ['accept', 'text/plain'], ] - body = Async::HTTP::Body::Delayed.new(Protocol::HTTP::Body::File.open(File.join(__dir__, "data.txt"), block_size: 32)) + body = Delayed.new(Protocol::HTTP::Body::File.open(File.join(__dir__, "data.txt"), block_size: 32)) response = client.post(endpoint.path, headers, body) diff --git a/fixtures/async/http/a_protocol.rb b/fixtures/async/http/a_protocol.rb index 0dcde419..5d3870ca 100644 --- a/fixtures/async/http/a_protocol.rb +++ b/fixtures/async/http/a_protocol.rb @@ -162,7 +162,7 @@ module HTTP request_received.wait headers.add('etag', 'abcd') - body.close + body.close_write end response = client.post("/", headers, body) @@ -187,7 +187,7 @@ module HTTP response_received.wait headers.add('etag', 'abcd') - body.close + body.close_write end ::Protocol::HTTP::Response[200, headers, body] @@ -395,9 +395,9 @@ module HTTP let(:app) do ::Protocol::HTTP::Middleware.for do |request| Async::HTTP::Body::Hijack.response(request, 200, {}) do |stream| - stream.write content - stream.write content - stream.close + stream.write(content) + stream.write(content) + stream.close_write end end end diff --git a/fixtures/async/http/body/a_writable_body.rb b/fixtures/async/http/body/a_writable_body.rb deleted file mode 100644 index 24c18c38..00000000 --- a/fixtures/async/http/body/a_writable_body.rb +++ /dev/null @@ -1,105 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2019-2023, by Samuel Williams. - -require 'protocol/http/body/deflate' - -module Async - module HTTP - module Body - AWritableBody = Sus::Shared("a writable body") do - it "can write and read data" do - 3.times do |i| - body.write("Hello World #{i}") - expect(body.read).to be == "Hello World #{i}" - end - end - - it "can buffer data in order" do - 3.times do |i| - body.write("Hello World #{i}") - end - - 3.times do |i| - expect(body.read).to be == "Hello World #{i}" - end - end - - with '#join' do - it "can join chunks" do - 3.times do |i| - body.write("#{i}") - end - - body.close - - expect(body.join).to be == "012" - end - end - - with '#each' do - it "can read all data in order" do - 3.times do |i| - body.write("Hello World #{i}") - end - - body.close - - 3.times do |i| - chunk = body.read - expect(chunk).to be == "Hello World #{i}" - end - end - - it "can propagate failures" do - reactor.async do - expect do - body.each do |chunk| - raise RuntimeError.new("It was too big!") - end - end.to raise_exception(RuntimeError, message: be =~ /big/) - end - - expect{ - body.write("Beep boop") # This will cause a failure. - ::Async::Task.current.yield - body.write("Beep boop") # This will fail. - }.to raise_exception(RuntimeError, message: be =~ /big/) - end - - it "can propagate failures in nested bodies" do - nested = ::Protocol::HTTP::Body::Deflate.for(body) - - reactor.async do - expect do - nested.each do |chunk| - raise RuntimeError.new("It was too big!") - end - end.to raise_exception(RuntimeError, message: be =~ /big/) - end - - expect{ - body.write("Beep boop") # This will cause a failure. - ::Async::Task.current.yield - body.write("Beep boop") # This will fail. - }.to raise_exception(RuntimeError, message: be =~ /big/) - end - - it "can consume chunks" do - body.write("Hello World!") - body.close - - expect(body).not.to be(:empty?) - - body.each do |chunk| - expect(chunk).to be == "Hello World!" - end - - expect(body).to be(:empty?) - end - end - end - end - end -end diff --git a/gems.rb b/gems.rb index 4094e8aa..52fa32c2 100644 --- a/gems.rb +++ b/gems.rb @@ -20,6 +20,8 @@ # gem "protocol-http2", path: "../protocol-http2" # gem "protocol-hpack", path: "../protocol-hpack" +gem "protocol-http", git: "https://github.com/socketry/protocol-http.git" + group :maintenance, optional: true do gem "bake-modernize" gem "bake-gem" diff --git a/lib/async/http/body/delayed.rb b/lib/async/http/body/delayed.rb deleted file mode 100644 index 7b0f57b5..00000000 --- a/lib/async/http/body/delayed.rb +++ /dev/null @@ -1,32 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2018-2023, by Samuel Williams. -# Copyright, 2020, by Bruno Sutic. -# Copyright, 2023, by Thomas Morgan. - -require 'protocol/http/body/wrapper' - -module Async - module HTTP - module Body - class Delayed < ::Protocol::HTTP::Body::Wrapper - def initialize(body, delay = 0.01) - super(body) - - @delay = delay - end - - def ready? - false - end - - def read - Async::Task.current.sleep(@delay) - - return super - end - end - end - end -end diff --git a/lib/async/http/body/hijack.rb b/lib/async/http/body/hijack.rb index b07fcda6..769c45b7 100644 --- a/lib/async/http/body/hijack.rb +++ b/lib/async/http/body/hijack.rb @@ -36,7 +36,7 @@ def stream? end def call(stream) - return @block.call(stream) + @block.call(stream) end attr :input diff --git a/lib/async/http/body/pipe.rb b/lib/async/http/body/pipe.rb index 6ef1c0e1..93d7a45d 100644 --- a/lib/async/http/body/pipe.rb +++ b/lib/async/http/body/pipe.rb @@ -17,7 +17,7 @@ def initialize(input, output = Writable.new, task: Task.current) head, tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM) - @head = ::IO::Stream::Buffered.new(head) + @head = ::IO::Stream(head) @tail = tail @reader = nil @@ -52,8 +52,10 @@ def reader(task) end @head.close_write + rescue => error + raise ensure - @input.close($!) + @input.close(error) close_head if @writer&.finished? end @@ -68,8 +70,10 @@ def writer(task) while chunk = @head.read_partial @output.write(chunk) end + rescue => error + raise ensure - @output.close($!) + @output.close_write(error) close_head if @reader&.finished? end diff --git a/lib/async/http/body/slowloris.rb b/lib/async/http/body/slowloris.rb deleted file mode 100644 index 6a3d412f..00000000 --- a/lib/async/http/body/slowloris.rb +++ /dev/null @@ -1,55 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2019-2023, by Samuel Williams. - -require_relative 'writable' - -require 'async/clock' - -module Async - module HTTP - module Body - # A dynamic body which you can write to and read from. - class Slowloris < Writable - class ThroughputError < StandardError - def initialize(throughput, minimum_throughput, time_since_last_write) - super("Slow write: #{throughput.round(1)}bytes/s less than required #{minimum_throughput.round}bytes/s.") - end - end - - # In order for this implementation to work correctly, you need to use a LimitedQueue. - # @param minimum_throughput [Integer] the minimum bytes per second otherwise this body will be forcefully closed. - def initialize(*arguments, minimum_throughput: 1024, **options) - super(*arguments, **options) - - @minimum_throughput = minimum_throughput - - @last_write_at = nil - @last_chunk_size = nil - end - - attr :minimum_throughput - - # If #read is called regularly to maintain throughput, that is good. If #read is not called, that is a problem. Throughput is dependent on data being available, from #write, so it doesn't seem particularly problimatic to do this check in #write. - def write(chunk) - if @last_chunk_size - time_since_last_write = Async::Clock.now - @last_write_at - throughput = @last_chunk_size / time_since_last_write - - if throughput < @minimum_throughput - error = ThroughputError.new(throughput, @minimum_throughput, time_since_last_write) - - self.close(error) - end - end - - super.tap do - @last_write_at = Async::Clock.now - @last_chunk_size = chunk&.bytesize - end - end - end - end - end -end diff --git a/lib/async/http/protocol/http1/server.rb b/lib/async/http/protocol/http1/server.rb index 19078a61..3463984c 100644 --- a/lib/async/http/protocol/http1/server.rb +++ b/lib/async/http/protocol/http1/server.rb @@ -68,11 +68,23 @@ def each(task: Task.current) stream = write_upgrade_body(protocol) # At this point, the request body is hijacked, so we don't want to call #finish below. - request = nil unless request.body + request = nil response = nil # We must return here as no further request processing can be done: return body.call(stream) + elsif response.status == 101 + # This code path is to support legacy behavior where the response status is set to 101, but the protocol is not upgraded. This may not be a valid use case, but it is supported for compatibility. We expect the response headers to contain the `upgrade` header. + write_response(@version, response.status, response.headers) + + stream = write_tunnel_body(request.version) + + # Same as above: + request = nil + response = nil + + # We must return here as no further request processing can be done: + return body&.call(stream) else write_response(@version, response.status, response.headers) @@ -80,7 +92,7 @@ def each(task: Task.current) stream = write_tunnel_body(request.version) # Same as above: - request = nil unless request.body + request = nil response = nil # We must return here as no further request processing can be done: diff --git a/lib/async/http/protocol/http2/input.rb b/lib/async/http/protocol/http2/input.rb index b29d3849..116681b6 100644 --- a/lib/async/http/protocol/http2/input.rb +++ b/lib/async/http/protocol/http2/input.rb @@ -3,14 +3,14 @@ # Released under the MIT License. # Copyright, 2020-2023, by Samuel Williams. -require_relative '../../body/writable' +require 'protocol/http/body/writable' module Async module HTTP module Protocol module HTTP2 # A writable body which requests window updates when data is read from it. - class Input < Body::Writable + class Input < ::Protocol::HTTP::Body::Writable def initialize(stream, length) super(length) diff --git a/lib/async/http/protocol/http2/output.rb b/lib/async/http/protocol/http2/output.rb index dee8b1ba..4e3d5cfa 100644 --- a/lib/async/http/protocol/http2/output.rb +++ b/lib/async/http/protocol/http2/output.rb @@ -50,18 +50,25 @@ def write(chunk) end end - # This method should only be called from within the context of the output task. - def close(error = nil) - if @stream - @stream.finish_output(error) + def close_write(error = nil) + if stream = @stream @stream = nil + stream.finish_output(error) end end + # This method should only be called from within the context of the output task. + def close(error = nil) + close_write(error) + stop(error) + end + # This method should only be called from within the context of the HTTP/2 stream. def stop(error) - @task&.stop - @task = nil + if task = @task + @task = nil + task.stop(error) + end end private @@ -70,10 +77,12 @@ def stream(task) task.annotate("Streaming #{@body} to #{@stream}.") input = @stream.wait_for_input + stream = ::Protocol::HTTP::Body::Stream.new(input, self) - @body.call(::Protocol::HTTP::Body::Stream.new(input, self)) - rescue Async::Stop - # Ignore. + @body.call(stream) + rescue => error + self.close(error) + raise end # Reads chunks from the given body and writes them to the stream as fast as possible. @@ -86,11 +95,17 @@ def passthrough(task) # chunk.clear unless chunk.frozen? # GC.start end - - self.close + rescue => error + raise ensure - @body&.close($!) - @body = nil + # Ensure the body we are reading from is fully closed: + if body = @body + @body = nil + body.close(error) + end + + # Ensure the output of this body is closed: + self.close_write(error) end # Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk. diff --git a/lib/async/http/protocol/http2/stream.rb b/lib/async/http/protocol/http2/stream.rb index 49d25871..60f3fadd 100644 --- a/lib/async/http/protocol/http2/stream.rb +++ b/lib/async/http/protocol/http2/stream.rb @@ -59,7 +59,7 @@ def process_headers(frame) # TODO this might need to be in an ensure block: if @input and frame.end_stream? - @input.close($!) + @input.close_write @input = nil end rescue ::Protocol::HTTP2::HeaderError => error @@ -98,7 +98,7 @@ def process_data(frame) end if frame.end_stream? - @input.close + @input.close_write @input = nil end end @@ -149,7 +149,7 @@ def closed(error) super if @input - @input.close(error) + @input.close_write(error) @input = nil end diff --git a/test/async/http/body.rb b/test/async/http/body.rb index 37d5adb8..1235a541 100644 --- a/test/async/http/body.rb +++ b/test/async/http/body.rb @@ -23,7 +23,7 @@ output.write(chunk.reverse) end - output.close + output.close_write end Protocol::HTTP::Response[200, [], output] @@ -35,7 +35,7 @@ reactor.async do |task| output.write("Hello World!") - output.close + output.close_write end response = client.post("/", {}, output) @@ -58,7 +58,7 @@ notification.wait end - body.close + body.close_write end Protocol::HTTP::Response[200, {}, body] diff --git a/test/async/http/body/hijack.rb b/test/async/http/body/hijack.rb index 73cad02d..9d994a2a 100644 --- a/test/async/http/body/hijack.rb +++ b/test/async/http/body/hijack.rb @@ -15,7 +15,7 @@ 3.times do stream.write(content) end - stream.close + stream.close_write end end diff --git a/test/async/http/body/pipe.rb b/test/async/http/body/pipe.rb index 0760de24..8e05263c 100644 --- a/test/async/http/body/pipe.rb +++ b/test/async/http/body/pipe.rb @@ -20,7 +20,7 @@ include Sus::Fixtures::Async::ReactorContext let(:input_write_duration) {0} - let(:io) { pipe.to_io } + let(:io) {pipe.to_io} def before super @@ -31,14 +31,12 @@ def before input.write("#{first} ") sleep(input_write_duration) if input_write_duration > 0 input.write(second) - input.close + input.close_write end end - def aftrer + after do io.close - - super end it "returns an io socket" do diff --git a/test/async/http/body/slowloris.rb b/test/async/http/body/slowloris.rb deleted file mode 100644 index dc3e48be..00000000 --- a/test/async/http/body/slowloris.rb +++ /dev/null @@ -1,35 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2019-2023, by Samuel Williams. - -require 'async/http/body/slowloris' - -require 'sus/fixtures/async' -require 'async/http/body/a_writable_body' - -describe Async::HTTP::Body::Slowloris do - include Sus::Fixtures::Async::ReactorContext - - let(:body) {subject.new} - - it_behaves_like Async::HTTP::Body::AWritableBody - - it "closes body with error if throughput is not maintained" do - body.write("Hello World") - - sleep 0.1 - - expect do - body.write("Hello World") - end.to raise_exception(Async::HTTP::Body::Slowloris::ThroughputError, message: be =~ /Slow write/) - end - - it "doesn't close body if throughput is exceeded" do - body.write("Hello World") - - expect do - body.write("Hello World") - end.not.to raise_exception - end -end diff --git a/test/async/http/body/writable.rb b/test/async/http/body/writable.rb deleted file mode 100644 index 9d553a58..00000000 --- a/test/async/http/body/writable.rb +++ /dev/null @@ -1,17 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2018-2023, by Samuel Williams. - -require 'async/http/body/slowloris' - -require 'sus/fixtures/async' -require 'async/http/body/a_writable_body' - -describe Async::HTTP::Body::Writable do - include Sus::Fixtures::Async::ReactorContext - - let(:body) {subject.new} - - it_behaves_like Async::HTTP::Body::AWritableBody -end diff --git a/test/async/http/proxy.rb b/test/async/http/proxy.rb index b3b4708e..7b37ba39 100644 --- a/test/async/http/proxy.rb +++ b/test/async/http/proxy.rb @@ -57,7 +57,7 @@ expect(response).to be(:success?) input.write(data) - input.close + input.close_write expect(response.read).to be == data end @@ -74,7 +74,7 @@ stream.flush end - stream.close + stream.close_write end end end