Configuring Spark with Mongo DB
Two packaged connectors are available to integrate Mongo DB and Spark:- Spark Mongo DB connector developed by Stratio: Spark-MongoDB
- Mongo DB connector for Spark: mongo-spark
- Mongo DB Connector for Spark: mongo-spark-connector_2.10
- Mongo DB Java Driver
- Spark dependencies
Sample Code
Mongoconfig.scalatraitMongoConfig {
val database = "test"
val collection = "characters"
val host = "localhost"
val port = 27017
}
DataModeler.scala
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.SQLContext
import com.mongodb.spark.config._
import com.mongodb.spark.api.java.MongoSpark
import com.mongodb.spark._
import com.mongodb.spark.sql._
object DataModeler extends MongoConfig {
def main(args: Array[String]): Unit = {
println("Initializing Streaming Spark Context...")
System.setProperty("hadoop.home.dir", """C:\hadoop-2.3.0""")
val inputUri = "mongodb://" + host + ":" + port + "/" + database + "." + collection
val outputUri = "mongodb://" + host + ":" + port + "/" + database + "." + collection
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("MongoConnector")
.set("spark.mongodb.input.uri", inputUri)
.set("spark.mongodb.output.uri", outputUri)
println("SparkConf is set...")
val sc = new SparkContext(sparkConf)
val dataHandler = new DataHandler()
//Data Insertion
dataHandler.insertData(sc)
// Helper Functions
val sqlContext = SQLContext.getOrCreate(sc)
var df = MongoSpark.load(sqlContext)
var rows = df.count()
println("Is collection empty- " + df.isEmpty())
println("Number of rows- " + rows)
println("First row- " + df.first())
//Data Selection
println("The elements are:")
sc.loadFromMongoDB().take(rows.toInt).foreach(println)
//Querying collection using SQLcontext
val characterdf = sqlContext.read.mongo()
characterdf.registerTempTable("characters")
val centenarians = sqlContext.sql("SELECT name, age FROM characters WHERE age >= 100")
println("Centenarians are:")
centenarians.show()
//Applying aggregation pipeline
println("Sorted values are:")
sqlContext.read.option("pipeline", "[{ $sort: { age : -1 } }]").mongo().show()
// Drop database
//MongoConnector(sc).withDatabaseDo(ReadConfig(sc), db =>db.drop())
println("Completed.....")
}
}
DataHandler.scala
import org.apache.spark.SparkContext
import com.mongodb.spark.api.java.MongoSpark
import org.bson.Document
class DataHandler {
def insertData(sc: SparkContext) {
val docs = """
|{"name": "Bilbo Baggins", "age": 50}
|{"name": "Gandalf", "age": 1000}
|{"name": "Thorin", "age": 195}
|{"name": "Balin", "age": 178}
|{"name": "Kíli", "age": 77}
|{"name": "Dwalin", "age": 169}
|{"name": "Óin", "age": 167}
|{"name": "Glóin", "age": 158}
|{"name": "Fíli", "age": 82}
|{"name": "Bombur"}""".trim.stripMargin.split("[\\r\\n]+").toSeq
MongoSpark.save(sc.parallelize(docs.map(Document.parse)))
}
}
Dependencies
Spark-assembly-1.6.1-hadoop2.6.0.jarMongo-java-driver-3.2.2.jar
Mongo-spark-connector_2.10-0.1.jar
References
- https://docs.mongodb.com/manual/
- https://docs.mongodb.com/spark-connector
- https://github.com/mongodb/mongo-spark
- https://github.com/Stratio/Spark-MongoDB
- https://www.mongodb.com/mongodb-architecture
- https://jira.mongodb.org/secure/attachment/112939/MongoDB_Architecture_Guide.pdf
Be self motivated, We are the creators of our destiny