Skip to content

Commit

Permalink
Close read-only segments after timeout (#360)
Browse files Browse the repository at this point in the history
Avoid keeping all segments mapped in memory until deleted. Instead keep
only few of them at a time.
  • Loading branch information
merlimat committed Jul 6, 2023
1 parent 996c226 commit e28f543
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 21 deletions.
68 changes: 68 additions & 0 deletions common/ref_count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"io"
"sync/atomic"
)

type RefCount[T io.Closer] interface {
io.Closer

Acquire() RefCount[T]

RefCnt() int32

Get() T
}

func NewRefCount[T io.Closer](t T) RefCount[T] {
res := &refCount[T]{
rc: &atomic.Int32{},
t: t,
}
res.rc.Store(1)
return res
}

func (r refCount[T]) RefCnt() int32 {
return r.rc.Load()
}

type refCount[T io.Closer] struct {
rc *atomic.Int32
t T
}

func (r refCount[T]) Close() error {
if count := r.rc.Add(-1); count == 0 {
return r.t.Close()
}

return nil
}

func (r refCount[T]) Acquire() RefCount[T] {
r.rc.Add(1)
return &refCount[T]{
rc: r.rc,
t: r.t,
}
}

func (r refCount[T]) Get() T {
return r.t
}
54 changes: 54 additions & 0 deletions common/ref_count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"github.com/stretchr/testify/assert"
"testing"
)

type testRc struct {
closeFunc func()
}

func (t *testRc) Close() error {
t.closeFunc()
return nil
}

