Configuring Spark with Mongo DB
Two packaged connectors are available to integrate Mongo DB and Spark:
- Spark Mongo DB connector developed by Stratio: Spark-MongoDB
- Mongo DB connector for Spark: mongo-spark
Dependencies required are:
- Mongo DB Connector for Spark: mongo-spark-connector_2.10
- Mongo DB Java Driver
- Spark dependencies
Sample Code
Mongoconfig.scala
traitMongoConfig {
val database = "test"
val collection = "characters"
val host = "localhost"
val port = 27017
}
DataModeler.scala
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.SQLContext
import com.mongodb.spark.config._
import com.mongodb.spark.api.java.MongoSpark
import com.mongodb.spark._
import com.mongodb.spark.sql._
object DataModeler extends MongoConfig {
def main(args: Array[String]): Unit = {
println("Initializing Streaming Spark Context...")
System.setProperty("hadoop.home.dir", """C:\hadoop-2.3.0""")
val inputUri = "mongodb://" + host + ":" + port + "/" + database + "." + collection
val outputUri = "mongodb://" + host + ":" + port + "/" + database + "." + collection
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("MongoConnector")
.set("spark.mongodb.input.uri", inputUri)
.set("spark.mongodb.output.uri", outputUri)
println("SparkConf is set...")
val sc = new SparkContext(sparkConf)
val dataHandler = new DataHandler()
//Data Insertion
dataHandler.insertData(sc)
// Helper Functions
val sqlContext = SQLContext.getOrCreate(sc)
var df = MongoSpark.load(sqlContext)
var rows = df.count()
println("Is collection empty- " + df.isEmpty())
println("Number of rows- " + rows)
println("First row- " + df.first())
//Data Selection
println("The elements are:")
sc.loadFromMongoDB().take(rows.toInt).foreach(println)
//Querying collection using SQLcontext
val characterdf = sqlContext.read.mongo()
characterdf.registerTempTable("characters")
val centenarians = sqlContext.sql("SELECT name, age FROM characters WHERE age >= 100")
println("Centenarians are:")
centenarians.show()
//Applying aggregation pipeline
println("Sorted values are:")
sqlContext.read.option("pipeline", "[{ $sort: { age : -1 } }]").mongo().show()
// Drop database
//MongoConnector(sc).withDatabaseDo(ReadConfig(sc), db =>db.drop())
println("Completed.....")
}
}
DataHandler.scala
import org.apache.spark.SparkContext
import com.mongodb.spark.api.java.MongoSpark
import org.bson.Document
class DataHandler {
def insertData(sc: SparkContext) {
val docs = """
|{"name": "Bilbo Baggins", "age": 50}
|{"name": "Gandalf", "age": 1000}
|{"name": "Thorin", "age": 195}
|{"name": "Balin", "age": 178}
|{"name": "Kíli", "age": 77}
|{"name": "Dwalin", "age": 169}
|{"name": "Óin", "age": 167}
|{"name": "Glóin", "age": 158}
|{"name": "Fíli", "age": 82}
|{"name": "Bombur"}""".trim.stripMargin.split("[\\r\\n]+").toSeq
MongoSpark.save(sc.parallelize(docs.map(Document.parse)))
}
}
Dependencies
Spark-assembly-1.6.1-hadoop2.6.0.jar
Mongo-java-driver-3.2.2.jar
Mongo-spark-connector_2.10-0.1.jar
References
- https://docs.mongodb.com/manual/
- https://docs.mongodb.com/spark-connector
- https://github.com/mongodb/mongo-spark
- https://github.com/Stratio/Spark-MongoDB
- https://www.mongodb.com/mongodb-architecture
- https://jira.mongodb.org/secure/attachment/112939/MongoDB_Architecture_Guide.pdf
Be self motivated, We are the creators of our destiny
Terminologies
Mongo DB Processes and Configurations
- mongod – Database instance
- mongos - Sharding processes
Analogous to a database router.
Processes all requests
Decides how many and which mongods should receive the query
Mongos collates the results, and sends it back to the client.
- mongo – an interactive shell (a client)
Fully functional JavaScript environment for use with a Mongo DB
Getting started with Mongo DB
- To install mongo DB, go to this link and click on the appropriate OS and architecture: http://www.mongodb.org/downloads
- Extract the files
- Create a data directory for mongoDB to use
- Open the mongodb/bin directory and run mongod.exe to start the database server.
- To establish a connection to the server, open another command prompt window and go to the same directory, entering in mongo.exe.
Mongo DB CRUD Operation
1. Create
• db.collection.insert( <document> )
Eg: db.users.insert(
{ name: “sally”, salary: 15000, designation: “MTS”, teams: [ “cluster-management” ] }
)
• db.collection.save( <document> )
2. Read
• db.collection.find( <query>, <projection> )
Eg: db.collection.find(
{ qty: { $gt: 4 } },
{ name: 1}
)
• db.collection.findOne( <query>, <projection> )
3. Update
db.collection.update( <Update Criteria>, <Update Action>, < Update Option> )
Eg: db.user.update(
{salary:{$gt:18000}},
{$set: {designation: “Manager”}},
{multi: true}
)
4. Delete
db.collection.remove( <query>, <justOne> )
Eg: db.user.remove(
{ “name" : “sally" },
{justOne: true}
)
When to Use Spark with MongoDB
While MongoDB natively offers rich analytics capabilities, there are situations where integrating the Spark engine can extend the real-time processing of operational data managed by MongoDB, and allow users to operationalize results generated from Spark within real-time business processes supported by MongoDB.
Spark can take advantage of MongoDB’s rich secondary indexes to extract and process only the range data it needs– for example, analyzing all customers located in a specific geography. This is very different from other databases that either do not offer, or do not recommend the use of secondary indexes. In these cases, Spark would need to extract all data based on a simple primary key, even if only a subset of that data is required for the Spark process. This means more processing overhead, more hardware, and longer time-to-insight for the analyst.
Examples of where it is useful to combine Spark and MongoDB include the following.
1. Rich Operators & Algorithms
Spark supports over 100 different operators and algorithms for processing data. Developers can use this to perform advanced computations that would otherwise require more programmatic effort combining the MongoDB aggregation framework with application code. For example, Spark offers native support for advanced machine learning algorithms including k-means clustering and Gaussian mixture models.
Consider a web analytics platform that uses the MongoDB aggregation framework to maintain a real time dashboard displaying the number of clicks on an article by country; how often the article is shared across social media; and the number of shares by platform. With this data, analysts can quickly gain insight on how content is performing, optimizing user’s experience for posts that are trending, with the ability to deliver critical feedback to the editors and ad-tech team.
Spark’s machine learning algorithms can also be applied to the log, clickstream and user data stored in MongoDB to build precisely targeted content recommendations for its readers. Multi-class classifications are run to divide articles into granular sub-categories, before applying logistic regression and decision tree methods to match readers’ interests to specific articles. The recommendations are then served back to users through MongoDB, as they browse the site.
2. Processing Paradigm
Many programming languages can use their own MongoDB drivers to execute queries against the database, returning results to the application where additional analytics can be run using standard machine learning and statistics libraries. For example, a developer could use the MongoDB Python or R drivers to query the database, loading the result sets into the application tier for additional processing.
However, this starts to become more complex when an analytical job in the application needs to be distributed across multiple threads and nodes. While MongoDB can service thousands of connections in parallel, the application would need to partition the data, distribute the processing across the cluster, and then merge results. Spark makes this kind of distributed processing easier and faster to develop. MongoDB exposes operational data to Spark’s distributed processing layer to provide fast, real-time analysis. Combining Spark queries with MongoDB indexes allows data to be filtered, avoiding full collection scans and delivering low-latency responsiveness with minimal hardware and database overhead.
3. Skills Re-Use
With libraries for SQL, machine learning and others –combined with programming in Java, Scala and Python –developers can leverage existing skills and best practices to build sophisticated analytics workflows on top of MongoDB.
Don't fear failure in the first attempt because even the successful maths started with 'zero' only
MongoDB data management
1. Sharding
MongoDB provides horizontal scale-out for databases on low cost, commodity hardware or cloud infrastructure using a technique called sharding, which is transparent to applications. Sharding distributes data across multiple physical partitions called shards. Sharding allows MongoDB deployments to address the hardware limitations of a single server, such as bottlenecks in RAM or disk I/O, without adding complexity to the application. MongoDB automatically balances the data in the sharded cluster as the data grows or the size of the cluster increases or decreases.
- Range-based Sharding: Documents are partitioned across shards according to the shard key value. Documents with shard key values “close” to one another are likely to be co-located on the same shard. This approach is well suited for applications that need to optimize range-based queries.
- Hash-based Sharding: Documents are uniformly distributed according to an MD5 hash of the shard key value. This approach guarantees a uniform distribution of writes across shards, but is less optimal for range-based queries.
- Location-aware Sharding: Documents are partitioned according to a user-specified configuration that associates shard key ranges with specific shards and hardware. Users can continuously refine the physical location of documents for application requirements such as locating data in specific data centers or on different storage types (i.e. the In-Memory engine for “hot” data, and disk-based engines running on SSDs for warm data, and HDDs for aged data).
2. Consistency
MongoDB is ACID compliant at the document level. One or more fields may be written in a single operation, including updates to multiple sub-documents and elements of an array. The ACID property provided by MongoDB ensures complete isolation as a document is updated; any errors cause the operation to roll back and clients receive a consistent view of the document. MongoDB also allows users to specify write availability in the system using an option called the write concern. The default write concern acknowledges writes from the application, allowing the client to catch network exceptions and duplicate key errors. Developers can use MongoDB's Write Concerns to configure operations to commit to the application only after specific policies have been fulfilled. Each query can specify the appropriate write concern, ranging from unacknowledged to acknowledgement that writes have been committed to all replicas.
3. Availability
Replication: MongoDB maintains multiple copies of data called replica sets using native replication. A replica set is a fully self-healing shard that helps prevent database downtime and can be used to scale read operations. Replica failover is fully automated, eliminating the need for administrators to intervene manually. A replica set consists of multiple replicas. At any given time one member acts as the primary replica set member and the other members act as secondary replica set members. MongoDB is strongly consistent by default: reads and writes are issued to a primary copy of the data. If the primary member fails for any reason (e.g., hardware failure, network partition) one of the secondary members is automatically elected to primary and begins to process all writes. Up to 50 members can be provisioned per replica set.
Applications can optionally read from secondary replicas, where data is eventually consistent by default. Reads from secondaries can be useful in scenarios where it is acceptable for data to be slightly out of date, such as some reporting applications. For data-center aware reads, applications can also read from the closest copy of the data as measured by ping distance to reduce the effects of geographic latency. Replica sets also provide operational flexibility by providing a way to upgrade hardware and software without requiring the database to go offline. MongoDB achieves replication by the use of replica set. A replica set is a group of mongod instances that host the same data set. In a replica one node is primary node that receives all write operations. All other instances, apply operations from the primary so that they have the same data set. Replica set can have only one primary node.
- Replica set is a group of two or more nodes (generally minimum 3 nodes are required).
- In a replica set one node is primary node and remaining nodes are secondary.
- All data replicates from primary to secondary node.
- At the time of automatic failover or maintenance, election establishes for primary and a new primary node is elected.
- After the recovery of failed node, it again join the replica set and works as a secondary node.
Client application always interacts with primary node and primary node then replicates the data to the secondary nodes.
ReplicasetOplog: Operations that modify a database on the primary replica set member are replicated to the secondary members using the oplog(operations log).The size of the oplog is configurable and by default is 5% of the available free disk space. For most applications, this size represents many hours of operations and defines the recovery window for a secondary, should this replica go offline for some period of time and need to catch up to the primary when it recovers. If a secondary replica set member is down for a period longer than is maintained by the oplog, it must be recovered from the primary replica using a process called initial synchronization. During this process all databases and their collections are copied from the primary or another replica to the secondary as well as the oplog and then the indexes are built.
Elections and failover: Replica sets reduce operational overhead and improve system availability. If the primary replica for a shard fails, secondary replicas together determine which replica should be elected as the new primary using an extended implementation of the Raft consensus algorithm. Once the election process has determined the new primary, the secondary members automatically start replicating from it. If the original primary comes back online, it will recognize its change in state and automatically assume the role of a secondary.
Election Priority: A new primary replica set member is promoted within several seconds of the original primary failing. During this time, queries configured with the appropriate read preference can continue to be serviced by secondary replica set members. The election algorithms evaluate a range of parameters including analysis of election identifiers and timestamps to identify that replica set members that have applied the most recent updates from the primary; heartbeat and connectivity status; and user-defined priorities assigned to replica set members. In an election, the replica set elects an eligible member with the highest priority value as primary. By default, all members have a priority of 1 and have an equal chance of becoming primary; however, it is possible to set priority values that affect the likelihood of a replica becoming primary.
4. Storage
With MongoDB’s flexible storage architecture, the database automatically manages the movement of data between storage engine technologies using nativereplication. This approach significantly reduces developer and operational complexity when compared to running multiple distinct database technologies. MongoDB 3.2 ships with four supported storage engines, all of which can coexist within a single MongoDB replica set.
- The default WiredTiger storage engine. For many applications, WiredTiger's granular concurrency control and native compression will provide the best all round performance and storage efficiency for the broadest range of applications.
- The Encrypted storage engine protecting highly sensitive data, without the performance or management overhead of separate file system encryption.(Requires MongoDB Enterprise Advanced).
- The In-Memory storage engine delivering the extreme performance coupled with real time analytics for the most demanding, latency-sensitive applications. (Requires MongoDB Enterprise Advanced).
- The MMAPv1 engine, an improved version of the storage engine used in pre-3.x MongoDB releases
- Administrators can modify the default compression settings for all collections and indexes.
5. Security
MongoDB Enterprise Advanced features extensive capabilities to defend, detect, and control access to data:
- Authentication: Simplifying access control to the database, MongoDB offers integration with external security mechanisms including LDAP, Windows Active Directory, Kerberos, and x.509 certificates.
- Authorization: User-defined roles enable administrators to configure granular permissions for a user or an application based on the privileges they need to do their job. Additionally, field-level redaction can work with trusted middleware to manage access to individual fields within a document, making it possible to co-locate data with multiple security levels in a single document.
- Auditing: For regulatory compliance, security administrators can use MongoDB's native audit log to track any operation taken against the database – whether DML, DCL or DDL.
- Encryption: MongoDB data can be encrypted on the network and on disk. With the introduction of the Encrypted storage engine, protection of data at-rest now becomes an integral feature within the database. By natively encrypting database files on disk, administrators eliminate both the management and performance overhead of external encryption mechanisms. Now, only those staff who have the appropriate database authorization credentials can access the encrypted data, providing additional levels of defence.
Chase your Dreams