Skip to content

GetSubmatrix & GetCell methods #556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions ydb/core/scheme/scheme_tablecell.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,34 @@ TSerializedCellMatrix::TSerializedCellMatrix(TConstArrayRef<TCell> cells, ui32 r
SerializeCellMatrix(cells, rowCount, colCount, Buf, &Cells);
}

const TCell& TSerializedCellMatrix::GetCell(ui32 row, ui16 column) const {
Y_ABORT_UNLESS(row < RowCount && column < ColCount);
return Cells.at(CalcIndex(row, column));
}


void TSerializedCellMatrix::GetSubmatrix(ui32 firstRow, ui32 lastRow, ui16 firstColumn, ui16 lastColumn, TVector<TCell>& resultCells) const {
Y_ABORT_UNLESS(firstColumn < ColCount &&
lastColumn < ColCount &&
firstRow < RowCount &&
lastRow < RowCount &&
firstColumn <= lastColumn &&
firstRow <= lastRow);

ui32 rowCount = (lastRow - firstRow + 1);
ui16 colCount = (lastColumn - firstColumn + 1);
size_t cellCount = colCount * rowCount;
resultCells.clear();
resultCells.resize_uninitialized(cellCount);

for (ui32 row = firstRow; row <= lastRow; ++row) {
for (ui16 col = firstColumn; col <= lastColumn; ++col) {
resultCells[CalcIndex(row - firstRow, col - firstColumn, colCount)] = GetCell(row, col);
}
}
}


void TSerializedCellMatrix::Serialize(TString& res, TConstArrayRef<TCell> cells, ui32 rowCount, ui16 colCount) {
SerializeCellMatrix(cells, rowCount, colCount, res, nullptr /*resultCells*/);
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/scheme/scheme_tablecell.h
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,16 @@ class TSerializedCellMatrix {
}

TConstArrayRef<TCell> GetCells() const { return Cells; }
const TCell& GetCell(ui32 row, ui16 column) const;

ui32 GetRowCount() const { return RowCount; }
ui16 GetColCount() const { return ColCount; }

static size_t CalcIndex(ui32 row, ui16 column, ui16 columnCount) { return row * columnCount + column; }
size_t CalcIndex(ui32 row, ui16 column) const { return CalcIndex(row, column, ColCount); }

void GetSubmatrix(ui32 firstRow, ui32 lastRow, ui16 firstColumn, ui16 lastColumn, TVector<TCell>& resultCells) const;

static void Serialize(TString& res, TConstArrayRef<TCell> cells, ui32 rowCount, ui16 colCount);

static TString Serialize(TConstArrayRef<TCell> cells, ui32 rowCount, ui16 colCount);
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/scheme/scheme_tablecell_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ Y_UNIT_TEST_SUITE(Scheme) {

UNIT_ASSERT_VALUES_EQUAL(matrix.GetBuffer().size(), 2146);

//test submatrix
{
TVector<TCell> submatrix;
matrix.GetSubmatrix(1, 2, 3, 5, submatrix);
UNIT_ASSERT_VALUES_EQUAL(submatrix.size(), 6);
}

TSerializedCellMatrix matrix2(matrix.GetBuffer());
CompareTypedCellMatrix(matrix2, cells, types, hash);

Expand Down
12 changes: 4 additions & 8 deletions ydb/core/tx/datashard/datashard_write_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,14 @@ TVector<TEngineBay::TColumnWriteMeta> GetColumnWrites(const ::google::protobuf::

void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx)
{
TVector<TCell> keyCells;
for (ui32 rowIdx = 0; rowIdx < Matrix_.GetRowCount(); ++rowIdx)
{
//TODO zero copy necessary keys from TableInfo_->KeyColumnTypes
KeyCells_.clear();
for (ui16 colIdx = 0; colIdx < TableInfo_->KeyColumnIds.size(); ++colIdx)
KeyCells_.push_back(Matrix_.GetCells()[rowIdx * columnTags.size() + colIdx]);

TTableRange tableRange(KeyCells_);
Matrix_.GetSubmatrix(rowIdx, rowIdx, 0, TableInfo_->KeyColumnIds.size() - 1, keyCells);

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo_->Path << ", shard: " << TabletId_ << ", "
<< "write point " << DebugPrintPoint(TableInfo_->KeyColumnTypes, KeyCells_, typeRegistry));

<< "write point " << DebugPrintPoint(TableInfo_->KeyColumnTypes, keyCells, typeRegistry));
TTableRange tableRange(keyCells);
EngineBay.AddWriteRange(TableId_, tableRange, TableInfo_->KeyColumnTypes, GetColumnWrites(columnTags), false);
}
}
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/tx/datashard/datashard_write_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ class TValidatedWriteTx: TNonCopyable {
return Matrix_;
}

const TVector<TCell> KeyCells() const {
return KeyCells_;
}

ui64 LockTxId() const {
return Record().locktxid();
}
Expand Down Expand Up @@ -232,7 +228,6 @@ class TValidatedWriteTx: TNonCopyable {
const TUserTable* TableInfo_;
const NEvents::TDataEvents::TEvWrite::TPtr& Ev_;
TSerializedCellMatrix Matrix_;
TVector<TCell> KeyCells_;
TActorId Source_;
TEngineBay EngineBay;
NKikimrTxDataShard::TError::EKind ErrCode;
Expand Down
17 changes: 9 additions & 8 deletions ydb/core/tx/datashard/write_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,26 @@ class TWriteUnit : public TExecutionUnit {
TVector<TRawTypeValue> key;
TVector<NTable::TUpdateOp> value;

TVector<TCell> keyCells;

const TSerializedCellMatrix& matrix = writeTx->Matrix();
TConstArrayRef<TCell> cells = matrix.GetCells();
const ui32 rowCount = matrix.GetRowCount();
const ui16 colCount = matrix.GetColCount();

for (ui32 rowIdx = 0; rowIdx < rowCount; ++rowIdx)
for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx)
{
key.clear();
keyCells.clear();
ui64 keyBytes = 0;
for (ui16 keyColIdx = 0; keyColIdx < TableInfo_.KeyColumnIds.size(); ++keyColIdx) {
const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx];
const TCell& cell = cells[rowIdx * colCount + keyColIdx];
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue<ui8>() > 127) {
tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited");
return;
}

keyBytes += cell.Size();
key.emplace_back(TRawTypeValue(cell.AsRef(), cellType));
keyCells.emplace_back(cell);
}

if (keyBytes > NLimits::MaxWriteKeySize) {
Expand All @@ -87,9 +88,9 @@ class TWriteUnit : public TExecutionUnit {
}

value.clear();
for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < colCount; ++valueColIdx) {
for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx);
const TCell& cell = cells[rowIdx * colCount + valueColIdx];
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
if (cell.Size() > NLimits::MaxWriteValueSize) {
tx->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize);
return;
Expand All @@ -102,7 +103,7 @@ class TWriteUnit : public TExecutionUnit {
}

txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion);
self->GetConflictsCache().GetTableCache(writeTableId).RemoveUncommittedWrites(writeTx->KeyCells(), txc.DB);
self->GetConflictsCache().GetTableCache(writeTableId).RemoveUncommittedWrites(keyCells, txc.DB);
}
//TODO: Counters
// self->IncCounter(COUNTER_UPLOAD_ROWS, rowCount);
Expand Down