Spark java.lang.NoClassDefFoundError: org/apache/spark/sql/exécution/datasources/v2/FileDataSourceV2


J'essaie actuellement de spark-soumettre un pot de graisse à un cluster local, que j'ai développé en utilisant Spark 2.4.6; Scala 2.11.12. Lors de la soumission au cluster, je reçois cette erreur:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

Ma commande spark submit (exécutée dans l'invite cmd): spark-submit --class main.app --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6 my_app_name-1.0-SNAPSHOT-jar-with-dependencies.jar

Autres détails:

  • Scala version: 2.11.12
  • Étincelle 2.4.6
  • Lorsque je soumets en utilisant Spark 3.0.0 (c'est-à-dire en pointant mon SPARK_HOME vers le répertoire Spark 3.0.0 et en soumettant), cela fonctionne bien, mais lorsque je soumets en utilisant Spark 2.4.6 (c'est-à-dire en pointant mon SPARK_HOME vers le répertoire Spark 2.4.6 et en soumettant), j'obtiens cette erreur
  • Je dois utiliser 2.4.6 (cela ne peut pas être modifié)

Mon pom fichier

[....headers and stuff]
<groupId>org.example</groupId>
<artifactId>my_app_name</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
    <scala.version>2.11.12</scala.version>
</properties>

<repositories>
    <repository>
        <id>scala-tools.org</id>
        <name>Scala-Tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
    </repository>
</repositories>

<pluginRepositories>
    <pluginRepository>
        <id>scala-tools.org</id>
        <name>Scala-Tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
</pluginRepositories>

<dependencies>
        <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api -->
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>5.6.0</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.specs</groupId>
        <artifactId>specs</artifactId>
        <version>1.2.5</version>
        <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.4.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.4.3</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>2.4.3</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.3</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>2.4.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-tools -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-tools</artifactId>
        <version>2.4.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.databricks/spark-csv -->
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.11</artifactId>
        <version>1.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>2.7.4</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.11.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.3.3</version>
    </dependency>
</dependencies>
<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <!-- see http://davidb.github.com/scala-maven-plugin -->
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.3.2</version>
            <configuration>
                <recompileMode>incremental</recompileMode>   <!-- NOTE: incremental compilation although faster requires passing to MAVEN_OPTS="-XX:MaxPermSize=128m" -->
                <!-- addScalacArgs>-feature</addScalacArgs -->
                <args>
                    <arg>-Yresolve-term-conflict:object</arg>   <!-- required for package/object name conflict in Jenkins jar -->
                </args>
                <javacArgs>
                    <javacArg>-Xlint:unchecked</javacArg>
                    <javacArg>-Xlint:deprecation</javacArg>
                </javacArgs>
            </configuration>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>
                                    ingest_package.object_ingest
                                </mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

[....footers and stuff]

Mon fichier d'application principal

package main

import java.nio.file.{Files, Paths}

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.functions.{date_format, struct}

object app {

def main(args: Array[String]): Unit = {

val spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("parquet_ingest_engine")
  .getOrCreate()

Logger.getLogger("org").setLevel(Level.ERROR)
val accessKeyId = System.getenv("AWS_ACCESS_KEY_ID")
val secretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY")


val person_df = spark.read.format("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat").load("s3_parquet_path_here")
val person_df_reformatted = person_df.withColumn("registration_dttm_string", date_format(person_df("registration_dttm"), "MM/dd/yyyy hh:mm"))
val person_df_final = person_df_reformatted.select("registration_dttm_string", "id", "first_name", "last_name", "email", "gender", "ip_address", "cc", "country", "birthdate", "salary", "title", "comments")

person_df_final.printSchema()
person_df_final.show(5)

val person_avro_schema = new String(Files.readAllBytes(Paths.get("input\\person_schema.avsc")))
print(person_avro_schema)

person_df_final.write.format("avro").mode("overwrite").option("avroSchema", person_avro_schema).save("output/person.avro")
print("\n" + "=====================successfully wrote avro to local path=====================" + "\n")


person_df_final.select(to_avro(struct("registration_dttm_string", "id", "first_name", "last_name", "email", "gender", "ip_address", "cc", "country", "birthdate", "salary", "title", "comments")) as "value")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "spark_topic_test")
  .save()

print("\n" + "========================Successfully wrote to avro consumer on localhost kafka consumer========================" + "\n"+ "\n")


  }
 }
Author: kndoan211A, 2020-09-10

1 answers

Tout d'abord, vous avez des problèmes avec les dépendances:

  • vous n'avez pas besoin de com.databricks:spark-csv_2.11 - Le support CSV est dans le Spark lui - même depuis longtemps
  • vous n'avez pas besoin de dépendances Kafka sauf org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6
  • spark-sql et spark-core doivent être déclarées avec <scope>provided</scope> comme ici
  • il est préférable d'utiliser la même version des dépendances Spark que celle que vous utilisez pour la soumission

2ème, le problème pourrait provenir de la version incorrecte de Scala (par exemple, vous n'avez pas fait mvn clean si vous avez dit que le code fonctionne avec Spark 3.0, il devrait être compilé avec Scala 2.12, tandis que 2.4.6 ne fonctionne qu'avec 2.11

Je recommande fortement de se débarrasser des dépendances non nécessaires, d'utiliser fourni, de faire mvn clean, etc.

 0
Author: Alex Ott, 2020-09-10 17:14:55