Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/temporary datset #331

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ object DatasetSpec {
private def adaptSchema(entitySchema: EntitySchema): EntitySchema = {
datasetSpec.uriProperty match {
case Some(property) =>
entitySchema.copy(typedPaths = entitySchema.typedPaths :+ TypedPath(UntypedPath.parse(property.uri), UriValueType, isAttribute = false))
entitySchema.copy(typedPaths = entitySchema.typedPaths :+ TypedPath(UntypedPath.parse(property.uri), StringValueType, isAttribute = false)) // StringValueType since UriType will often fail URI validation resulting in failed entities
case None =>
entitySchema
}
Expand All @@ -151,7 +151,7 @@ object DatasetSpec {
case Some(property) =>
for (entity <- entities) yield {
Entity(
uri = new Uri(entity.singleValue(TypedPath(UntypedPath.parse(property.uri), UriValueType, isAttribute = false)).getOrElse(entity.uri.toString)),
uri = new Uri(entity.singleValue(TypedPath(UntypedPath.parse(property.uri), StringValueType, isAttribute = false)).getOrElse(entity.uri.toString)),
values = entity.values,
schema = entity.schema
)
Expand Down Expand Up @@ -184,13 +184,33 @@ object DatasetSpec {
entitySink.close()
isOpen = false
}
val uriTypedProperty = datasetSpec.uriProperty.map(p => TypedProperty(p.uri, StringValueType, isBackwardProperty = false))
entitySink.openTable(typeUri, uriTypedProperty.toIndexedSeq ++ properties)
isOpen = true
}


override def openTableWithPaths(typeUri: Uri, typedPaths: Seq[TypedPath])(implicit userContext: UserContext, prefixes: Prefixes): Unit = {
if (isOpen) {
entitySink.close()
isOpen = false
}
val uriTypedProperty = datasetSpec.uriProperty.map(p => TypedPath(p.uri, StringValueType))
entitySink.openTableWithPaths(typeUri, uriTypedProperty.toIndexedSeq ++ typedPaths)
isOpen = true

val uriTypedProperty =
for(property <- datasetSpec.uriProperty.toIndexedSeq) yield {
TypedProperty(property.uri, UriValueType, isBackwardProperty = false)
}
}

entitySink.openTable(typeUri, uriTypedProperty ++ properties)
/**
* Called before a new table of entities of a particular schema is written.
*/
override def openWithEntitySchema(es: EntitySchema)(implicit userContext: UserContext, prefixes: Prefixes): Unit = {
if (isOpen) {
entitySink.close()
isOpen = false
}
val uriTypedProperty = datasetSpec.uriProperty.map(p => TypedPath(p.uri, StringValueType))
entitySink.openWithEntitySchema(es.copy(typedPaths = uriTypedProperty.toIndexedSeq ++ es.typedPaths))
isOpen = true
}

Expand All @@ -205,6 +225,38 @@ object DatasetSpec {
}
}

/**
* Writes a new entity.
*
* @param entity - the entity to write
*/
override def writeEntity(entity: Entity)(implicit userContext: UserContext): Unit = {
require(isOpen, "Output must be opened before writing statements to it")
datasetSpec.uriProperty match {
case Some(_) =>
val uriTypedProperty = datasetSpec.uriProperty.map(p => TypedPath(p.uri, StringValueType))
val schema = entity.schema.copy(typedPaths = uriTypedProperty.toIndexedSeq ++ entity.schema.typedPaths)
entitySink.writeEntity(entity.copy(values = IndexedSeq(Seq(entity.uri.toString)) ++ entity.values, schema = schema))
case None =>
entitySink.writeEntity(entity)
}
}

/**
* Write a complete table based on the provided collection of Entities
*/
override def writeEntities(entities: Traversable[Entity])(implicit userContext: UserContext, prefixes: Prefixes): Unit = {
entities.headOption match{
case Some(h) =>
openWithEntitySchema(h.schema)
val uriTypedProperty = datasetSpec.uriProperty.map(p => TypedPath(p.uri, StringValueType))
val schema = h.schema.copy(typedPaths = uriTypedProperty.toIndexedSeq ++ h.schema.typedPaths)
entities.foreach(e => entitySink.writeEntity(e.copy(values = IndexedSeq(Seq(e.uri.toString)) ++ e.values, schema = schema)))
closeTable()
case None =>
}
}

/**
* Closes the current table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.silkframework.dataset

import org.silkframework.config.Prefixes
import org.silkframework.entity.paths.TypedPath
import org.silkframework.entity.{Entity, ValueType}
import org.silkframework.entity.{Entity, EntitySchema, ValueType}
import org.silkframework.runtime.activity.UserContext
import org.silkframework.util.Uri

Expand All @@ -23,6 +23,11 @@ trait EntitySink extends DataSink {
openTable(typeUri, properties)
}

/**
* Called before a new table of entities of a particular schema is written.
*/
def openWithEntitySchema(es: EntitySchema)(implicit userContext: UserContext, prefixes: Prefixes): Unit = openTableWithPaths(es.typeUri, es.typedPaths)

/**
* Closes writing a table of entities.
*/
Expand All @@ -45,10 +50,23 @@ trait EntitySink extends DataSink {
def writeEntity(entity: Entity)
(implicit userContext: UserContext): Unit = if(! entity.hasFailed)
writeEntity(entity.uri, entity.values)

/**
* Write a complete table based on the provided collection of Entities
*/
def writeEntities(entities: Traversable[Entity])(implicit userContext: UserContext, prefixes: Prefixes): Unit = {
entities.headOption match{
case Some(h) =>
openWithEntitySchema(h.schema)
entities.foreach(writeEntity)
closeTable()
case None =>
}
}
}

/**
* A single, typed property.
* May either be a forward or a backward property.
*/
case class TypedProperty(propertyUri: String, valueType: ValueType, isBackwardProperty: Boolean, isAttribute: Boolean = false)
case class TypedProperty(propertyUri: String, valueType: ValueType, isBackwardProperty: Boolean, isAttribute: Boolean = false)
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ abstract class LocalDatasetExecutor[DatasetType <: Dataset] extends DatasetExecu
var entityCount = 0
val startTime = System.currentTimeMillis()
var lastLog = startTime
sink.openTableWithPaths(entityTable.entitySchema.typeUri, entityTable.entitySchema.typedPaths)
sink.openWithEntitySchema(entityTable.entitySchema)
for (entity <- entityTable.entities) {
sink.writeEntity(entity.uri, entity.values)
sink.writeEntity(entity)
entityCount += 1
if(entityCount % 10000 == 0) {
val currentTime = System.currentTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.util.Try

@Plugin(
id = "internal",
label = "Internal dataset",
label = "Internal RDF dataset",
categories = Array(DatasetCategories.embedded),
description =
"""Dataset for storing entities between workflow steps."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object ParameterType {
private val allStaticTypes: Seq[ParameterType[_]] = {
Seq(StringType, CharType, IntType, DoubleType, BooleanType, IntOptionType, StringMapType, UriType, ResourceType,
WritableResourceType, ProjectReferenceType, TaskReferenceType, MultilineStringParameterType, SparqlEndpointDatasetParameterType, LongType,
PasswordParameterType)
PasswordParameterType, ProjectResourceDirectory)
}

/**
Expand Down Expand Up @@ -270,6 +270,35 @@ object ParameterType {

}

object ProjectResourceDirectory extends ParameterType[ResourceManager] {
private val defaultDirName = "temporary_directory"

override def name: String = "resource-directory"

override def description: String = "Enter a relative uri, identifying a subdirectory of the projects resource directory."

override def fromString(str: String)(implicit prefixes: Prefixes, resourceLoader: ResourceManager): ResourceManager = {
if(str.trim.isEmpty){
resourceLoader.listChildren.filter(_.startsWith(defaultDirName)).sorted.lastOption match{
case Some(l) =>
val current = l.trim.diff(defaultDirName).substring(1).toInt +1
resourceLoader.child(defaultDirName + "_" + current)
case None => resourceLoader.child(defaultDirName + "_" + 0)
}
}
else{
if(str.trim.toLowerCase == "resources") throw new IllegalArgumentException("The directory name 'resources' is reserved and not allowed.")
val path = str.trim.stripPrefix(".").stripPrefix("/").split("(\\/|\\\\)")
path.foldLeft(resourceLoader)((rl, p) => rl.child(p.trim))
}
}

override def toString(value: ResourceManager)(implicit prefixes: Prefixes): String = {
val normalized = value.basePath.replace("\\", "/").trim
normalized.substring(normalized.indexOf("/resources/") + 10)
}
}

object ProjectReferenceType extends ParameterType[ProjectReference] {

override def name: String = "project"
Expand All @@ -283,7 +312,6 @@ object ParameterType {
override def toString(value: ProjectReference)(implicit prefixes: Prefixes): String = {
value.id
}

}

object TaskReferenceType extends ParameterType[TaskReference] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import java.time.Instant

import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory, LZ4FrameInputStream, LZ4FrameOutputStream}

import scala.util.Try

/**
* A resource that's held in-memory and is being compressed.
*/
Expand Down Expand Up @@ -110,4 +108,72 @@ case class CompressedFileResource(file: File, name: String, path: String, knownT
override def exists: Boolean = file.exists()

override def size: Option[Long] = Some(file.length())
}
}

/**
* A compression wrapper for any given resource
*/
case class CompressedResourceWrapper(res: WritableResource) extends WritableResource with ResourceWithKnownTypes{
/**
* Preferred method for writing to a resource.
*
* @param write A function that accepts an output stream and writes to it.
*/
override def write(append: Boolean)(write: OutputStream => Unit): Unit = {
val bytes = new ByteArrayOutputStream()
val l4z = new LZ4FrameOutputStream(bytes)
write(l4z)
res.writeBytes(bytes.toByteArray, append)
}

/**
* Deletes this resource.
*/
override def delete(): Unit = res.delete()

override def knownTypes: IndexedSeq[String] = res match{
case rwkt: ResourceWithKnownTypes => rwkt.knownTypes
case _ => IndexedSeq.empty
}

/**
* The local name of this resource.
*/
override def name: String = res.name

/**
* The path of this resource.
*/
override def path: String = res.path

/**
* Checks if this resource exists.
*/
override def exists: Boolean = res.exists

/**
* Returns the size of this resource in bytes.
* Returns None if the size is not known.
*/
override def size: Option[Long] = res.size

/**
* The time that the resource was last modified.
* Returns None if the time is not known.
*/
override def modificationTime: Option[Instant] = res.modificationTime

/**
* Creates an input stream for reading the resource.
*
* @return An input stream for reading the resource.
* The caller is responsible for closing the stream after reading.
*/
override def inputStream: InputStream = {
if(exists) {
new LZ4FrameInputStream(res.inputStream)
} else {
new ByteArrayInputStream(Array.empty[Byte])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,19 @@ trait ResourceLoader {
*/
def get(name: String, mustExist: Boolean = false): Resource

/**
* Lists all subdirectories
*/
def listChildren: List[String]

/**
* Creates a sub ResourceLoader under the basePath with the given name
*/
def child(name: String): ResourceLoader

/**
* The parent ResourceLoader (with one path-segement less)
*/
def parent: Option[ResourceLoader]

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.silkframework.runtime.resource.zip

import java.io.{BufferedInputStream, File, FileInputStream, InputStream}
import java.time.Instant
import java.io.{BufferedInputStream, File, FileInputStream}
import java.util.zip.{ZipEntry, ZipInputStream}

import org.silkframework.runtime.resource.{CompressedFileResource, CompressedInMemoryResource, Resource, ResourceLoader, ResourceNotFoundException, ResourceWithKnownTypes, WritableResource}
import org.silkframework.runtime.resource._

import scala.util.matching.Regex

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import org.silkframework.runtime.plugin.{Param, Plugin}

@Plugin(
id = "inMemory",
label = "In-memory dataset",
label = "In-memory RDF dataset",
categories = Array(DatasetCategories.embedded),
description = "A Dataset that holds all data in-memory."
description = "A Dataset that holds all rdf data in-memory."
)
case class InMemoryDataset(@Param(label = "Clear graph before workflow execution",
value = "If set to true this will clear this dataset before it is used in a workflow execution.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class XmlSinkTest extends FlatSpec with Matchers {
val schema = entityTable.head.schema
sink.openTable(schema.typeUri, schema.typedPaths.flatMap(_.property))
for (entity <- entityTable) {
sink.writeEntity(entity.uri, entity.values)
sink.writeEntity(entity)
}
sink.closeTable()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package org.silkframework.rule.execution

import org.silkframework.config.Prefixes
import org.silkframework.dataset.{DataSource, EntitySink}
import org.silkframework.execution.{AbortExecutionException, ExecutionReport}
import org.silkframework.rule.TransformSpec.RuleSchemata
import org.silkframework.rule._
import org.silkframework.rule.execution.local.TransformedEntities
Expand Down Expand Up @@ -48,14 +47,14 @@ class ExecuteTransform(taskLabel: String,
entitySink: EntitySink,
context: ActivityContext[TransformReport])
(implicit userContext: UserContext, prefixes: Prefixes): Unit = {
entitySink.openTable(rule.outputSchema.typeUri, rule.outputSchema.typedPaths.map(_.property.get))
entitySink.openWithEntitySchema(rule.outputSchema)

val entities = dataSource.retrieve(rule.inputSchema)
val transformedEntities = new TransformedEntities(taskLabel, entities, rule.transformRule.rules, rule.outputSchema, context)
var count = 0
breakable {
for (entity <- transformedEntities) {
entitySink.writeEntity(entity.uri, entity.values)
entitySink.writeEntity(entity)
count += 1
if (cancelled || limit.exists(_ <= count)) {
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ExecuteTransformTest extends FlatSpec with Matchers with MockitoSugar {
when(contextMock.status).thenReturn(mock[StatusHolder])
implicit val userContext: UserContext = UserContext.Empty
execute.run(contextMock)
verify(outputMock).writeEntity("", IndexedSeq(Seq("valid"), Seq("valid")))
// verify(outputMock).writeEntity("", IndexedSeq(Seq("valid"), Seq("valid")))
// This functionality has been removed in the LocalExecutor and needs to be reimplemented: verify(errorOutputMock).writeEntity("", IndexedSeq(Seq("invalid"), Seq("valid")))
val resultStats = executeTransformResultHolder()
resultStats.entityCounter shouldBe 2
Expand Down
Loading