Skip to content
This repository has been archived by the owner on Apr 27, 2018. It is now read-only.

Issues with serialization on persistance #227

Open
bzz opened this issue May 7, 2016 · 2 comments
Open

Issues with serialization on persistance #227

bzz opened this issue May 7, 2016 · 2 comments

Comments

@bzz
Copy link

bzz commented May 7, 2016

Warcbase is awesome and I'm trying to use latest version on local Spark instance it to peek into .warc files.

AFAIK that should work after #160 but I'm having few issues, especially in case if results are saved using Spark RDD persistence facilities like persist() using serialization, on both disk and memory.

Do you guys have any troubles using persistence\serialization with Spark 1.6 or is that something relevant only to my local environment? Please advise.

  1. no kryo, no cache

    After Propagate Spark serializers to within data loading API #186 I was under impression that Kryo it's no longer mandatory, but

    val r = RecordLoader.loadArchives(localPath, sc)
    r.takeI(1) foreach println

    for me results in

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.archive.io.warc.WARCRecord
    
  2. kryo, no cache

    If I turn Kryo on, same code results in

    org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
    Serialization trace:
    ISO8601 (org.warcbase.spark.archive.io.GenericArchiveRecord)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    

    This is quite easy to fix - it works for me, if non-serializable SimpleDataFormaters are removed. Please let me know if you are interested in PR fixing it.

  3. kryo, cache

    Adding r.persist(StorageLevel.DISK_ONLY) to the code above results something cryptic, but in a way similar to 1

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
    Serialization trace:
    classes (sun.misc.Launcher$AppClassLoader)
    classloader (java.security.ProtectionDomain)
    context (java.security.AccessControlContext)
    acc (org.apache.spark.sql.hive.client.NonClosableMutableURLClassLoader)
    classLoader (org.apache.hadoop.hive.conf.HiveConf)
    conf (org.apache.hadoop.fs.LocalFileSystem)
    fs (org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream)
    in (java.io.BufferedInputStream)
    in (com.google.common.io.CountingInputStream)
    in (org.archive.io.warc.WARCRecord)
    warcRecord (org.warcbase.spark.archive.io.GenericArchiveRecord)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    

    It looks the similar as Some Extractors fail with ConcurrentModificationException dbpedia/distributed-extraction-framework#9 and presumably has something to do with registering serializable classes...

@jrwiebe
Copy link
Collaborator

jrwiebe commented May 9, 2016

Thanks for this. I don't believe any of us active contributors have been very focused on performance tuning, and thus we haven't really spent time ensuring capabilities like persistence are working properly.

Re (2), you're welcome to submit a pull request that replaces the SimpleDateFormat calls with a serializable equivalent. Your other two use scenarios will require some more examination.

I probably won't have time to look into your other use scenarios for a while. I'm tagging @aliceranzhou here, since she worked on this before (realizing she's probably busy with other things these days). Also @yb1.

@lintool
Copy link
Owner

lintool commented May 11, 2016

Thanks for your positive feedback.
I think it'll work if you do this:

val r = RecordLoader.loadArchives("src/test/resources/arc/example.arc.gz", sc)
  .keepValidPages()
  .map(r => ExtractDomain(r.getUrl))
  .take(1)

I.e., extract the fields that you want out of the WARC records... Will that do it for you?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants