Skip to content

Commit

Permalink
update to tokio 0.3 (#491)
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Oct 23, 2020
1 parent 676a068 commit cbbdd30
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 61 deletions.
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ members = [
futures-core = { version = "0.3", default-features = false }
futures-sink = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false }
tokio-util = { version = "0.3.1", features = ["codec"] }
tokio = { version = "0.2", features = ["io-util"] }
tokio-util = { version = "0.4.0", features = ["codec"] }
tokio = { version = "0.3", features = ["io-util"] }
bytes = "0.5.2"
http = "0.2"
tracing = { version = "0.1.13", default-features = false, features = ["std", "log"] }
tracing-futures = { version = "0.2", default-features = false, features = ["std-future"]}
fnv = "1.0.5"
slab = "0.4.0"
slab = "0.4.2"
indexmap = "1.0"

[dev-dependencies]
Expand All @@ -68,9 +68,9 @@ serde = "1.0.0"
serde_json = "1.0.0"

# Examples
tokio = { version = "0.2", features = ["dns", "macros", "rt-core", "sync", "tcp"] }
tokio = { version = "0.3", features = ["rt-multi-thread", "macros", "sync", "net"] }
env_logger = { version = "0.5.3", default-features = false }
rustls = "0.16"
tokio-rustls = "0.12.0"
rustls = "0.18"
tokio-rustls = "0.20.0"
webpki = "0.21"
webpki-roots = "0.17"
2 changes: 1 addition & 1 deletion examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::net::{TcpListener, TcpStream};
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let _ = env_logger::try_init();

let mut listener = TcpListener::bind("127.0.0.1:5928").await?;
let listener = TcpListener::bind("127.0.0.1:5928").await?;

println!("listening on {:?}", listener.local_addr());

Expand Down
43 changes: 21 additions & 22 deletions src/codec/framed_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ use crate::codec::UserError::*;
use crate::frame::{self, Frame, FrameSize};
use crate::hpack;

use bytes::{
buf::{BufExt, BufMutExt},
Buf, BufMut, BytesMut,
};
use bytes::{buf::BufMutExt, Buf, BufMut, BytesMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use std::io::{self, Cursor};

Expand Down Expand Up @@ -193,12 +190,26 @@ where
match self.next {
Some(Next::Data(ref mut frame)) => {
tracing::trace!(queued_data_frame = true);
let mut buf = (&mut self.buf).chain(frame.payload_mut());
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut buf))?;

if self.buf.has_remaining() {
let n =
ready!(Pin::new(&mut self.inner).poll_write(cx, self.buf.bytes()))?;
self.buf.advance(n);
}

let buf = frame.payload_mut();

if !self.buf.has_remaining() && buf.has_remaining() {
let n = ready!(Pin::new(&mut self.inner).poll_write(cx, buf.bytes()))?;
buf.advance(n);
}
}
_ => {
tracing::trace!(queued_data_frame = false);
ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut self.buf))?;
let n = ready!(
Pin::new(&mut self.inner).poll_write(cx, &mut self.buf.bytes())
)?;
self.buf.advance(n);
}
}
}
Expand Down Expand Up @@ -290,25 +301,13 @@ impl<T, B> FramedWrite<T, B> {
}

impl<T: AsyncRead + Unpin, B> AsyncRead for FramedWrite<T, B> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit<u8>]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}

fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
}

fn poll_read_buf<Buf: BufMut>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut Buf,
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_read_buf(cx, buf)
}
}

// We never project the Pin to `B`.
Expand Down
8 changes: 5 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{convert, fmt, io, mem};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing_futures::{Instrument, Instrumented};

/// In progress HTTP/2.0 connection handshake future.
Expand Down Expand Up @@ -1158,16 +1158,18 @@ where
let mut rem = PREFACE.len() - self.pos;

while rem > 0 {
let n = ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf[..rem]))
let mut buf = ReadBuf::new(&mut buf[..rem]);
ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf))
.map_err(crate::Error::from_io)?;
let n = buf.filled().len();
if n == 0 {
return Poll::Ready(Err(crate::Error::from_io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"connection closed before reading preface",
))));
}

if PREFACE[self.pos..self.pos + n] != buf[..n] {
if &PREFACE[self.pos..self.pos + n] != buf.filled() {
proto_err!(conn: "read_preface: invalid preface");
// TODO: Should this just write the GO_AWAY frame directly?
return Poll::Ready(Err(Reason::PROTOCOL_ERROR.into()));
Expand Down
2 changes: 1 addition & 1 deletion tests/h2-fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ env_logger = { version = "0.5.3", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std"] }
honggfuzz = "0.5"
http = "0.2"
tokio = { version = "0.2", features = [] }
tokio = { version = "0.3", features = [] }
4 changes: 2 additions & 2 deletions tests/h2-support/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ tracing = "0.1"
tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "chrono", "ansi"] }
futures = { version = "0.3", default-features = false }
http = "0.2"
tokio = { version = "0.2", features = ["time"] }
tokio-test = "0.2"
tokio = { version = "0.3", features = ["time"] }
tokio-test = "0.3"
41 changes: 21 additions & 20 deletions tests/h2-support/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use h2::{self, RecvError, SendError};
use futures::future::poll_fn;
use futures::{ready, Stream, StreamExt};

use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};

