Skip to content

Commit

Permalink
Fix column filtering for DML decompression
Browse files Browse the repository at this point in the history
With index scans during INSERT decompression caused
by unique constraints, we need to make sure we are using
key columns from constraints while building scan keys.
This makes the code more flexible for any kind of
index found on the chunk by not relying on compression
settings.
  • Loading branch information
antekresic committed Jun 25, 2024
1 parent 64af307 commit 2908b15
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 38 deletions.
63 changes: 27 additions & 36 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <nodes/print.h>
#include <optimizer/optimizer.h>
#include <parser/parse_coerce.h>
#include <parser/parse_relation.h>
#include <parser/parsetree.h>
#include <storage/lmgr.h>
#include <storage/predicate.h>
Expand Down Expand Up @@ -2245,8 +2246,9 @@ compressed_insert_key_columns(Relation relation)
*/
static ScanKeyData *
build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation out_rel,
CompressionSettings *settings, TupleTableSlot *slot,
Relation *result_index_rel, int *num_scan_keys)
Bitmapset *key_columns, TupleTableSlot *slot,
Relation *result_index_rel, Bitmapset **index_columns,
int *num_scan_keys)
{
List *index_oids;
ListCell *lc;
Expand All @@ -2259,7 +2261,6 @@ build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation
{
Relation index_rel = index_open(lfirst_oid(lc), AccessShareLock);
IndexInfo *index_info = BuildIndexInfo(index_rel);
bool matches = false;

/* Can't use partial or expression indexes */
if (index_info->ii_Predicate != NIL || index_info->ii_Expressions != NIL)
Expand Down Expand Up @@ -2294,26 +2295,16 @@ build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation
for (int i = 0; i < index_rel->rd_index->indnkeyatts; i++)
{
AttrNumber attnum = AttrOffsetGetAttrNumber(i);
char *attname = get_attname(RelationGetRelid(index_rel), attnum, false);
const NameData *attname = attnumAttName(index_rel, attnum);

/* If we are at the last attribute, check its the sequence number attribute.
* This means we found all other attributes on the hypertable and this could be our
* index.
*/
if (index_rel->rd_index->indnatts - 1 == i)
{
if (strcmp(attname, COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME) == 0)
matches = true;
break;
}

if (!ts_array_is_member(settings->fd.segmentby, attname))
/* Make sure we find columns in key columns in order to select the right index */
if (!bms_is_member(get_attnum(out_rel->rd_id, NameStr(*attname)), key_columns))
{
break;
}

bool isnull;
AttrNumber ht_attno = get_attnum(hypertable_relid, attname);
AttrNumber ht_attno = get_attnum(hypertable_relid, NameStr(*attname));
Datum value = slot_getattr(slot, ht_attno, &isnull);

if (isnull)
Expand Down Expand Up @@ -2369,7 +2360,7 @@ build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation
value);
}

if (matches)
if (*num_scan_keys > 0)
{
*result_index_rel = index_rel;
break;
Expand Down Expand Up @@ -2672,6 +2663,16 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,

write_logical_replication_msg_decompression_start();
result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple);
/* skip reporting error if isolation level is < Repeatable Read
* since somebody decompressed the data concurrently, we need to take
* that data into account as well when in Read Committed level
*/
if (result == TM_Deleted && !IsolationUsesXactSnapshot())
{
write_logical_replication_msg_decompression_end();
stats.batches_decompressed++;
continue;
}
if (result != TM_Ok)
{
table_endscan(scan);
Expand Down Expand Up @@ -2732,6 +2733,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
Assert(settings);

Bitmapset *key_columns = compressed_insert_key_columns(out_rel);
Bitmapset *index_columns = NULL;
Bitmapset *null_columns = NULL;
struct decompress_batches_stats stats;

Expand All @@ -2740,37 +2742,26 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
ScanKeyData *index_scankeys = build_index_scankeys_using_slot(cis->hypertable_relid,
in_rel,
out_rel,
settings,
key_columns,
slot,
&index_rel,
&index_columns,
&num_index_scankeys);

if (index_rel)
{
/*
* Prepare the heap scan keys if any
* This assumes that columns in segmentby are
* handled by the index scan keys and potentially
* might need to be handled by going through
* index scan keys instead
* Prepare the heap scan keys for all
* key columns not found in the index
*/
Bitmapset *filtered_key_columns = NULL;
int i = -1;
while ((i = bms_next_member(key_columns, i)) > 0)
{
AttrNumber attno = i + FirstLowInvalidHeapAttributeNumber;
char *attname = get_attname(out_rel->rd_id, attno, false);
if (!ts_array_is_member(settings->fd.segmentby, attname))
{
filtered_key_columns = bms_add_member(filtered_key_columns, i);
}
}
key_columns = bms_difference(key_columns, index_columns);

int num_heap_scankeys;
ScanKeyData *heap_scankeys = build_heap_scankeys(cis->hypertable_relid,
in_rel,
out_rel,
settings,
filtered_key_columns,
key_columns,
&null_columns,
slot,
&num_heap_scankeys);
Expand Down
3 changes: 1 addition & 2 deletions tsl/test/isolation/expected/compression_conflicts_iso.out
Original file line number Diff line number Diff line change
Expand Up @@ -2121,8 +2121,7 @@ time|device|location|value
7| 1| 100| 20
8| 1| 100| 20
9| 1| 100| 20
1| 1| 200| 100
(11 rows)
(10 rows)

step SChunkStat: SELECT status from _timescaledb_catalog.chunk
WHERE id = ( select min(ch.id) FROM _timescaledb_catalog.hypertable ht, _timescaledb_catalog.chunk ch WHERE ch.hypertable_id = ht.id AND ht.table_name like 'ts_device_table');
Expand Down

0 comments on commit 2908b15

Please sign in to comment.