вівторок, 1 квітня 2014 р.

Spark on HDP2

There is my first experience with Apache Spark, running it on Hadoop. I faced in several issues during running my piece of code.
To be honest, I started with Cloudera CDH5 distribution, they promised Spark was already added and usage will be simple. But no luck in fact, it doesn't work at all - even on local machine with their spark-cloudera jar. I didn't want to waste my time, so I just downloaded spark distro to HDP2.
First of all, let start Spark in standalone mode, according to documentation:
# start master
./sbin/start-master.sh

# pick up in the log output spark://IP:PORT
# and than run worker on each node
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://IP:PORT

# more documentation available here https://spark.apache.org/docs/0.9.0/spark-standalone.html

After that I wrote some amount of Scala code, in fact to just count hardcoded words in document:

package experiment

import org.apache.spark.{SparkConf, SparkContext}

object SimpleApp {

  def main(args: Array[String]) {
    val logFile = args(0)  
  val conf = new SparkConf()
      .setMaster("local")
      .setAppName("My Spark application")
      .set("spark.executor.memory", "1g")
  val sc = new SparkContext(conf)


  // hdfs:///user/hue/input.txt
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("London")).count()
    val numBs = logData.filter(line =>; line.contains("Lviv")).count()
    println("Lines with London: %s, Lines with Lviv: %s".format(numAs, numBs))
}





It was the easiest part! After that I spent a couple of hours making correct build with Maven, the result is:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>SparkBegining</groupId>
    <artifactId>SparkBegining</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.10.0</scala.version>
    </properties>


    <repositories>

        <repository>
            <id>Akka repository</id>
            <url>http://repo.akka.io/releases</url>
        </repository>
        <repository>
            <id>scala</id>
            <name>Scala Tools</name>
            <url>http://scala-tools.org/repo-releases/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>


    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>0.9.0-incubating</version>
            <exclusions>
                <exclusion>
                    <artifactId>com.google.protobuf</artifactId>
                    <groupId>protobuf-java</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>2.5.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.specs</groupId>
            <artifactId>specs</artifactId>
            <version>1.2.5</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.5</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <configuration>
                    <downloadSources>true</downloadSources>
                    <buildcommands>
                        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
                    </buildcommands>
                    <additionalProjectnatures>
                        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
                    </additionalProjectnatures>
                    <classpathContainers>
                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
                    </classpathContainers>
                </configuration>
            </plugin>


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.5</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <artifactSet>
                                <includes>
                                    <include>*:*</include>
                                </includes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <!-- it's required to overcome icorect digest exception -->
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <!-- it's required to overcome 'akka.version' exception (and put Akka default configuration) -->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <manifestEntries>
                                        <Main-Class>experiment.SimpleApp</Main-Class>
                                    </manifestEntries>
                                </transformer>
                                <!-- and it's required to specify handler for 'hdfs' filesystem -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/services/org.apache.hadoop.fs.FileSystem</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>
    <reporting>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </reporting>

</project>


Perhaps, you mentioned that I excluded protobuf from sprak and added next version. The reason:
I got error
Exception in thread "main" java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$AppendRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;at java.lang.ClassLoader.defineClass1(Native Method)

I checked my HDP (find / -name protobug*.jar) and found that my Hadoop uses protobuf 2.5.1 instead of 2.4.1 (it was dependency derived from spark jar! it easy discovered with maven command mvn dependency:tree -Dincludes=*protobuf*)

 After that, finally, I was able to run Spark Job! Hurray:

java -jar SparkBegining-1.0-SNAPSHOT-shaded.jar hdfs://10.25.9.155:8020/user/hue/input.txt

1 коментар:

  1. Thanks for sharing the setup information, the following line is not compiling due to ";" present in the following line
    val numBs = logData.filter(line =>; line.contains("Lviv")).count()

    Thanks,
    Vicky

    ВідповістиВидалити