Skip to content

Commit

Permalink
new: implemented ssl support for AMQP module
Browse files Browse the repository at this point in the history
  • Loading branch information
evilsocket committed Oct 30, 2023
1 parent 23ddeb3 commit 3f04d13
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 20 deletions.
3 changes: 3 additions & 0 deletions src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ pub(crate) struct Options {
#[clap(long, default_value_t = false)]
pub quiet: bool,

#[cfg(feature = "amqp")]
#[clap(flatten, next_help_heading = "AMQP")]
pub amqp: crate::plugins::amqp::options::Options,
#[cfg(feature = "http")]
#[clap(flatten, next_help_heading = "HTTP")]
pub http: crate::plugins::http::options::Options,
Expand Down
68 changes: 48 additions & 20 deletions src/plugins/amqp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use crate::Plugin;

use crate::creds::Credentials;

pub(crate) mod options;

const PROTOCOL_HEADER_091: &[u8] = &[b'A', b'M', b'Q', b'P', 0, 0, 9, 1];

#[ctor]
Expand All @@ -24,6 +26,7 @@ pub(crate) struct AMQP {
host: String,
port: u16,
address: String,
ssl: bool,
}

impl AMQP {
Expand All @@ -32,29 +35,18 @@ impl AMQP {
host: String::new(),
port: 5672,
address: String::new(),
ssl: false,
}
}
}

#[async_trait]
impl Plugin for AMQP {
fn description(&self) -> &'static str {
"AMQP password authentication (ActiveMQ, RabbitMQ, Qpid, JORAM and Solace)."
}

fn setup(&mut self, opts: &Options) -> Result<(), Error> {
(self.host, self.port) = utils::parse_target(opts.target.as_ref(), 5672)?;
self.address = format!("{}:{}", &self.host, self.port);
Ok(())
}

async fn attempt(&self, creds: &Credentials, timeout: Duration) -> Result<Option<Loot>, Error> {
// TODO: SSL
let mut stream = tokio::time::timeout(timeout, TcpStream::connect(&self.address))
.await
.map_err(|e| e.to_string())?
.map_err(|e| e.to_string())?;

async fn attempt_with_stream<S>(
&self,
creds: &Credentials,
mut stream: S,
) -> Result<Option<Loot>, Error>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
// send proto header
stream
.write_all(PROTOCOL_HEADER_091)
Expand Down Expand Up @@ -120,3 +112,39 @@ impl Plugin for AMQP {
}
}
}

#[async_trait]
impl Plugin for AMQP {
fn description(&self) -> &'static str {
"AMQP password authentication (ActiveMQ, RabbitMQ, Qpid, JORAM and Solace)."
}

fn setup(&mut self, opts: &Options) -> Result<(), Error> {
(self.host, self.port) = utils::parse_target(opts.target.as_ref(), 5672)?;
self.address = format!("{}:{}", &self.host, self.port);
self.ssl = opts.amqp.amqp_ssl;
Ok(())
}

async fn attempt(&self, creds: &Credentials, timeout: Duration) -> Result<Option<Loot>, Error> {
let tcp_stream = tokio::time::timeout(timeout, TcpStream::connect(&self.address))
.await
.map_err(|e| e.to_string())?
.map_err(|e| e.to_string())?;

if !self.ssl {
self.attempt_with_stream(creds, tcp_stream).await
} else {
let tls = async_native_tls::TlsConnector::new()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true);

let stream = tokio::time::timeout(timeout, tls.connect(&self.host, tcp_stream))
.await
.map_err(|e| e.to_string())?
.map_err(|e| e.to_string())?;

self.attempt_with_stream(creds, stream).await
}
}
}
9 changes: 9 additions & 0 deletions src/plugins/amqp/options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use clap::Parser;
use serde::{Deserialize, Serialize};

#[derive(Parser, Debug, Serialize, Deserialize, Clone, Default)]
pub(crate) struct Options {
#[clap(long, default_value_t = false)]
/// Enable SSL for AMQP.
pub amqp_ssl: bool,
}

0 comments on commit 3f04d13

Please sign in to comment.