func TestRefCount(t *testing.T) {
var done bool
rc := NewRefCount[*testRc](&testRc{
closeFunc: func() {
done = true
},
})

assert.EqualValues(t, 1, rc.RefCnt())

rc2 := rc.Acquire()
assert.EqualValues(t, 2, rc.RefCnt())
assert.EqualValues(t, 2, rc2.RefCnt())

assert.NoError(t, rc.Close())
assert.EqualValues(t, 1, rc.RefCnt())
assert.EqualValues(t, 1, rc2.RefCnt())
assert.False(t, done)

assert.NoError(t, rc.Close())
assert.EqualValues(t, 0, rc.RefCnt())
assert.EqualValues(t, 0, rc2.RefCnt())
assert.True(t, done)
}
25 changes: 19 additions & 6 deletions server/wal/wal_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,20 @@ func (t *wal) readAtIndex(index int64) (*proto.LogEntry, error) {
defer timer.Done()

var err error
var rc common.RefCount[ReadOnlySegment]
var segment ReadOnlySegment
if index >= t.currentSegment.BaseOffset() {
segment = t.currentSegment
} else {
if segment, err = t.readOnlySegments.Get(index); err != nil {
rc, err = t.readOnlySegments.Get(index)
if err != nil {
return nil, err
}

defer func(rc common.RefCount[ReadOnlySegment]) {
err = multierr.Append(err, rc.Close())
}(rc)
segment = rc.Get()
}

var val []byte
Expand All @@ -181,7 +188,7 @@ func (t *wal) readAtIndex(index int64) (*proto.LogEntry, error) {
return nil, err
}
t.readBytes.Add(len(val))
return entry, nil
return entry, err
}

func (t *wal) LastOffset() int64 {
Expand Down Expand Up @@ -448,24 +455,30 @@ func (t *wal) TruncateLog(lastSafeOffset int64) (int64, error) {
return InvalidOffset, err
}
return t.LastOffset(), nil
} else if lastSafeOffset >= segment.BaseOffset() {
} else if lastSafeOffset >= segment.Get().BaseOffset() {
// The truncation will happen in the middle of this segment,
// and this will also become the new current segment
if err = segment.Close(); err != nil {
return InvalidOffset, err
}

if t.currentSegment, err = newReadWriteSegment(t.walPath, segment.BaseOffset(), t.segmentSize); err != nil {
if t.currentSegment, err = newReadWriteSegment(t.walPath, segment.Get().BaseOffset(), t.segmentSize); err != nil {
err = multierr.Append(err, segment.Close())
return InvalidOffset, err
}
if err := t.currentSegment.Truncate(lastSafeOffset); err != nil {
err = multierr.Append(err, segment.Close())
return InvalidOffset, err
}

return lastSafeOffset, nil
err = segment.Close()
return lastSafeOffset, err
} else {
// The entire segment can be discarded
if err := segment.Delete(); err != nil {
if err := segment.Get().Delete(); err != nil {
err = multierr.Append(err, segment.Close())
return InvalidOffset, err
} else if err := segment.Close(); err != nil {
return InvalidOffset, err
}
}
Expand Down
85 changes: 70 additions & 15 deletions server/wal/wal_ro_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"go.uber.org/multierr"
"io"
"os"
"oxia/common"
"path/filepath"
"sync"
"time"
)

const (
Expand Down Expand Up @@ -54,27 +56,32 @@ type ReadOnlySegment interface {
Read(offset int64) ([]byte, error)

Delete() error

OpenTimestamp() time.Time
}

type readonlySegment struct {
txnPath string
idxPath string
baseOffset int64
lastOffset int64
closed bool

txnFile *os.File
txnMappedFile mmap.MMap

// Index file maps a logical "offset" to a physical file offset within the wal segment
idxFile *os.File
idxMappedFile mmap.MMap
openTimestamp time.Time
}

func newReadOnlySegment(basePath string, baseOffset int64) (ReadOnlySegment, error) {
ms := &readonlySegment{
txnPath: segmentPath(basePath, baseOffset) + txnExtension,
idxPath: segmentPath(basePath, baseOffset) + idxExtension,
baseOffset: baseOffset,
txnPath: segmentPath(basePath, baseOffset) + txnExtension,
idxPath: segmentPath(basePath, baseOffset) + idxExtension,
baseOffset: baseOffset,
openTimestamp: time.Now(),
}

var err error
Expand Down Expand Up @@ -120,6 +127,11 @@ func (ms *readonlySegment) Read(offset int64) ([]byte, error) {
}

func (ms *readonlySegment) Close() error {
if ms.closed {
return nil
}

ms.closed = true
return multierr.Combine(
ms.txnMappedFile.Unmap(),
ms.txnFile.Close(),
Expand All @@ -136,18 +148,27 @@ func (ms *readonlySegment) Delete() error {
)
}

func (ms *readonlySegment) OpenTimestamp() time.Time {
return ms.openTimestamp
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

const (
maxReadOnlySegmentsInCacheCount = 5
maxReadOnlySegmentsInCacheTime = 5 * time.Minute
)

type ReadOnlySegmentsGroup interface {
io.Closer

Get(offset int64) (ReadOnlySegment, error)
Get(offset int64) (common.RefCount[ReadOnlySegment], error)

TrimSegments(offset int64) error

AddedNewSegment(baseOffset int64)

PollHighestSegment() (ReadOnlySegment, error)
PollHighestSegment() (common.RefCount[ReadOnlySegment], error)
}

type readOnlySegmentsGroup struct {
Expand Down Expand Up @@ -193,21 +214,21 @@ func (r *readOnlySegmentsGroup) Close() error {

var err error
r.openSegments.Each(func(id any, segment any) {
err = multierr.Append(err, segment.(ReadOnlySegment).Close())
err = multierr.Append(err, segment.(io.Closer).Close())
})

r.openSegments.Clear()
r.allSegments.Clear()
return err
}

func (r *readOnlySegmentsGroup) Get(offset int64) (ReadOnlySegment, error) {
func (r *readOnlySegmentsGroup) Get(offset int64) (common.RefCount[ReadOnlySegment], error) {
r.Lock()
defer r.Unlock()

_, segment := r.openSegments.Floor(offset)
if segment != nil && offset <= segment.(ReadOnlySegment).LastOffset() {
return segment.(ReadOnlySegment), nil
if segment != nil && offset <= segment.(common.RefCount[ReadOnlySegment]).Get().LastOffset() {
return segment.(common.RefCount[ReadOnlySegment]).Acquire(), nil
}

// Check if we have a segment file on disk
Expand All @@ -218,8 +239,14 @@ func (r *readOnlySegmentsGroup) Get(offset int64) (ReadOnlySegment, error) {
return nil, err
}

r.openSegments.Put(segment.BaseOffset(), segment)
return segment, nil
rc := common.NewRefCount(segment)
res := rc.Acquire()

r.openSegments.Put(segment.BaseOffset(), rc)
if err := r.cleanSegmentsCache(); err != nil {
return nil, err
}
return res, nil
}

return nil, ErrorOffsetOutOfBounds
Expand Down Expand Up @@ -248,7 +275,7 @@ func (r *readOnlySegmentsGroup) TrimSegments(offset int64) error {

r.allSegments.Remove(s)
if segment, ok := r.openSegments.Get(s); ok {
err = multierr.Append(err, segment.(ReadOnlySegment).Delete())
err = multierr.Append(err, segment.(common.RefCount[ReadOnlySegment]).Get().Delete())
r.openSegments.Remove(s)
} else {
if segment, err2 := newReadOnlySegment(r.basePath, s.(int64)); err != nil {
Expand All @@ -262,7 +289,7 @@ func (r *readOnlySegmentsGroup) TrimSegments(offset int64) error {
return err
}

func (r *readOnlySegmentsGroup) PollHighestSegment() (ReadOnlySegment, error) {
func (r *readOnlySegmentsGroup) PollHighestSegment() (common.RefCount[ReadOnlySegment], error) {
r.Lock()
defer r.Unlock()

Expand All @@ -274,8 +301,36 @@ func (r *readOnlySegmentsGroup) PollHighestSegment() (ReadOnlySegment, error) {
r.allSegments.Remove(offset)
segment, found := r.openSegments.Get(offset)
if found {
return segment.(ReadOnlySegment), nil
return segment.(common.RefCount[ReadOnlySegment]).Acquire(), nil
}

roSegment, err := newReadOnlySegment(r.basePath, offset.(int64))
if err != nil {
return nil, err
}

return newReadOnlySegment(r.basePath, offset.(int64))
return common.NewRefCount(roSegment), err
}

func (r *readOnlySegmentsGroup) cleanSegmentsCache() error {
var err error

// Delete based on open-timestamp
it := r.openSegments.Iterator()
for it.Next() {
ts := it.Value().(common.RefCount[ReadOnlySegment]).Get().OpenTimestamp()
if time.Since(ts) > maxReadOnlySegmentsInCacheTime {
err = multierr.Append(err, it.Value().(common.RefCount[ReadOnlySegment]).Close())
r.openSegments.Remove(it.Key())
}
}

// Delete based on max-count
it = r.openSegments.Iterator()
for it.Next() && r.openSegments.Size() > maxReadOnlySegmentsInCacheCount {
err = multierr.Append(err, it.Value().(common.RefCount[ReadOnlySegment]).Close())
r.openSegments.Remove(it.Key())
}

return err
}
5 changes: 5 additions & 0 deletions server/wal/wal_rw_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.uber.org/multierr"
"os"
"sync"
"time"
)

type ReadWriteSegment interface {
Expand Down Expand Up @@ -166,6 +167,10 @@ func (ms *readWriteSegment) rebuildIdx() error {
return nil
}

func (ms *readWriteSegment) OpenTimestamp() time.Time {
return time.Now()
}

func (ms *readWriteSegment) Close() error {
ms.Lock()
defer ms.Unlock()
Expand Down

0 comments on commit e28f543

Please sign in to comment.