Skip to content

Commit

Permalink
2.0.0: Bump quasar dependency to 55.0.0
Browse files Browse the repository at this point in the history
(Merge branch 'jsantos17-bump-quasar-55')

Depends on #76
  • Loading branch information
rintcius committed Jul 25, 2018
2 parents 192e759 + 9a37f72 commit afdf48f
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 99 deletions.
24 changes: 14 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ in `.targets/datasource/scala-2.12/quasar-s3-<version>-explode.tar.gz`

## Configuration

You can create a new S3 datasource after you've loaded the plugin into
You can create a new S3 datasource after you've loaded this plugin into
Quasar. Refer to the previous section for instructions on how to do
that. In order to create a datasource, you will need to send a PUT
request to `/datasource/<your datasource name>` including a JSON
document specifiying the datasource's configuration. An example of a
JSON configuration to create a datasource that parses line-delimited JSON:
that. In order to create a datasource, you will need to send a POST
request to `/datasource` including a JSON
document specifiying the datasource's configuration. The format of the
JSON document can be found in [`slamdata-backend`'s
documentation.](https://github.com/slamdata/slamdata-backend#applicationvndslamdatadatasource).

The connector-specific configuration needs to specify at least a
bucket URI and the JSON parsing to use when decoding JSON files stored
in S3. An example of a JSON configuration to create a datasource that
parses line-delimited JSON:

```json
{
Expand All @@ -29,7 +35,7 @@ JSON configuration to create a datasource that parses line-delimited JSON:
}
```

As another example, this is the JSON configuration to parse array
As another example, this is a JSON configuration to parse array
JSON:

```json
Expand All @@ -39,12 +45,10 @@ JSON:
}
```

You also need to specify a `Content-Type` header with
information regarding this datasource. For example, to use version 1
of this datasource you may specify:
Along with the request, you also need to specify a `Content-Type` header:

```
Content-Type: application/vnd.slamdata.datasource.s3; version="1"
Content-Type: application/vnd.slamdata.datasource"
```

### Secure buckets
Expand Down
57 changes: 24 additions & 33 deletions datasource/src/main/scala/quasar/physical/s3/S3DataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,73 +16,68 @@

package quasar.physical.s3

import quasar.Data
import quasar.api.ResourceError
import quasar.api.ResourceError.{CommonError, ReadError}
import quasar.api.ResourcePath.{Leaf, Root}
import quasar.api.datasource.DatasourceType
import quasar.api.{ResourceName, ResourcePath, ResourcePathType}
import quasar.common.data.Data
import quasar.common.resource.MonadResourceErr
import quasar.common.resource.ResourcePath.{Leaf, Root}
import quasar.common.resource.{ResourceName, ResourcePath, ResourcePathType}
import quasar.connector.datasource.LightweightDatasource
import quasar.contrib.cats.effect._
import quasar.contrib.pathy.APath

import scala.concurrent.duration.SECONDS
import slamdata.Predef.{Stream => _, _}

import java.time.{ZoneOffset, LocalDateTime}

import cats.arrow.FunctionK
import cats.effect.{Effect, Async, Timer}
import cats.effect.{Effect, Timer}
import cats.syntax.flatMap._
import cats.syntax.option._
import fs2.Stream
import org.http4s.{Request, Header, Headers}
import org.http4s.client.Client
import pathy.Path
import pathy.Path.{DirName, FileName}
import scalaz.syntax.applicative._
import scalaz.syntax.either._
import scalaz.{\/, \/-, -\/}
import scalaz.{\/-, -\/}
import shims._

final class S3DataSource[F[_]: Effect: Timer, G[_]: Async](
final class S3DataSource[F[_]: Effect: Timer: MonadResourceErr](
client: Client[F],
config: S3Config)
extends LightweightDatasource[F, Stream[G, ?], Stream[G, Data]] {
extends LightweightDatasource[F, Stream[F, ?], Stream[F, Data]] {
def kind: DatasourceType = s3.datasourceKind

val shutdown: F[Unit] = client.shutdown

def evaluate(path: ResourcePath): F[ReadError \/ Stream[G, Data]] =
def evaluate(path: ResourcePath): F[Stream[F, Data]] =
path match {
case Root =>
ResourceError.notAResource(path).left.point[F]
Stream.empty.covaryAll[F, Data].pure[F]
case Leaf(file) =>
impl.evaluate[F](config.parsing, client, config.bucket, file, S3DataSource.signRequest(config)) map {
case None => (ResourceError.pathNotFound(Leaf(file)): ReadError).left
/* In http4s, the type of streaming results is the same as
every other effectful operation. However,
LightweightDatasourceModule forces us to separate the types,
so we need to translate */
case Some(s) => s.translate[G](FToG).right[ReadError]
}
case None => Stream.empty
/* In http4s, the type of streaming results is the same as
every other effectful operation. However,
LightweightDatasourceModule forces us to separate the types,
so we need to translate */
case Some(s) => s
}
}

def children(path: ResourcePath): F[CommonError \/ Stream[G, (ResourceName, ResourcePathType)]] =
def prefixedChildPaths(path: ResourcePath): F[Option[Stream[F, (ResourceName, ResourcePathType)]]] =
impl.children(
client,
config.bucket,
dropEmpty(path.toPath),
S3DataSource.signRequest(config)) map {
case None =>
ResourceError.pathNotFound(path).left[Stream[G, (ResourceName, ResourcePathType)]]
none[Stream[F, (ResourceName, ResourcePathType)]]
case Some(paths) =>
paths.map {
case -\/(Path.DirName(dn)) => (ResourceName(dn), ResourcePathType.ResourcePrefix)
case \/-(Path.FileName(fn)) => (ResourceName(fn), ResourcePathType.Resource)
}.translate[G](FToG).right[CommonError]
case -\/(Path.DirName(dn)) => (ResourceName(dn), ResourcePathType.prefix)
case \/-(Path.FileName(fn)) => (ResourceName(fn), ResourcePathType.leafResource)
}.some
}

def isResource(path: ResourcePath): F[Boolean] = path match {
def pathIsResource(path: ResourcePath): F[Boolean] = path match {
case Root => false.pure[F]
case Leaf(file) => Path.refineType(dropEmpty(file)) match {
case -\/(_) => false.pure[F]
Expand All @@ -96,10 +91,6 @@ final class S3DataSource[F[_]: Effect: Timer, G[_]: Async](
case Some((d, -\/(DirName(dn)))) if dn.isEmpty => d
case _ => path
}

private val FToG: FunctionK[F, G] = new FunctionK[F, G] {
def apply[A](fa: F[A]): G[A] = fa.to[G]
}
}

object S3DataSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package quasar.physical.s3


import quasar.Data
import quasar.common.data.Data
import quasar.common.resource.MonadResourceErr
import quasar.common.resource.ResourcePath
import quasar.api.datasource.DatasourceError.{InitializationError, MalformedConfiguration}
import quasar.api.ResourcePath
import quasar.api.datasource.DatasourceType
import quasar.connector.Datasource
import quasar.connector.LightweightDatasourceModule
import quasar.Disposable

import argonaut.Json
import cats.effect.{Timer, ConcurrentEffect}
Expand All @@ -37,24 +39,23 @@ import shims._
object S3DataSourceModule extends LightweightDatasourceModule {
def kind: DatasourceType = s3.datasourceKind

def lightweightDatasource[
F[_]: ConcurrentEffect: Timer,
G[_]: ConcurrentEffect: Timer](
config: Json)
: F[InitializationError[Json] \/ Datasource[F, Stream[G, ?], ResourcePath, Stream[G, Data]]] = {
def lightweightDatasource[F[_]: ConcurrentEffect: MonadResourceErr: Timer](config: Json)
: F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]]]] = {
config.as[S3Config].result match {
case Right(s3Config) => {
Http1Client[F]() map { client =>
val ds: Datasource[F, Stream[G, ?], ResourcePath, Stream[G, Data]] =
new S3DataSource[F, G](client, s3Config)
val ds: Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]] =
new S3DataSource[F](client, s3Config)

ds.right[InitializationError[Json]]
val disposable = Disposable(ds, client.shutdown)

disposable.right[InitializationError[Json]]
}
}

case Left((msg, _)) =>
(MalformedConfiguration(kind, config, msg): InitializationError[Json])
.left[Datasource[F, Stream[G, ?], ResourcePath, Stream[G, Data]]].point[F]
.left[Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]]]].point[F]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ object children {
bucket: Uri,
dir: APath,
ct: Option[ContinuationToken]): Request[F] = {
// Converts a pathy Path to an S3 object prefix.
val objectPrefix = aPathToObjectPrefix(dir)

// Start with the bucket URI; add an extra `/` on the end
// so that S3 understands us.
Expand All @@ -135,12 +133,13 @@ object children {
val listingQuery = (bucket / "")

val listType = ("list-type", "2").some
// Converts a pathy Path to an S3 object prefix.
val objectPrefix = aPathToObjectPrefix(dir)
val prefix = objectPrefix.map(("prefix", _))
val ct0 = ct.map(_.value).map(("continuation-token", _))

val params = List(listType, prefix, ct0).unite
val ct0 = ct.map(_.value).map(("continuation-token", _))

val queryUri = params.foldLeft(listingQuery) {
val queryUri = List(listType, prefix, ct0).unite.foldLeft(listingQuery) {
case (uri0, (param, value)) => uri0.withQueryParam(param, value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package quasar.physical.s3
package impl

import quasar.Data
import quasar.common.data.Data
import quasar.contrib.pathy._
import quasar.physical.s3.S3JsonParsing

Expand Down
54 changes: 26 additions & 28 deletions datasource/src/test/scala/quasar/physical/s3/S3DataSourceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package quasar.physical.s3

import slamdata.Predef._

import quasar.api.ResourceDiscoverySpec
import quasar.api.{ResourceName, ResourcePath, ResourcePathType}
import quasar.connector.DatasourceSpec
import quasar.common.resource.{ResourceName, ResourcePath, ResourcePathType, ResourceError}
import quasar.contrib.scalaz.MonadError_

import scala.concurrent.ExecutionContext.Implicits.global

Expand All @@ -28,72 +29,70 @@ import fs2.Stream
import org.http4s.Uri
import org.http4s.client.blaze.Http1Client
import scalaz.syntax.applicative._
import scalaz.{Foldable, Monoid, Id, ~>}, Id.Id
import scalaz.{Id, ~>}, Id.Id
import shims._

import S3DataSourceSpec._

class S3DataSourceSpec extends ResourceDiscoverySpec[IO, Stream[IO, ?]] {
class S3DataSourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] {
"the root of a bucket with a trailing slash is not a resource" >>* {
val root = ResourcePath.root() / ResourceName("")
discovery.isResource(root).map(_ must beFalse)
datasource.pathIsResource(root).map(_ must beFalse)
}

"the root of a bucket is not a resource" >>* {
val root = ResourcePath.root()
discovery.isResource(root).map(_ must beFalse)
datasource.pathIsResource(root).map(_ must beFalse)
}

"a prefix without contents is not a resource" >>* {
val path = ResourcePath.root() / ResourceName("prefix3") / ResourceName("subprefix5")
discovery.isResource(path).map(_ must beFalse)
datasource.pathIsResource(path).map(_ must beFalse)
}

"list nested children" >>* {
val path = ResourcePath.root() / ResourceName("dir1") / ResourceName("dir2") / ResourceName("dir3")

val listing = discovery.children(path)
val listing = datasource.prefixedChildPaths(path)

listing.flatMap { list =>
list.map(_.compile.toList)
.getOrElse(IO.raiseError(new Exception("Could not list nested children under dir1/dir2/dir3")))
.map {
case List((resource, resourceType)) =>
resource must_= ResourceName("flattenable.data")
resourceType must_= ResourcePathType.resource
resourceType must_= ResourcePathType.leafResource
}
}
}

"list children at the root of the bucket" >>* {
discovery.children(ResourcePath.root()).flatMap { list =>
datasource.prefixedChildPaths(ResourcePath.root()).flatMap { list =>
list.map(_.compile.toList).getOrElse(IO.raiseError(new Exception("Could not list children under the root")))
.map(resources => {
resources.length must_== 4
resources(0) must_== (ResourceName("dir1") -> ResourcePathType.resourcePrefix)
resources(1) must_== (ResourceName("extraSmallZips.data") -> ResourcePathType.resource)
resources(2) must_== (ResourceName("prefix3") -> ResourcePathType.resourcePrefix)
resources(3) must_== (ResourceName("testData") -> ResourcePathType.resourcePrefix)
resources(0) must_== (ResourceName("dir1") -> ResourcePathType.prefix)
resources(1) must_== (ResourceName("extraSmallZips.data") -> ResourcePathType.leafResource)
resources(2) must_== (ResourceName("prefix3") -> ResourcePathType.prefix)
resources(3) must_== (ResourceName("testData") -> ResourcePathType.prefix)
})
}
}

"an actual file is a resource" >>* {
val res = ResourcePath.root() / ResourceName("testData") / ResourceName("array.json")

discovery.isResource(res) map (_ must beTrue)
datasource.pathIsResource(res) map (_ must beTrue)
}

"read line-delimited and array JSON" >>* {
val ld = discoveryLD.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("lines.json"))
val array = discovery.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("array.json"))
val ld = datasourceLD.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("lines.json"))
val array = datasource.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("array.json"))

(ld |@| array).tupled.flatMap {
case (readLD, readArray) => {
val rd = readLD.map(_.compile.toList)
.toOption.getOrElse(IO.raiseError(new Exception("Could not read lines.json")))
val ra = readArray.map(_.compile.toList)
.toOption.getOrElse(IO.raiseError(new Exception("Could not read array.json")))
val rd = readLD.compile.toList
val ra = readArray.compile.toList

(rd |@| ra) {
case (lines, array) => {
Expand All @@ -105,14 +104,16 @@ class S3DataSourceSpec extends ResourceDiscoverySpec[IO, Stream[IO, ?]] {
}
}

val discoveryLD = new S3DataSource[IO, IO](
def gatherMultiple[A](g: Stream[IO, A]) = g.compile.toList

val datasourceLD = new S3DataSource[IO](
Http1Client[IO]().unsafeRunSync,
S3Config(
Uri.uri("https://s3.amazonaws.com/slamdata-public-test"),
S3JsonParsing.LineDelimited,
None))

val discovery = new S3DataSource[IO, IO](
val datasource = new S3DataSource[IO](
Http1Client[IO]().unsafeRunSync,
S3Config(
Uri.uri("https://s3.amazonaws.com/slamdata-public-test"),
Expand All @@ -126,9 +127,6 @@ class S3DataSourceSpec extends ResourceDiscoverySpec[IO, Stream[IO, ?]] {
}

object S3DataSourceSpec {
implicit val unsafeStreamFoldable: Foldable[Stream[IO, ?]] =
new Foldable[Stream[IO, ?]] with Foldable.FromFoldMap[Stream[IO, ?]] {
def foldMap[A, M](fa: Stream[IO, A])(f: A => M)(implicit M: Monoid[M]) =
fa.compile.fold(M.zero)((m, a) => M.append(m, f(a))).unsafeRunSync
}
implicit val ioMonadResourceErr: MonadError_[IO, ResourceError] =
MonadError_.facet[IO](ResourceError.throwableP)
}
Loading

0 comments on commit afdf48f

Please sign in to comment.