Sunday, 29 October 2017

Spark on MongoDB: Part- 4

Configuring Spark with Mongo DB

Two packaged connectors are available to integrate Mongo DB and Spark:
  • Spark Mongo DB connector developed by Stratio: Spark-MongoDB 
  • Mongo DB connector for Spark: mongo-spark
Dependencies required are:
  • Mongo DB Connector for Spark: mongo-spark-connector_2.10
  • Mongo DB Java Driver
  • Spark dependencies

Sample Code

Mongoconfig.scala
traitMongoConfig {
val database = "test"
val collection = "characters"
val host = "localhost"
val port = 27017
}
DataModeler.scala
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.SQLContext
import com.mongodb.spark.config._
import com.mongodb.spark.api.java.MongoSpark
import com.mongodb.spark._
import com.mongodb.spark.sql._

object DataModeler extends MongoConfig {


def main(args: Array[String]): Unit = {


println("Initializing Streaming Spark Context...")

System.setProperty("hadoop.home.dir", """C:\hadoop-2.3.0""")

val inputUri = "mongodb://" + host + ":" + port + "/" + database + "." + collection

val outputUri = "mongodb://" + host + ":" + port + "/" + database + "." + collection

val sparkConf = new SparkConf()

      .setMaster("local[*]")
      .setAppName("MongoConnector")
      .set("spark.mongodb.input.uri", inputUri)
      .set("spark.mongodb.output.uri", outputUri)
println("SparkConf is set...")

val sc = new SparkContext(sparkConf)


val dataHandler = new DataHandler()


//Data Insertion

dataHandler.insertData(sc)

// Helper Functions

val sqlContext = SQLContext.getOrCreate(sc)
var df = MongoSpark.load(sqlContext)

var rows = df.count()

println("Is collection empty- " + df.isEmpty())
println("Number of rows- " + rows)
println("First row- " + df.first())

//Data Selection

println("The elements are:")
sc.loadFromMongoDB().take(rows.toInt).foreach(println)

//Querying collection using SQLcontext

val characterdf = sqlContext.read.mongo()
characterdf.registerTempTable("characters")

val centenarians = sqlContext.sql("SELECT name, age FROM characters WHERE age >= 100")

println("Centenarians are:")
centenarians.show()

//Applying aggregation pipeline

println("Sorted values are:")
sqlContext.read.option("pipeline", "[{ $sort: { age : -1 } }]").mongo().show()

// Drop database

//MongoConnector(sc).withDatabaseDo(ReadConfig(sc), db =>db.drop())

println("Completed.....")


  }

}
DataHandler.scala
import org.apache.spark.SparkContext
import com.mongodb.spark.api.java.MongoSpark
import org.bson.Document
class DataHandler {
def insertData(sc: SparkContext) {
val docs = """
|{"name": "Bilbo Baggins", "age": 50}
|{"name": "Gandalf", "age": 1000}
|{"name": "Thorin", "age": 195}
|{"name": "Balin", "age": 178}
|{"name": "Kíli", "age": 77}
|{"name": "Dwalin", "age": 169}
|{"name": "Óin", "age": 167}
|{"name": "Glóin", "age": 158}
|{"name": "Fíli", "age": 82}
|{"name": "Bombur"}""".trim.stripMargin.split("[\\r\\n]+").toSeq
MongoSpark.save(sc.parallelize(docs.map(Document.parse)))
  }
}

Dependencies

Spark-assembly-1.6.1-hadoop2.6.0.jar
Mongo-java-driver-3.2.2.jar
Mongo-spark-connector_2.10-0.1.jar

References

  • https://docs.mongodb.com/manual/ 
  • https://docs.mongodb.com/spark-connector 
  • https://github.com/mongodb/mongo-spark 
  • https://github.com/Stratio/Spark-MongoDB 
  • https://www.mongodb.com/mongodb-architecture 
  • https://jira.mongodb.org/secure/attachment/112939/MongoDB_Architecture_Guide.pdf 
Be self motivated, We are the creators of our destiny



1 comment: