Spark java.lang.NoClassDefFoundError: org/apache/spark/sql/exécution/datasources/v2/FileDataSourceV2
J'essaie actuellement de spark-soumettre un pot de graisse à un cluster local, que j'ai développé en utilisant Spark 2.4.6; Scala 2.11.12. Lors de la soumission au cluster, je reçois cette erreur:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2
Ma commande spark submit (exécutée dans l'invite cmd):
spark-submit --class main.app --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6 my_app_name-1.0-SNAPSHOT-jar-with-dependencies.jar
Autres détails:
- Scala version: 2.11.12
- Étincelle 2.4.6
- Lorsque je soumets en utilisant Spark 3.0.0 (c'est-à-dire en pointant mon SPARK_HOME vers le répertoire Spark 3.0.0 et en soumettant), cela fonctionne bien, mais lorsque je soumets en utilisant Spark 2.4.6 (c'est-à-dire en pointant mon SPARK_HOME vers le répertoire Spark 2.4.6 et en soumettant), j'obtiens cette erreur
- Je dois utiliser 2.4.6 (cela ne peut pas être modifié)
Mon pom fichier
[....headers and stuff]
<groupId>org.example</groupId>
<artifactId>my_app_name</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.12</scala.version>
</properties>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-tools -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-tools</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.3</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<configuration>
<recompileMode>incremental</recompileMode> <!-- NOTE: incremental compilation although faster requires passing to MAVEN_OPTS="-XX:MaxPermSize=128m" -->
<!-- addScalacArgs>-feature</addScalacArgs -->
<args>
<arg>-Yresolve-term-conflict:object</arg> <!-- required for package/object name conflict in Jenkins jar -->
</args>
<javacArgs>
<javacArg>-Xlint:unchecked</javacArg>
<javacArg>-Xlint:deprecation</javacArg>
</javacArgs>
</configuration>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>
ingest_package.object_ingest
</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
[....footers and stuff]
Mon fichier d'application principal
package main
import java.nio.file.{Files, Paths}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.functions.{date_format, struct}
object app {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("parquet_ingest_engine")
.getOrCreate()
Logger.getLogger("org").setLevel(Level.ERROR)
val accessKeyId = System.getenv("AWS_ACCESS_KEY_ID")
val secretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
val person_df = spark.read.format("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat").load("s3_parquet_path_here")
val person_df_reformatted = person_df.withColumn("registration_dttm_string", date_format(person_df("registration_dttm"), "MM/dd/yyyy hh:mm"))
val person_df_final = person_df_reformatted.select("registration_dttm_string", "id", "first_name", "last_name", "email", "gender", "ip_address", "cc", "country", "birthdate", "salary", "title", "comments")
person_df_final.printSchema()
person_df_final.show(5)
val person_avro_schema = new String(Files.readAllBytes(Paths.get("input\\person_schema.avsc")))
print(person_avro_schema)
person_df_final.write.format("avro").mode("overwrite").option("avroSchema", person_avro_schema).save("output/person.avro")
print("\n" + "=====================successfully wrote avro to local path=====================" + "\n")
person_df_final.select(to_avro(struct("registration_dttm_string", "id", "first_name", "last_name", "email", "gender", "ip_address", "cc", "country", "birthdate", "salary", "title", "comments")) as "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "spark_topic_test")
.save()
print("\n" + "========================Successfully wrote to avro consumer on localhost kafka consumer========================" + "\n"+ "\n")
}
}
1 answers
Tout d'abord, vous avez des problèmes avec les dépendances:
- vous n'avez pas besoin de
com.databricks:spark-csv_2.11
- Le support CSV est dans le Spark lui - même depuis longtemps - vous n'avez pas besoin de dépendances Kafka sauf
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6
-
spark-sql
etspark-core
doivent être déclarées avec<scope>provided</scope>
comme ici - il est préférable d'utiliser la même version des dépendances Spark que celle que vous utilisez pour la soumission
2ème, le problème pourrait provenir de la version incorrecte de Scala (par exemple, vous n'avez pas fait mvn clean
si vous avez dit que le code fonctionne avec Spark 3.0, il devrait être compilé avec Scala 2.12, tandis que 2.4.6 ne fonctionne qu'avec 2.11
Je recommande fortement de se débarrasser des dépendances non nécessaires, d'utiliser fourni, de faire mvn clean
, etc.