Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doubts] Custom spark physical operator Just like AlreadySortedExec #5

Open
dragno99 opened this issue Sep 20, 2024 · 2 comments
Open

Comments

@dragno99
Copy link

Hello Vladimir Prus, I read your blog on medium, it was very much interesting, i learned alot from it. By reading so, I was trying to write a custom operator which will derived an extra column ( i.e add an extra UTF8 string at the end of InternalRow), after that I am applying groupBy aggregation, it seems like everything is working fine but i am seeing that only 1 element is taking part in groupBy aggregation whereas when i am just using mapPartitions to derive that columns, lots of elements taking part in shuffling stage and giving correct output.

I need your help and suggestion to resolve this issue.

@vprus
Copy link
Collaborator

vprus commented Sep 23, 2024

Can you put together a minimal example to reproduce this problem? E.g. as a gist at gist.github.com?

@dragno99
Copy link
Author

Hi, apologies for my late response.

so when i wrote my doExecute() method like this then in shuffle stage, only 1 record was taking part ( seems like hashAggregate was getting same hash value for every row)

  override protected def doExecute(): RDD[InternalRow] = {
    val func = (partitionIndex: Int, it: Iterator[InternalRow]) => {

      val inputs = UnsafeProjection.create(child.output, output)
      inputs.initialize(partitionIndex)

      val retRows: ArrayBuffer[UnsafeRow] = ArrayBuffer()

      val queryKeys: util.List[String] = new util.ArrayList[String]()

      val keyIdx = child.schema.fieldIndex(keyFields.head.name)
      
      // here i am gathering all keys for bulk query
      it.foreach(row => {
        val r =  inputs(row)
        if(!r.isNullAt(keyIdx)) {
          val key = r.getUTF8String(keyIdx).toString
          if(key != "") queryKeys.add(key)
        }
        retRows += r
      })

      var map: util.Map[String, Array[String]] = new util.HashMap[String, Array[String]]

      if (!queryKeys.isEmpty) {
        map = queryInBulk(queryKeys)
      }

      val rowWriter: UnsafeRowWriter = new UnsafeRowWriter(newAttributeReference.size)
      
      val joiner = GenerateUnsafeRowJoiner.create(child.schema, newAttributeReference.map(_.toAttribute).toStructType)

      retRows.map(row => {
        var res: UnsafeRow = null
        var queryRes = new Array[String](qualifiers.length)
        if(!row.isNullAt(keyIdx)) {
          val key = row.getUTF8String(keyIdx).toString
          queryRes = map.getOrDefault(key, new Array[String](qualifiers.length))
        }
        res = joiner.join(row, buildUnsafeRow(rowWriter, queryRes))
        res
      }).iterator
    }
    child.execute().mapPartitionsWithIndex(func, preservesPartitioning = true)
  }
   
 private def buildUnsafeRow(rowWriter: UnsafeRowWriter, values: Array[String]): UnsafeRow = {
    rowWriter.reset()
    for (i <- values.indices) {
      if (values(i) == null) {
        rowWriter.setNullAt(i)
      } else {
        rowWriter.write(i, UTF8String.fromString(values(i)))
      }
    }
    rowWriter.getRow
  }
  

but when i changed my code and used .copy() method, it started working correctly, below are few changes which made above code working

  1. val r = inputs(row).copy()

  2. res = joiner.join(row, buildUnsafeRow(rowWriter, queryRes)).copy()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants