Skip to content

Commit

Permalink
adaptive partition sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
dszakallas committed Apr 11, 2021
1 parent 5b3de7e commit 3c3af90
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void run() {
for (int i = 0; i < numPartitions; ++i) {
staticSerializer[i] = new StaticSerializer();
staticSerializer[i].initialize(
fs, conf.getOutputDir(), i,
fs, conf.getOutputDir(), i, 1.0,
false
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public class HdfsCsvWriter extends HdfsWriter {
private String separator;
private StringBuffer buffer;

public HdfsCsvWriter(FileSystem fs, String outputDir, String prefix, int numPartitions, boolean compressed, String separator) throws IOException {
super(fs, outputDir, prefix, numPartitions, compressed, "csv");
public HdfsCsvWriter(FileSystem fs, String outputDir, String prefix, int numFiles, boolean compressed, String separator) throws IOException {
super(fs, outputDir, prefix, numFiles, compressed, "csv");
this.separator = separator;
this.buffer = new StringBuffer(2048);
}
Expand Down
66 changes: 26 additions & 40 deletions src/main/java/ldbc/snb/datagen/serializer/FileName.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,40 @@
public enum FileName {

// static
TAG("Tag"),
TAG_HASTYPE_TAGCLASS("Tag_hasType_TagClass"),
TAGCLASS("TagClass"),
TAGCLASS_ISSUBCLASSOF_TAGCLASS("TagClass_isSubclassOf_TagClass"),
PLACE("Place"),
PLACE_ISPARTOF_PLACE("Place_isPartOf_Place"),
ORGANISATION("Organisation"),
ORGANISATION_ISLOCATEDIN_PLACE("Organisation_isLocatedIn_Place"),
TAG("Tag", 1.0),
TAG_HASTYPE_TAGCLASS("Tag_hasType_TagClass", 1.0),
TAGCLASS("TagClass", 1.0),
TAGCLASS_ISSUBCLASSOF_TAGCLASS("TagClass_isSubclassOf_TagClass", 1.0),
PLACE("Place", 1.0),
PLACE_ISPARTOF_PLACE("Place_isPartOf_Place", 1.0),
ORGANISATION("Organisation", 1.0),
ORGANISATION_ISLOCATEDIN_PLACE("Organisation_isLocatedIn_Place", 1.0),

// dynamic activity
FORUM("Forum"),
FORUM_CONTAINEROF_POST("Forum_containerOf_Post"),
FORUM_HASMEMBER_PERSON("Forum_hasMember_Person"),
FORUM_HASMODERATOR_PERSON("Forum_hasModerator_Person"),
FORUM_HASTAG_TAG("Forum_hasTag_Tag"),
PERSON_LIKES_POST("Person_likes_Post"),
PERSON_LIKES_COMMENT("Person_likes_Comment"),
POST("Post"),
POST_HASCREATOR_PERSON("Post_hasCreator_Person"),
POST_HASTAG_TAG("Post_hasTag_Tag"),
POST_ISLOCATEDIN_COUNTRY("Post_isLocatedIn_Country"),
COMMENT("Comment"),
COMMENT_HASCREATOR_PERSON("Comment_hasCreator_Person"),
COMMENT_HASTAG_TAG("Comment_hasTag_Tag"),
COMMENT_ISLOCATEDIN_COUNTRY("Comment_isLocatedIn_Country"),
COMMENT_REPLYOF_POST("Comment_replyOf_Post"),
COMMENT_REPLYOF_COMMENT("Comment_replyOf_Comment"),
FORUM("Forum", 5.13),
FORUM_HASMEMBER_PERSON("Forum_hasMember_Person", 384.06),
FORUM_HASTAG_TAG("Forum_hasTag_Tag", 11.10),
PERSON_LIKES_POST("Person_likes_Post", 141.12),
PERSON_LIKES_COMMENT("Person_likes_Comment", 325.31),
POST("Post", 138.61),
POST_HASTAG_TAG("Post_hasTag_Tag", 77.34),
COMMENT("Comment", 503.70),
COMMENT_HASTAG_TAG("Comment_hasTag_Tag", 295.20),

// dynamic person
PERSON("Person"),
PERSON_SPEAKS_LANGUAGE("Person_speaks_language"),
PERSON_EMAIL_EMAILADDRESS("Person_email_emailaddress"),
PERSON_ISLOCATEDIN_CITY("Person_isLocatedIn_City"),
PERSON_HASINTEREST_TAG("Person_hasInterest_Tag"),
PERSON_WORKAT_COMPANY("Person_workAt_Company"),
PERSON_STUDYAT_UNIVERSITY("Person_studyAt_University"),
PERSON_KNOWS_PERSON("Person_knows_Person"),
PERSON("Person", 1.0),
PERSON_HASINTEREST_TAG("Person_hasInterest_Tag", 7.89),
PERSON_WORKAT_COMPANY("Person_workAt_Company", 0.77),
PERSON_STUDYAT_UNIVERSITY("Person_studyAt_University", 0.28),
PERSON_KNOWS_PERSON("Person_knows_Person", 26.11),

;

private final String name;
public final String name;
public final double size;

FileName(String name) {
FileName(String name, double size) {
this.name = name;
this.size = size;
}

public String toString() {
return name;
}

}
10 changes: 5 additions & 5 deletions src/main/java/ldbc/snb/datagen/serializer/LdbcSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ public Map<FileName, HdfsCsvWriter> initialize(
FileSystem fs,
String outputDir,
int reducerId,
double oversizeFactor,
boolean isCompressed,
boolean dynamic,
List<FileName> fileNames
) throws IOException {

Map<FileName, HdfsCsvWriter> writers = new HashMap<>();
for (FileName f : fileNames) {
writers.put(f, new HdfsCsvWriter(
fs,
outputDir + "/csv/raw/composite-merged-fk" + (dynamic ? "/dynamic/" : "/static/") + f.toString() + "/",
outputDir + "/csv/raw/composite-merged-fk" + (dynamic ? "/dynamic/" : "/static/") + f.name + "/",
String.valueOf(reducerId),
DatagenParams.numUpdateStreams,
(int)Math.ceil(f.size / oversizeFactor),
isCompressed,
"|"
)
Expand All @@ -44,8 +44,8 @@ public Map<FileName, HdfsCsvWriter> initialize(
return writers;
}

public void initialize(FileSystem fs, String outputDir, int reducerId, boolean isCompressed) throws IOException {
writers = initialize(fs, outputDir, reducerId, isCompressed, isDynamic(), getFileNames());
public void initialize(FileSystem fs, String outputDir, int reducerId, double oversizeFactor, boolean isCompressed) throws IOException {
writers = initialize(fs, outputDir, reducerId, oversizeFactor, isCompressed, isDynamic(), getFileNames());
writeFileHeaders();
this.dateFormatter = new DateFormatter();
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/ldbc/snb/datagen/serializer/Serializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Map<FileName, THDFSWriter> initialize(
FileSystem fs,
String outputDir,
int reducerId,
double oversizeFactor,
boolean isCompressed,
boolean dynamic,
List<FileName> fileNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import java.net.URI
object GenerationStage extends SparkApp with Logging {
override def appName: String = "LDBC SNB Datagen for Spark: Generation Stage"

val optimalPersonsPerFile = 500000

case class Args(
scaleFactor: String = "1",
numThreads: Option[Int] = None,
Expand All @@ -24,6 +26,9 @@ object GenerationStage extends SparkApp with Logging {

def run(config: GeneratorConfiguration)(implicit spark: SparkSession) = {
val numPartitions = config.getInt("hadoop.numThreads", spark.sparkContext.defaultParallelism)
val idealPartitions = DatagenParams.numPersons.toDouble / optimalPersonsPerFile

val oversizeFactor = Math.max(numPartitions / idealPartitions, 1.0)

val persons = SparkPersonGenerator(config)

Expand All @@ -43,11 +48,11 @@ object GenerationStage extends SparkApp with Logging {
val merged = SparkKnowsMerger(uniKnows, interestKnows, randomKnows).cache()

SparkUI.job(simpleNameOf[SparkActivitySerializer.type], "serialize person activities") {
SparkActivitySerializer(merged, randomRanker, config, Some(numPartitions))
SparkActivitySerializer(merged, randomRanker, config, Some(numPartitions), oversizeFactor)
}

SparkUI.job(simpleNameOf[SparkPersonSerializer.type ], "serialize persons") {
SparkPersonSerializer(merged, config, Some(numPartitions))
SparkPersonSerializer(merged, config, Some(numPartitions), oversizeFactor)
}

SparkUI.job(simpleNameOf[SparkStaticGraphSerializer.type], "serialize static graph") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.collection.JavaConverters._

object SparkActivitySerializer {

def apply(persons: RDD[Person], ranker: SparkRanker, conf: GeneratorConfiguration, partitions: Option[Int] = None)(implicit spark: SparkSession) = {
def apply(persons: RDD[Person], ranker: SparkRanker, conf: GeneratorConfiguration, partitions: Option[Int] = None, oversizeFactor: Double = 1.0)(implicit spark: SparkSession) = {

val blockSize = DatagenParams.blockSize
val blocks = ranker(persons)
Expand All @@ -41,7 +41,7 @@ object SparkActivitySerializer {

val dynamicActivitySerializer = new DynamicActivitySerializer()

dynamicActivitySerializer.initialize(fs, conf.getOutputDir, partitionId, false)
dynamicActivitySerializer.initialize(fs, conf.getOutputDir, partitionId, oversizeFactor, false)

val generator = new PersonActivityGenerator
val exporter = new PersonActivityExporter(dynamicActivitySerializer, generator.getFactorTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ object SparkPersonSerializer {
def apply(
persons: RDD[Person],
conf: GeneratorConfiguration,
partitions: Option[Int] = None
partitions: Option[Int] = None,
oversizeFactor: Double = 1.0
)(implicit spark: SparkSession): Unit = {
val serializableHadoopConf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration)

Expand All @@ -34,6 +35,7 @@ object SparkPersonSerializer {
fs,
conf.getOutputDir,
partitionId,
oversizeFactor,
false
)

Expand Down
38 changes: 33 additions & 5 deletions tools/emr/submit_datagen_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
defaults = {
'bucket': 'ldbc-snb-datagen-store',
'use_spot': False,
'master_instance_type': 'm5ad.2xlarge',
'instance_type': 'r5ad.4xlarge',
'master_instance_type': 'm5a.2xlarge',
'instance_type': 'r5a.2xlarge',
'version': lib.version,
'az': 'us-west-2c',
'is_interactive': False,
Expand Down Expand Up @@ -75,7 +75,8 @@ def submit_datagen_job(name, sf,
emr_release=defaults['emr_release'],
is_interactive=defaults['is_interactive'],
ec2_key=defaults['ec2_key'],
passthrough_args=None
passthrough_args=None,
conf=None
):

exec_info = get_instance_info(instance_type)
Expand All @@ -96,7 +97,8 @@ def submit_datagen_job(name, sf,

spark_config = {
'maximizeResourceAllocation': 'true',
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
**(conf if conf else {})
}

hdfs_prefix = '/ldbc_snb_datagen'
Expand Down Expand Up @@ -184,6 +186,23 @@ def submit_datagen_job(name, sf,

emr.run_job_flow(**job_flow_args)

def parse_var(s):
items = s.split('=')
key = items[0].strip() # we remove blanks around keys, as is logical
if len(items) > 1:
# rejoin the rest:
value = '='.join(items[1:])
return (key, value)


def parse_vars(items):
d = {}
if items:
for item in items:
key, value = parse_var(item)
d[key] = value
return d


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Submit a Datagen job to EMR')
Expand Down Expand Up @@ -216,12 +235,20 @@ def submit_datagen_job(name, sf,
parser.add_argument('-y',
action='store_true',
help='Assume \'yes\' for prompts')
parser.add_argument("--conf",
metavar="KEY=VALUE",
nargs='+',
help="SparkConf as key=value pairs")

parser.add_argument('--', nargs='*', help='Arguments passed to LDBC SNB Datagen', dest="arg")


self_args, child_args = split_passthrough_args()

args = parser.parse_args(self_args)

conf = parse_vars(args.conf)

is_interactive = hasattr(__main__, '__file__')

submit_datagen_job(args.name, args.sf,
Expand All @@ -231,5 +258,6 @@ def submit_datagen_job(name, sf,
emr_release=args.emr_release,
ec2_key=args.ec2_key,
version=args.version,
passthrough_args=child_args
passthrough_args=child_args,
conf=conf
)

0 comments on commit 3c3af90

Please sign in to comment.