use super::assert::assert_frame_eq;
use std::pin::Pin;
Expand Down Expand Up @@ -147,10 +147,11 @@ impl Handle {
poll_fn(move |cx| {
while buf.has_remaining() {
let res = Pin::new(self.codec.get_mut())
.poll_write_buf(cx, &mut buf)
.poll_write(cx, &mut buf.bytes())
.map_err(|e| panic!("write err={:?}", e));

ready!(res).unwrap();
let n = ready!(res).unwrap();
buf.advance(n);
}

Poll::Ready(())
Expand Down Expand Up @@ -294,8 +295,8 @@ impl AsyncRead for Handle {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
Pin::new(self.codec.get_mut()).poll_read(cx, buf)
}
}
Expand Down Expand Up @@ -344,29 +345,29 @@ impl AsyncRead for Mock {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
assert!(
buf.len() > 0,
buf.remaining() > 0,
"attempted read with zero length buffer... wut?"
);

let mut me = self.pipe.inner.lock().unwrap();

if me.rx.is_empty() {
if me.closed {
return Poll::Ready(Ok(0));
return Poll::Ready(Ok(()));
}

me.rx_task = Some(cx.waker().clone());
return Poll::Pending;
}

let n = cmp::min(buf.len(), me.rx.len());
buf[..n].copy_from_slice(&me.rx[..n]);
let n = cmp::min(buf.remaining(), me.rx.len());
buf.put_slice(&me.rx[..n]);
me.rx.drain(..n);

Poll::Ready(Ok(n))
Poll::Ready(Ok(()))
}
}

Expand Down Expand Up @@ -427,29 +428,29 @@ impl AsyncRead for Pipe {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
assert!(
buf.len() > 0,
buf.remaining() > 0,
"attempted read with zero length buffer... wut?"
);

let mut me = self.inner.lock().unwrap();

if me.tx.is_empty() {
if me.closed {
return Poll::Ready(Ok(0));
return Poll::Ready(Ok(()));
}

me.tx_task = Some(cx.waker().clone());
return Poll::Pending;
}

let n = cmp::min(buf.len(), me.tx.len());
buf[..n].copy_from_slice(&me.tx[..n]);
let n = cmp::min(buf.remaining(), me.tx.len());
buf.put_slice(&me.tx[..n]);
me.tx.drain(..n);

Poll::Ready(Ok(n))
Poll::Ready(Ok(()))
}
}

Expand Down Expand Up @@ -479,5 +480,5 @@ impl AsyncWrite for Pipe {
}

pub async fn idle_ms(ms: u64) {
tokio::time::delay_for(Duration::from_millis(ms)).await
tokio::time::sleep(Duration::from_millis(ms)).await
}
2 changes: 1 addition & 1 deletion tests/h2-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ edition = "2018"
h2-support = { path = "../h2-support" }
tracing = "0.1.13"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
tokio = { version = "0.2", features = ["macros", "tcp"] }
tokio = { version = "0.3", features = ["macros", "net", "rt", "io-util"] }
5 changes: 5 additions & 0 deletions tests/h2-tests/tests/codec_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ async fn read_continuation_frames() {
#[tokio::test]
async fn update_max_frame_len_at_rest() {
use futures::StreamExt;
use tokio::io::AsyncReadExt;

h2_support::trace_init!();
// TODO: add test for updating max frame length in flight as well?
Expand All @@ -211,6 +212,10 @@ async fn update_max_frame_len_at_rest() {
codec.next().await.unwrap().unwrap_err().to_string(),
"frame with invalid size"
);

// drain codec buffer
let mut buf = Vec::new();
codec.get_mut().read_to_end(&mut buf).await.unwrap();
}

#[tokio::test]
Expand Down
4 changes: 2 additions & 2 deletions tests/h2-tests/tests/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ async fn settings_lowered_capacity_returns_capacity_to_connection() {
//
// A timeout is used here to avoid blocking forever if there is a
// failure
let result = select(rx2, tokio::time::delay_for(Duration::from_secs(5))).await;
let result = select(rx2, tokio::time::sleep(Duration::from_secs(5))).await;
if let Either::Right((_, _)) = result {
panic!("Timed out");
}
Expand Down Expand Up @@ -1004,7 +1004,7 @@ async fn settings_lowered_capacity_returns_capacity_to_connection() {
});

// Wait for server handshake to complete.
let result = select(rx1, tokio::time::delay_for(Duration::from_secs(5))).await;
let result = select(rx1, tokio::time::sleep(Duration::from_secs(5))).await;
if let Either::Right((_, _)) = result {
panic!("Timed out");
}
Expand Down
6 changes: 3 additions & 3 deletions tests/h2-tests/tests/hammer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ impl Server {
{
let mk_data = Arc::new(mk_data);

let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut listener = rt
let rt = tokio::runtime::Runtime::new().unwrap();
let listener = rt
.block_on(TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))))
.unwrap();
let addr = listener.local_addr().unwrap();
Expand Down Expand Up @@ -140,7 +140,7 @@ fn hammer_client_concurrency() {
})
});

let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(tcp);
println!("...done");
}
Expand Down

0 comments on commit cbbdd30

Please sign in to comment.