在levelDB中所有KV数据都是存储在Memtable,Immutable Memtable和SSTable中的,ImmutableMemtable从结构上讲和Memtable是完全一样的,区别仅仅在于其是只读的,不允许写入操作,而Memtable则是允许写入和读取的。当Memtable写入的数据占用内存到达指定数量,则自动转换为ImmutableMemtable,等待Dump到磁盘中,系统会自动生成新的Memtable供写操作写入新数据,理解了Memtable,那么ImmutableMemtable自然不在话下。
LevelDb的MemTable提供了将KV数据写入,删除以及读取KV记录的操作接口,但是事实上Memtable并不存在真正的删除操作,删除某个Key的Value在Memtable内是作为插入一条记录实施的,但是会打上一个Key的删除标记,真正的删除操作是Lazy的,会在以后的Compaction过程中去掉这个KV。需要注意的是,LevelDb的Memtable中KV对是根据Key大小有序存储的,在系统插入新的KV时,LevelDb要把这个KV插到合适的位置上以保持这种Key有序性。其实,LevelDb的Memtable类只是一个接口类,真正的操作是通过背后的SkipList来做的,包括插入操作和读取操作等,所以Memtable的核心数据结构是一个SkipList。
db数据在内存中的存储格式。写操作的数据都会先写到memtable中。memtable的size有限制最大值(write_buffer_size)。memtable的实现是skiplist,当一个memtablesize达到阀值时,会变成只读的memtable(immutable memtable),同时生成一个新的memtable供新的写入。后台的compact进程会负责将immutable memtable dump成sstable。所以,同时最多会存在两个memtable(正在写的memtable和immutablememtable)。
class MemTable { public: // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator); // Increase reference count. void Ref() { ++refs_; } // Drop reference count. Delete if no more references exist. void Unref() { --refs_; assert(refs_ >= 0); if (refs_ <= 0) { delete this; } } // Returns an estimate of the number of bytes of data in use by this // data structure. // // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable. size_t ApproximateMemoryUsage(); // Return an iterator that yields the contents of the memtable. // // The caller must ensure that the underlying MemTable remains live // while the returned iterator is live. The keys returned by this // iterator are internal keys encoded by AppendInternalKey in the // db/format.{h,cc} module. Iterator* NewIterator(); // Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. // Typically value will be empty if type==kTypeDeletion. void Add(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value); // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s); private: ~MemTable(); // Private since only Unref() should be used to delete it struct KeyComparator { const InternalKeyComparator comparator; explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } int operator()(const char* a, const char* b) const; }; friend class MemTableIterator; friend class MemTableBackwardIterator; typedef SkipList<const char*, KeyComparator> Table; KeyComparator comparator_; int refs_; Arena arena_; Table table_; // No copying allowed MemTable(const MemTable&); void operator=(const MemTable&);};
MemTable只是一层封装,实际的数据结构是一个SkipListtypedef SkipList<const char*, KeyComparator> Table;
MemTable有一个公有的构造函数,Ref()和unRef()和一个返回迭代器的成员函数,除此之外,就只有3个公有的成员函数了,分别是:
其中,第一个成员函数只是简单的封装,最后调用的是Arena中的成员函数
size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); }
Arena是一个内存管理类,可以参考这里。
下面来分析另外两个成员函数:
void Add(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value);
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, const Slice& value) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] // value_size : varint32 of value.size() // value bytes : char[value.size()] size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8; const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; char* buf = arena_.Allocate(encoded_len); char* p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); p += key_size; EncodeFixed64(p, (s << 8) | type); p += 8; p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert((p + val_size) - buf == encoded_len); table_.Insert(buf); }
MemTable::add只是简单的将SequenceNumber和ValueType以及消息编码成一个字符串,存放在buf数组中,然后调用table.Insert(buf)插入数据。
这里要解释的是buf的内容:它包括
其中,internal_key只是一个结构体,封装了key,SequenceNumber和Type;因为SequenceNumber和Type一起存放在一个64位的整型里面,所以才有:
internal_key_size = key.size + 8
该函数里的其他语句就是将internal_key_size和val.isze()编码成varint,然后把数据依次存放到buf中。
bool Get(const LookupKey& key, std::string* value, Status* s);
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); if (iter.Valid()) { // entry format is: // klength varint32 // userkey char[klength] // tag uint64 // vlength varint32 // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. const char* entry = iter.key(); uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length); if (comparator_.comparator.user_comparator()->Compare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast<ValueType>(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); value->assign(v.data(), v.size()); return true; } case kTypeDeletion: *s = Status::NotFound(Slice()); return true; } } } return false; }
理解MemTable 更多的是理解消息封装和编码,在MemTable::Get函数中又出现了LookupKey ,我们暂时不要管他,先看函数体。函数体通过在Table 中查找key,找到就返回一个迭代器,然后获取迭代器的值,解码得到信息。
通过下面的语句获取internal_key_size ,因为varint最多不会超过5个字节,所以有起点是entry 终点是 entry +5,具体怎么在一个字符串数组中获取一个int ,请参考这里。
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
再次比较返回的key与user_key 是否相同,这里之所以会有 key_length - 8是因为key_length包含了key.size()和SequenceNumber(8个字节,这8个字节里前7个字节是序列号,最后一个字节是消息类型)。
if (comparator_.comparator.user_comparator()->Compare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key
对key后面的8个字节解码,然后获取这8个字节的最后一个字节,判断消息是什么类型:
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast<ValueType>(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); value->assign(v.data(), v.size()); return true; } case kTypeDeletion: *s = Status::NotFound(Slice()); return true; }
如果消息不是kTypeDeletion类型,那么消息就是有效的,接下来就获取value的size,再获取value的值。
联系客服