Skip to content

Commit

Permalink
Stateful HTTP/1 connection handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 19, 2024
1 parent 2f3180a commit 43940ca
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 28 deletions.
4 changes: 2 additions & 2 deletions async-http.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ Gem::Specification.new do |spec|
spec.add_dependency "async-pool", "~> 0.7"
spec.add_dependency "io-endpoint", "~> 0.11"
spec.add_dependency "io-stream", "~> 0.4"
spec.add_dependency "protocol-http", "~> 0.35"
spec.add_dependency "protocol-http1", "~> 0.20"
spec.add_dependency "protocol-http", "~> 0.37"
spec.add_dependency "protocol-http1", "~> 0.25"
spec.add_dependency "protocol-http2", "~> 0.18"
spec.add_dependency "traces", ">= 0.10"
end
5 changes: 3 additions & 2 deletions fixtures/async/http/a_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
require 'tempfile'

require 'protocol/http/body/file'
require "protocol/http/body/buffered"

require 'sus/fixtures/async/http'

Expand Down Expand Up @@ -100,7 +101,7 @@ module HTTP
end

with 'buffered body' do
let(:body) {Async::HTTP::Body::Buffered.new(["Hello World"])}
let(:body) {::Protocol::HTTP::Body::Buffered.new(["Hello World"])}
let(:response) {::Protocol::HTTP::Response[200, {}, body]}

let(:app) do
Expand Down Expand Up @@ -410,7 +411,7 @@ module HTTP
end

with 'body with incorrect length' do
let(:bad_body) {Async::HTTP::Body::Buffered.new(["Borked"], 10)}
let(:bad_body) {::Protocol::HTTP::Body::Buffered.new(["Borked"], 10)}

let(:app) do
::Protocol::HTTP::Middleware.for do |request|
Expand Down
19 changes: 18 additions & 1 deletion lib/async/http/body/finishable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,25 @@
module Async
module HTTP
module Body
# Keeps track of whether a body is being read, and if so, waits for it to be closed.
class Finishable < ::Protocol::HTTP::Body::Wrapper
def initialize(body)
super(body)

@closed = Async::Variable.new
@error = nil

@reading = false
end

def reading?
@reading
end

def read
@reading = true

super
end

def close(error = nil)
Expand All @@ -27,7 +40,11 @@ def close(error = nil)
end

def wait
@closed.wait
if @reading
@closed.wait
else
self.discard
end
end

def inspect
Expand Down
11 changes: 3 additions & 8 deletions lib/async/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
require 'traces/provider'

require_relative 'protocol'
require_relative 'body/finishable'

module Async
module HTTP
Expand Down Expand Up @@ -140,7 +141,7 @@ def call(request)
def inspect
"#<#{self.class} authority=#{@authority.inspect}>"
end

