Skip to content

Commit

Permalink
8.0.0: Update to quasar-80.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jsantos17 committed Oct 26, 2018
2 parents 50efe1a + a5d4ffb commit 67d4c3f
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 98 deletions.
37 changes: 18 additions & 19 deletions datasource/src/main/scala/quasar/physical/s3/S3Datasource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package quasar.physical.s3

import quasar.api.QueryEvaluator
import quasar.api.datasource.DatasourceType
import quasar.api.resource.ResourcePath.{Leaf, Root}
import quasar.api.resource.{ResourceName, ResourcePath, ResourcePathType}
import quasar.connector.{MonadResourceErr, ResourceError}
import quasar.connector.{MonadResourceErr, ParsableType, QueryResult, ResourceError}
import quasar.connector.datasource.LightweightDatasource
import quasar.contrib.pathy.APath
import quasar.contrib.scalaz.MonadError_
Expand All @@ -36,35 +35,35 @@ import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.option._
import fs2.Stream
import jawn.Facade
import org.http4s.{Request, Header, Headers}
import org.http4s.client.Client
import pathy.Path
import pathy.Path.{DirName, FileName}
import qdata.QDataEncode
import qdata.json.QDataFacade
import scalaz.{\/-, -\/}
import shims._

final class S3Datasource[F[_]: Effect: MonadResourceErr](
client: Client[F],
config: S3Config)
extends LightweightDatasource[F, Stream[F, ?]] {
def kind: DatasourceType = s3.datasourceKind
client: Client[F],
config: S3Config)
extends LightweightDatasource[F, Stream[F, ?], QueryResult[F]] {

import ParsableType.JsonVariant

def evaluator[R: QDataEncode]: QueryEvaluator[F, ResourcePath, Stream[F, R]] =
new QueryEvaluator[F, ResourcePath, Stream[F, R]] {
implicit val facade: Facade[R] = QDataFacade.qdata[R]
def kind: DatasourceType = s3.datasourceKind

val MR = MonadError_[F, ResourceError]
def evaluate(path: ResourcePath): F[QueryResult[F]] =
path match {
case Root =>
MonadError_[F, ResourceError].raiseError(ResourceError.notAResource(path))

def evaluate(path: ResourcePath): F[Stream[F, R]] =
path match {
case Root =>
MR.raiseError(ResourceError.notAResource(path))
case Leaf(file) =>
impl.evaluate[F, R](config.parsing, client, config.bucket, file, signRequest(config))
case Leaf(file) =>
val jvar = config.parsing match {
case S3JsonParsing.JsonArray => JsonVariant.ArrayWrapped
case S3JsonParsing.LineDelimited => JsonVariant.LineDelimited
}

impl.evaluate[F](client, config.bucket, file, signRequest(config))
.map(QueryResult.typed(ParsableType.json(jvar, false), _))
}

def prefixedChildPaths(path: ResourcePath): F[Option[Stream[F, (ResourceName, ResourcePathType)]]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
package quasar.physical.s3

import quasar.Disposable
import quasar.api.datasource.DatasourceError
import quasar.api.datasource.{DatasourceError, DatasourceType}
import quasar.api.datasource.DatasourceError.InitializationError
import quasar.api.datasource.DatasourceType
import quasar.api.resource.ResourcePath
import quasar.connector.Datasource
import quasar.connector.LightweightDatasourceModule
import quasar.connector.MonadResourceErr
import quasar.connector.{Datasource, LightweightDatasourceModule, MonadResourceErr, QueryResult}

import scala.concurrent.ExecutionContext

Expand All @@ -44,15 +41,15 @@ object S3DatasourceModule extends LightweightDatasourceModule {

def lightweightDatasource[F[_]: ConcurrentEffect: ContextShift: MonadResourceErr: Timer](
config: Json)(implicit ec: ExecutionContext)
: F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath]]] =
: F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, QueryResult[F]]]] =
config.as[S3Config].result match {
case Right(s3Config) =>
val clientResource = BlazeClientBuilder[F](ec).resource
val c = s3.resourceToDisposable(clientResource)

c.flatMap { client =>
val s3Ds = new S3Datasource[F](client.unsafeValue, s3Config)
val ds: Datasource[F, Stream[F, ?], ResourcePath] = s3Ds
val ds: Datasource[F, Stream[F, ?], ResourcePath, QueryResult[F]] = s3Ds

s3Ds.isLive.ifM({
Disposable(ds, client.dispose).right.pure[F]
Expand Down
44 changes: 4 additions & 40 deletions datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,74 +21,38 @@ import slamdata.Predef._
import quasar.api.resource.ResourcePath
import quasar.connector.{MonadResourceErr, ResourceError}
import quasar.contrib.pathy._
import quasar.physical.s3.S3JsonParsing

import cats.effect.{Effect, Sync}
import cats.syntax.applicative._
import cats.syntax.functor._
import cats.syntax.flatMap._
import cats.ApplicativeError

import fs2.{Pipe, Stream}
import jawn.{Facade, ParseException}
import jawnfs2._
import fs2.Stream
import org.http4s.client._
import org.http4s.{Request, Response, Status, Uri}
import pathy.Path
import shims._

object evaluate {

def apply[F[_]: Effect, R: Facade](
jsonParsing: S3JsonParsing,
def apply[F[_]: Effect](
client: Client[F],
uri: Uri,
file: AFile,
sign: Request[F] => F[Request[F]])
(implicit MR: MonadResourceErr[F])
: F[Stream[F, R]] = {
: F[Stream[F, Byte]] = {
// Convert the pathy Path to a POSIX path, dropping
// the first slash, which is what S3 expects for object paths
val objectPath = Path.posixCodec.printPath(file).drop(1)
// Put the object path after the bucket URI
val queryUri = appendPathS3Encoded(uri, objectPath)
val request = Request[F](uri = queryUri)

sign(request) >>= { req =>
streamRequest[F, R](client, req, file) { resp =>
resp.body.chunks.map(_.toByteBuffer).through(parse(jsonParsing))
}.map(_.handleErrorWith {
case ParseException(message, _, _, _) =>
Stream.eval(MR.raiseError(parseError(file, jsonParsing, message)))
})
}
sign(request) >>= (streamRequest[F, Byte](client, _, file)(_.body))
}

////

private def parse[F[_]: ApplicativeError[?[_], Throwable], R: Facade](jsonParsing: S3JsonParsing)
: Pipe[F, ByteBuffer, R] =
jsonParsing match {
case S3JsonParsing.JsonArray => unwrapJsonArray[F, ByteBuffer, R]
case S3JsonParsing.LineDelimited => parseJsonStream[F, ByteBuffer, R]
}

private def parseError(path: AFile, parsing: S3JsonParsing, message: String)
: ResourceError = {
val msg: String =
s"Could not parse the file as JSON. Ensure you've configured the correct jsonParsing option for this bucket: $message"

val expectedFormat: String = parsing match {
case S3JsonParsing.LineDelimited => "Newline-delimited JSON"
case S3JsonParsing.JsonArray => "Array-wrapped JSON"
}

ResourceError.malformedResource(
ResourcePath.Leaf(path),
expectedFormat,
msg)
}

private def streamRequest[F[_]: Sync: MonadResourceErr, A](
client: Client[F], req: Request[F], file: AFile)(
f: Response[F] => Stream[F, A])
Expand Down
38 changes: 24 additions & 14 deletions datasource/src/test/scala/quasar/physical/s3/S3DatasourceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import slamdata.Predef._

import quasar.api.resource.{ResourceName, ResourcePath, ResourcePathType}
import quasar.common.data.Data
import quasar.connector.{Datasource, DatasourceSpec, MonadResourceErr, ResourceError}
import quasar.connector.{Datasource, DatasourceSpec, MonadResourceErr, QueryResult, ResourceError}
import quasar.connector.ResourceError
import quasar.contrib.scalaz.MonadError_

import java.nio.charset.Charset
import scala.concurrent.ExecutionContext

import cats.data.{EitherT, OptionT}
Expand Down Expand Up @@ -132,49 +133,58 @@ class S3DatasourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] {

"evaluate" >> {
"read line-delimited JSON" >>* {
assertEvaluate(
assertResultBytes(
datasourceLD,
ResourcePath.root() / ResourceName("testData") / ResourceName("lines.json"),
data_12_34)
"[1, 2]\n[3, 4]\n".getBytes(Charset.forName("UTF-8")))
}

"read array JSON" >>* {
assertEvaluate(
assertResultBytes(
datasource,
ResourcePath.root() / ResourceName("testData") / ResourceName("array.json"),
data_12_34)
"[[1, 2], [3, 4]]\n".getBytes(Charset.forName("UTF-8")))
}

"read array JSON of resource with special chars in path" >>* {
assertEvaluate(
assertResultBytes(
datasource,
ResourcePath.root() / ResourceName("testData") / ResourceName("á") / ResourceName("βç.json"),
data_12_34)
"[[1, 2], [3, 4]]\n".getBytes(Charset.forName("UTF-8")))
}

"read line-delimited JSON with special chars of resource with special chars in path" >>* {
assertEvaluate(
val esStr = "\"El veloz murciélago hindú comía feliz cardillo y kiwi. La cigüeña tocaba el saxofón detrás del palenque de paja.\"\n"

assertResultBytes(
datasourceLD,
spanishResource,
List(Data.Str("El veloz murciélago hindú comía feliz cardillo y kiwi. La cigüeña tocaba el saxofón detrás del palenque de paja.")))
esStr.getBytes(Charset.forName("UTF-8")))
}

"reading a non-existent file raises ResourceError.PathNotFound" >> {
val creds = EitherT.right[Throwable](credentials)
val ds = creds.flatMap(mkDatasource[G](S3JsonParsing.JsonArray, testBucket, _))

val path = ResourcePath.root() / ResourceName("does-not-exist")
val read: G[Stream[G, Data]] = ds.flatMap(_.evaluator[Data].evaluate(path))
val read: G[QueryResult[G]] = ds.flatMap(_.evaluate(path))

run(read.value) must beLeft.like {
case ResourceError.throwableP(ResourceError.PathNotFound(_)) => ok
}
}
}

def assertEvaluate(ds: Datasource[IO, Stream[IO,?], ResourcePath], path: ResourcePath, expected: List[Data]) =
ds.evaluator[Data].evaluate(path).flatMap { res =>
gatherMultiple(res).map { _ must_== expected }
def assertResultBytes(
ds: Datasource[IO, Stream[IO, ?], ResourcePath, QueryResult[IO]],
path: ResourcePath,
expected: Array[Byte]) =
ds.evaluate(path) flatMap {
case QueryResult.Typed(_, data) =>
data.compile.to[Array].map(_ must_=== expected)

case _ =>
IO(ko("Unexpected QueryResult"))
}

def assertPrefixedChildPaths(path: ResourcePath, expected: List[(ResourceName, ResourcePathType)]) =
Expand All @@ -194,7 +204,7 @@ class S3DatasourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] {
parsing: S3JsonParsing,
bucket: Uri,
creds: Option[S3Credentials])
: F[Datasource[F, Stream[F, ?], ResourcePath]] = {
: F[Datasource[F, Stream[F, ?], ResourcePath, QueryResult[F]]] = {

val ec = ExecutionContext.Implicits.global
val builder = BlazeClientBuilder[F](ec)
Expand Down
17 changes: 1 addition & 16 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,23 @@ object Dependencies {
// the S3 API's responses.
private val scalaXmlVersion = "1.1.0"

// used for parsing JSON out of the stream of S3 data
// in an object.
// we need to be compatible with Quasar's version of both
// fs2 and jawn
private val argonautVersion = "6.2.2"
private val catsEffectVersion = "1.0.0"
private val circeFs2Version = "0.9.0"
private val circeJawnVersion = "0.9.3"
private val fs2Version = "1.0.0"
private val quasarVersion = IO.read(file("./quasar-version")).trim
private val qdataVersion = IO.read(file("./qdata-version")).trim
private val jawnFs2Version = "0.13.0"
private val shimsVersion = "1.2.1"
private val specsVersion = "4.1.2"

// http4s-blaze-client's version has to be in sync with
// quasar's http4s version. The same goes for any
// dependencies, transitive or otherwise.
def datasourceCore = Seq(
"org.http4s" %% "jawn-fs2" % jawnFs2Version,
"com.slamdata" %% "qdata-json" % qdataVersion,
"org.http4s" %% "http4s-scala-xml" % http4sVersion,
"org.http4s" %% "http4s-blaze-client" % http4sVersion,
"org.scala-lang.modules" %% "scala-xml" % scalaXmlVersion,
"io.circe" %% "circe-jawn" % circeJawnVersion,
"com.codecommit" %% "shims" % shimsVersion,
"org.typelevel" %% "cats-effect" % catsEffectVersion,
"org.specs2" %% "specs2-core" % specsVersion % Test,
"org.specs2" %% "specs2-scalaz" % specsVersion % Test,
"org.specs2" %% "specs2-scalacheck" % specsVersion % Test,
"io.argonaut" %% "argonaut" % argonautVersion,
"io.circe" %% "circe-fs2" % circeFs2Version
"org.specs2" %% "specs2-scalacheck" % specsVersion % Test
)

// we need to separate quasar out from the datasource dependencies,
Expand Down
2 changes: 1 addition & 1 deletion quasar-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
79.1.1
80.0.0
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "7.0.1"
version in ThisBuild := "8.0.0"

0 comments on commit 67d4c3f

Please sign in to comment.