Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Docs] Update spark-getting-started docs page to make the example valid #11923

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions docs/docs/spark-getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,28 @@ Once your table is created, insert data using [`INSERT INTO`](spark-writes.md#in

```sql
INSERT INTO local.db.table VALUES (1, 'a'), (2, 'b'), (3, 'c');
INSERT INTO local.db.table SELECT id, data FROM source WHERE length(data) = 1;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement does not add much to the simple example here, remove it

```

Iceberg also adds row-level SQL updates to Spark, [`MERGE INTO`](spark-writes.md#merge-into) and [`DELETE FROM`](spark-writes.md#delete-from):

```sql
MERGE INTO local.db.target t USING (SELECT * FROM updates) u ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.count = t.count + u.count
Comment on lines -86 to -87
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this PR, updates does not exist, nor does t.count or u.count

CREATE TABLE local.db.updates (id bigint, data string) USING iceberg;

INSERT INTO local.db.updates VALUES (1, 'x'), (2, 'y'), (4, 'z');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as below, lets update the values so it will hit all branch of the merge into statement.

nit: and also add values as comment to track the table state

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

about merge branches, commented here https://github.com/apache/iceberg/pull/11923/files?diff=unified&w=0#r1906122418

The table states are straightforward until after the MERGE query (1 insert per table). I have added the table state as a comment after MERGE only. Otherwise there is a lot of duplication. Let me know your thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, thanks!


MERGE INTO local.db.table t
USING (SELECT * FROM local.db.updates) u ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.data = u.data
WHEN NOT MATCHED THEN INSERT *;

-- ((1, 'x'), (2, 'y'), (3, 'c'), (4, 'z'))
SELECT * FROM local.db.table;
```

Iceberg supports writing DataFrames using the new [v2 DataFrame write API](spark-writes.md#writing-with-dataframes):

```scala
spark.table("source").select("id", "data")
spark.table("local.db.updates").select("id", "data")
.writeTo("local.db.table").append()
```

Expand Down Expand Up @@ -160,7 +167,7 @@ This type conversion table describes how Spark types are converted to the Iceber
| map | map | |

!!! info
The table is based on representing conversion during creating table. In fact, broader supports are applied on write. Here're some points on write:
Broader type conversions are applied on write:

* Iceberg numeric types (`integer`, `long`, `float`, `double`, `decimal`) support promotion during writes. e.g. You can write Spark types `short`, `byte`, `integer`, `long` to Iceberg type `long`.
* You can write to Iceberg `fixed` type using Spark `binary` type. Note that assertion on the length will be performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public void dropTable() {
}

// Run through our Doc's Getting Started Example
// TODO Update doc example so that it can actually be run, modifications were required for this
// test suite to run
@Test
public void testGettingStarted() throws IOException {
// Creating a table
Expand All @@ -66,25 +64,25 @@ public void testGettingStarted() throws IOException {
sql(
"CREATE TABLE updates (id bigint, data string) USING parquet LOCATION '%s'",
kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved
temp.newFolder());
sql("INSERT INTO updates VALUES (1, 'x'), (2, 'x'), (4, 'z')");
sql("INSERT INTO updates VALUES (1, 'x'), (2, 'y'), (4, 'z')");
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make the example more interesting to users, set unique values of data so that the function of MERGE is more clear in the result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like the original example since it hits all branch of the merge into statement.
also it'd be nice to keep track of table state in the comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the example still hits all branches of merge

  • id 1 and 2 are updated
  • id 3, 10, 11 are unchanged
  • id 4 does not match and is inserted

the change here is to provide a unique data value for results as that helps to explain the example in the docs better


sql(
"MERGE INTO %s t USING (SELECT * FROM updates) u ON t.id = u.id\n"
+ "WHEN MATCHED THEN UPDATE SET t.data = u.data\n"
+ "WHEN NOT MATCHED THEN INSERT *",
tableName);

// Reading
Assert.assertEquals(
"Table should now have 5 rows", 5L, scalarSql("SELECT COUNT(*) FROM %s", tableName));
Assert.assertEquals(
"Record 1 should now have data x",
"x",
scalarSql("SELECT data FROM %s WHERE id = 1", tableName));

// Reading
Assert.assertEquals(
"There should be 2 records with data x",
2L,
scalarSql("SELECT count(1) as count FROM %s WHERE data = 'x' GROUP BY data ", tableName));
"Record 2 should now have data y",
"y",
scalarSql("SELECT data FROM %s WHERE id = 2", tableName));

// Not supported because of Spark limitation
if (!catalogName.equals("spark_catalog")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ public void dropTable() {
}

// Run through our Doc's Getting Started Example
// TODO Update doc example so that it can actually be run, modifications were required for this
// test suite to run
@Test
public void testGettingStarted() throws IOException {
// Creating a table
Expand All @@ -68,25 +66,25 @@ public void testGettingStarted() throws IOException {
sql(
"CREATE TABLE updates (id bigint, data string) USING parquet LOCATION '%s'",
temp.newFolder());
sql("INSERT INTO updates VALUES (1, 'x'), (2, 'x'), (4, 'z')");
sql("INSERT INTO updates VALUES (1, 'x'), (2, 'y'), (4, 'z')");

sql(
"MERGE INTO %s t USING (SELECT * FROM updates) u ON t.id = u.id\n"
+ "WHEN MATCHED THEN UPDATE SET t.data = u.data\n"
+ "WHEN NOT MATCHED THEN INSERT *",
tableName);

// Reading
Assert.assertEquals(
"Table should now have 5 rows", 5L, scalarSql("SELECT COUNT(*) FROM %s", tableName));
Assert.assertEquals(
"Record 1 should now have data x",
"x",
scalarSql("SELECT data FROM %s WHERE id = 1", tableName));

// Reading
Assert.assertEquals(
"There should be 2 records with data x",
2L,
scalarSql("SELECT count(1) as count FROM %s WHERE data = 'x' GROUP BY data ", tableName));
"Record 2 should now have data y",
"y",
scalarSql("SELECT data FROM %s WHERE id = 2", tableName));

// Not supported because of Spark limitation
if (!catalogName.equals("spark_catalog")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public void dropTable() {
}

// Run through our Doc's Getting Started Example
// TODO Update doc example so that it can actually be run, modifications were required for this
// test suite to run
@TestTemplate
public void testGettingStarted() throws IOException {
// Creating a table
Expand All @@ -66,26 +64,24 @@ public void testGettingStarted() throws IOException {
sql(
"CREATE TABLE updates (id bigint, data string) USING parquet LOCATION '%s'",
Files.createTempDirectory(temp, "junit"));
sql("INSERT INTO updates VALUES (1, 'x'), (2, 'x'), (4, 'z')");
sql("INSERT INTO updates VALUES (1, 'x'), (2, 'y'), (4, 'z')");

sql(
"MERGE INTO %s t USING (SELECT * FROM updates) u ON t.id = u.id\n"
+ "WHEN MATCHED THEN UPDATE SET t.data = u.data\n"
+ "WHEN NOT MATCHED THEN INSERT *",
tableName);

// Reading
assertThat(scalarSql("SELECT COUNT(*) FROM %s", tableName))
.as("Table should now have 5 rows")
.isEqualTo(5L);
assertThat(scalarSql("SELECT data FROM %s WHERE id = 1", tableName))
.as("Record 1 should now have data x")
.isEqualTo("x");

// Reading
assertThat(
scalarSql(
"SELECT count(1) as count FROM %s WHERE data = 'x' GROUP BY data ", tableName))
.as("There should be 2 records with data x")
.isEqualTo(2L);
assertThat(scalarSql("SELECT data FROM %s WHERE id = 2", tableName))
.as("Record 2 should now have data y")
.isEqualTo("y");

// Not supported because of Spark limitation
if (!catalogName.equals("spark_catalog")) {
Expand Down
Loading