Skip to content

Commit

Permalink
Cleanup code by applying inspections
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Apr 7, 2024
1 parent 86d0dd2 commit 505ca92
Show file tree
Hide file tree
Showing 440 changed files with 2,614 additions and 2,949 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,10 @@ final class AmqpDetailsConnectionProvider private (
if (sslConfiguration.trustManager.isDefined)
factory.useSslProtocol(sslConfiguration.protocol.get, sslConfiguration.trustManager.get)
else factory.useSslProtocol(sslConfiguration.protocol.get)
} else if (sslConfiguration.context.isDefined) {
} else if (sslConfiguration.context.isDefined)
factory.useSslProtocol(sslConfiguration.context.get)
} else {
else
factory.useSslProtocol()
}
})
requestedHeartbeat.foreach(factory.setRequestedHeartbeat)
connectionTimeout.foreach(factory.setConnectionTimeout)
Expand Down Expand Up @@ -244,9 +243,8 @@ object AmqpCredentials {
final class AmqpSSLConfiguration private (val protocol: Option[String] = None,
val trustManager: Option[TrustManager] = None,
val context: Option[SSLContext] = None) {
if (protocol.isDefined && context.isDefined) {
if (protocol.isDefined && context.isDefined)
throw new IllegalArgumentException("Protocol and context can't be defined in the same AmqpSSLConfiguration.")
}

def withProtocol(protocol: String): AmqpSSLConfiguration =
copy(protocol = Some(protocol))
Expand Down Expand Up @@ -419,10 +417,8 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr
throw new ConcurrentModificationException(
"Unexpected concurrent modification while closing the connection.")
}
} else {
if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
releaseRecursive(amqpConnectionProvider, connection)
}
} else if (!state.compareAndSet(c, Connected(cachedConnection, clients - 1)))
releaseRecursive(amqpConnectionProvider, connection)
case Closing => releaseRecursive(amqpConnectionProvider, connection)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ final class TemporaryQueueSourceSettings private (
}

object TemporaryQueueSourceSettings {
def apply(connectionProvider: AmqpConnectionProvider, exchange: String) =
def apply(connectionProvider: AmqpConnectionProvider, exchange: String): TemporaryQueueSourceSettings =
new TemporaryQueueSourceSettings(connectionProvider, exchange)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ import scala.concurrent.Promise
val callback = getAsyncCallback[(DeliveryTag, Boolean)] {
case (tag: DeliveryTag, multiple: Boolean) => confirmCallback(tag, multiple)
}
new ConfirmCallback { // cant use function literal because it doesn't work with 2.11
override def handle(tag: DeliveryTag, multiple: Boolean): Unit = callback.invoke((tag, multiple))
}
(tag: DeliveryTag, multiple: Boolean) => callback.invoke((tag, multiple))
}

private def onConfirmation(tag: DeliveryTag, multiple: Boolean): Unit = {
Expand Down Expand Up @@ -155,9 +153,8 @@ import scala.concurrent.Promise
if (noAwaitingMessages && exitQueue.isEmpty) {
streamCompletion.success(Done)
super.onUpstreamFinish()
} else {
} else
log.debug("Received upstream finish signal - stage will be closed when all buffered messages are processed")
}

