typed_store/rocks/
util.rs1use std::cmp::Ordering;
6
7use rocksdb::{CompactionDecision, MergeOperands};
8
9pub fn reference_count_merge_operator(
13 _key: &[u8],
14 stored_value: Option<&[u8]>,
15 operands: &MergeOperands,
16) -> Option<Vec<u8>> {
17 let (mut value, mut ref_count) = stored_value.map_or((None, 0), deserialize_ref_count_value);
18
19 for operand in operands {
20 let (new_value, delta) = deserialize_ref_count_value(operand);
21 assert!(value.is_none() || new_value.is_none() || value == new_value);
22 if value.is_none() && new_value.is_some() {
23 value = new_value;
24 }
25 ref_count += delta;
26 }
27 match ref_count.cmp(&0) {
28 Ordering::Greater => Some([value.unwrap_or(b""), &ref_count.to_le_bytes()].concat()),
29 Ordering::Equal => Some(vec![]),
30 Ordering::Less => Some(ref_count.to_le_bytes().to_vec()),
31 }
32}
33
34pub fn empty_compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> CompactionDecision {
35 if value.is_empty() {
36 CompactionDecision::Remove
37 } else {
38 CompactionDecision::Keep
39 }
40}
41
42pub fn is_ref_count_value(value: &[u8]) -> bool {
43 value.is_empty() || value.len() == 8
44}
45
46fn deserialize_ref_count_value(bytes: &[u8]) -> (Option<&[u8]>, i64) {
47 if bytes.is_empty() {
48 return (None, 0);
49 }
50 assert!(bytes.len() >= 8);
51 let (value, rc_bytes) = bytes.split_at(bytes.len() - 8);
52 let ref_count = i64::from_le_bytes(rc_bytes.try_into().unwrap());
53 (if value.is_empty() { None } else { Some(value) }, ref_count)
54}
55
56#[cfg(test)]
57mod tests {
58 use super::deserialize_ref_count_value;
59
60 #[test]
61 fn deserialize_ref_count_value_test() {
62 assert_eq!(deserialize_ref_count_value(&[]), (None, 0));
63 assert_eq!(
64 deserialize_ref_count_value(b"\x01\0\0\0\0\0\0\0"),
65 (None, 1)
66 );
67 assert_eq!(
68 deserialize_ref_count_value(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
69 (None, -1)
70 );
71 assert_eq!(
72 deserialize_ref_count_value(b"\xfe\xff\xff\xff\xff\xff\xff\xff"),
73 (None, -2)
74 );
75 assert_eq!(
76 deserialize_ref_count_value(b"test\x04\0\0\0\0\0\0\0"),
77 (Some(b"test".as_ref()), 4)
78 );
79 }
80}