Traces::Provider(self) do
def call(request)
attributes = {
Expand Down Expand Up @@ -186,13 +187,7 @@ def call(request)
def make_response(request, connection)
response = request.call(connection)

# The connection won't be released until the body is completely read/released.
::Protocol::HTTP::Body::Completable.wrap(response) do
# TODO: We should probably wait until the request is fully consumed and/or the connection is ready before releasing it back into the pool.

# Release the connection back into the pool:
@pool.release(connection)
end
response.pool = @pool

return response
end
Expand Down
23 changes: 18 additions & 5 deletions lib/async/http/protocol/http1/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,25 @@ module HTTP
module Protocol
module HTTP1
class Client < Connection
def initialize(...)
super

@pool = nil
end

attr_accessor :pool

def closed!
super

if pool = @pool
@pool = nil
pool.release(self)
end
end

# Used by the client to send requests to the remote server.
def call(request, task: Task.current)
# We need to keep track of connections which are not in the initial "ready" state.
@ready = false

Console.logger.debug(self) {"#{request.method} #{request.path} #{request.headers.inspect}"}

# Mark the start of the trailers:
Expand Down Expand Up @@ -54,12 +68,11 @@ def call(request, task: Task.current)
end

response = Response.read(self, request)
@ready = true

return response
rescue
# This will ensure that #reusable? returns false.
@stream.close
self.close

raise
end
Expand Down
7 changes: 3 additions & 4 deletions lib/async/http/protocol/http1/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ class Connection < ::Protocol::HTTP1::Connection
def initialize(stream, version)
super(stream)

@ready = true
@version = version
end

def to_s
"\#<#{self.class} negotiated #{@version}, currently #{@ready ? 'ready' : 'in-use'}>"
"\#<#{self.class} negotiated #{@version}, #{@state}>"
end

def as_json(...)
Expand Down Expand Up @@ -62,11 +61,11 @@ def concurrency

# Can we use this connection to make requests?
def viable?
@ready && @stream&.readable?
self.idle? && @stream&.readable?
end

def reusable?
@ready && @persistent && @stream && !@stream.closed?
@persistent && @stream && !@stream.closed?
end
end
end
Expand Down
8 changes: 8 additions & 0 deletions lib/async/http/protocol/http1/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ def initialize(connection, version, status, reason, headers, body)
super(version, status, headers, body, protocol)
end

def pool=(pool)
if @connection.idle? or @connection.closed?
pool.release(@connection)
else
@connection.pool = pool
end
end

def connection
@connection
end
Expand Down
4 changes: 2 additions & 2 deletions lib/async/http/protocol/http1/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def fail_request(status)
write_body(@version, nil)
rescue => error
# At this point, there is very little we can do to recover:
Console::Event::Failure.for(error).emit(self, "Failed to write failure response.", severity: :debug)
Console::Event::Failure.for(error).emit(self, "Failed to write failure response!", severity: :debug)
end

def next_request
Expand All @@ -37,7 +37,7 @@ def next_request
end

return request
rescue ::Protocol::HTTP1::BadRequest
rescue ::Protocol::HTTP1::BadRequest => error
fail_request(400)
# Conceivably we could retry here, but we don't really know how bad the error is, so it's better to just fail:
raise
Expand Down
10 changes: 10 additions & 0 deletions lib/async/http/protocol/http2/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ def initialize(stream)
attr :stream
attr :request

def pool=(pool)
# If we are already closed, the stream can be released now:
if @stream.closed?
pool.release(@stream.connection)
else
# Otherwise, we will release the stream when it is closed:
@stream.pool = pool
end
end

def connection
@stream.connection
end
Expand Down
8 changes: 8 additions & 0 deletions lib/async/http/protocol/http2/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def initialize(*)

@headers = nil

@pool = nil

# Input buffer, reading request body, or response body (receive_data):
@length = nil
@input = nil
Expand All @@ -30,6 +32,8 @@ def initialize(*)

attr_accessor :headers

attr_accessor :pool

attr :input

def add_header(key, value)
Expand Down Expand Up @@ -158,6 +162,10 @@ def closed(error)
@output = nil
end

if pool = @pool and @connection
pool.release(@connection)
end

return self
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/async/http/proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def connect(&block)
end
else
# This ensures we don't leave a response dangling:
input.close
response.close

raise ConnectFailure, response
Expand Down
2 changes: 0 additions & 2 deletions test/async/http/middleware/location_redirector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
with '301' do
let(:app) do
Protocol::HTTP::Middleware.for do |request|
request.finish # TODO: request.discard - or some default handling?

case request.path
when '/home'
Protocol::HTTP::Response[301, {'location' => '/'}, []]
Expand Down
4 changes: 2 additions & 2 deletions test/async/http/protocol/http11.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
response = client.get("/")
connection = response.connection

expect(connection.as_json).to be == "#<Async::HTTP::Protocol::HTTP1::Client negotiated HTTP/1.1, currently ready>"
expect(connection.as_json).to be =~ /Async::HTTP::Protocol::HTTP1::Client negotiated HTTP/
ensure
response&.close
end
Expand Down Expand Up @@ -109,7 +109,7 @@ def around
end

with 'full hijack with empty response' do
let(:body) {Async::HTTP::Body::Buffered.new([], 0)}
let(:body) {::Protocol::HTTP::Body::Buffered.new([], 0)}

let(:app) do
::Protocol::HTTP::Middleware.for do |request|
Expand Down
2 changes: 2 additions & 0 deletions test/async/http/proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
stream.close_read

stream.write(chunk)
stream.close_write
ensure
stream.close
end
end
Expand Down

0 comments on commit 43940ca

Please sign in to comment.