private def publish(message: WriteMessage): DeliveryTag = {
val tag: DeliveryTag = channel.getNextPublishSeqNo
Expand Down Expand Up @@ -193,10 +190,9 @@ import scala.concurrent.Promise

override protected def onTimer(timerKey: Any): Unit =
timerKey match {
case tag: DeliveryTag => {
case tag: DeliveryTag =>
log.debug("Received timeout for deliveryTag {}.", tag)
onRejection(tag, multiple = false)
}
case _ => ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ import scala.concurrent.{ Future, Promise }
buffer += (tag -> AwaitingMessage(tag, passThrough))

override def dequeueAwaitingMessages(tag: DeliveryTag, multiple: Boolean): Iterable[AwaitingMessage[T]] =
if (multiple) {
if (multiple)
dequeueWhile((t, _) => t <= tag)
} else {
else {
setReady(tag)
if (isAtHead(tag)) {
if (isAtHead(tag))
dequeueWhile((_, message) => message.ready)
} else {
else
Seq.empty
}
}

private def dequeueWhile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.concurrent.{ Future, Promise }
private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToSinkSettings)
extends GraphStageWithMaterializedValue[SinkShape[WriteMessage], Future[Done]] { stage =>

val in = Inlet[WriteMessage]("AmqpReplyToSink.in")
val in: Inlet[WriteMessage] = Inlet[WriteMessage]("AmqpReplyToSink.in")

override def shape: SinkShape[WriteMessage] = SinkShape.of(in)

Expand Down Expand Up @@ -82,9 +82,8 @@ private[amqp] final class AmqpReplyToSinkStage(replyToSinkSettings: AmqpReplyToS
elem.immediate,
elem.properties.orNull,
elem.bytes.toArray)
} else if (settings.failIfReplyToMissing) {
} else if (settings.failIfReplyToMissing)
onFailure(new RuntimeException("Reply-to header was not set"))
}

tryPull(in)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
extends GraphStageWithMaterializedValue[FlowShape[WriteMessage, CommittableReadResult], Future[String]] {
stage =>

val in = Inlet[WriteMessage]("AmqpRpcFlow.in")
val out = Outlet[CommittableReadResult]("AmqpRpcFlow.out")
val in: Inlet[WriteMessage] = Inlet[WriteMessage]("AmqpRpcFlow.in")
val out: Outlet[CommittableReadResult] = Outlet[CommittableReadResult]("AmqpRpcFlow.out")

override def shape: FlowShape[WriteMessage, CommittableReadResult] = FlowShape.of(in, out)

Expand All @@ -70,7 +70,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
val consumerCallback = getAsyncCallback(handleDelivery)

val commitCallback = getAsyncCallback[AckArguments] {
case AckArguments(deliveryTag, multiple, promise) => {
case AckArguments(deliveryTag, multiple, promise) =>
try {
channel.basicAck(deliveryTag, multiple)
unackedMessages -= 1
Expand All @@ -81,10 +81,9 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
} catch {
case e: Throwable => promise.failure(e)
}
}
}
val nackCallback = getAsyncCallback[NackArguments] {
case NackArguments(deliveryTag, multiple, requeue, promise) => {
case NackArguments(deliveryTag, multiple, requeue, promise) =>
try {
channel.basicNack(deliveryTag, multiple, requeue)
unackedMessages -= 1
Expand All @@ -95,7 +94,6 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
} catch {
case e: Throwable => promise.failure(e)
}
}
}

val amqpSourceConsumer = new DefaultConsumer(channel) {
Expand All @@ -105,7 +103,7 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
body: Array[Byte]): Unit =
consumerCallback.invoke(
new CommittableReadResult {
override val message = ReadResult(ByteString(body), envelope, properties)
override val message: ReadResult = ReadResult(ByteString(body), envelope, properties)

override def ack(multiple: Boolean): Future[Done] = {
val promise = Promise[Done]()
Expand Down Expand Up @@ -148,21 +146,19 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf
}

def handleDelivery(message: CommittableReadResult): Unit =
if (isAvailable(out)) {
if (isAvailable(out))
pushMessage(message)
} else if (queue.size + 1 > bufferSize) {
else if (queue.size + 1 > bufferSize)
onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
else
queue.enqueue(message)
}

setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (queue.nonEmpty) {
if (queue.nonEmpty)
pushMessage(queue.dequeue())
}

override def onDownstreamFinish(cause: Throwable): Unit = {
setKeepGoing(true)
Expand Down Expand Up @@ -207,15 +203,14 @@ private[amqp] final class AmqpRpcFlowStage(writeSettings: AmqpWriteSettings, buf

val expectedResponses: Int = {
val headers = props.getHeaders
if (headers == null) {
if (headers == null)
responsesPerMessage
} else {
else {
val r = headers.get("expectedReplies")
if (r != null) {
if (r != null)
r.asInstanceOf[Int]
} else {
else
responsesPerMessage
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
properties: BasicProperties,
body: Array[Byte]): Unit = {
val message = if (ackRequired) {

new CommittableReadResult {
override val message = ReadResult(ByteString(body), envelope, properties)
override val message: ReadResult = ReadResult(ByteString(body), envelope, properties)

override def ack(multiple: Boolean): Future[Done] = {
val promise = Promise[Done]()
Expand Down Expand Up @@ -155,21 +154,19 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
}

def handleDelivery(message: CommittableReadResult): Unit =
if (isAvailable(out)) {
if (isAvailable(out))
pushMessage(message)
} else if (queue.size + 1 > bufferSize) {
else if (queue.size + 1 > bufferSize)
onFailure(new RuntimeException(s"Reached maximum buffer size $bufferSize"))
} else {
else
queue.enqueue(message)
}

setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (queue.nonEmpty) {
if (queue.nonEmpty)
pushMessage(queue.dequeue())
}

override def onDownstreamFinish(cause: Throwable): Unit =
if (unackedMessages == 0) super.onDownstreamFinish(cause)
Expand Down
4 changes: 2 additions & 2 deletions amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class AmqpDocsSpec extends AmqpSpec {

override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)

val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful(_)
val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful

"The AMQP Connectors" should {

Expand Down Expand Up @@ -158,7 +158,7 @@ class AmqpDocsSpec extends AmqpSpec {
val mergingFlow = mergedSources
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.fold(Set.empty[Int]) {
case (seen, (branch, element)) =>
case (seen, (branch, _)) =>
if (seen.size == fanoutSize) completion.trySuccess(Done)
seen + branch
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ import com.rabbitmq.client._
* otherwise undefined
*/
class AmqpProxyConnection(protected val delegate: Connection) extends Connection {
override def getAddress: InetAddress = delegate.getAddress()
override def getAddress: InetAddress = delegate.getAddress

override def getPort: Int = delegate.getPort()
override def getPort: Int = delegate.getPort

override def getChannelMax: Int = delegate.getChannelMax()
override def getChannelMax: Int = delegate.getChannelMax

override def getFrameMax: Int = delegate.getFrameMax()
override def getFrameMax: Int = delegate.getFrameMax

override def getHeartbeat: Int = delegate.getHeartbeat()
override def getHeartbeat: Int = delegate.getHeartbeat

override def getClientProperties: util.Map[String, AnyRef] = delegate.getClientProperties()
override def getClientProperties: util.Map[String, AnyRef] = delegate.getClientProperties

override def getClientProvidedName: String = delegate.getClientProvidedName()
override def getClientProvidedName: String = delegate.getClientProvidedName

override def getServerProperties: util.Map[String, AnyRef] = delegate.getServerProperties()
override def getServerProperties: util.Map[String, AnyRef] = delegate.getServerProperties

override def createChannel(): Channel = delegate.createChannel()

Expand Down Expand Up @@ -76,9 +76,9 @@ class AmqpProxyConnection(protected val delegate: Connection) extends Connection

override def clearBlockedListeners(): Unit = delegate.clearBlockedListeners()

override def getExceptionHandler: ExceptionHandler = delegate.getExceptionHandler()
override def getExceptionHandler: ExceptionHandler = delegate.getExceptionHandler

override def getId: String = delegate.getId()
override def getId: String = delegate.getId

override def setId(s: String): Unit = delegate.setId(s)

Expand All @@ -88,9 +88,9 @@ class AmqpProxyConnection(protected val delegate: Connection) extends Connection
override def removeShutdownListener(shutdownListener: ShutdownListener): Unit =
delegate.removeShutdownListener(shutdownListener)

override def getCloseReason: ShutdownSignalException = delegate.getCloseReason()
override def getCloseReason: ShutdownSignalException = delegate.getCloseReason

override def notifyListeners(): Unit = delegate.notifyListeners()

override def isOpen: Boolean = delegate.isOpen()
override def isOpen: Boolean = delegate.isOpen
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class AmqpGraphStageLogicConnectionShutdownSpec
val shutdownsAdded = new AtomicInteger()
val shutdownsRemoved = new AtomicInteger()

override def beforeEach() = {
override def beforeEach(): Unit = {
shutdownsAdded.set(0)
shutdownsRemoved.set(0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ private[avroparquet] class AvroParquetFlow[T <: GenericRecord](writer: ParquetWr
new InHandler {

override def onUpstreamFinish(): Unit =
// super.onUpstreamFinish()
completeStage()

override def onUpstreamFailure(ex: Throwable): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class AvroParquetFlowSpec
val n: Int = 2
val file: String = genFinalFile.sample.get
val documents: List[Document] = genDocuments(n).sample.get
val avroDocuments: List[Record] = documents.map(format.to(_))
val avroDocuments: List[Record] = documents.map(format.to)
val writer: ParquetWriter[Record] = parquetWriter[Record](file, conf, schema)

// when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class AvroParquetSinkSpec
val documents: List[Document] = genDocuments(n).sample.get
val writer: ParquetWriter[Record] = parquetWriter[Record](file, conf, schema)
// #init-sink
val records: List[Record] = documents.map(format.to(_))
val records: List[Record] = documents.map(format.to)
val source: Source[Record, NotUsed] = Source(records)
val result: Future[Done] = source
.runWith(AvroParquetSink(writer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class AvroParquetSourceSpec
val n: Int = 4
val file: String = genFinalFile.sample.get
val documents: List[Document] = genDocuments(n).sample.get
val avroDocuments: List[Record] = documents.map(format.to(_))
val avroDocuments: List[Record] = documents.map(format.to)
Source(avroDocuments)
.toMat(AvroParquetSink(parquetWriter(file, conf, schema)))(Keep.right)
.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ final class EventBridgePublishSettings private (val concurrency: Int) {

def withConcurrency(concurrency: Int): EventBridgePublishSettings = copy(concurrency = concurrency)

def copy(concurrency: Int) = new EventBridgePublishSettings(concurrency)
def copy(concurrency: Int): EventBridgePublishSettings = new EventBridgePublishSettings(concurrency)

override def toString: String =
"EventBridgePublishSettings(" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with DefaultTestContext wit

private def entryDetail(detail: String, eventBusName: Option[String] = None): PutEventsRequestEntry = {
val entry = PutEventsRequestEntry.builder().detail(detail)
eventBusName.map(entry.eventBusName(_))
eventBusName.map(entry.eventBusName)
entry.build()
}

Expand Down Expand Up @@ -129,7 +129,7 @@ class EventBridgePublishMockSpec extends AnyFlatSpec with DefaultTestContext wit
}

it should "fail stage if upstream failure occurs" in {
case class MyCustomException(message: String) extends Exception(message)
final case class MyCustomException(message: String) extends Exception(message)

val (probe, future) =
TestSource.probe[Seq[PutEventsRequestEntry]].via(EventBridgePublisher.flowSeq()).toMat(Sink.seq)(Keep.both).run()
Expand Down
Loading

0 comments on commit 505ca92

Please sign in to comment.