Skip to content

Commit

Permalink
Make compressionAlgo and region split policy type configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
shydefoo committed Sep 26, 2024
1 parent fbc5c3f commit 536deb9
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ object BasePipeline {
conf
.set("spark.bigtable.projectId", projectId)
.set("spark.bigtable.instanceId", instanceId)
case HBaseConfig(zookeeperQuorum, zookeeperPort) =>
case HBaseConfig(zookeeperQuorum, zookeeperPort, hbaseProperties) =>
conf
.set("spark.hbase.zookeeper.quorum", zookeeperQuorum)
.set("spark.hbase.zookeeper.port", zookeeperPort.toString)
.set("spark.hbase.properties.regionSplitPolicyClassName", hbaseProperties.regionSplitPolicy)
.set("spark.hbase.properties.compressionAlgorithm", hbaseProperties.compressionAlgorithm)
}

jobConfig.metrics match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ case class RedisWriteProperties(
ratePerSecondLimit: Int = 50000
)
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int) extends StoreConfig
case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int, hbaseProperties: HBaseProperties = HBaseProperties()) extends StoreConfig
case class HBaseProperties(regionSplitPolicy: String = "org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy", compressionAlgorithm: String = "ZSTD")

sealed trait MetricConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class BigTableSinkRelation(
featuresCFBuilder.setMaxVersions(1)
val featuresCF = featuresCFBuilder.build()

// TODO: Set compression type for column family
val tdb = TableDescriptorBuilder.newBuilder(table)

if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,18 @@ class HbaseSinkRelation(
featuresCFBuilder.setTimeToLive(config.maxAge.toInt)
}
featuresCFBuilder.setMaxVersions(1)
featuresCFBuilder.setCompressionType(Compression.Algorithm.ZSTD)
sqlContext.getConf("spark.hbase.properties.compressionAlgorithm") match {
case "ZSTD" => featuresCFBuilder.setCompressionType(Compression.Algorithm.ZSTD)
case "GZ" => featuresCFBuilder.setCompressionType(Compression.Algorithm.GZ)
case "LZ4" => featuresCFBuilder.setCompressionType(Compression.Algorithm.LZ4)
case "SNAPPY" => featuresCFBuilder.setCompressionType(Compression.Algorithm.SNAPPY)
case _ => featuresCFBuilder.setCompressionType(Compression.Algorithm.NONE)
}
val featuresCF = featuresCFBuilder.build()

val tdb = TableDescriptorBuilder.newBuilder(table)
// TODO: make this configurable
tdb.setRegionSplitPolicyClassName(
"org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy"
sqlContext.getConf("spark.hbase.properties.regionSplitPolicyClassName")
)

if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) {
Expand All @@ -66,6 +71,7 @@ class HbaseSinkRelation(
tdb.modifyColumnFamily(featuresCF)
admin.modifyTable(tdb.build())
}

} finally {
hbaseConn.close()
}
Expand Down

0 comments on commit 536deb9

Please sign in to comment.