typed_store/rocks/
options.rs1use std::{collections::BTreeMap, env};
6
7use rocksdb::{BlockBasedOptions, Cache, ReadOptions};
8use tap::TapFallible;
9use tracing::{info, warn};
10
11const ENV_VAR_DB_WRITE_BUFFER_SIZE: &str = "DB_WRITE_BUFFER_SIZE_MB";
14const DEFAULT_DB_WRITE_BUFFER_SIZE: usize = 1024;
15
16const ENV_VAR_DB_WAL_SIZE: &str = "DB_WAL_SIZE_MB";
19const DEFAULT_DB_WAL_SIZE: usize = 1024;
20
21const ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER: &str = "L0_NUM_FILES_COMPACTION_TRIGGER";
24const DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 4;
25const DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER: usize = 80;
26const ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB: &str = "MAX_WRITE_BUFFER_SIZE_MB";
27const DEFAULT_MAX_WRITE_BUFFER_SIZE_MB: usize = 256;
28const ENV_VAR_MAX_WRITE_BUFFER_NUMBER: &str = "MAX_WRITE_BUFFER_NUMBER";
29const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 6;
30const ENV_VAR_TARGET_FILE_SIZE_BASE_MB: &str = "TARGET_FILE_SIZE_BASE_MB";
31const DEFAULT_TARGET_FILE_SIZE_BASE_MB: usize = 128;
32
33const ENV_VAR_DISABLE_BLOB_STORAGE: &str = "DISABLE_BLOB_STORAGE";
35const ENV_VAR_DB_PARALLELISM: &str = "DB_PARALLELISM";
36
37#[derive(Clone, Debug, Default)]
38pub struct ReadWriteOptions {
39 pub log_value_hash: bool,
42}
43
44impl ReadWriteOptions {
45 pub fn readopts(&self) -> ReadOptions {
46 ReadOptions::default()
47 }
48
49 pub fn set_log_value_hash(mut self, log_value_hash: bool) -> Self {
50 self.log_value_hash = log_value_hash;
51 self
52 }
53}
54
55#[derive(Default, Clone)]
56pub struct DBOptions {
57 pub options: rocksdb::Options,
58 pub rw_options: ReadWriteOptions,
59}
60
61#[derive(Clone)]
62pub struct DBMapTableConfigMap(BTreeMap<String, DBOptions>);
63impl DBMapTableConfigMap {
64 pub fn new(map: BTreeMap<String, DBOptions>) -> Self {
65 Self(map)
66 }
67
68 pub fn to_map(&self) -> BTreeMap<String, DBOptions> {
69 self.0.clone()
70 }
71}
72
73impl DBOptions {
74 pub fn optimize_for_point_lookup(mut self, block_cache_size_mb: usize) -> DBOptions {
78 self.options
80 .optimize_for_point_lookup(block_cache_size_mb as u64);
81 self
82 }
83
84 pub fn optimize_for_large_values_no_scan(mut self, min_blob_size: u64) -> DBOptions {
87 if env::var(ENV_VAR_DISABLE_BLOB_STORAGE).is_ok() {
88 info!("Large value blob storage optimization is disabled via env var.");
89 return self;
90 }
91
92 self.options.set_enable_blob_files(true);
94 self.options
95 .set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
96 self.options.set_enable_blob_gc(true);
97 self.options.set_min_blob_size(min_blob_size);
101
102 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
104 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
105 * 1024
106 * 1024;
107 self.options.set_write_buffer_size(write_buffer_size);
108 let target_file_size_base = 64 << 20;
111 self.options
112 .set_target_file_size_base(target_file_size_base);
113 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
115 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
116 self.options
117 .set_max_bytes_for_level_base(target_file_size_base * max_level_zero_file_num as u64);
118
119 self
120 }
121
122 pub fn optimize_for_read(mut self, block_cache_size_mb: usize) -> DBOptions {
124 self.options
125 .set_block_based_table_factory(&get_block_options(block_cache_size_mb, 16 << 10));
126 self
127 }
128
129 pub fn optimize_db_for_write_throughput(mut self, db_max_write_buffer_gb: u64) -> DBOptions {
131 self.options
132 .set_db_write_buffer_size(db_max_write_buffer_gb as usize * 1024 * 1024 * 1024);
133 self.options
134 .set_max_total_wal_size(db_max_write_buffer_gb * 1024 * 1024 * 1024);
135 self
136 }
137
138 pub fn optimize_for_write_throughput(mut self) -> DBOptions {
140 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
142 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
143 * 1024
144 * 1024;
145 self.options.set_write_buffer_size(write_buffer_size);
146 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
148 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
149 self.options
150 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
151 self.options
153 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
154
155 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
157 .unwrap_or(DEFAULT_L0_NUM_FILES_COMPACTION_TRIGGER);
158 self.options.set_level_zero_file_num_compaction_trigger(
159 max_level_zero_file_num.try_into().unwrap(),
160 );
161 self.options.set_level_zero_slowdown_writes_trigger(
162 (max_level_zero_file_num * 12).try_into().unwrap(),
163 );
164 self.options
165 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
166
167 self.options.set_target_file_size_base(
169 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
170 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
171 * 1024
172 * 1024,
173 );
174
175 self.options
177 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
178
179 self
180 }
181
182 pub fn optimize_for_write_throughput_no_deletion(mut self) -> DBOptions {
186 let write_buffer_size = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_SIZE_MB)
188 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_SIZE_MB)
189 * 1024
190 * 1024;
191 self.options.set_write_buffer_size(write_buffer_size);
192 let max_write_buffer_number = read_size_from_env(ENV_VAR_MAX_WRITE_BUFFER_NUMBER)
194 .unwrap_or(DEFAULT_MAX_WRITE_BUFFER_NUMBER);
195 self.options
196 .set_max_write_buffer_number(max_write_buffer_number.try_into().unwrap());
197 self.options
199 .set_max_write_buffer_size_to_maintain((write_buffer_size).try_into().unwrap());
200
201 self.options
203 .set_compaction_style(rocksdb::DBCompactionStyle::Universal);
204 let mut compaction_options = rocksdb::UniversalCompactOptions::default();
205 compaction_options.set_max_size_amplification_percent(10000);
206 compaction_options.set_stop_style(rocksdb::UniversalCompactionStopStyle::Similar);
207 self.options
208 .set_universal_compaction_options(&compaction_options);
209
210 let max_level_zero_file_num = read_size_from_env(ENV_VAR_L0_NUM_FILES_COMPACTION_TRIGGER)
211 .unwrap_or(DEFAULT_UNIVERSAL_COMPACTION_L0_NUM_FILES_COMPACTION_TRIGGER);
212 self.options.set_level_zero_file_num_compaction_trigger(
213 max_level_zero_file_num.try_into().unwrap(),
214 );
215 self.options.set_level_zero_slowdown_writes_trigger(
216 (max_level_zero_file_num * 12).try_into().unwrap(),
217 );
218 self.options
219 .set_level_zero_stop_writes_trigger((max_level_zero_file_num * 16).try_into().unwrap());
220
221 self.options.set_target_file_size_base(
223 read_size_from_env(ENV_VAR_TARGET_FILE_SIZE_BASE_MB)
224 .unwrap_or(DEFAULT_TARGET_FILE_SIZE_BASE_MB) as u64
225 * 1024
226 * 1024,
227 );
228
229 self.options
231 .set_max_bytes_for_level_base((write_buffer_size * max_level_zero_file_num) as u64);
232
233 self
234 }
235
236 pub fn set_block_options(
238 mut self,
239 block_cache_size_mb: usize,
240 block_size_bytes: usize,
241 ) -> DBOptions {
242 self.options
243 .set_block_based_table_factory(&get_block_options(
244 block_cache_size_mb,
245 block_size_bytes,
246 ));
247 self
248 }
249
250 pub fn disable_write_throttling(mut self) -> DBOptions {
252 self.options.set_soft_pending_compaction_bytes_limit(0);
253 self.options.set_hard_pending_compaction_bytes_limit(0);
254 self
255 }
256}
257
258pub fn default_db_options() -> DBOptions {
261 let mut opt = rocksdb::Options::default();
262
263 if let Some(limit) = fdlimit::raise_fd_limit() {
267 opt.set_max_open_files((limit / 8) as i32);
269 }
270
271 opt.set_table_cache_num_shard_bits(10);
274
275 opt.set_compression_type(rocksdb::DBCompressionType::Lz4);
277 opt.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd);
278 opt.set_bottommost_zstd_max_train_bytes(1024 * 1024, true);
279
280 opt.set_db_write_buffer_size(
292 read_size_from_env(ENV_VAR_DB_WRITE_BUFFER_SIZE).unwrap_or(DEFAULT_DB_WRITE_BUFFER_SIZE)
293 * 1024
294 * 1024,
295 );
296 opt.set_max_total_wal_size(
297 read_size_from_env(ENV_VAR_DB_WAL_SIZE).unwrap_or(DEFAULT_DB_WAL_SIZE) as u64 * 1024 * 1024,
298 );
299
300 opt.increase_parallelism(read_size_from_env(ENV_VAR_DB_PARALLELISM).unwrap_or(8) as i32);
302
303 opt.set_enable_pipelined_write(true);
304
305 opt.set_block_based_table_factory(&get_block_options(128, 16 << 10));
308
309 opt.set_memtable_prefix_bloom_ratio(0.02);
311
312 DBOptions {
313 options: opt,
314 rw_options: ReadWriteOptions::default(),
315 }
316}
317
318fn get_block_options(block_cache_size_mb: usize, block_size_bytes: usize) -> BlockBasedOptions {
319 let mut block_options = BlockBasedOptions::default();
324 block_options.set_block_size(block_size_bytes);
326 block_options.set_block_cache(&Cache::new_lru_cache(block_cache_size_mb << 20));
328 block_options.set_bloom_filter(10.0, false);
330 block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
332 block_options
333}
334
335pub fn list_tables(path: std::path::PathBuf) -> eyre::Result<Vec<String>> {
336 const DB_DEFAULT_CF_NAME: &str = "default";
337
338 let opts = rocksdb::Options::default();
339 rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&opts, path)
340 .map_err(|e| e.into())
341 .map(|q| {
342 q.iter()
343 .filter_map(|s| {
344 if s != DB_DEFAULT_CF_NAME {
346 Some(s.clone())
347 } else {
348 None
349 }
350 })
351 .collect()
352 })
353}
354
355pub fn read_size_from_env(var_name: &str) -> Option<usize> {
356 env::var(var_name)
357 .ok()?
358 .parse::<usize>()
359 .tap_err(|e| {
360 warn!(
361 "Env var {} does not contain valid usize integer: {}",
362 var_name, e
363 )
364 })
365 .ok()
366}