Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage controller: make proxying of GETs to pageservers more robust #9065

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions storage_controller/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ async fn handle_tenant_timeline_passthrough(
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);

// Find the node that holds shard zero
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?;
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;

// Callers will always pass an unsharded tenant ID. Before proxying, we must
// rewrite this to a shard-aware shard zero ID.
Expand Down Expand Up @@ -431,10 +431,10 @@ async fn handle_tenant_timeline_passthrough(
let _timer = latency.start_timer(labels.clone());

let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
let resp = client.get_raw(path).await.map_err(|_e|
// FIXME: give APiError a proper Unavailable variant. We return 503 here because
// if we can't successfully send a request to the pageserver, we aren't available.
ApiError::ShuttingDown)?;
let resp = client.get_raw(path).await.map_err(|e|
// We return 503 here because if we can't successfully send a request to the pageserver,
// either we aren't available or the pageserver is unavailable.
ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {}: {e}", node).into()))?;

if !resp.status().is_success() {
let error_counter = &METRICS_REGISTRY
Expand All @@ -443,6 +443,19 @@ async fn handle_tenant_timeline_passthrough(
error_counter.inc(labels);
}

// Transform 404 into 503 if we raced with a migration
if resp.status() == reqwest::StatusCode::NOT_FOUND {
// Look up node again: if we migrated it will be different
let (new_node, _tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
if new_node.get_id() != node.get_id() {
// Rather than retry here, send the client a 503 to prompt a retry: this matches
// the pageserver's use of 503, and all clients calling this API should retry on 503.
return Err(ApiError::ResourceUnavailable(
format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
));
}
}

// We have a reqest::Response, would like a http::Response
let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
for (k, v) in resp.headers() {
Expand Down
74 changes: 53 additions & 21 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3498,34 +3498,66 @@ impl Service {

/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
/// function looks up and returns node. If the tenant isn't found, returns Err(ApiError::NotFound)
pub(crate) fn tenant_shard0_node(
pub(crate) async fn tenant_shard0_node(
&self,
tenant_id: TenantId,
) -> Result<(Node, TenantShardId), ApiError> {
let locked = self.inner.read().unwrap();
let Some((tenant_shard_id, shard)) = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.next()
else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
));
// Look up in-memory state and maybe use the node from there.
{
let locked = self.inner.read().unwrap();
let Some((tenant_shard_id, shard)) = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.next()
else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
));
};

let Some(intent_node_id) = shard.intent.get_attached() else {
tracing::warn!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
"Shard not scheduled (policy {:?}), cannot generate pass-through URL",
shard.policy
);
return Err(ApiError::Conflict(
"Cannot call timeline API on non-attached tenant".to_string(),
));
};

if shard.reconciler.is_none() {
// Optimization: while no reconcile is in flight, we may trust our in-memory state
// to tell us which pageserver to use. Otherwise we will fall through and hit the database
let Some(node) = locked.nodes.get(intent_node_id) else {
// This should never happen
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Shard refers to nonexistent node"
)));
};
return Ok((node.clone(), *tenant_shard_id));
}
};

// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
// point to somewhere we haven't attached yet.
let Some(node_id) = shard.intent.get_attached() else {
tracing::warn!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
"Shard not scheduled (policy {:?}), cannot generate pass-through URL",
shard.policy
);
return Err(ApiError::Conflict(
"Cannot call timeline API on non-attached tenant".to_string(),
// Look up the latest attached pageserver location from the database
// generation state: this will reflect the progress of any ongoing migration.
// Note that it is not guaranteed to _stay_ here, our caller must still handle
// the case where they call through to the pageserver and get a 404.
let db_result = self.persistence.tenant_generations(tenant_id).await?;
let Some(ShardGenerationState {
tenant_shard_id,
generation: _,
generation_pageserver: Some(node_id),
}) = db_result.first()
else {
// This can happen if we raced with a tenant deletion or a shard split. On a retry
// the caller will either succeed (shard split case), get a proper 404 (deletion case),
// or a conflict response (case where tenant was detached in background)
return Err(ApiError::ResourceUnavailable(
"Shard {} not found in database, or is not attached".into(),
));
};

let locked = self.inner.read().unwrap();
let Some(node) = locked.nodes.get(node_id) else {
// This should never happen
return Err(ApiError::InternalServerError(anyhow::anyhow!(
Expand Down
Loading