Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add_be_host_mapping_list_config #114

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions docs/connector-read.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,21 +155,22 @@ This section describes the parameters you need to configure when you use the Spa

The following parameters apply to all three reading methods: Spark SQL, Spark DataFrame, and Spark RDD.

| Parameter | Default value | Description |
| ------------------------------------ | ----------------- | ------------------------------------------------------------ |
| starrocks.fenodes | None | The HTTP URL of the FE in your StarRocks cluster. Format `<fe_host>:<fe_http_port>`. You can specify multiple URLs, which must be separated by a comma (,). |
| starrocks.table.identifier | None | The name of the StarRocks table. Format: `<database_name>.<table_name>`. |
| starrocks.request.retries | 3 | The maximum number of times that Spark can retry to send a read request o StarRocks. |
| starrocks.request.connect.timeout.ms | 30000 | The maximum amount of time after which a read request sent to StarRocks times out. |
| starrocks.request.read.timeout.ms | 30000 | The maximum amount of time after which the reading for a request sent to StarRocks times out. |
| starrocks.request.query.timeout.s | 3600 | The maximum amount of time after which a query of data from StarRocks times out. The default timeout period is 1 hour. `-1` means that no timeout period is specified. |
| Parameter | Default value | Description |
| ------------------------------------ | ----------------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| starrocks.fenodes | None | The HTTP URL of the FE in your StarRocks cluster. Format `<fe_host>:<fe_http_port>`. You can specify multiple URLs, which must be separated by a comma (,). |
| starrocks.be.host.mapping.list | None | The list of mappings between the external IPs or domain names and internal service names of StarRocks BE nodes. Ofen used in kubernetes. Format: `<external_ip>:<port>,<internal_service_name>:<port>;...`. |
| starrocks.table.identifier | None | The name of the StarRocks table. Format: `<database_name>.<table_name>`. |
| starrocks.request.retries | 3 | The maximum number of times that Spark can retry to send a read request o StarRocks. |
| starrocks.request.connect.timeout.ms | 30000 | The maximum amount of time after which a read request sent to StarRocks times out. |
| starrocks.request.read.timeout.ms | 30000 | The maximum amount of time after which the reading for a request sent to StarRocks times out. |
| starrocks.request.query.timeout.s | 3600 | The maximum amount of time after which a query of data from StarRocks times out. The default timeout period is 1 hour. `-1` means that no timeout period is specified. |
| starrocks.request.tablet.size | Integer.MAX_VALUE | The number of StarRocks tablets grouped into each Spark RDD partition. A smaller value of this parameter indicates that a larger number of Spark RDD partitions will be generated. A larger number of Spark RDD partitions means higher parallelism on Spark but greater pressure on StarRocks. |
| starrocks.batch.size | 4096 | The maximum number of rows that can be read from BEs at a time. Increasing the value of this parameter can reduce the number of connections established between Spark and StarRocks, thereby mitigating extra time overheads caused by network latency. |
| starrocks.exec.mem.limit | 2147483648 | The maximum amount of memory allowed per query. Unit: bytes. The default memory limit is 2 GB. |
| starrocks.deserialize.arrow.async | false | Specifies whether to support asynchronously converting the Arrow memory format to RowBatches required for the iteration of the Spark connector. |
| starrocks.deserialize.queue.size | 64 | The size of the internal queue that holds tasks for asynchronously converting the Arrow memory format to RowBatches. This parameter is valid when `starrocks.deserialize.arrow.async` is set to `true`. |
| starrocks.filter.query | None | The condition based on which you want to filter data on StarRocks. You can specify multiple filter conditions, which must be joined by `and`. StarRocks filters the data from the StarRocks table based on the specified filter conditions before the data is read by Spark. |
| starrocks.timezone | Default timezone of JVM | Supported since 1.1.1. The timezone used to convert StarRocks `DATETIME` to Spark `TimestampType`. The default is the timezone of JVM returned by `ZoneId#systemDefault()`. The format could be a timezone name such as `Asia/Shanghai`, or a zone offset such as `+08:00`. |
| starrocks.batch.size | 4096 | The maximum number of rows that can be read from BEs at a time. Increasing the value of this parameter can reduce the number of connections established between Spark and StarRocks, thereby mitigating extra time overheads caused by network latency. |
| starrocks.exec.mem.limit | 2147483648 | The maximum amount of memory allowed per query. Unit: bytes. The default memory limit is 2 GB. |
| starrocks.deserialize.arrow.async | false | Specifies whether to support asynchronously converting the Arrow memory format to RowBatches required for the iteration of the Spark connector. |
| starrocks.deserialize.queue.size | 64 | The size of the internal queue that holds tasks for asynchronously converting the Arrow memory format to RowBatches. This parameter is valid when `starrocks.deserialize.arrow.async` is set to `true`. |
| starrocks.filter.query | None | The condition based on which you want to filter data on StarRocks. You can specify multiple filter conditions, which must be joined by `and`. StarRocks filters the data from the StarRocks table based on the specified filter conditions before the data is read by Spark. |
| starrocks.timezone | Default timezone of JVM | Supported since 1.1.1. The timezone used to convert StarRocks `DATETIME` to Spark `TimestampType`. The default is the timezone of JVM returned by `ZoneId#systemDefault()`. The format could be a timezone name such as `Asia/Shanghai`, or a zone offset such as `+08:00`. |

