问题:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 12.0 failed 4 times, most recent failure: Lost task 3.3 in stage 12.0 (TID 112) (emr-5bywke3r0d3nqg4cmeb0-core-2 executor 15): java.lang.IllegalArgumentException: Unexpected type of BlockId, shuffle_4_45_33_44
at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1079)
at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1314)
at scala.Option.map(Option.scala:230)
at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1313)
at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1299)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899)
at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:496)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.IllegalArgumentException: Unexpected type of BlockId, shuffle_4_45_33_44
at org.apache.spark.storage.ShuffleBlockFetcherIterator.diagnoseCorruption(ShuffleBlockFetcherIterator.scala:1079)
at org.apache.spark.storage.BufferReleasingInputStream.$anonfun$tryOrFetchFailedException$1(ShuffleBlockFetcherIterator.scala:1314)
at scala.Option.map(Option.scala:230)
at org.apache.spark.storage.BufferReleasingInputStream.tryOrFetchFailedException(ShuffleBlockFetcherIterator.scala:1313)
at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1299)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899)
at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:496)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage13.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

原因:
RDD Cache和Shuffle数据压缩所采用的算法Codec,默认值曾经是使用LZF作为默认Codec,所以如何过设置了spark.io.compression.codec和spark.debug.maxtostringfields则会出现如上错误。
看了下sparK的代码,在ShuffleBlockFetcherIterator的block变成了ShuffleBlockBatchId,所以没有进入cas。

同时spark无法对同一张表在查询的同时进行写入,也就是如下命令:

insert overwrite table aa select *  from  aa

如果是orc或者parquet的话会出现:

Cannot overwrite a path that is also being read from

遇到这个问题的解决方案:

  • 设置 spark.sql.hive.convertMetastoreParquet=false或者spark.sql.hive.convertMetastoreOrc=false
  • 设置checkpiont 阻断血缘,设置目录并设置checkpoint,清理目录spark.cleaner.referenceTracking.cleanCheckpoints=true。
  • 先插入临时表,再移动到目标表。

即可解决次问题。


扫码手机观看或分享: