From 931bf3d0d730892e93f8c67c6b2afaac74f21554 Mon Sep 17 00:00:00 2001 From: Juan Pablo Santos Date: Tue, 24 Jul 2018 17:16:52 -0500 Subject: [PATCH 1/2] Bump quasar to 55.0.0. Fix API breakage --- .../quasar/physical/s3/S3DataSource.scala | 57 ++++++++----------- .../physical/s3/S3DataSourceModule.scala | 23 ++++---- .../quasar/physical/s3/impl/children.scala | 9 ++- .../quasar/physical/s3/impl/evaluate.scala | 2 +- .../quasar/physical/s3/S3DataSourceSpec.scala | 54 +++++++++--------- .../physical/s3/SecureS3DataSourceSpec.scala | 15 ++++- project/Dependencies.scala | 15 ++--- quasar-version | 2 +- 8 files changed, 89 insertions(+), 88 deletions(-) diff --git a/datasource/src/main/scala/quasar/physical/s3/S3DataSource.scala b/datasource/src/main/scala/quasar/physical/s3/S3DataSource.scala index aac92545..b99a3b57 100644 --- a/datasource/src/main/scala/quasar/physical/s3/S3DataSource.scala +++ b/datasource/src/main/scala/quasar/physical/s3/S3DataSource.scala @@ -16,14 +16,12 @@ package quasar.physical.s3 -import quasar.Data -import quasar.api.ResourceError -import quasar.api.ResourceError.{CommonError, ReadError} -import quasar.api.ResourcePath.{Leaf, Root} import quasar.api.datasource.DatasourceType -import quasar.api.{ResourceName, ResourcePath, ResourcePathType} +import quasar.common.data.Data +import quasar.common.resource.MonadResourceErr +import quasar.common.resource.ResourcePath.{Leaf, Root} +import quasar.common.resource.{ResourceName, ResourcePath, ResourcePathType} import quasar.connector.datasource.LightweightDatasource -import quasar.contrib.cats.effect._ import quasar.contrib.pathy.APath import scala.concurrent.duration.SECONDS @@ -31,58 +29,55 @@ import slamdata.Predef.{Stream => _, _} import java.time.{ZoneOffset, LocalDateTime} -import cats.arrow.FunctionK -import cats.effect.{Effect, Async, Timer} +import cats.effect.{Effect, Timer} import cats.syntax.flatMap._ +import cats.syntax.option._ import fs2.Stream import org.http4s.{Request, Header, Headers} import org.http4s.client.Client import pathy.Path import pathy.Path.{DirName, FileName} import scalaz.syntax.applicative._ -import scalaz.syntax.either._ -import scalaz.{\/, \/-, -\/} +import scalaz.{\/-, -\/} import shims._ -final class S3DataSource[F[_]: Effect: Timer, G[_]: Async]( +final class S3DataSource[F[_]: Effect: Timer: MonadResourceErr]( client: Client[F], config: S3Config) - extends LightweightDatasource[F, Stream[G, ?], Stream[G, Data]] { + extends LightweightDatasource[F, Stream[F, ?], Stream[F, Data]] { def kind: DatasourceType = s3.datasourceKind - val shutdown: F[Unit] = client.shutdown - - def evaluate(path: ResourcePath): F[ReadError \/ Stream[G, Data]] = + def evaluate(path: ResourcePath): F[Stream[F, Data]] = path match { case Root => - ResourceError.notAResource(path).left.point[F] + Stream.empty.covaryAll[F, Data].pure[F] case Leaf(file) => impl.evaluate[F](config.parsing, client, config.bucket, file, S3DataSource.signRequest(config)) map { - case None => (ResourceError.pathNotFound(Leaf(file)): ReadError).left - /* In http4s, the type of streaming results is the same as - every other effectful operation. However, - LightweightDatasourceModule forces us to separate the types, - so we need to translate */ - case Some(s) => s.translate[G](FToG).right[ReadError] - } + case None => Stream.empty + /* In http4s, the type of streaming results is the same as + every other effectful operation. However, + LightweightDatasourceModule forces us to separate the types, + so we need to translate */ + case Some(s) => s + } } - def children(path: ResourcePath): F[CommonError \/ Stream[G, (ResourceName, ResourcePathType)]] = + def prefixedChildPaths(path: ResourcePath): F[Option[Stream[F, (ResourceName, ResourcePathType)]]] = impl.children( client, config.bucket, dropEmpty(path.toPath), S3DataSource.signRequest(config)) map { case None => - ResourceError.pathNotFound(path).left[Stream[G, (ResourceName, ResourcePathType)]] + none[Stream[F, (ResourceName, ResourcePathType)]] case Some(paths) => paths.map { - case -\/(Path.DirName(dn)) => (ResourceName(dn), ResourcePathType.ResourcePrefix) - case \/-(Path.FileName(fn)) => (ResourceName(fn), ResourcePathType.Resource) - }.translate[G](FToG).right[CommonError] + case -\/(Path.DirName(dn)) => (ResourceName(dn), ResourcePathType.prefix) + case \/-(Path.FileName(fn)) => (ResourceName(fn), ResourcePathType.leafResource) + }.some } - def isResource(path: ResourcePath): F[Boolean] = path match { + def pathIsResource(path: ResourcePath): F[Boolean] = path match { case Root => false.pure[F] case Leaf(file) => Path.refineType(dropEmpty(file)) match { case -\/(_) => false.pure[F] @@ -96,10 +91,6 @@ final class S3DataSource[F[_]: Effect: Timer, G[_]: Async]( case Some((d, -\/(DirName(dn)))) if dn.isEmpty => d case _ => path } - - private val FToG: FunctionK[F, G] = new FunctionK[F, G] { - def apply[A](fa: F[A]): G[A] = fa.to[G] - } } object S3DataSource { diff --git a/datasource/src/main/scala/quasar/physical/s3/S3DataSourceModule.scala b/datasource/src/main/scala/quasar/physical/s3/S3DataSourceModule.scala index 7d633fc6..349f39a9 100644 --- a/datasource/src/main/scala/quasar/physical/s3/S3DataSourceModule.scala +++ b/datasource/src/main/scala/quasar/physical/s3/S3DataSourceModule.scala @@ -17,12 +17,14 @@ package quasar.physical.s3 -import quasar.Data +import quasar.common.data.Data +import quasar.common.resource.MonadResourceErr +import quasar.common.resource.ResourcePath import quasar.api.datasource.DatasourceError.{InitializationError, MalformedConfiguration} -import quasar.api.ResourcePath import quasar.api.datasource.DatasourceType import quasar.connector.Datasource import quasar.connector.LightweightDatasourceModule +import quasar.Disposable import argonaut.Json import cats.effect.{Timer, ConcurrentEffect} @@ -37,24 +39,23 @@ import shims._ object S3DataSourceModule extends LightweightDatasourceModule { def kind: DatasourceType = s3.datasourceKind - def lightweightDatasource[ - F[_]: ConcurrentEffect: Timer, - G[_]: ConcurrentEffect: Timer]( - config: Json) - : F[InitializationError[Json] \/ Datasource[F, Stream[G, ?], ResourcePath, Stream[G, Data]]] = { + def lightweightDatasource[F[_]: ConcurrentEffect: MonadResourceErr: Timer](config: Json) + : F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]]]] = { config.as[S3Config].result match { case Right(s3Config) => { Http1Client[F]() map { client => - val ds: Datasource[F, Stream[G, ?], ResourcePath, Stream[G, Data]] = - new S3DataSource[F, G](client, s3Config) + val ds: Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]] = + new S3DataSource[F](client, s3Config) - ds.right[InitializationError[Json]] + val disposable = Disposable(ds, client.shutdown) + + disposable.right[InitializationError[Json]] } } case Left((msg, _)) => (MalformedConfiguration(kind, config, msg): InitializationError[Json]) - .left[Datasource[F, Stream[G, ?], ResourcePath, Stream[G, Data]]].point[F] + .left[Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]]]].point[F] } } } diff --git a/datasource/src/main/scala/quasar/physical/s3/impl/children.scala b/datasource/src/main/scala/quasar/physical/s3/impl/children.scala index e176b2c4..1b4926e8 100644 --- a/datasource/src/main/scala/quasar/physical/s3/impl/children.scala +++ b/datasource/src/main/scala/quasar/physical/s3/impl/children.scala @@ -124,8 +124,6 @@ object children { bucket: Uri, dir: APath, ct: Option[ContinuationToken]): Request[F] = { - // Converts a pathy Path to an S3 object prefix. - val objectPrefix = aPathToObjectPrefix(dir) // Start with the bucket URI; add an extra `/` on the end // so that S3 understands us. @@ -135,12 +133,13 @@ object children { val listingQuery = (bucket / "") val listType = ("list-type", "2").some + // Converts a pathy Path to an S3 object prefix. + val objectPrefix = aPathToObjectPrefix(dir) val prefix = objectPrefix.map(("prefix", _)) - val ct0 = ct.map(_.value).map(("continuation-token", _)) - val params = List(listType, prefix, ct0).unite + val ct0 = ct.map(_.value).map(("continuation-token", _)) - val queryUri = params.foldLeft(listingQuery) { + val queryUri = List(listType, prefix, ct0).unite.foldLeft(listingQuery) { case (uri0, (param, value)) => uri0.withQueryParam(param, value) } diff --git a/datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala b/datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala index 6d5b4ef6..af1a84ba 100644 --- a/datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala +++ b/datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala @@ -17,7 +17,7 @@ package quasar.physical.s3 package impl -import quasar.Data +import quasar.common.data.Data import quasar.contrib.pathy._ import quasar.physical.s3.S3JsonParsing diff --git a/datasource/src/test/scala/quasar/physical/s3/S3DataSourceSpec.scala b/datasource/src/test/scala/quasar/physical/s3/S3DataSourceSpec.scala index 8c3bdd41..ce47fa86 100644 --- a/datasource/src/test/scala/quasar/physical/s3/S3DataSourceSpec.scala +++ b/datasource/src/test/scala/quasar/physical/s3/S3DataSourceSpec.scala @@ -18,8 +18,9 @@ package quasar.physical.s3 import slamdata.Predef._ -import quasar.api.ResourceDiscoverySpec -import quasar.api.{ResourceName, ResourcePath, ResourcePathType} +import quasar.connector.DatasourceSpec +import quasar.common.resource.{ResourceName, ResourcePath, ResourcePathType, ResourceError} +import quasar.contrib.scalaz.MonadError_ import scala.concurrent.ExecutionContext.Implicits.global @@ -28,31 +29,31 @@ import fs2.Stream import org.http4s.Uri import org.http4s.client.blaze.Http1Client import scalaz.syntax.applicative._ -import scalaz.{Foldable, Monoid, Id, ~>}, Id.Id +import scalaz.{Id, ~>}, Id.Id import shims._ import S3DataSourceSpec._ -class S3DataSourceSpec extends ResourceDiscoverySpec[IO, Stream[IO, ?]] { +class S3DataSourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] { "the root of a bucket with a trailing slash is not a resource" >>* { val root = ResourcePath.root() / ResourceName("") - discovery.isResource(root).map(_ must beFalse) + datasource.pathIsResource(root).map(_ must beFalse) } "the root of a bucket is not a resource" >>* { val root = ResourcePath.root() - discovery.isResource(root).map(_ must beFalse) + datasource.pathIsResource(root).map(_ must beFalse) } "a prefix without contents is not a resource" >>* { val path = ResourcePath.root() / ResourceName("prefix3") / ResourceName("subprefix5") - discovery.isResource(path).map(_ must beFalse) + datasource.pathIsResource(path).map(_ must beFalse) } "list nested children" >>* { val path = ResourcePath.root() / ResourceName("dir1") / ResourceName("dir2") / ResourceName("dir3") - val listing = discovery.children(path) + val listing = datasource.prefixedChildPaths(path) listing.flatMap { list => list.map(_.compile.toList) @@ -60,20 +61,20 @@ class S3DataSourceSpec extends ResourceDiscoverySpec[IO, Stream[IO, ?]] { .map { case List((resource, resourceType)) => resource must_= ResourceName("flattenable.data") - resourceType must_= ResourcePathType.resource + resourceType must_= ResourcePathType.leafResource } } } "list children at the root of the bucket" >>* { - discovery.children(ResourcePath.root()).flatMap { list => + datasource.prefixedChildPaths(ResourcePath.root()).flatMap { list => list.map(_.compile.toList).getOrElse(IO.raiseError(new Exception("Could not list children under the root"))) .map(resources => { resources.length must_== 4 - resources(0) must_== (ResourceName("dir1") -> ResourcePathType.resourcePrefix) - resources(1) must_== (ResourceName("extraSmallZips.data") -> ResourcePathType.resource) - resources(2) must_== (ResourceName("prefix3") -> ResourcePathType.resourcePrefix) - resources(3) must_== (ResourceName("testData") -> ResourcePathType.resourcePrefix) + resources(0) must_== (ResourceName("dir1") -> ResourcePathType.prefix) + resources(1) must_== (ResourceName("extraSmallZips.data") -> ResourcePathType.leafResource) + resources(2) must_== (ResourceName("prefix3") -> ResourcePathType.prefix) + resources(3) must_== (ResourceName("testData") -> ResourcePathType.prefix) }) } } @@ -81,19 +82,17 @@ class S3DataSourceSpec extends ResourceDiscoverySpec[IO, Stream[IO, ?]] { "an actual file is a resource" >>* { val res = ResourcePath.root() / ResourceName("testData") / ResourceName("array.json") - discovery.isResource(res) map (_ must beTrue) + datasource.pathIsResource(res) map (_ must beTrue) } "read line-delimited and array JSON" >>* { - val ld = discoveryLD.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("lines.json")) - val array = discovery.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("array.json")) + val ld = datasourceLD.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("lines.json")) + val array = datasource.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("array.json")) (ld |@| array).tupled.flatMap { case (readLD, readArray) => { - val rd = readLD.map(_.compile.toList) - .toOption.getOrElse(IO.raiseError(new Exception("Could not read lines.json"))) - val ra = readArray.map(_.compile.toList) - .toOption.getOrElse(IO.raiseError(new Exception("Could not read array.json"))) + val rd = readLD.compile.toList + val ra = readArray.compile.toList (rd |@| ra) { case (lines, array) => { @@ -105,14 +104,16 @@ class S3DataSourceSpec extends ResourceDiscoverySpec[IO, Stream[IO, ?]] { } } - val discoveryLD = new S3DataSource[IO, IO]( + def gatherMultiple[A](g: Stream[IO, A]) = g.compile.toList + + val datasourceLD = new S3DataSource[IO]( Http1Client[IO]().unsafeRunSync, S3Config( Uri.uri("https://s3.amazonaws.com/slamdata-public-test"), S3JsonParsing.LineDelimited, None)) - val discovery = new S3DataSource[IO, IO]( + val datasource = new S3DataSource[IO]( Http1Client[IO]().unsafeRunSync, S3Config( Uri.uri("https://s3.amazonaws.com/slamdata-public-test"), @@ -126,9 +127,6 @@ class S3DataSourceSpec extends ResourceDiscoverySpec[IO, Stream[IO, ?]] { } object S3DataSourceSpec { - implicit val unsafeStreamFoldable: Foldable[Stream[IO, ?]] = - new Foldable[Stream[IO, ?]] with Foldable.FromFoldMap[Stream[IO, ?]] { - def foldMap[A, M](fa: Stream[IO, A])(f: A => M)(implicit M: Monoid[M]) = - fa.compile.fold(M.zero)((m, a) => M.append(m, f(a))).unsafeRunSync - } + implicit val ioMonadResourceErr: MonadError_[IO, ResourceError] = + MonadError_.facet[IO](ResourceError.throwableP) } diff --git a/datasource/src/test/scala/quasar/physical/s3/SecureS3DataSourceSpec.scala b/datasource/src/test/scala/quasar/physical/s3/SecureS3DataSourceSpec.scala index 3c936eb4..9e70b16d 100644 --- a/datasource/src/test/scala/quasar/physical/s3/SecureS3DataSourceSpec.scala +++ b/datasource/src/test/scala/quasar/physical/s3/SecureS3DataSourceSpec.scala @@ -18,6 +18,9 @@ package quasar.physical.s3 import slamdata.Predef._ +import quasar.contrib.scalaz.MonadError_ +import quasar.common.resource.ResourceError + import scala.concurrent.ExecutionContext.Implicits.global import scala.io.{Source, Codec} @@ -29,16 +32,19 @@ import cats.syntax.flatMap._ import cats.syntax.applicative._ import org.http4s.Uri import org.http4s.client.blaze.Http1Client +import shims._ + +import SecureS3DataSourceSpec._ final class SecureS3DataSourceSpec extends S3DataSourceSpec { - override val discoveryLD = new S3DataSource[IO, IO]( + override val datasourceLD = new S3DataSource[IO]( Http1Client[IO]().unsafeRunSync, S3Config( Uri.uri("https://s3.amazonaws.com/slamdata-private-test"), S3JsonParsing.LineDelimited, Some(readCredentials.unsafeRunSync))) - override val discovery = new S3DataSource[IO, IO]( + override val datasource = new S3DataSource[IO]( Http1Client[IO]().unsafeRunSync, S3Config( Uri.uri("https://s3.amazonaws.com/slamdata-private-test"), @@ -63,3 +69,8 @@ final class SecureS3DataSourceSpec extends S3DataSourceSpec { .map(_.toOption) >>= (_.fold[IO[S3Credentials]](IO.raiseError(new Exception(msg)))(_.pure[IO])) } } + +object SecureS3DataSourceSpec { + implicit val ioMonadResourceErr: MonadError_[IO, ResourceError] = + MonadError_.facet[IO](ResourceError.throwableP) +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 25a01afa..a802e3c5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,14 +16,14 @@ object Dependencies { // in an object. // we need to be compatible with Quasar's version of both // fs2 and jawn, so we use the older circe-jawn version. - private val quasarVersion = IO.read(file("./quasar-version")).trim - private val circeJawnVersion = "0.9.3" - private val fs2Version = "0.10.5" - private val specsVersion = "4.2.0" - private val shimsVersion = "1.3.0" - private val argonautVersion = "6.2" + private val argonautVersion = "6.2.2" private val catsEffectVersion = "0.10.1" private val circeFs2Version = "0.9.0" + private val circeJawnVersion = "0.9.3" + private val fs2Version = "0.10.5" + private val quasarVersion = IO.read(file("./quasar-version")).trim + private val shimsVersion = "1.2.1" + private val specsVersion = "4.1.2" // http4s-blaze-client's version has to be in sync with // quasar's http4s version. The same goes for any @@ -33,7 +33,7 @@ object Dependencies { "org.http4s" %% "http4s-blaze-client" % http4sVersion, "org.scala-lang.modules" %% "scala-xml" % scalaXmlVersion, "io.circe" %% "circe-jawn" % circeJawnVersion, - "com.codecommit" %% "shims-effect" % shimsVersion, + "com.codecommit" %% "shims" % shimsVersion, "org.typelevel" %% "cats-effect" % catsEffectVersion, "org.specs2" %% "specs2-core" % specsVersion % Test, "org.specs2" %% "specs2-scalaz" % specsVersion % Test, @@ -51,5 +51,6 @@ object Dependencies { "com.slamdata" %% "quasar-foundation-internal" % quasarVersion, "com.slamdata" %% "quasar-foundation-internal" % quasarVersion % Test classifier "tests", "com.slamdata" %% "quasar-connector-internal" % quasarVersion, + "com.slamdata" %% "quasar-connector-internal" % quasarVersion % Test classifier "tests", ) } diff --git a/quasar-version b/quasar-version index b0fa2fde..b406fbef 100644 --- a/quasar-version +++ b/quasar-version @@ -1 +1 @@ -53.0.0 +55.0.0 From d101ccc6c92ee3841b7001a08dcf96c92c78afbb Mon Sep 17 00:00:00 2001 From: Juan Pablo Santos Date: Tue, 24 Jul 2018 18:22:55 -0500 Subject: [PATCH 2/2] Update README.md to reflect API changes --- README.md | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 8f730774..1ca73712 100644 --- a/README.md +++ b/README.md @@ -15,12 +15,18 @@ in `.targets/datasource/scala-2.12/quasar-s3--explode.tar.gz` ## Configuration -You can create a new S3 datasource after you've loaded the plugin into +You can create a new S3 datasource after you've loaded this plugin into Quasar. Refer to the previous section for instructions on how to do -that. In order to create a datasource, you will need to send a PUT -request to `/datasource/` including a JSON -document specifiying the datasource's configuration. An example of a -JSON configuration to create a datasource that parses line-delimited JSON: +that. In order to create a datasource, you will need to send a POST +request to `/datasource` including a JSON +document specifiying the datasource's configuration. The format of the +JSON document can be found in [`slamdata-backend`'s +documentation.](https://github.com/slamdata/slamdata-backend#applicationvndslamdatadatasource). + +The connector-specific configuration needs to specify at least a +bucket URI and the JSON parsing to use when decoding JSON files stored +in S3. An example of a JSON configuration to create a datasource that +parses line-delimited JSON: ```json { @@ -29,7 +35,7 @@ JSON configuration to create a datasource that parses line-delimited JSON: } ``` -As another example, this is the JSON configuration to parse array +As another example, this is a JSON configuration to parse array JSON: ```json @@ -39,12 +45,10 @@ JSON: } ``` -You also need to specify a `Content-Type` header with -information regarding this datasource. For example, to use version 1 -of this datasource you may specify: +Along with the request, you also need to specify a `Content-Type` header: ``` -Content-Type: application/vnd.slamdata.datasource.s3; version="1" +Content-Type: application/vnd.slamdata.datasource" ``` ### Secure buckets