### Parameters for Spark SQL and Spark DataFrame

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public interface ConfigurationOptions {
// starrocks fe node address
String STARROCKS_FENODES = "starrocks.fenodes";

String STARROCKS_BE_HOST_MAPPING_LIST = "starrocks.be.host.mapping.list";

String STARROCKS_DEFAULT_CLUSTER = "default_cluster";

String STARROCKS_TIMEZONE = "starrocks.timezone";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@

import static com.starrocks.connector.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;

import com.starrocks.connector.spark.cfg.ConfigurationOptions;
import com.starrocks.connector.spark.cfg.Settings;
import com.starrocks.connector.spark.exception.IllegalArgumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* present an StarRocks BE address.
*/
Expand All @@ -34,12 +39,30 @@ public class Routing {
private String host;
private int port;

public Routing(String routing) throws IllegalArgumentException {
parseRouting(routing);
public Routing(String routing, Settings settings) throws IllegalArgumentException {
parseRouting(routing, settings);
}

private void parseRouting(String routing) throws IllegalArgumentException {
private void parseRouting(String routing, Settings settings) throws IllegalArgumentException {
logger.debug("Parse StarRocks BE address: '{}'.", routing);
String beHostMappingList = settings.getProperty(ConfigurationOptions.STARROCKS_BE_HOST_MAPPING_LIST, "");
if (beHostMappingList.length() > 0) {
String list = beHostMappingList;
Map<String, String> mappingMap = new HashMap<>();
String[] beHostMappingInfos = list.split(";");
for (String beHostMappingInfo : beHostMappingInfos) {
String[] mapping = beHostMappingInfo.split(",");
mappingMap.put(mapping[1].trim(), mapping[0].trim());
}
if (!mappingMap.containsKey(routing)) {
throw new RuntimeException("Not find be node info from the be port mappping list");
}
routing = mappingMap.get(routing);
logger.info("query data from be by using be-hostname {}", routing);
} else {
logger.info("query data from be by using be-ip {}", routing);
}

String[] hostPort = routing.split(":");
if (hostPort.length != 2) {
logger.error("Format of StarRocks BE address '{}' is illegal.", routing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
protected val logger = Logger.getLogger(classOf[ScalaValueReader])

protected val timeZone = ZoneId.of(settings.getProperty(STARROCKS_TIMEZONE, ZoneId.systemDefault.toString))
protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
protected val client = new BackendClient(new Routing(partition.getBeAddress, settings), settings)
protected var offset = 0
protected var eos: AtomicBoolean = new AtomicBoolean(false)
protected var rowBatch: RowBatch = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import static org.hamcrest.core.StringStartsWith.startsWith;

import com.starrocks.connector.spark.cfg.ConfigurationOptions;
import com.starrocks.connector.spark.cfg.PropertiesSettings;
import com.starrocks.connector.spark.cfg.Settings;
import com.starrocks.connector.spark.exception.IllegalArgumentException;

import org.junit.Assert;
Expand All @@ -32,18 +35,36 @@ public class TestRouting {
@Rule
public ExpectedException thrown = ExpectedException.none();


@Test
public void testRouting() throws Exception {
Routing r1 = new Routing("10.11.12.13:1234");
public void testRoutingNoBeMappingList() throws Exception {
Settings settings = new PropertiesSettings();
Routing r1 = new Routing("10.11.12.13:1234", settings);
Assert.assertEquals("10.11.12.13", r1.getHost());
Assert.assertEquals(1234, r1.getPort());

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(startsWith("argument "));
new Routing("10.11.12.13:wxyz");
new Routing("10.11.12.13:wxyz", settings);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(startsWith("Parse "));
new Routing("10.11.12.13");
new Routing("10.11.12.13", settings);
}

@Test
public void testRoutingBeMappingList() throws Exception {
Settings settings = new PropertiesSettings();
String mappingList = "20.11.12.13:6666,10.11.12.13:1234;21.11.12.13:5555,11.11.12.13:1234";
settings.setProperty(ConfigurationOptions.STARROCKS_BE_HOST_MAPPING_LIST, mappingList);

Routing r1 = new Routing("10.11.12.13:1234", settings);
Assert.assertEquals("20.11.12.13", r1.getHost());
Assert.assertEquals(6666, r1.getPort());

Routing r2 = new Routing("11.11.12.13:1234", settings);
Assert.assertEquals("21.11.12.13", r2.getHost());
Assert.assertEquals(5555, r2.getPort());

}
}