Skip to content

Commit

Permalink
refactor(storage): part1. use arc sst info in hummock version
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Sep 18, 2024
1 parent 06d5cde commit 852b9a7
Show file tree
Hide file tree
Showing 70 changed files with 1,171 additions and 717 deletions.
23 changes: 12 additions & 11 deletions src/ctl/src/cmd_impl/hummock/sst_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAw
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_frontend::TableCatalog;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::level::Level;
use risingwave_hummock_sdk::level::LevelCommon;
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::sstable_info_ref::SstableInfoReader;
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_object_store::object::{ObjectMetadata, ObjectStoreImpl};
use risingwave_rpc_client::MetaClient;
Expand Down Expand Up @@ -84,13 +85,13 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
for level in version.get_combined_levels() {
for sstable_info in &level.table_infos {
if let Some(object_id) = &args.object_id {
if *object_id == sstable_info.object_id {
if *object_id == sstable_info.object_id() {
print_level(level, sstable_info);
sst_dump_via_sstable_store(
&sstable_store,
sstable_info.object_id,
sstable_info.meta_offset,
sstable_info.file_size,
sstable_info.object_id(),
sstable_info.meta_offset(),
sstable_info.file_size(),
&table_data,
&args,
)
Expand All @@ -101,9 +102,9 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
print_level(level, sstable_info);
sst_dump_via_sstable_store(
&sstable_store,
sstable_info.object_id,
sstable_info.meta_offset,
sstable_info.file_size,
sstable_info.object_id(),
sstable_info.meta_offset(),
sstable_info.file_size(),
&table_data,
&args,
)
Expand Down Expand Up @@ -161,14 +162,14 @@ pub async fn sst_dump(context: &CtlContext, args: SstDumpArgs) -> anyhow::Result
Ok(())
}

fn print_level(level: &Level, sst_info: &SstableInfo) {
fn print_level<T: SstableInfoReader>(level: &LevelCommon<T>, sst_info: &T) {
println!("Level Type: {}", level.level_type.as_str_name());
println!("Level Idx: {}", level.level_idx);
if level.level_idx == 0 {
println!("L0 Sub-Level Idx: {}", level.sub_level_id);
}
println!("SST id: {}", sst_info.sst_id);
println!("SST table_ids: {:?}", sst_info.table_ids);
println!("SST id: {}", sst_info.sst_id());
println!("SST table_ids: {:?}", sst_info.table_ids());
}

fn print_object(obj: &ObjectMetadata) {
Expand Down
9 changes: 7 additions & 2 deletions src/ctl/src/cmd_impl/hummock/validate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext;
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::sstable_info_ref::SstableInfoReader;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::{version_archive_dir, HummockSstableObjectId, HummockVersionId};
use risingwave_object_store::object::ObjectStoreRef;
Expand Down Expand Up @@ -102,7 +103,7 @@ async fn print_user_key_in_version(
for cg in version.levels.values() {
for level in cg.l0.sub_levels.iter().rev().chain(cg.levels.iter()) {
for sstable_info in &level.table_infos {
let key_range = &sstable_info.key_range;
let key_range = &sstable_info.key_range();
let left_user_key = FullKey::decode(&key_range.left);
let right_user_key = FullKey::decode(&key_range.right);
if left_user_key.user_key > *target_key || *target_key > right_user_key.user_key {
Expand Down Expand Up @@ -146,7 +147,11 @@ async fn print_user_key_in_sst(
let date_time = DateTime::<Utc>::from(epoch.as_system_time());
if is_first {
is_first = false;
println!("\t\tSST id: {}, object id: {}", sst.sst_id, sst.object_id);
println!(
"\t\tSST id: {}, object id: {}",
sst.sst_id(),
sst.object_id()
);
}
println!("\t\t key: {:?}, len={}", full_key, full_key.encoded_len());
println!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::HashMap;
use itertools::Itertools;
use risingwave_common::types::{Fields, JsonbVal};
use risingwave_frontend_macro::system_catalog;
use risingwave_hummock_sdk::sstable_info_ref::SstableInfoReader;
use risingwave_hummock_sdk::version::HummockVersion;
use serde_json::json;

Expand Down Expand Up @@ -114,28 +115,28 @@ fn version_to_sstable_rows(version: HummockVersion) -> Vec<RwHummockSstable> {
for cg in version.levels.into_values() {
for level in cg.levels.into_iter().chain(cg.l0.sub_levels) {
for sst in level.table_infos {
let key_range = sst.key_range;
let key_range = sst.key_range();
sstables.push(RwHummockSstable {
sstable_id: sst.sst_id as _,
object_id: sst.object_id as _,
sstable_id: sst.sst_id() as _,
object_id: sst.object_id() as _,
compaction_group_id: cg.group_id as _,
level_id: level.level_idx as _,
sub_level_id: (level.level_idx == 0).then_some(level.sub_level_id as _),
level_type: level.level_type as _,
key_range_left: key_range.left.to_vec(),
key_range_right: key_range.right.to_vec(),
right_exclusive: key_range.right_exclusive,
file_size: sst.file_size as _,
meta_offset: sst.meta_offset as _,
stale_key_count: sst.stale_key_count as _,
total_key_count: sst.total_key_count as _,
min_epoch: sst.min_epoch as _,
max_epoch: sst.max_epoch as _,
uncompressed_file_size: sst.uncompressed_file_size as _,
range_tombstone_count: sst.range_tombstone_count as _,
bloom_filter_kind: sst.bloom_filter_kind as _,
table_ids: json!(sst.table_ids).into(),
sst_size: sst.sst_size as _,
file_size: sst.file_size() as _,
meta_offset: sst.meta_offset() as _,
stale_key_count: sst.stale_key_count() as _,
total_key_count: sst.total_key_count() as _,
min_epoch: sst.min_epoch() as _,
max_epoch: sst.max_epoch() as _,
uncompressed_file_size: sst.uncompressed_file_size() as _,
range_tombstone_count: sst.range_tombstone_count() as _,
bloom_filter_kind: sst.bloom_filter_kind() as _,
table_ids: json!(sst.table_ids()).into(),
sst_size: sst.sst_size() as _,
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/java_binding/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::sstable_info_ref::HummockVersionType;
use risingwave_jni_core::HummockJavaBindingIterator;
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
Expand Down Expand Up @@ -141,7 +141,7 @@ pub(crate) async fn new_hummock_java_binding_iter(
let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
let key_range = read_plan.key_range.unwrap();
let pin_version = PinnedVersion::new(
HummockVersion::from_rpc_protobuf(&read_plan.version.unwrap()),
HummockVersionType::from_rpc_protobuf(&read_plan.version.unwrap()),
unbounded_channel().0,
);
let table_id = read_plan.table_id.into();
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use picker::{LevelCompactionPicker, TierCompactionPicker};
use risingwave_hummock_sdk::sstable_info_ref::SstableInfoReader;
use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
Expand Down Expand Up @@ -179,7 +180,7 @@ impl CompactStatus {
let exist_table_ids = HashSet::<u32>::from_iter(task.existing_table_ids.clone());
task.input_ssts.iter().all(|level| {
level.table_infos.iter().all(|sst| {
sst.table_ids
sst.table_ids()
.iter()
.all(|table_id| !exist_table_ids.contains(table_id))
})
Expand Down
16 changes: 9 additions & 7 deletions src/meta/src/hummock/compaction/overlap_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::ops::Range;
use itertools::Itertools;
use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_hummock_sdk::sstable_info_ref::SstableInfoReader;
use risingwave_hummock_sdk::KeyComparator;

pub trait OverlapInfo: Debug {
Expand Down Expand Up @@ -85,14 +86,14 @@ impl OverlapInfo for RangeOverlapInfo {
match self.target_range.as_ref() {
Some(key_range) => {
let overlap_begin = others.partition_point(|table_status| {
table_status.key_range.compare_right_with(&key_range.left)
table_status.key_range().compare_right_with(&key_range.left)
== cmp::Ordering::Less
});
if overlap_begin >= others.len() {
return overlap_begin..overlap_begin;
}
let overlap_end = others.partition_point(|table_status| {
key_range.compare_right_with(&table_status.key_range.left)
key_range.compare_right_with(&table_status.key_range().left)
!= cmp::Ordering::Less
});
overlap_begin..overlap_end
Expand All @@ -106,7 +107,7 @@ impl OverlapInfo for RangeOverlapInfo {
Some(key_range) => {
let overlap_begin = others.partition_point(|table_status| {
KeyComparator::compare_encoded_full_key(
&table_status.key_range.left,
&table_status.key_range().left,
&key_range.left,
) == cmp::Ordering::Less
});
Expand All @@ -115,7 +116,8 @@ impl OverlapInfo for RangeOverlapInfo {
}
let mut overlap_end = overlap_begin;
for table in &others[overlap_begin..] {
if key_range.compare_right_with(&table.key_range.right) == cmp::Ordering::Less {
if key_range.compare_right_with(&table.key_range().right) == cmp::Ordering::Less
{
break;
}
overlap_end += 1;
Expand All @@ -127,7 +129,7 @@ impl OverlapInfo for RangeOverlapInfo {
}

fn update(&mut self, table: &SstableInfo) {
let other = &table.key_range;
let other = table.key_range();
if let Some(range) = self.target_range.as_mut() {
range.full_key_extend(other);
return;
Expand All @@ -141,7 +143,7 @@ pub struct RangeOverlapStrategy {}

impl OverlapStrategy for RangeOverlapStrategy {
fn check_overlap(&self, a: &SstableInfo, b: &SstableInfo) -> bool {
check_table_overlap(&a.key_range, b)
check_table_overlap(a.key_range(), b)
}

fn create_overlap_info(&self) -> Box<dyn OverlapInfo> {
Expand All @@ -150,5 +152,5 @@ impl OverlapStrategy for RangeOverlapStrategy {
}

fn check_table_overlap(key_range: &KeyRange, table: &SstableInfo) -> bool {
key_range.sstable_overlap(&table.key_range)
key_range.sstable_overlap(table.key_range())
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use itertools::Itertools;
use risingwave_common::config::default::compaction_config;
use risingwave_hummock_sdk::level::{InputLevel, Level, Levels, OverlappingLevel};
use risingwave_hummock_sdk::sstable_info_ref::SstableInfoReader;
use risingwave_pb::hummock::{CompactionConfig, LevelType};

use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker;
Expand Down Expand Up @@ -213,12 +214,13 @@ impl LevelCompactionPicker {
let mut target_level_size = 0;
let mut pending_compact = false;
for sst in &target_level_ssts {
if level_handlers[target_level.level_idx as usize].is_pending_compact(&sst.sst_id) {
if level_handlers[target_level.level_idx as usize].is_pending_compact(&sst.sst_id())
{
pending_compact = true;
break;
}

target_level_size += sst.sst_size;
target_level_size += sst.sst_size();
}

if pending_compact {
Expand Down Expand Up @@ -346,8 +348,8 @@ pub mod tests {
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels[0].table_infos.len(), 1);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 4);
assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 1);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id(), 4);
assert_eq!(ret.input_levels[1].table_infos[0].sst_id(), 1);

ret.add_pending_task(0, &mut levels_handler);
{
Expand All @@ -358,14 +360,14 @@ pub mod tests {
.unwrap();

assert_eq!(ret2.input_levels[0].table_infos.len(), 1);
assert_eq!(ret2.input_levels[0].table_infos[0].sst_id, 6);
assert_eq!(ret2.input_levels[1].table_infos[0].sst_id, 5);
assert_eq!(ret2.input_levels[0].table_infos[0].sst_id(), 6);
assert_eq!(ret2.input_levels[1].table_infos[0].sst_id(), 5);
}

levels.l0.sub_levels[0]
.table_infos
.retain(|table| table.sst_id != 4);
levels.l0.total_file_size -= ret.input_levels[0].table_infos[0].file_size;
.retain(|table| table.sst_id() != 4);
levels.l0.total_file_size -= ret.input_levels[0].table_infos[0].file_size();

levels_handler[0].remove_task(0);
levels_handler[1].remove_task(0);
Expand All @@ -374,11 +376,11 @@ pub mod tests {
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels.len(), 3);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id(), 6);
assert_eq!(ret.input_levels[1].table_infos[0].sst_id(), 5);
assert_eq!(ret.input_levels[2].table_infos.len(), 2);
assert_eq!(ret.input_levels[2].table_infos[0].sst_id, 3);
assert_eq!(ret.input_levels[2].table_infos[1].sst_id, 2);
assert_eq!(ret.input_levels[2].table_infos[0].sst_id(), 3);
assert_eq!(ret.input_levels[2].table_infos[1].sst_id(), 2);
ret.add_pending_task(1, &mut levels_handler);

let mut local_stats = LocalPickerStatistic::default();
Expand All @@ -395,8 +397,8 @@ pub mod tests {
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels.len(), 3);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
assert_eq!(ret.input_levels[1].table_infos[0].sst_id, 5);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id(), 6);
assert_eq!(ret.input_levels[1].table_infos[0].sst_id(), 5);
assert_eq!(ret.input_levels[2].table_infos.len(), 2);
}
#[test]
Expand Down Expand Up @@ -455,7 +457,7 @@ pub mod tests {
ret.input_levels[0]
.table_infos
.iter()
.map(|t| t.sst_id)
.map(|t| t.sst_id())
.collect_vec(),
vec![1]
);
Expand All @@ -464,7 +466,7 @@ pub mod tests {
ret.input_levels[1]
.table_infos
.iter()
.map(|t| t.sst_id)
.map(|t| t.sst_id())
.collect_vec(),
vec![3,]
);
Expand Down Expand Up @@ -610,7 +612,7 @@ pub mod tests {
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 7);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id(), 7);
assert_eq!(
3,
ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
Expand Down Expand Up @@ -638,7 +640,7 @@ pub mod tests {
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels[0].table_infos[0].sst_id, 6);
assert_eq!(ret.input_levels[0].table_infos[0].sst_id(), 6);
assert_eq!(
2,
ret.input_levels.iter().filter(|l| l.level_idx == 0).count()
Expand Down Expand Up @@ -754,13 +756,13 @@ pub mod tests {
// 1. trivial_move
assert_eq!(2, ret.input_levels.len());
assert!(ret.input_levels[1].table_infos.is_empty());
assert_eq!(5, ret.input_levels[0].table_infos[0].sst_id);
assert_eq!(5, ret.input_levels[0].table_infos[0].sst_id());
ret.add_pending_task(0, &mut levels_handler);

let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(3, ret.input_levels.len());
assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id);
assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::config::default::compaction_config;
use risingwave_hummock_sdk::sstable_info_ref::SstableInfoReader;
use risingwave_pb::hummock::CompactionConfig;

use super::{CompactionInput, LocalPickerStatistic};
Expand Down Expand Up @@ -157,7 +158,7 @@ impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
let level_select_size = select_level
.table_infos
.iter()
.map(|sst| sst.sst_size)
.map(|sst| sst.sst_size())
.sum::<u64>();

max_level_size = std::cmp::max(max_level_size, level_select_size);
Expand Down
Loading

0 comments on commit 852b9a7

Please sign in to comment.