Skip to content

Make it possible to change in-memory setting for tables #12099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/tablet_flat/flat_dbase_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ bool TSchemeModifier::Apply(const TAlterRecord &delta)
ui32 large = delta.HasLarge() ? delta.GetLarge() : family.Large;

Y_ABORT_UNLESS(ui32(cache) <= 2, "Invalid pages cache policy value");
if (family.Cache != cache && cache == ECache::Ever) {
ChangeTableSetting(table, tableInfo.PendingCacheEnable, true);
}
changes |= ChangeTableSetting(table, family.Cache, cache);
changes |= ChangeTableSetting(table, family.Codec, codec);
changes |= ChangeTableSetting(table, family.Small, small);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tablet_flat/flat_dbase_scheme.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class TScheme {
bool EraseCacheEnabled = false;
ui32 EraseCacheMinRows = 0; // 0 means use default
ui32 EraseCacheMaxBytes = 0; // 0 means use default

// When true this table has an in-memory caching enabled that has not been processed yet
mutable bool PendingCacheEnable = false;
};

struct TRedo {
Expand Down
22 changes: 16 additions & 6 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ void TExecutor::ActivateFollower(const TActorContext &ctx) {
RecreatePageCollectionsCache();
ReflectSchemeSettings();

RequestInMemPagesForDatabase();

Become(&TThis::StateFollower);
Stats->IsActive = true;
Stats->FollowerId = FollowerId;
Expand Down Expand Up @@ -663,16 +665,21 @@ void TExecutor::TranslateCacheTouchesToSharedCache() {
Send(MakeSharedPageCacheId(), new NSharedCache::TEvTouch(std::move(touches)));
}

void TExecutor::RequestInMemPagesForDatabase() {
const auto &scheme = Scheme();
for (auto &sxpair : scheme.Tables) {
auto stickyColumns = GetStickyColumns(sxpair.first);
void TExecutor::RequestInMemPagesForDatabase(bool pendingOnly) {
const auto& scheme = Scheme();
for (auto& pr : scheme.Tables) {
const ui32 tid = pr.first;
if (pendingOnly && !pr.second.PendingCacheEnable) {
continue;
}
auto stickyColumns = GetStickyColumns(tid);
if (stickyColumns) {
auto subset = Database->Subset(sxpair.first, NTable::TEpoch::Max(), { } , { });
auto subset = Database->Subset(tid, NTable::TEpoch::Max(), { } , { });

for (auto &partView: subset->Flatten)
RequestInMemPagesForPartStore(sxpair.first, partView, stickyColumns);
RequestInMemPagesForPartStore(tid, partView, stickyColumns);
}
pr.second.PendingCacheEnable = false;
}
}

Expand Down Expand Up @@ -984,6 +991,7 @@ void TExecutor::ApplyFollowerUpdate(THolder<TEvTablet::TFUpdateBody> update) {
if (schemeUpdate) {
ReadResourceProfile();
ReflectSchemeSettings();
RequestInMemPagesForDatabase(/* pendingOnly */ true);
Owner->OnFollowerSchemaUpdated();
}

Expand Down Expand Up @@ -1361,6 +1369,7 @@ void TExecutor::RequestInMemPagesForPartStore(ui32 tableId, const NTable::TPartV
for (ui32 pageId : req->Pages)
PrivatePageCache->MarkSticky(pageId, info);

// TODO: only request missing pages
RequestFromSharedCache(req, NBlockIO::EPriority::Bkgr, EPageCollectionRequest::CacheSync);
}
}
Expand Down Expand Up @@ -2081,6 +2090,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv

ReadResourceProfile();
ReflectSchemeSettings();
RequestInMemPagesForDatabase(/* pendingOnly */ true);

// For every table that changed strategy we need to generate a
// special part switch that notifies bootlogic about new strategy
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ class TExecutor
void DropSingleCache(const TLogoBlobID&) noexcept;

void TranslateCacheTouchesToSharedCache();
void RequestInMemPagesForDatabase();
void RequestInMemPagesForDatabase(bool pendingOnly = false);
void RequestInMemPagesForPartStore(ui32 tableId, const NTable::TPartView &partView, const THashSet<NTable::TTag> &stickyColumns);
THashSet<NTable::TTag> GetStickyColumns(ui32 tableId);
void RequestFromSharedCache(TAutoPtr<NPageCollection::TFetch> fetch,
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tablet_flat/flat_executor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6140,7 +6140,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorStickyPages) {

int failedAttempts = 0;
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(failedAttempts) });
UNIT_ASSERT_GE(failedAttempts, 20); // old parts aren't sticky before restart
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 0); // parts become sticky soon after it's enabled

// restart tablet
env.SendSync(new TEvents::TEvPoison, false, true);
Expand Down Expand Up @@ -6174,7 +6174,7 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorStickyPages) {

int failedAttempts = 0;
env.SendSync(new NFake::TEvExecute{ new TTxFullScan(failedAttempts) });
UNIT_ASSERT_GE(failedAttempts, 20); // old parts aren't sticky before restart
UNIT_ASSERT_VALUES_EQUAL(failedAttempts, 0); // parts become sticky soon after it's enabled

// restart tablet
env.SendSync(new TEvents::TEvPoison, false, true);
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/ydb_convert/column_families.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,15 @@ namespace NKikimr {
case Ydb::FeatureFlag::STATUS_UNSPECIFIED:
break;
case Ydb::FeatureFlag::ENABLED:
*code = Ydb::StatusIds::BAD_REQUEST;
*error = TStringBuilder()
<< "Setting keep_in_memory to ENABLED is not supported in column family '"
<< familySettings.name() << "'";
return false;
if (!AppData()->FeatureFlags.GetEnablePublicApiKeepInMemory()) {
*code = Ydb::StatusIds::BAD_REQUEST;
*error = "Setting keep_in_memory to ENABLED is not allowed";
return false;
}
family->SetColumnCache(NKikimrSchemeOp::ColumnCacheEver);
break;
case Ydb::FeatureFlag::DISABLED:
family->ClearColumnCache();
family->SetColumnCache(NKikimrSchemeOp::ColumnCacheNone);
break;
default:
*code = Ydb::StatusIds::BAD_REQUEST;
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_table/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,11 @@ TColumnFamilyBuilder& TColumnFamilyBuilder::SetCompression(EColumnFamilyCompress
return *this;
}

TColumnFamilyBuilder& TColumnFamilyBuilder::SetKeepInMemory(bool enabled) {
Impl_->Proto.set_keep_in_memory(enabled ? Ydb::FeatureFlag::ENABLED : Ydb::FeatureFlag::DISABLED);
return *this;
}

TColumnFamilyDescription TColumnFamilyBuilder::Build() const {
return TColumnFamilyDescription(Impl_->Proto);
}
Expand Down
11 changes: 11 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_table/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ class TColumnFamilyBuilder {

TColumnFamilyBuilder& SetData(const TString& media);
TColumnFamilyBuilder& SetCompression(EColumnFamilyCompression compression);
TColumnFamilyBuilder& SetKeepInMemory(bool enabled);

TColumnFamilyDescription Build() const;

Expand Down Expand Up @@ -868,6 +869,11 @@ class TTableColumnFamilyBuilder {
return *this;
}

TTableColumnFamilyBuilder& SetKeepInMemory(bool enabled) {
Builder_.SetKeepInMemory(enabled);
return *this;
}

TTableBuilder& EndColumnFamily();

private:
Expand Down Expand Up @@ -1486,6 +1492,11 @@ class TAlterColumnFamilyBuilder {
return *this;
}

TAlterColumnFamilyBuilder& SetKeepInMemory(bool enabled) {
Builder_.SetKeepInMemory(enabled);
return *this;
}

TAlterTableSettings& EndAddColumnFamily();
TAlterTableSettings& EndAlterColumnFamily();

Expand Down
81 changes: 81 additions & 0 deletions ydb/services/ydb/ydb_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,87 @@ Y_UNIT_TEST_SUITE(TGRpcNewClient) {
client.CreateSession().Apply(createSessionHandler).Wait();
UNIT_ASSERT(done);
}

Y_UNIT_TEST(InMemoryTables) {
TKikimrWithGrpcAndRootSchemaNoSystemViews server;
server.Server_->GetRuntime()->GetAppData().FeatureFlags.SetEnablePublicApiKeepInMemory(true);

ui16 grpc = server.GetPort();
TString location = TStringBuilder() << "localhost:" << grpc;

auto connection = NYdb::TDriver(
TDriverConfig()
.SetEndpoint(location));

auto client = NYdb::NTable::TTableClient(connection);
auto createSessionResult = client.CreateSession().ExtractValueSync();
UNIT_ASSERT(!createSessionResult.IsTransportError());
auto session = createSessionResult.GetSession();

auto createTableResult = session.CreateTable("/Root/Table", client.GetTableBuilder()
.AddNullableColumn("Key", EPrimitiveType::Int32)
.AddNullableColumn("Value", EPrimitiveType::String)
.SetPrimaryKeyColumn("Key")
// Note: only needed because this test doesn't initial table profiles
.BeginStorageSettings()
.SetTabletCommitLog0("ssd")
.SetTabletCommitLog1("ssd")
.EndStorageSettings()
.BeginColumnFamily("default")
.SetData("ssd")
.SetKeepInMemory(true)
.EndColumnFamily()
.Build()).ExtractValueSync();
UNIT_ASSERT_C(createTableResult.IsSuccess(), (NYdb::TStatus&)createTableResult);

{
auto describeTableResult = session.DescribeTable("/Root/Table").ExtractValueSync();
UNIT_ASSERT_C(describeTableResult.IsSuccess(), (NYdb::TStatus&)describeTableResult);
auto desc = describeTableResult.GetTableDescription();
auto families = desc.GetColumnFamilies();
UNIT_ASSERT_VALUES_EQUAL(families.size(), 1u);
auto family = families.at(0);
UNIT_ASSERT_VALUES_EQUAL(family.GetKeepInMemory(), true);
}

{
auto alterTableResult = session.AlterTable("/Root/Table", NYdb::NTable::TAlterTableSettings()
.BeginAlterColumnFamily("default")
.SetKeepInMemory(false)
.EndAlterColumnFamily()).ExtractValueSync();
UNIT_ASSERT_C(alterTableResult.IsSuccess(), (NYdb::TStatus&)alterTableResult);
}

{
auto describeTableResult = session.DescribeTable("/Root/Table").ExtractValueSync();
UNIT_ASSERT_C(describeTableResult.IsSuccess(), (NYdb::TStatus&)describeTableResult);
auto desc = describeTableResult.GetTableDescription();
auto families = desc.GetColumnFamilies();
UNIT_ASSERT_VALUES_EQUAL(families.size(), 1u);
auto family = families.at(0);
// Note: server cannot currently distinguish between implicitly
// unset and explicitly disabled, so it returns the former.
UNIT_ASSERT_VALUES_EQUAL(family.GetKeepInMemory(), Nothing());
}

{
auto alterTableResult = session.AlterTable("/Root/Table", NYdb::NTable::TAlterTableSettings()
.BeginAlterColumnFamily("default")
.SetKeepInMemory(true)
.EndAlterColumnFamily()).ExtractValueSync();
UNIT_ASSERT_C(alterTableResult.IsSuccess(), (NYdb::TStatus&)alterTableResult);
}

{
auto describeTableResult = session.DescribeTable("/Root/Table").ExtractValueSync();
UNIT_ASSERT_C(describeTableResult.IsSuccess(), (NYdb::TStatus&)describeTableResult);
auto desc = describeTableResult.GetTableDescription();
auto families = desc.GetColumnFamilies();
UNIT_ASSERT_VALUES_EQUAL(families.size(), 1u);
auto family = families.at(0);
UNIT_ASSERT_VALUES_EQUAL(family.GetKeepInMemory(), true);
}
}
}

static TString CreateSession(std::shared_ptr<grpc::Channel> channel) {
Expand Down
Loading