diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala index 9fd6322..2c29e3f 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala @@ -33,10 +33,12 @@ object BasePipeline { conf .set("spark.bigtable.projectId", projectId) .set("spark.bigtable.instanceId", instanceId) - case HBaseConfig(zookeeperQuorum, zookeeperPort) => + case HBaseConfig(zookeeperQuorum, zookeeperPort, hbaseProperties) => conf .set("spark.hbase.zookeeper.quorum", zookeeperQuorum) .set("spark.hbase.zookeeper.port", zookeeperPort.toString) + .set("spark.hbase.properties.regionSplitPolicyClassName", hbaseProperties.regionSplitPolicy) + .set("spark.hbase.properties.compressionAlgorithm", hbaseProperties.compressionAlgorithm) } jobConfig.metrics match { diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala index cae6053..f4eb803 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala @@ -27,7 +27,8 @@ case class RedisWriteProperties( ratePerSecondLimit: Int = 50000 ) case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig -case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int) extends StoreConfig +case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int, hbaseProperties: HBaseProperties = HBaseProperties()) extends StoreConfig +case class HBaseProperties(regionSplitPolicy: String = "org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy", compressionAlgorithm: String = "ZSTD") sealed trait MetricConfig diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index a428a5d..a5049d3 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -62,7 +62,6 @@ class BigTableSinkRelation( featuresCFBuilder.setMaxVersions(1) val featuresCF = featuresCFBuilder.build() - // TODO: Set compression type for column family val tdb = TableDescriptorBuilder.newBuilder(table) if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala index 4d0a186..82e3d44 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala @@ -41,13 +41,18 @@ class HbaseSinkRelation( featuresCFBuilder.setTimeToLive(config.maxAge.toInt) } featuresCFBuilder.setMaxVersions(1) - featuresCFBuilder.setCompressionType(Compression.Algorithm.ZSTD) + sqlContext.getConf("spark.hbase.properties.compressionAlgorithm") match { + case "ZSTD" => featuresCFBuilder.setCompressionType(Compression.Algorithm.ZSTD) + case "GZ" => featuresCFBuilder.setCompressionType(Compression.Algorithm.GZ) + case "LZ4" => featuresCFBuilder.setCompressionType(Compression.Algorithm.LZ4) + case "SNAPPY" => featuresCFBuilder.setCompressionType(Compression.Algorithm.SNAPPY) + case _ => featuresCFBuilder.setCompressionType(Compression.Algorithm.NONE) + } val featuresCF = featuresCFBuilder.build() val tdb = TableDescriptorBuilder.newBuilder(table) - // TODO: make this configurable tdb.setRegionSplitPolicyClassName( - "org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy" + sqlContext.getConf("spark.hbase.properties.regionSplitPolicyClassName") ) if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { @@ -66,6 +71,7 @@ class HbaseSinkRelation( tdb.modifyColumnFamily(featuresCF) admin.modifyTable(tdb.build()) } + } finally { hbaseConn.close() }