From da6c9118b3b14a5f4c05197930d5e736066a636e Mon Sep 17 00:00:00 2001 From: Norman Jordan Date: Wed, 27 Nov 2024 13:34:58 -0800 Subject: [PATCH 1/3] Add a script for importing GeoIP data * Splits subnets as necessary to remove any overlap * Is a Scala script to run in Spark CLI * Can optionally import the data into a Spark table * Adds 3 extra columns that will be used to find the subnet for an IP address Signed-off-by: Norman Jordan --- docs/load_geoip_data.scala | 402 +++++++++++++++++++++++++++++++++++++ docs/opensearch-geoip.md | 57 ++++++ 2 files changed, 459 insertions(+) create mode 100644 docs/load_geoip_data.scala create mode 100644 docs/opensearch-geoip.md diff --git a/docs/load_geoip_data.scala b/docs/load_geoip_data.scala new file mode 100644 index 000000000..8cef9d669 --- /dev/null +++ b/docs/load_geoip_data.scala @@ -0,0 +1,402 @@ +import java.io.BufferedReader +import java.io.FileReader +import java.io.PrintStream +import java.math.BigInteger +import scala.collection.mutable.ListBuffer + +/* Create a binary tree based on the bits of the start IP address of the subnets. Only use the + first bits needed for the netmask. For example with a subnet of "192.168.2.0/24", only use the + first 24 bits. + + If a node for a subnet has children, then there is an overlap that must be corrected. To correct + an overlap, make sure that both children of the node exist and remove the subnet for the current + node. Finally check the child nodes for overlapping subnets and continue. + */ +class TreeNode(var ipAddressBytes: Array[Byte], var netmask: Int, var isIPv4: Boolean, var lineRemainder: String) { + var falseChild: TreeNode = null + var trueChild: TreeNode = null + + def maxNetmask: Integer = if (isIPv4) 32 else 128 + + // Add a new node to the tree in the correct position + def addNode(nodeToAdd: TreeNode): Unit = { + if (netmask >= nodeToAdd.netmask || netmask == maxNetmask) { + return + } + + var byteIndex = netmask / 8 + var bitValue = (nodeToAdd.ipAddressBytes(byteIndex) & (1 << (7 - (netmask % 8)))) > 0 + + if (netmask + 1 == nodeToAdd.netmask) { + if (bitValue) { + trueChild = nodeToAdd + } else { + falseChild = nodeToAdd + } + } else { + var nextChild: TreeNode = null + if (bitValue) { + nextChild = trueChild + if (trueChild == null) { + nextChild = new TreeNode(null, netmask + 1, isIPv4, null) + trueChild = nextChild + } + } else { + nextChild = falseChild + if (falseChild == null) { + nextChild = new TreeNode(null, netmask + 1, isIPv4, null) + falseChild = nextChild + } + } + + nextChild.addNode(nodeToAdd) + } + + return + } + + def haveOverlap(): Boolean = falseChild != null || trueChild != null + + // Convert the IP address to a string. For IPv6, this is more complicated, since it may + // need to be reduced. + def ipAddressString(): String = { + if (isIPv4) { + return ipAddressBytes.map(v => 255 & v).mkString(".") + } else { + var allZeroes = true + for (b <- ipAddressBytes) { + if (b != 0) { + allZeroes = false + } + } + + if (allZeroes) { + return "::" + } + + var zeroes: ListBuffer[(Int, Int)] = ListBuffer() + var zeroesStart = -1 + var zeroesStartIndex = -1 + for (i <- 0 to 7) { + if (ipAddressBytes(i * 2) == 0 && ipAddressBytes(i * 2 + 1) == 0) { + if (zeroesStart == -1) { + zeroesStart = i + zeroesStartIndex = zeroes.length + zeroes = zeroes :+ (i, 1) + } else { + var existingTuple = zeroes(zeroesStartIndex) + zeroes.update(zeroesStartIndex, (existingTuple._1, 1 + existingTuple._2)) + } + } else { + zeroesStart = -1 + zeroesStartIndex = -1 + } + } + + var longestZeroesIndex = -1 + var longestZeroesLength = 0 + for (v <- zeroes) { + if (v._2 >= longestZeroesLength) { + longestZeroesLength = v._2 + longestZeroesIndex = v._1 + } + } + + var fullIpAddress: Array[String] = Array.fill(8){null} + for (i <- 0 to 7) { + var strValue = (((255 & ipAddressBytes(i * 2)) << 8) + (255 & ipAddressBytes(i * 2 + 1))).toHexString + fullIpAddress(i) = strValue + } + + if (longestZeroesIndex == -1) { + return fullIpAddress.mkString(":") + } else { + var ipPartsStart = fullIpAddress.slice(0, longestZeroesIndex) + var ipPartsEnd = fullIpAddress.slice(longestZeroesIndex + longestZeroesLength, 8) + return ipPartsStart.mkString(":") + "::" + ipPartsEnd.mkString(":") + } + } + } + + def getStart(): BigInteger = new BigInteger(ipAddressBytes) + + def getEnd(): BigInteger = { + var valueToAdd = new BigInteger(Array.fill(maxNetmask / 8){0.toByte}) + if (netmask < maxNetmask) { + valueToAdd = valueToAdd.flipBit(maxNetmask - netmask) + valueToAdd = valueToAdd.subtract(new BigInteger("1")) + } + return getStart().add(valueToAdd) + } + + def valueToByteArray(value: BigInteger): Array[Byte] = { + var fullArray = Array.fill(maxNetmask / 8){0.toByte} + var valueArray = value.toByteArray() + valueArray.copyToArray(fullArray, (maxNetmask / 8) - valueArray.length, valueArray.length) + return fullArray + } + + // Split a node. Make sure that both children exist and remove the subnet for the current node. + def split(): Unit = { + if (ipAddressBytes == null) { + return + } + + var ipAddressStr = ipAddressString() + println(s">>> Splitting IP: $ipAddressStr") + + if (falseChild == null) { + falseChild = new TreeNode(ipAddressBytes, netmask + 1, isIPv4, lineRemainder) + } else if (falseChild.ipAddressBytes == null) { + falseChild.ipAddressBytes = ipAddressBytes + falseChild.lineRemainder = lineRemainder + } + + if (trueChild == null) { + var valueStart = falseChild.getEnd().add(new BigInteger("1")) + var startArray = valueToByteArray(valueStart) + trueChild = new TreeNode(startArray, netmask + 1, isIPv4, lineRemainder) + } else if (trueChild.ipAddressBytes == null) { + var valueStart = falseChild.getEnd().add(new BigInteger("1")) + var startArray = valueToByteArray(valueStart) + trueChild.ipAddressBytes = startArray + trueChild.lineRemainder = lineRemainder + } + + ipAddressBytes = null + lineRemainder = null + + return + } + + def fixTree(): Unit = { + if (haveOverlap()) { + split() + } + + if (falseChild != null) { + falseChild.fixTree() + } + + if (trueChild != null) { + trueChild.fixTree() + } + } + + def printTree(outStream: PrintStream): Unit = { + if (ipAddressBytes != null) { + outStream.print(ipAddressString()) + outStream.print("/") + outStream.print(netmask.toString) + outStream.print(",") + outStream.print(lineRemainder) + outStream.print(",") + outStream.print(getStart().toString()) + outStream.print(",") + outStream.print(getEnd().toString()) + outStream.print(",") + outStream.println(isIPv4.toString) + } + + if (falseChild != null) { + falseChild.printTree(outStream) + } + if (trueChild != null) { + trueChild.printTree(outStream) + } + } +} + +// Create a node for an IPv4 entry +def createIPv4TreeNode(fullLine: String): TreeNode = { + var charIndex = fullLine.indexOf(",") + var subnet = fullLine.substring(0, charIndex) + var lineRemainder = fullLine.substring(charIndex + 1) + + charIndex = subnet.indexOf("/") + var ipAddressStr = subnet.substring(0, charIndex) + var netmask = subnet.substring(charIndex + 1).toInt + + var addrParts = ipAddressStr.split("\\.") + var bytes = Array[Byte]( + addrParts(0).toInt.toByte, + addrParts(1).toInt.toByte, + addrParts(2).toInt.toByte, + addrParts(3).toInt.toByte + ) + + return new TreeNode(bytes, netmask, true, lineRemainder) +} + +// Create a node for an IPv6 entry +def createIPv6TreeNode(fullLine: String): TreeNode = { + var charIndex = fullLine.indexOf(",") + var subnet = fullLine.substring(0, charIndex) + var lineRemainder = fullLine.substring(charIndex + 1) + + charIndex = subnet.indexOf("/") + var ipAddressStr = subnet.substring(0, charIndex) + var netmask = subnet.substring(charIndex + 1).toInt + + var bytes: Array[Byte] = null + charIndex = ipAddressStr.indexOf("::") + + if (charIndex == -1) { + var values = ipAddressStr.split(":").map(x => Integer.parseInt(x, 16)) + bytes = Array.fill(16){0.toByte} + for (i <- 0 to 7) { + bytes(i * 2) = (values(i) >> 8).toByte + bytes(i * 2 + 1) = (values(i) & 255).toByte + } + } else if ("::" == ipAddressStr) { + bytes = Array.fill(16){0.toByte} + } else { + if (charIndex == 0) { + var values = ipAddressStr.substring(2).split(":").map(x => Integer.parseInt(x, 16)) + bytes = Array.fill(16){0.toByte} + for (i <- 8 - values.length to 7) { + var valuesIndex = i - 8 + values.length + bytes(i * 2) = (values(valuesIndex) >> 8).toByte + bytes(i * 2 + 1) = (values(valuesIndex) & 255).toByte + } + } else if (charIndex == ipAddressStr.length - 2) { + var values = ipAddressStr.substring(0, ipAddressStr.length - 2).split(":").map(x => Integer.parseInt(x, 16)) + bytes = Array.fill(16){0.toByte} + for (i <- 0 to values.length - 1) { + bytes(i * 2) = (values(i) >> 8).toByte + bytes(i * 2 + 1) = (values(i) & 255).toByte + } + } else { + var startValues = ipAddressStr.substring(0, charIndex).split(":").map(x => Integer.parseInt(x, 16)) + var endValues = ipAddressStr.substring(charIndex + 2).split(":").map(x => Integer.parseInt(x, 16)) + bytes = Array.fill(16){0.toByte} + for (i <- 0 to startValues.length - 1) { + bytes(i * 2) = (startValues(i) >> 8).toByte + bytes(i * 2 + 1) = (startValues(i) & 255).toByte + } + for (i <- 8 - endValues.length to 7) { + var valuesIndex = i - 8 + endValues.length + bytes(i * 2) = (endValues(valuesIndex) >> 8).toByte + bytes(i * 2 + 1) = (endValues(valuesIndex) & 255).toByte + } + } + } + + return new TreeNode(bytes, netmask, false, lineRemainder) +} + +def createTreeNode(fullLine: String): TreeNode = { + var charIndex = fullLine.indexOf(",") + var subnet = fullLine.substring(0, charIndex) + if (subnet.indexOf(':') > -1) { + return createIPv6TreeNode(fullLine) + } else { + return createIPv4TreeNode(fullLine) + } +} + +var header: String = null +def readSubnets(fileName: String, ipv4Root: TreeNode, ipv6Root: TreeNode): Unit = { + var reader = new BufferedReader(new FileReader(fileName)) + header = reader.readLine() + + var line = reader.readLine() + while (line != null) { + var newNode = createTreeNode(line) + if (newNode.isIPv4) { + ipv4Root.addNode(newNode) + } else { + ipv6Root.addNode(newNode) + } + + line = reader.readLine() + } + + reader.close() +} + +def writeSubnets(fileName: String, ipv4Root: TreeNode, ipv6Root: TreeNode): Unit = { + var outStream = new PrintStream(fileName) + outStream.print(header) + outStream.print(",start,end,ipv4") + outStream.print("\r\n") + + ipv4Root.printTree(outStream) + ipv6Root.printTree(outStream) + + outStream.close() +} + +// Create the table in Spark +def createTable(fileName: String, tableName: String): Unit = { + try { + var sparkSessionClass = Class.forName("org.apache.spark.sql.SparkSession") + var activeSessionMethod = sparkSessionClass.getMethod("active") + var sparkSession = activeSessionMethod.invoke(sparkSessionClass) + + var readMethod = sparkSessionClass.getMethod("read") + var dataFrameReader = readMethod.invoke(sparkSession) + + var dataFrameReaderClass = Class.forName("org.apache.spark.sql.DataFrameReader") + var formatMethod = dataFrameReaderClass.getMethod("format", classOf[java.lang.String]) + dataFrameReader = formatMethod.invoke(dataFrameReader, "csv") + + var optionMethod = dataFrameReaderClass.getMethod("option", classOf[java.lang.String], classOf[java.lang.String]) + dataFrameReader = optionMethod.invoke(dataFrameReader, "inferSchema", "true") + dataFrameReader = optionMethod.invoke(dataFrameReader, "header", "true") + + var loadMethod = dataFrameReaderClass.getMethod("load", classOf[java.lang.String]) + var dataset = loadMethod.invoke(dataFrameReader, fileName) + + var datasetClass = Class.forName("org.apache.spark.sql.Dataset") + var writeMethod = datasetClass.getMethod("write") + var dataFrameWriter = writeMethod.invoke(dataset) + + var dataFrameWriterClass = Class.forName("org.apache.spark.sql.DataFrameWriter") + var saveAsTableMethod = dataFrameWriterClass.getMethod("saveAsTable", classOf[java.lang.String]) + saveAsTableMethod.invoke(dataFrameWriter, tableName) + } catch { + case e: Exception => { + println("Unable to load data into table") + e.printStackTrace() + } + } +} + +// Sanitize the data and import it into a Spark table +def cleanAndImport(inputFile: String, outputFile: String, tableName: String): Unit = { + if (tableName != null) { + try { + Class.forName("org.apache.spark.sql.SparkSession") + } catch { + case e: ClassNotFoundException => { + println("Must run in Spark CLI to create the Spark table") + return + } + } + } + + println("Loading data") + var ipv4Root = new TreeNode(null, 0, true, null) + var ipv6Root = new TreeNode(null, 0, false, null) + readSubnets(inputFile, ipv4Root, ipv6Root) + + println("Fixing overlapping subnets") + ipv4Root.fixTree() + ipv6Root.fixTree() + + println("Writing data to file") + writeSubnets(outputFile, ipv4Root, ipv6Root) + + if (tableName != null) { + println("Creating and populating Spark table") + createTable(outputFile, tableName) + } + + println("Done") +} + +var INPUT_FILE: String = "/Users/normanj/Documents/geoip/geolite2-City.csv" +var OUTPUT_FILE: String = "/Users/normanj/Documents/geoip/test.csv" +var TABLE_NAME: String = "geoip_data" +var result = cleanAndImport(INPUT_FILE, OUTPUT_FILE, TABLE_NAME) diff --git a/docs/opensearch-geoip.md b/docs/opensearch-geoip.md new file mode 100644 index 000000000..e7e288034 --- /dev/null +++ b/docs/opensearch-geoip.md @@ -0,0 +1,57 @@ +# OpenSearch Geographic IP Location Data + +## Overview + +OpenSearch has PPL functions for looking up the geographic location of IP addresses. In order +to use these functions, a table needs to be created containing the geographic location +information. + +## How to Create Geographic Location Index + +A script has been created that can cleanup and augment a CSV file that contains geographic +location information for IP addresses ranges. The CSV file is expected to have the following +columns: + +| Column Name | Description | +|------------------|---------------------------------------------------------------------------------------------------------| +| cidr | IP address subnet in format `IP_ADDRESS/NETMASK` (ex. `192.168.0.0/24`). IP address can be IPv4 or IPv6 | +| country_iso_code | ISO code of the country where the IP address subnet is located | +| country_name | Name of the country where the IP address subnet is located | +| continent_name | Name of the continent where the IP address subent is located | +| region_iso_code | ISO code of the region where the IP address subnet is located | +| region_name | Name of the region where the IP address subnet is located | +| city_name | Name of the city where the IP address subnet is located | +| time_zone | Time zone where the IP address subnet is located | +| location | Latitude and longitude where the IP address subnet is located | + +The script will cleanup the data by splitting IP address subnets so that an IP address can only be in at most one subnet. + +The data is augmented by adding 3 fields. + +| Column Name | Description | +|-------------|--------------------------------------------------------------------| +| start | An integer value used to determine if an IP address is in a subnet | +| end | An integer value used to determine if an IP address is in a subnet | +| ipv4 | A boolean value, `true` if the IP address subnet is in IPv4 format | + +## Run the Script + +1. Create a copy of the scala file `load_geoip_data.scala` +2. Edit the file +3. There are three variables that need to be updated. + 1. `INPUT_FILE` - the full path to the CSV file to load + 2. `OUTPUT_FILE` - the full path of the CSV file to write the sanitized data to + 3. `TABLE_NAME` - name of the index to create in OpenSearch. No table is created if this is null +4. Save the file +5. Run the Apache Spark CLI and connect to the database +6. Load the Scala script + ```scala + :load FILENAME + ``` + Replace `FILENAME` with the full path to the Scala script. + +## Notes for EMR + +With EMR it is necessary to load the data from an S3 object. Follow the instructions for +**Run the Script**, but make sure that `TABLE_NAME` is set to `null`. Upload the `OUTPUT_FILE` +to S3. \ No newline at end of file From 0b2e23074c4040582b4716aa13d4f770c74687f9 Mon Sep 17 00:00:00 2001 From: Norman Jordan Date: Mon, 2 Dec 2024 13:11:03 -0800 Subject: [PATCH 2/3] Updated based on PR feedback Signed-off-by: Norman Jordan --- docs/load_geoip_data.scala | 58 +++++++++++++++++++++++++++++++------- docs/opensearch-geoip.md | 53 +++++++++++++++++++++++++++------- 2 files changed, 91 insertions(+), 20 deletions(-) diff --git a/docs/load_geoip_data.scala b/docs/load_geoip_data.scala index 8cef9d669..1540dbfb1 100644 --- a/docs/load_geoip_data.scala +++ b/docs/load_geoip_data.scala @@ -1,9 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + import java.io.BufferedReader import java.io.FileReader import java.io.PrintStream import java.math.BigInteger import scala.collection.mutable.ListBuffer +var ipv4NodeCount = 0 +var ipv6NodeCount = 0 +var ipv4NodeOutputCount = 0 +var ipv6NodeOutputCount = 0 + /* Create a binary tree based on the bits of the start IP address of the subnets. Only use the first bits needed for the netmask. For example with a subnet of "192.168.2.0/24", only use the first 24 bits. @@ -136,6 +146,14 @@ class TreeNode(var ipAddressBytes: Array[Byte], var netmask: Int, var isIPv4: Bo return fullArray } + def incrementNodeCount(): Unit = { + if (isIPv4) { + ipv4NodeCount += ipv4NodeCount + } else { + ipv6NodeCount += ipv6NodeCount + } + } + // Split a node. Make sure that both children exist and remove the subnet for the current node. def split(): Unit = { if (ipAddressBytes == null) { @@ -183,7 +201,7 @@ class TreeNode(var ipAddressBytes: Array[Byte], var netmask: Int, var isIPv4: Bo } } - def printTree(outStream: PrintStream): Unit = { + def printTree(outStream: PrintStream, tenPercentCount: Int): Unit = { if (ipAddressBytes != null) { outStream.print(ipAddressString()) outStream.print("/") @@ -196,13 +214,24 @@ class TreeNode(var ipAddressBytes: Array[Byte], var netmask: Int, var isIPv4: Bo outStream.print(getEnd().toString()) outStream.print(",") outStream.println(isIPv4.toString) + + var currentNodeCount = if (isIPv4) ipv4NodeOutputCount else ipv6NodeOutputCount + if (currentNodeCount % tenPercentCount == 0) { + print((currentNodeCount * 10 / tenPercentCount).toString + "%..") + } + + if (isIPv4) { + ipv4NodeOutputCount += 1 + } else { + ipv6NodeOutputCount += 1 + } } if (falseChild != null) { - falseChild.printTree(outStream) + falseChild.printTree(outStream, tenPercentCount) } if (trueChild != null) { - trueChild.printTree(outStream) + trueChild.printTree(outStream, tenPercentCount) } } } @@ -305,8 +334,10 @@ def readSubnets(fileName: String, ipv4Root: TreeNode, ipv6Root: TreeNode): Unit var newNode = createTreeNode(line) if (newNode.isIPv4) { ipv4Root.addNode(newNode) + ipv4NodeCount += 1 } else { ipv6Root.addNode(newNode) + ipv6NodeCount += 1 } line = reader.readLine() @@ -318,11 +349,18 @@ def readSubnets(fileName: String, ipv4Root: TreeNode, ipv6Root: TreeNode): Unit def writeSubnets(fileName: String, ipv4Root: TreeNode, ipv6Root: TreeNode): Unit = { var outStream = new PrintStream(fileName) outStream.print(header) - outStream.print(",start,end,ipv4") + outStream.print(",ip_range_start,ip_range_end,ipv4") outStream.print("\r\n") - ipv4Root.printTree(outStream) - ipv6Root.printTree(outStream) + println("Writing IPv4 data") + ipv4NodeOutputCount = 0 + ipv4Root.printTree(outStream, (ipv4NodeCount / 10).floor.toInt) + println() + + println("Writing IPv6 data") + ipv6NodeOutputCount = 0 + ipv6Root.printTree(outStream, (ipv6NodeCount / 10).floor.toInt) + println() outStream.close() } @@ -396,7 +434,7 @@ def cleanAndImport(inputFile: String, outputFile: String, tableName: String): Un println("Done") } -var INPUT_FILE: String = "/Users/normanj/Documents/geoip/geolite2-City.csv" -var OUTPUT_FILE: String = "/Users/normanj/Documents/geoip/test.csv" -var TABLE_NAME: String = "geoip_data" -var result = cleanAndImport(INPUT_FILE, OUTPUT_FILE, TABLE_NAME) +var FILE_PATH_TO_INPUT_CSV: String = "/replace/this/value" +var FILE_PATH_TO_OUTPUT_CSV: String = "/replace/this/value" +var TABLE_NAME: String = null +var result = cleanAndImport(FILE_PATH_TO_INPUT_CSV, FILE_PATH_TO_OUTPUT_CSV, TABLE_NAME) diff --git a/docs/opensearch-geoip.md b/docs/opensearch-geoip.md index e7e288034..60ce9f6aa 100644 --- a/docs/opensearch-geoip.md +++ b/docs/opensearch-geoip.md @@ -28,19 +28,19 @@ The script will cleanup the data by splitting IP address subnets so that an IP a The data is augmented by adding 3 fields. -| Column Name | Description | -|-------------|--------------------------------------------------------------------| -| start | An integer value used to determine if an IP address is in a subnet | -| end | An integer value used to determine if an IP address is in a subnet | -| ipv4 | A boolean value, `true` if the IP address subnet is in IPv4 format | +| Column Name | Description | +|----------------|--------------------------------------------------------------------| +| ip_range_start | An integer value used to determine if an IP address is in a subnet | +| ip_range_end | An integer value used to determine if an IP address is in a subnet | +| ipv4 | A boolean value, `true` if the IP address subnet is in IPv4 format | ## Run the Script 1. Create a copy of the scala file `load_geoip_data.scala` -2. Edit the file +2. Edit the copy of the file `load_geoip_data.scala` 3. There are three variables that need to be updated. - 1. `INPUT_FILE` - the full path to the CSV file to load - 2. `OUTPUT_FILE` - the full path of the CSV file to write the sanitized data to + 1. `FILE_PATH_TO_INPUT_CSV` - the full path to the CSV file to load + 2. `FILE_PATH_TO_OUTPUT_CSV` - the full path of the CSV file to write the sanitized data to 3. `TABLE_NAME` - name of the index to create in OpenSearch. No table is created if this is null 4. Save the file 5. Run the Apache Spark CLI and connect to the database @@ -53,5 +53,38 @@ The data is augmented by adding 3 fields. ## Notes for EMR With EMR it is necessary to load the data from an S3 object. Follow the instructions for -**Run the Script**, but make sure that `TABLE_NAME` is set to `null`. Upload the `OUTPUT_FILE` -to S3. \ No newline at end of file +**Run the Script**, but make sure that `TABLE_NAME` is set to `null`. Upload the +`FILE_PATH_TO_OUTPUT_CSV` to S3. + +## End-to-End + +How to download a sample data GeoIP location data set, clean it up and import it into a +Spark table. + +1. Use a web browser to download the [data set Zip file](https://geoip.maps.opensearch.org/v1/geolite2-city/data/geolite2-city_1732905911000.zip) +2. Extract the Zip file +3. Copy the file `geolite2-City.csv` to the computer where you run `spark-shell` +4. Copy the file file `load_geoip_data.scala` to the computer where you run `spark-shell` +5. Connect to the computer where you run `spark-shell` +6. Change to the directory containing `geolite2-City.csv` and `load_geoip_data.scala` +7. Update the `load_geoip_data.scala` file to specify the CSV files to read and write. Also update + it to specify the Spark table to create (`geo_ip_data` in this case). + ``` + sed -i \ + -e "s#^var FILE_PATH_TO_INPUT_CSV: String =.*#var FILE_PATH_TO_INPUT_CSV: String = \"${PWD}/geolite2-City.csv\"#" \ + load_geoip_data.scala + sed -i \ + -e "s#^var FILE_PATH_TO_OUTPUT_CSV: String = .*#var FILE_PATH_TO_OUTPUT_CSV: String = \"${PWD}/geolite2-City-fixed.csv\"#" \ + load_geoip_data.scala + sed -i \ + -e 's#^var TABLE_NAME: String = .*#var TABLE_NAME: String = "geo_ip_data"#' \ + load_geoip_data.scala + ``` +8. Run `spark-shell` + ``` + spark-shell + ``` +9. Load and run the `load_geoip_data.scala` script + ``` + :load load_geoip_data.scala + ``` From ede075ce7a5773b83be716984d15f808f6a24e36 Mon Sep 17 00:00:00 2001 From: Norman Jordan Date: Mon, 2 Dec 2024 13:14:00 -0800 Subject: [PATCH 3/3] Fixed formatting Signed-off-by: Norman Jordan --- docs/opensearch-geoip.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/opensearch-geoip.md b/docs/opensearch-geoip.md index 60ce9f6aa..cd262e187 100644 --- a/docs/opensearch-geoip.md +++ b/docs/opensearch-geoip.md @@ -38,7 +38,7 @@ The data is augmented by adding 3 fields. 1. Create a copy of the scala file `load_geoip_data.scala` 2. Edit the copy of the file `load_geoip_data.scala` -3. There are three variables that need to be updated. + There are three variables that need to be updated. 1. `FILE_PATH_TO_INPUT_CSV` - the full path to the CSV file to load 2. `FILE_PATH_TO_OUTPUT_CSV` - the full path of the CSV file to write the sanitized data to 3. `TABLE_NAME` - name of the index to create in OpenSearch. No table is created if this is null