From e28f5439345af324392e81b7358e82d8b55a1a77 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 6 Jul 2023 07:59:06 +0200 Subject: [PATCH] Close read-only segments after timeout (#360) Avoid keeping all segments mapped in memory until deleted. Instead keep only few of them at a time. --- common/ref_count.go | 68 +++++++++++++++++++++++++++++ common/ref_count_test.go | 54 +++++++++++++++++++++++ server/wal/wal_impl.go | 25 ++++++++--- server/wal/wal_ro_segment.go | 85 +++++++++++++++++++++++++++++------- server/wal/wal_rw_segment.go | 5 +++ 5 files changed, 216 insertions(+), 21 deletions(-) create mode 100644 common/ref_count.go create mode 100644 common/ref_count_test.go diff --git a/common/ref_count.go b/common/ref_count.go new file mode 100644 index 00000000..09f3b0bd --- /dev/null +++ b/common/ref_count.go @@ -0,0 +1,68 @@ +// Copyright 2023 StreamNative, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "io" + "sync/atomic" +) + +type RefCount[T io.Closer] interface { + io.Closer + + Acquire() RefCount[T] + + RefCnt() int32 + + Get() T +} + +func NewRefCount[T io.Closer](t T) RefCount[T] { + res := &refCount[T]{ + rc: &atomic.Int32{}, + t: t, + } + res.rc.Store(1) + return res +} + +func (r refCount[T]) RefCnt() int32 { + return r.rc.Load() +} + +type refCount[T io.Closer] struct { + rc *atomic.Int32 + t T +} + +func (r refCount[T]) Close() error { + if count := r.rc.Add(-1); count == 0 { + return r.t.Close() + } + + return nil +} + +func (r refCount[T]) Acquire() RefCount[T] { + r.rc.Add(1) + return &refCount[T]{ + rc: r.rc, + t: r.t, + } +} + +func (r refCount[T]) Get() T { + return r.t +} diff --git a/common/ref_count_test.go b/common/ref_count_test.go new file mode 100644 index 00000000..0af9c4fe --- /dev/null +++ b/common/ref_count_test.go @@ -0,0 +1,54 @@ +// Copyright 2023 StreamNative, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +type testRc struct { + closeFunc func() +} + +func (t *testRc) Close() error { + t.closeFunc() + return nil +} + +func TestRefCount(t *testing.T) { + var done bool + rc := NewRefCount[*testRc](&testRc{ + closeFunc: func() { + done = true + }, + }) + + assert.EqualValues(t, 1, rc.RefCnt()) + + rc2 := rc.Acquire() + assert.EqualValues(t, 2, rc.RefCnt()) + assert.EqualValues(t, 2, rc2.RefCnt()) + + assert.NoError(t, rc.Close()) + assert.EqualValues(t, 1, rc.RefCnt()) + assert.EqualValues(t, 1, rc2.RefCnt()) + assert.False(t, done) + + assert.NoError(t, rc.Close()) + assert.EqualValues(t, 0, rc.RefCnt()) + assert.EqualValues(t, 0, rc2.RefCnt()) + assert.True(t, done) +} diff --git a/server/wal/wal_impl.go b/server/wal/wal_impl.go index 5ea357a6..673eb499 100644 --- a/server/wal/wal_impl.go +++ b/server/wal/wal_impl.go @@ -160,13 +160,20 @@ func (t *wal) readAtIndex(index int64) (*proto.LogEntry, error) { defer timer.Done() var err error + var rc common.RefCount[ReadOnlySegment] var segment ReadOnlySegment if index >= t.currentSegment.BaseOffset() { segment = t.currentSegment } else { - if segment, err = t.readOnlySegments.Get(index); err != nil { + rc, err = t.readOnlySegments.Get(index) + if err != nil { return nil, err } + + defer func(rc common.RefCount[ReadOnlySegment]) { + err = multierr.Append(err, rc.Close()) + }(rc) + segment = rc.Get() } var val []byte @@ -181,7 +188,7 @@ func (t *wal) readAtIndex(index int64) (*proto.LogEntry, error) { return nil, err } t.readBytes.Add(len(val)) - return entry, nil + return entry, err } func (t *wal) LastOffset() int64 { @@ -448,24 +455,30 @@ func (t *wal) TruncateLog(lastSafeOffset int64) (int64, error) { return InvalidOffset, err } return t.LastOffset(), nil - } else if lastSafeOffset >= segment.BaseOffset() { + } else if lastSafeOffset >= segment.Get().BaseOffset() { // The truncation will happen in the middle of this segment, // and this will also become the new current segment if err = segment.Close(); err != nil { return InvalidOffset, err } - if t.currentSegment, err = newReadWriteSegment(t.walPath, segment.BaseOffset(), t.segmentSize); err != nil { + if t.currentSegment, err = newReadWriteSegment(t.walPath, segment.Get().BaseOffset(), t.segmentSize); err != nil { + err = multierr.Append(err, segment.Close()) return InvalidOffset, err } if err := t.currentSegment.Truncate(lastSafeOffset); err != nil { + err = multierr.Append(err, segment.Close()) return InvalidOffset, err } - return lastSafeOffset, nil + err = segment.Close() + return lastSafeOffset, err } else { // The entire segment can be discarded - if err := segment.Delete(); err != nil { + if err := segment.Get().Delete(); err != nil { + err = multierr.Append(err, segment.Close()) + return InvalidOffset, err + } else if err := segment.Close(); err != nil { return InvalidOffset, err } } diff --git a/server/wal/wal_ro_segment.go b/server/wal/wal_ro_segment.go index 17b194bb..96989e48 100644 --- a/server/wal/wal_ro_segment.go +++ b/server/wal/wal_ro_segment.go @@ -24,8 +24,10 @@ import ( "go.uber.org/multierr" "io" "os" + "oxia/common" "path/filepath" "sync" + "time" ) const ( @@ -54,6 +56,8 @@ type ReadOnlySegment interface { Read(offset int64) ([]byte, error) Delete() error + + OpenTimestamp() time.Time } type readonlySegment struct { @@ -61,6 +65,7 @@ type readonlySegment struct { idxPath string baseOffset int64 lastOffset int64 + closed bool txnFile *os.File txnMappedFile mmap.MMap @@ -68,13 +73,15 @@ type readonlySegment struct { // Index file maps a logical "offset" to a physical file offset within the wal segment idxFile *os.File idxMappedFile mmap.MMap + openTimestamp time.Time } func newReadOnlySegment(basePath string, baseOffset int64) (ReadOnlySegment, error) { ms := &readonlySegment{ - txnPath: segmentPath(basePath, baseOffset) + txnExtension, - idxPath: segmentPath(basePath, baseOffset) + idxExtension, - baseOffset: baseOffset, + txnPath: segmentPath(basePath, baseOffset) + txnExtension, + idxPath: segmentPath(basePath, baseOffset) + idxExtension, + baseOffset: baseOffset, + openTimestamp: time.Now(), } var err error @@ -120,6 +127,11 @@ func (ms *readonlySegment) Read(offset int64) ([]byte, error) { } func (ms *readonlySegment) Close() error { + if ms.closed { + return nil + } + + ms.closed = true return multierr.Combine( ms.txnMappedFile.Unmap(), ms.txnFile.Close(), @@ -136,18 +148,27 @@ func (ms *readonlySegment) Delete() error { ) } +func (ms *readonlySegment) OpenTimestamp() time.Time { + return ms.openTimestamp +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +const ( + maxReadOnlySegmentsInCacheCount = 5 + maxReadOnlySegmentsInCacheTime = 5 * time.Minute +) + type ReadOnlySegmentsGroup interface { io.Closer - Get(offset int64) (ReadOnlySegment, error) + Get(offset int64) (common.RefCount[ReadOnlySegment], error) TrimSegments(offset int64) error AddedNewSegment(baseOffset int64) - PollHighestSegment() (ReadOnlySegment, error) + PollHighestSegment() (common.RefCount[ReadOnlySegment], error) } type readOnlySegmentsGroup struct { @@ -193,7 +214,7 @@ func (r *readOnlySegmentsGroup) Close() error { var err error r.openSegments.Each(func(id any, segment any) { - err = multierr.Append(err, segment.(ReadOnlySegment).Close()) + err = multierr.Append(err, segment.(io.Closer).Close()) }) r.openSegments.Clear() @@ -201,13 +222,13 @@ func (r *readOnlySegmentsGroup) Close() error { return err } -func (r *readOnlySegmentsGroup) Get(offset int64) (ReadOnlySegment, error) { +func (r *readOnlySegmentsGroup) Get(offset int64) (common.RefCount[ReadOnlySegment], error) { r.Lock() defer r.Unlock() _, segment := r.openSegments.Floor(offset) - if segment != nil && offset <= segment.(ReadOnlySegment).LastOffset() { - return segment.(ReadOnlySegment), nil + if segment != nil && offset <= segment.(common.RefCount[ReadOnlySegment]).Get().LastOffset() { + return segment.(common.RefCount[ReadOnlySegment]).Acquire(), nil } // Check if we have a segment file on disk @@ -218,8 +239,14 @@ func (r *readOnlySegmentsGroup) Get(offset int64) (ReadOnlySegment, error) { return nil, err } - r.openSegments.Put(segment.BaseOffset(), segment) - return segment, nil + rc := common.NewRefCount(segment) + res := rc.Acquire() + + r.openSegments.Put(segment.BaseOffset(), rc) + if err := r.cleanSegmentsCache(); err != nil { + return nil, err + } + return res, nil } return nil, ErrorOffsetOutOfBounds @@ -248,7 +275,7 @@ func (r *readOnlySegmentsGroup) TrimSegments(offset int64) error { r.allSegments.Remove(s) if segment, ok := r.openSegments.Get(s); ok { - err = multierr.Append(err, segment.(ReadOnlySegment).Delete()) + err = multierr.Append(err, segment.(common.RefCount[ReadOnlySegment]).Get().Delete()) r.openSegments.Remove(s) } else { if segment, err2 := newReadOnlySegment(r.basePath, s.(int64)); err != nil { @@ -262,7 +289,7 @@ func (r *readOnlySegmentsGroup) TrimSegments(offset int64) error { return err } -func (r *readOnlySegmentsGroup) PollHighestSegment() (ReadOnlySegment, error) { +func (r *readOnlySegmentsGroup) PollHighestSegment() (common.RefCount[ReadOnlySegment], error) { r.Lock() defer r.Unlock() @@ -274,8 +301,36 @@ func (r *readOnlySegmentsGroup) PollHighestSegment() (ReadOnlySegment, error) { r.allSegments.Remove(offset) segment, found := r.openSegments.Get(offset) if found { - return segment.(ReadOnlySegment), nil + return segment.(common.RefCount[ReadOnlySegment]).Acquire(), nil + } + + roSegment, err := newReadOnlySegment(r.basePath, offset.(int64)) + if err != nil { + return nil, err } - return newReadOnlySegment(r.basePath, offset.(int64)) + return common.NewRefCount(roSegment), err +} + +func (r *readOnlySegmentsGroup) cleanSegmentsCache() error { + var err error + + // Delete based on open-timestamp + it := r.openSegments.Iterator() + for it.Next() { + ts := it.Value().(common.RefCount[ReadOnlySegment]).Get().OpenTimestamp() + if time.Since(ts) > maxReadOnlySegmentsInCacheTime { + err = multierr.Append(err, it.Value().(common.RefCount[ReadOnlySegment]).Close()) + r.openSegments.Remove(it.Key()) + } + } + + // Delete based on max-count + it = r.openSegments.Iterator() + for it.Next() && r.openSegments.Size() > maxReadOnlySegmentsInCacheCount { + err = multierr.Append(err, it.Value().(common.RefCount[ReadOnlySegment]).Close()) + r.openSegments.Remove(it.Key()) + } + + return err } diff --git a/server/wal/wal_rw_segment.go b/server/wal/wal_rw_segment.go index 2897a881..f01eeec5 100644 --- a/server/wal/wal_rw_segment.go +++ b/server/wal/wal_rw_segment.go @@ -21,6 +21,7 @@ import ( "go.uber.org/multierr" "os" "sync" + "time" ) type ReadWriteSegment interface { @@ -166,6 +167,10 @@ func (ms *readWriteSegment) rebuildIdx() error { return nil } +func (ms *readWriteSegment) OpenTimestamp() time.Time { + return time.Now() +} + func (ms *readWriteSegment) Close() error { ms.Lock() defer ms.Unlock()