Skip to content

Commit

Permalink
updated article code (#895)
Browse files Browse the repository at this point in the history
  • Loading branch information
l-trotta authored Nov 18, 2024
1 parent 1fa5d65 commit 770d42a
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@
import org.elasticsearch.client.RestClient;

import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.ResultSet;
Expand All @@ -53,125 +58,137 @@

public class EsqlArticle {

public static void main(String[] args) throws IOException, SQLException {
public static void main(String[] args) throws IOException, SQLException, InterruptedException {
String dir = System.getProperty("user.dir");
Properties prop = new Properties();
Path path = Paths.get(dir, "examples", "esql-article", "src", "main", "resources", "application" +
".conf");
prop.load(new FileInputStream(path.toString()));

String serverUrl = prop.getProperty("server-url");
String apiKey = prop.getProperty("api-key");
String csvPath = prop.getProperty("csv-file");
String serverUrl = System.getenv("server-url");
String apiKey = System.getenv("api-key");
String booksUrl = "https://raw.githubusercontent" +
".com/elastic/elasticsearch-php-examples/main/examples/ESQL/data/books.csv";

RestClient restClient = RestClient
try (RestClient restClient = RestClient
.builder(HttpHost.create(serverUrl))
.setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "ApiKey " + apiKey)
})
.build();

ObjectMapper mapper = JsonMapper.builder()
.build();

JacksonJsonpMapper jsonpMapper = new JacksonJsonpMapper(mapper);

ElasticsearchTransport transport = new RestClientTransport(
restClient, jsonpMapper);

ElasticsearchClient client = new ElasticsearchClient(transport);

if (!client.indices().exists(ex -> ex.index("books")).value()) {
client.indices()
.create(c -> c
.index("books")
.mappings(mp -> mp
.properties("title", p -> p.text(t -> t))
.properties("description", p -> p.text(t -> t))
.properties("author", p -> p.text(t -> t))
.properties("year", p -> p.short_(s -> s))
.properties("publisher", p -> p.text(t -> t))
.properties("ratings", p -> p.halfFloat(hf -> hf))
));
}
.build()) {

ObjectMapper mapper = JsonMapper.builder()
.build();

JacksonJsonpMapper jsonpMapper = new JacksonJsonpMapper(mapper);

ElasticsearchTransport transport = new RestClientTransport(
restClient, jsonpMapper);

ElasticsearchClient client = new ElasticsearchClient(transport);

Instant start = Instant.now();
System.out.println("Starting BulkIndexer... \n");

CsvMapper csvMapper = new CsvMapper();
CsvSchema schema = CsvSchema.builder()
.addColumn("title") // same order as in the csv
.addColumn("description")
.addColumn("author")
.addColumn("year")
.addColumn("publisher")
.addColumn("ratings")
.setColumnSeparator(';')
.setSkipFirstDataRow(true)
.build();

MappingIterator<Book> it = csvMapper
.readerFor(Book.class)
.with(schema)
.readValues(new FileReader(csvPath));

BulkIngester ingester = BulkIngester.of(bi -> bi
.client(client)
.maxConcurrentRequests(20)
.maxOperations(5000));

boolean hasNext = true;

while (hasNext) {
try {
Book book = it.nextValue();
ingester.add(BulkOperation.of(b -> b
.index(i -> i
if (!client.indices().exists(ex -> ex.index("books")).value()) {
client.indices()
.create(c -> c
.index("books")
.document(book))));
hasNext = it.hasNextValue();
} catch (JsonParseException | InvalidFormatException e) {
// ignore malformed data
.mappings(mp -> mp
.properties("title", p -> p.text(t -> t))
.properties("description", p -> p.text(t -> t))
.properties("author", p -> p.text(t -> t))
.properties("year", p -> p.short_(s -> s))
.properties("publisher", p -> p.text(t -> t))
.properties("ratings", p -> p.halfFloat(hf -> hf))
));
}
}

ingester.close();
Instant start = Instant.now();
System.out.println("Starting BulkIndexer... \n");

CsvMapper csvMapper = new CsvMapper();
CsvSchema schema = CsvSchema.builder()
.addColumn("title") // same order as in the csv
.addColumn("description")
.addColumn("author")
.addColumn("year")
.addColumn("publisher")
.addColumn("ratings")
.setColumnSeparator(';')
.setSkipFirstDataRow(true)
.build();

HttpClient httpClient = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(booksUrl))
.build();

HttpResponse<InputStream> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofInputStream());
InputStream csvContentStream = response.body();

MappingIterator<Book> it = csvMapper
.readerFor(Book.class)
.with(schema)
.readValues(new InputStreamReader(csvContentStream));

BulkIngester ingester = BulkIngester.of(bi -> bi
.client(client)
.maxConcurrentRequests(20)
.maxOperations(5000));

boolean hasNext = true;

while (hasNext) {
try {
Book book = it.nextValue();
ingester.add(BulkOperation.of(b -> b
.index(i -> i
.index("books")
.document(book))));
hasNext = it.hasNextValue();
} catch (JsonParseException | InvalidFormatException e) {
// ignore malformed data
}
}

client.indices().refresh();
ingester.close();

Instant end = Instant.now();
client.indices().refresh();

System.out.println("Finished in: " + Duration.between(start, end).toMillis() + "\n");
Instant end = Instant.now();

String queryAuthor =
"""
from books
| where author == "Isaac Asimov"
| sort year desc
| limit 10
""";
System.out.println("Finished in: " + Duration.between(start, end).toMillis() + "\n");

List<Book> queryRes = (List<Book>) client.esql().query(ObjectsEsqlAdapter.of(Book.class), queryAuthor);
String queryAuthor =
"""
from books
| where author == "Isaac Asimov"
| sort year desc
| limit 10
""";

System.out.println("~~~\nObject result author:\n" + queryRes.stream().map(Book::title).collect(Collectors.joining("\n")));
List<Book> queryRes = (List<Book>) client.esql().query(ObjectsEsqlAdapter.of(Book.class),
queryAuthor);

ResultSet resultSet = client.esql().query(ResultSetEsqlAdapter.INSTANCE, queryAuthor);
System.out.println("~~~\nObject result author:\n" + queryRes.stream().map(Book::title).collect(Collectors.joining("\n")));

System.out.println("~~~\nResultSet result author:");
while (resultSet.next()) {
System.out.println(resultSet.getString("title"));
}
try (ResultSet resultSet = client.esql().query(ResultSetEsqlAdapter.INSTANCE, queryAuthor)) {
System.out.println("~~~\nResultSet result author:");
while (resultSet.next()) {
System.out.println(resultSet.getString("title"));
}
}

String queryPublisher =
"""
from books
| where publisher == "Penguin"
| sort ratings desc
| limit 10
| sort title asc
""";

queryRes = (List<Book>) client.esql().query(ObjectsEsqlAdapter.of(Book.class), queryPublisher);
System.out.println("~~~\nObject result publisher:\n" + queryRes.stream().map(Book::title).collect(Collectors.joining("\n")));
String queryPublisher =
"""
from books
| where publisher == "Penguin"
| sort ratings desc
| limit 10
| sort title asc
""";

queryRes = (List<Book>) client.esql().query(ObjectsEsqlAdapter.of(Book.class), queryPublisher);
System.out.println("~~~\nObject result publisher:\n" + queryRes.stream().map(Book::title).collect(Collectors.joining("\n")));
}
}
}
3 changes: 0 additions & 3 deletions examples/esql-article/src/main/resources/application.conf

This file was deleted.

0 comments on commit 770d42a

Please sign in to comment.