Skip to content

Commit

Permalink
in-memory implementation //TODO needs clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
kenricky-bitquill committed Nov 15, 2024
1 parent 4cf1cdc commit cfb32a4
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 71 deletions.
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
"com.github.seancfoley" % "ipaddress" % "5.5.1",
"org.apache.commons" % "commons-lang3" % "3.17.0",
"org.apache.commons" % "commons-csv" % "1.12.0",
"org.apache.commons" % "commons-csv" % "1.12.0",
"com.fasterxml.jackson.core" % "jackson-annotations" % "2.14.2",
),
libraryDependencies ++= deps(sparkVersion),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.common.geospatial;

import inet.ipaddr.IPAddress;
import inet.ipaddr.IPAddressString;
import org.apache.commons.lang3.tuple.Pair;

import java.net.InetAddress;
Expand All @@ -15,13 +17,19 @@
public interface DatasourceDao extends AutoCloseable {
Stream<Pair<BitSet, GeoIpData>> getGeoIps();

static BitSet createCidrBitSet(String cidr) throws UnknownHostException {
static BitSet cidrToBitSet(String cidr) {
String[] parts = cidr.split("/");

InetAddress inetAddress = InetAddress.getByName(parts[0]);
byte[] bytes = inetAddress.getAddress();
BitSet cidrKey = BitSet.valueOf(bytes);
int prefixLength = Integer.parseInt(parts[1]);
IPAddressString cidrString = new IPAddressString(cidr);
IPAddress cidrIpAddress = cidrString.getAddress();

if (cidrIpAddress == null || cidrIpAddress.getNetworkPrefixLength() == null) {
throw new IllegalArgumentException("Invalid CIDR notation: " + cidr);
}

int prefixLength = cidrIpAddress.getNetworkPrefixLength();
byte[] cidrBytes = cidrIpAddress.getBytes();
BitSet cidrKey = BitSet.valueOf(cidrBytes);

return cidrKey.get(0, prefixLength - 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,14 @@
*/

package org.opensearch.sql.common.geospatial;

import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;

public class DatasourceDaoFactory {
public static DatasourceDao GetDatasourceDao(String className, String datasource) {
public static DatasourceDao GetDatasourceDao(String datasource) {
try {
Class<?> dataSourceClass = Class.forName(className);

if (!DatasourceDao.class.isAssignableFrom(dataSourceClass)) {
throw new RuntimeException(className + " does not implement DatasourceDao");
}

return (DatasourceDao) dataSourceClass.getDeclaredConstructor().newInstance(datasource);
} catch (Exception e){
throw new RuntimeException("Could not instantiate datasource DAO: " + datasource, e);
return new ManifestDao(datasource);
} catch (MalformedURLException e) {
throw new RuntimeException("Invalid URL provided: " + datasource, e);
}
}

private static DatasourceType getDatasourceType(String datasource) {
return DatasourceType.MANIFEST;
}

private enum DatasourceType {
MANIFEST,
API,
INDEX
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,35 @@
import java.net.URL;
import java.net.URLConnection;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

import static org.opensearch.sql.common.geospatial.ManifestDao.USER_AGENT_KEY;
import static org.opensearch.sql.common.geospatial.ManifestDao.USER_AGENT_VALUE;

/**
* Ip2Geo datasource manifest file object
*
* Manifest file is stored in an external endpoint. OpenSearch read the file and store values it in this object.
*/
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
public class DatasourceManifest {
/**
* @param url URL of a ZIP file conFtaining a database
* @param url URL of a ZIP file containing a database
* @return URL of a ZIP file containing a database
*/
public String url;
/**
* @param dbName A database file name inside the ZIP file
* @return A database file name inside the ZIP file
*/
@JsonProperty("db_name")
public String dbName;
/**
* @param sha256Hash SHA256 hash value of a database file
Expand Down Expand Up @@ -83,7 +87,7 @@ public static DatasourceManifest build(final URL url) {
}

protected static DatasourceManifest internalBuild(final URLConnection connection) throws IOException {
// connection.addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE);
connection.addRequestProperty(USER_AGENT_KEY, USER_AGENT_VALUE);
final ObjectMapper mapper = new ObjectMapper();
InputStreamReader inputStreamReader = new InputStreamReader(connection.getInputStream());
try (BufferedReader reader = new BufferedReader(inputStreamReader)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@

package org.opensearch.sql.common.geospatial;

import static org.opensearch.sql.common.geospatial.DatasourceDao.createCidrBitSet;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Spliterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.zip.ZipEntry;
Expand All @@ -24,9 +25,14 @@
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.tuple.Pair;
import org.glassfish.jersey.server.model.Suspendable;

public class ManifestDao implements DatasourceDao {

public static final String USER_AGENT_KEY = "User-Agent";

public static final String USER_AGENT_VALUE = String.format(Locale.ROOT, "OpenSearchSpark/%s vanilla", System.getProperty("sparkVersion"));

/**
* Default endpoint to be used in GeoIP datasource creation API
*/
Expand All @@ -37,42 +43,61 @@ public class ManifestDao implements DatasourceDao {
private final DatasourceManifest manifest;
private CSVParser manifestCsv;

public ManifestDao() throws MalformedURLException {
manifest = DatasourceManifest.Builder.build(new URL(DATASOURCE_ENDPOINT));
public ManifestDao(String datasource) throws MalformedURLException {
manifest = DatasourceManifest.Builder.build(new URL(datasource));
}

@Override
public Stream<Pair<BitSet, GeoIpData>> getGeoIps() {
manifestCsv = getDatabaseReader(manifest);

Map<String, Integer> headerMap = manifestCsv.getHeaderMap();
int cidr_index = headerMap.get("cidr");
Spliterator<CSVRecord> spliterator = manifestCsv.spliterator();
Map<String, Integer> headerMap = new HashMap<>();

spliterator.tryAdvance(headerRecord -> {
for (int i = 0; i < headerRecord.size(); i++) {
headerMap.put(headerRecord.get(i), i);
}
});

int cidr_index = headerMap.get("cidr");
int country_iso_code_index = headerMap.get("country_iso_code");
int country_name_index = headerMap.get("country_name");
int continent_name_index = headerMap.get("continent_name");
int region_iso_code_index = headerMap.get("region_iso_code");
int region_name_index = headerMap.get("region_name");
int city_name_index = headerMap.get("city_name");
int time_zone_index = headerMap.get("time_zone");
int lat_index = headerMap.get("lat");
int lon_index = headerMap.get("lon");

return StreamSupport.stream(manifestCsv.spliterator(), false)
.map(record -> {
return Pair.of(
createCidrBitSet(record.get(cidr_index)),
GeoIpData.builder()
.country_iso_code(record.get(country_iso_code_index))
.country_name(record.get(country_name_index))
.continent_name(record.get(continent_name_index))
.region_iso_code(record.get(region_iso_code_index))
.region_name(record.get(region_name_index))
.city_name(record.get(city_name_index))
.time_zone(record.get(time_zone_index))
.lat(record.get(lat_index))
.lon(record.get(lon_index))
.build());
});
int country_name_index = headerMap.get("country_name");
int continent_name_index = headerMap.get("continent_name");
int region_iso_code_index = headerMap.get("region_iso_code");
int region_name_index = headerMap.get("region_name");
int city_name_index = headerMap.get("city_name");
int time_zone_index = headerMap.get("time_zone");
int location_index = headerMap.get("location");


return StreamSupport.stream(spliterator, false)
.map(record -> {
String location = record.get(location_index);
String[] latLon;
if (location == null || !location.contains(",")) {
latLon = new String[]{null, null};
} else {
latLon = location.split(",", 2);
}

String lat = latLon[0];
String lon = latLon[1];

return Pair.of(
DatasourceDao.cidrToBitSet(record.get(cidr_index)),
GeoIpData.builder()
.country_iso_code(record.get(country_iso_code_index))
.country_name(record.get(country_name_index))
.continent_name(record.get(continent_name_index))
.region_iso_code(record.get(region_iso_code_index))
.region_name(record.get(region_name_index))
.city_name(record.get(city_name_index))
.time_zone(record.get(time_zone_index))
.lat(lat)
.lon(lon)
.build());
});
}

@Override
Expand All @@ -99,7 +124,7 @@ public CSVParser getDatabaseReader(final DatasourceManifest manifest) {
}

protected CSVParser internalGetDatabaseReader(final DatasourceManifest manifest, final URLConnection connection) throws IOException {
// connection.addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE);
connection.addRequestProperty(USER_AGENT_KEY, USER_AGENT_VALUE);
ZipInputStream zipIn = new ZipInputStream(connection.getInputStream());
ZipEntry zipEntry = zipIn.getNextEntry();
while (zipEntry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@ public Stream<Pair<BitSet, GeoIpData>> getGeoIps() {
BitSet bitSet1 = null;
BitSet bitSet2 = null;

try {
bitSet1 = DatasourceDao.createCidrBitSet("192.168.0.0/24"); // Example CIDR mask
bitSet2 = DatasourceDao.createCidrBitSet("10.0.0.0/8"); // Example CIDR mask
} catch (UnknownHostException e) {

}
bitSet1 = DatasourceDao.cidrToBitSet("192.168.0.0/24"); // Example CIDR mask
bitSet2 = DatasourceDao.cidrToBitSet("10.0.0.0/8"); // Example CIDR mask

return Stream.of(
Pair.of(bitSet1, geoIp1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Row apply(String datasource, String ipAddress, String properties) {
CidrGeoMap cidrGeoMap = geoIpCache.getIfPresent(datasource);

if (cidrGeoMap == null) {
DatasourceDao datasourceDao = DatasourceDaoFactory.GetDatasourceDao("TestDatasourceDao", datasource);
DatasourceDao datasourceDao = DatasourceDaoFactory.GetDatasourceDao(datasource);
cidrGeoMap = new CidrGeoMap(datasourceDao);
geoIpCache.put(datasource, cidrGeoMap);
}
Expand Down

0 comments on commit cfb32a4

Please sign in to comment.