diff --git a/datasource/src/main/scala/quasar/physical/s3/S3Datasource.scala b/datasource/src/main/scala/quasar/physical/s3/S3Datasource.scala index 1c9e9315..525e66f9 100644 --- a/datasource/src/main/scala/quasar/physical/s3/S3Datasource.scala +++ b/datasource/src/main/scala/quasar/physical/s3/S3Datasource.scala @@ -16,11 +16,10 @@ package quasar.physical.s3 -import quasar.api.QueryEvaluator import quasar.api.datasource.DatasourceType import quasar.api.resource.ResourcePath.{Leaf, Root} import quasar.api.resource.{ResourceName, ResourcePath, ResourcePathType} -import quasar.connector.{MonadResourceErr, ResourceError} +import quasar.connector.{MonadResourceErr, ParsableType, QueryResult, ResourceError} import quasar.connector.datasource.LightweightDatasource import quasar.contrib.pathy.APath import quasar.contrib.scalaz.MonadError_ @@ -36,35 +35,35 @@ import cats.syntax.flatMap._ import cats.syntax.functor._ import cats.syntax.option._ import fs2.Stream -import jawn.Facade import org.http4s.{Request, Header, Headers} import org.http4s.client.Client import pathy.Path import pathy.Path.{DirName, FileName} -import qdata.QDataEncode -import qdata.json.QDataFacade import scalaz.{\/-, -\/} import shims._ final class S3Datasource[F[_]: Effect: MonadResourceErr]( - client: Client[F], - config: S3Config) - extends LightweightDatasource[F, Stream[F, ?]] { - def kind: DatasourceType = s3.datasourceKind + client: Client[F], + config: S3Config) + extends LightweightDatasource[F, Stream[F, ?], QueryResult[F]] { + + import ParsableType.JsonVariant - def evaluator[R: QDataEncode]: QueryEvaluator[F, ResourcePath, Stream[F, R]] = - new QueryEvaluator[F, ResourcePath, Stream[F, R]] { - implicit val facade: Facade[R] = QDataFacade.qdata[R] + def kind: DatasourceType = s3.datasourceKind - val MR = MonadError_[F, ResourceError] + def evaluate(path: ResourcePath): F[QueryResult[F]] = + path match { + case Root => + MonadError_[F, ResourceError].raiseError(ResourceError.notAResource(path)) - def evaluate(path: ResourcePath): F[Stream[F, R]] = - path match { - case Root => - MR.raiseError(ResourceError.notAResource(path)) - case Leaf(file) => - impl.evaluate[F, R](config.parsing, client, config.bucket, file, signRequest(config)) + case Leaf(file) => + val jvar = config.parsing match { + case S3JsonParsing.JsonArray => JsonVariant.ArrayWrapped + case S3JsonParsing.LineDelimited => JsonVariant.LineDelimited } + + impl.evaluate[F](client, config.bucket, file, signRequest(config)) + .map(QueryResult.typed(ParsableType.json(jvar, false), _)) } def prefixedChildPaths(path: ResourcePath): F[Option[Stream[F, (ResourceName, ResourcePathType)]]] = diff --git a/datasource/src/main/scala/quasar/physical/s3/S3DatasourceModule.scala b/datasource/src/main/scala/quasar/physical/s3/S3DatasourceModule.scala index af10b7b1..42ae49a4 100644 --- a/datasource/src/main/scala/quasar/physical/s3/S3DatasourceModule.scala +++ b/datasource/src/main/scala/quasar/physical/s3/S3DatasourceModule.scala @@ -17,13 +17,10 @@ package quasar.physical.s3 import quasar.Disposable -import quasar.api.datasource.DatasourceError +import quasar.api.datasource.{DatasourceError, DatasourceType} import quasar.api.datasource.DatasourceError.InitializationError -import quasar.api.datasource.DatasourceType import quasar.api.resource.ResourcePath -import quasar.connector.Datasource -import quasar.connector.LightweightDatasourceModule -import quasar.connector.MonadResourceErr +import quasar.connector.{Datasource, LightweightDatasourceModule, MonadResourceErr, QueryResult} import scala.concurrent.ExecutionContext @@ -44,7 +41,7 @@ object S3DatasourceModule extends LightweightDatasourceModule { def lightweightDatasource[F[_]: ConcurrentEffect: ContextShift: MonadResourceErr: Timer]( config: Json)(implicit ec: ExecutionContext) - : F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath]]] = + : F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, QueryResult[F]]]] = config.as[S3Config].result match { case Right(s3Config) => val clientResource = BlazeClientBuilder[F](ec).resource @@ -52,7 +49,7 @@ object S3DatasourceModule extends LightweightDatasourceModule { c.flatMap { client => val s3Ds = new S3Datasource[F](client.unsafeValue, s3Config) - val ds: Datasource[F, Stream[F, ?], ResourcePath] = s3Ds + val ds: Datasource[F, Stream[F, ?], ResourcePath, QueryResult[F]] = s3Ds s3Ds.isLive.ifM({ Disposable(ds, client.dispose).right.pure[F] diff --git a/datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala b/datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala index 041de177..d0574a63 100644 --- a/datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala +++ b/datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala @@ -21,17 +21,12 @@ import slamdata.Predef._ import quasar.api.resource.ResourcePath import quasar.connector.{MonadResourceErr, ResourceError} import quasar.contrib.pathy._ -import quasar.physical.s3.S3JsonParsing import cats.effect.{Effect, Sync} import cats.syntax.applicative._ -import cats.syntax.functor._ import cats.syntax.flatMap._ -import cats.ApplicativeError -import fs2.{Pipe, Stream} -import jawn.{Facade, ParseException} -import jawnfs2._ +import fs2.Stream import org.http4s.client._ import org.http4s.{Request, Response, Status, Uri} import pathy.Path @@ -39,14 +34,13 @@ import shims._ object evaluate { - def apply[F[_]: Effect, R: Facade]( - jsonParsing: S3JsonParsing, + def apply[F[_]: Effect]( client: Client[F], uri: Uri, file: AFile, sign: Request[F] => F[Request[F]]) (implicit MR: MonadResourceErr[F]) - : F[Stream[F, R]] = { + : F[Stream[F, Byte]] = { // Convert the pathy Path to a POSIX path, dropping // the first slash, which is what S3 expects for object paths val objectPath = Path.posixCodec.printPath(file).drop(1) @@ -54,41 +48,11 @@ object evaluate { val queryUri = appendPathS3Encoded(uri, objectPath) val request = Request[F](uri = queryUri) - sign(request) >>= { req => - streamRequest[F, R](client, req, file) { resp => - resp.body.chunks.map(_.toByteBuffer).through(parse(jsonParsing)) - }.map(_.handleErrorWith { - case ParseException(message, _, _, _) => - Stream.eval(MR.raiseError(parseError(file, jsonParsing, message))) - }) - } + sign(request) >>= (streamRequest[F, Byte](client, _, file)(_.body)) } //// - private def parse[F[_]: ApplicativeError[?[_], Throwable], R: Facade](jsonParsing: S3JsonParsing) - : Pipe[F, ByteBuffer, R] = - jsonParsing match { - case S3JsonParsing.JsonArray => unwrapJsonArray[F, ByteBuffer, R] - case S3JsonParsing.LineDelimited => parseJsonStream[F, ByteBuffer, R] - } - - private def parseError(path: AFile, parsing: S3JsonParsing, message: String) - : ResourceError = { - val msg: String = - s"Could not parse the file as JSON. Ensure you've configured the correct jsonParsing option for this bucket: $message" - - val expectedFormat: String = parsing match { - case S3JsonParsing.LineDelimited => "Newline-delimited JSON" - case S3JsonParsing.JsonArray => "Array-wrapped JSON" - } - - ResourceError.malformedResource( - ResourcePath.Leaf(path), - expectedFormat, - msg) - } - private def streamRequest[F[_]: Sync: MonadResourceErr, A]( client: Client[F], req: Request[F], file: AFile)( f: Response[F] => Stream[F, A]) diff --git a/datasource/src/test/scala/quasar/physical/s3/S3DatasourceSpec.scala b/datasource/src/test/scala/quasar/physical/s3/S3DatasourceSpec.scala index 55d26585..b16448ad 100644 --- a/datasource/src/test/scala/quasar/physical/s3/S3DatasourceSpec.scala +++ b/datasource/src/test/scala/quasar/physical/s3/S3DatasourceSpec.scala @@ -20,10 +20,11 @@ import slamdata.Predef._ import quasar.api.resource.{ResourceName, ResourcePath, ResourcePathType} import quasar.common.data.Data -import quasar.connector.{Datasource, DatasourceSpec, MonadResourceErr, ResourceError} +import quasar.connector.{Datasource, DatasourceSpec, MonadResourceErr, QueryResult, ResourceError} import quasar.connector.ResourceError import quasar.contrib.scalaz.MonadError_ +import java.nio.charset.Charset import scala.concurrent.ExecutionContext import cats.data.{EitherT, OptionT} @@ -132,31 +133,33 @@ class S3DatasourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] { "evaluate" >> { "read line-delimited JSON" >>* { - assertEvaluate( + assertResultBytes( datasourceLD, ResourcePath.root() / ResourceName("testData") / ResourceName("lines.json"), - data_12_34) + "[1, 2]\n[3, 4]\n".getBytes(Charset.forName("UTF-8"))) } "read array JSON" >>* { - assertEvaluate( + assertResultBytes( datasource, ResourcePath.root() / ResourceName("testData") / ResourceName("array.json"), - data_12_34) + "[[1, 2], [3, 4]]\n".getBytes(Charset.forName("UTF-8"))) } "read array JSON of resource with special chars in path" >>* { - assertEvaluate( + assertResultBytes( datasource, ResourcePath.root() / ResourceName("testData") / ResourceName("á") / ResourceName("βç.json"), - data_12_34) + "[[1, 2], [3, 4]]\n".getBytes(Charset.forName("UTF-8"))) } "read line-delimited JSON with special chars of resource with special chars in path" >>* { - assertEvaluate( + val esStr = "\"El veloz murciélago hindú comía feliz cardillo y kiwi. La cigüeña tocaba el saxofón detrás del palenque de paja.\"\n" + + assertResultBytes( datasourceLD, spanishResource, - List(Data.Str("El veloz murciélago hindú comía feliz cardillo y kiwi. La cigüeña tocaba el saxofón detrás del palenque de paja."))) + esStr.getBytes(Charset.forName("UTF-8"))) } "reading a non-existent file raises ResourceError.PathNotFound" >> { @@ -164,7 +167,7 @@ class S3DatasourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] { val ds = creds.flatMap(mkDatasource[G](S3JsonParsing.JsonArray, testBucket, _)) val path = ResourcePath.root() / ResourceName("does-not-exist") - val read: G[Stream[G, Data]] = ds.flatMap(_.evaluator[Data].evaluate(path)) + val read: G[QueryResult[G]] = ds.flatMap(_.evaluate(path)) run(read.value) must beLeft.like { case ResourceError.throwableP(ResourceError.PathNotFound(_)) => ok @@ -172,9 +175,16 @@ class S3DatasourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] { } } - def assertEvaluate(ds: Datasource[IO, Stream[IO,?], ResourcePath], path: ResourcePath, expected: List[Data]) = - ds.evaluator[Data].evaluate(path).flatMap { res => - gatherMultiple(res).map { _ must_== expected } + def assertResultBytes( + ds: Datasource[IO, Stream[IO, ?], ResourcePath, QueryResult[IO]], + path: ResourcePath, + expected: Array[Byte]) = + ds.evaluate(path) flatMap { + case QueryResult.Typed(_, data) => + data.compile.to[Array].map(_ must_=== expected) + + case _ => + IO(ko("Unexpected QueryResult")) } def assertPrefixedChildPaths(path: ResourcePath, expected: List[(ResourceName, ResourcePathType)]) = @@ -194,7 +204,7 @@ class S3DatasourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] { parsing: S3JsonParsing, bucket: Uri, creds: Option[S3Credentials]) - : F[Datasource[F, Stream[F, ?], ResourcePath]] = { + : F[Datasource[F, Stream[F, ?], ResourcePath, QueryResult[F]]] = { val ec = ExecutionContext.Implicits.global val builder = BlazeClientBuilder[F](ec) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 53821aa8..3ab0b7b3 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,18 +12,8 @@ object Dependencies { // the S3 API's responses. private val scalaXmlVersion = "1.1.0" - // used for parsing JSON out of the stream of S3 data - // in an object. - // we need to be compatible with Quasar's version of both - // fs2 and jawn - private val argonautVersion = "6.2.2" private val catsEffectVersion = "1.0.0" - private val circeFs2Version = "0.9.0" - private val circeJawnVersion = "0.9.3" - private val fs2Version = "1.0.0" private val quasarVersion = IO.read(file("./quasar-version")).trim - private val qdataVersion = IO.read(file("./qdata-version")).trim - private val jawnFs2Version = "0.13.0" private val shimsVersion = "1.2.1" private val specsVersion = "4.1.2" @@ -31,19 +21,14 @@ object Dependencies { // quasar's http4s version. The same goes for any // dependencies, transitive or otherwise. def datasourceCore = Seq( - "org.http4s" %% "jawn-fs2" % jawnFs2Version, - "com.slamdata" %% "qdata-json" % qdataVersion, "org.http4s" %% "http4s-scala-xml" % http4sVersion, "org.http4s" %% "http4s-blaze-client" % http4sVersion, "org.scala-lang.modules" %% "scala-xml" % scalaXmlVersion, - "io.circe" %% "circe-jawn" % circeJawnVersion, "com.codecommit" %% "shims" % shimsVersion, "org.typelevel" %% "cats-effect" % catsEffectVersion, "org.specs2" %% "specs2-core" % specsVersion % Test, "org.specs2" %% "specs2-scalaz" % specsVersion % Test, - "org.specs2" %% "specs2-scalacheck" % specsVersion % Test, - "io.argonaut" %% "argonaut" % argonautVersion, - "io.circe" %% "circe-fs2" % circeFs2Version + "org.specs2" %% "specs2-scalacheck" % specsVersion % Test ) // we need to separate quasar out from the datasource dependencies, diff --git a/quasar-version b/quasar-version index c127e585..13379305 100644 --- a/quasar-version +++ b/quasar-version @@ -1 +1 @@ -79.1.1 +80.0.0 diff --git a/version.sbt b/version.sbt index 252a2deb..4559f826 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "7.0.1" \ No newline at end of file +version in ThisBuild := "8.0.0" \ No newline at end of file