Jai’s Weblog – Tech, Security & Fun…

Tech, Security & Fun…

  • Jaibeer Malik

    Jaibeer Malik
  • View Jaibeer Malik's profile on LinkedIn
  • Subscribe

  • Feedburner

  • Enter your email address to subscribe to this blog and receive notifications of new posts by email.

    Join 29 other followers

  • Archives

  • Categories

  • Stats

    • 286,912
  • Live Traffic

Spark: Real time analytics for big data for top search queries and top product views

Posted by Jai on June 4, 2014

Hadoop being the batch processing framework makes it a little hard to get the real time analytics for big data. Apache Spark overcomes this batch nature and provides distributed computation capabilities and events processed in streaming fashion. In this post, we will cover to explore Spark streaming capability to process Flume Events data to generate Top search query strings generated in last an hour or top product views in the last one hour.

In continuation to the previous posts on

We have so far utilized the Hadoop system batching capabilities to process huge amount of data. But the overall batching operation makes it a bit of latency issue depending on your data. This is where Spark comes into picture. We will explore Spark streaming capability here to get some real time analytics and those can be used on the website for display purpose or for monitoring purpose.

Spark

Apache spark  “is a fast and general engine for large-scale data processing.”

Functionality

As shared in other above exmaples, we have the customer search clicks data available to us. We have Flume system in place to process the data and store in Hadoop for later processing perspective. Take a scenario, you want to display real time customer behavior on the website, how other customers are doing

  • What other customers searching?
  • Other customers also searching for…
  • Top search query string on the website in last an hour
  • What other customers viewing?
  • Other customers also viewing products…
  • Top product views in the last an hour

We will use Spark streaming functionality to get real time results for customer clicks on the website.

spark-streaming-flume-realtimeanalytics

Spark Streaming

Spark streaming  “makes it easy to build scalable fault-tolerant streaming applications.”

Maven dependency

Check pom.xml  for other dependencies,

<spark.version>0.9.0-cdh5.0.2</spark.version>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-flume_2.10</artifactId>
			<version>${spark.version}</version>
		</dependency>

The application use 0.9.0 version of the spark.

Starting spark context

We will start the sprak in local mode. Check SparkStreamServiceImpl.java  for details,

		...
		// Create a StreamingContext with a SparkConf configuration
		SparkConf sparkConf = new SparkConf(false)
				.setAppName("JaiSpark")
				.setSparkHome("target/sparkhome")
				.setMaster("local")
				.set("spark.executor.memory", "128m")
				.set("spark.local.dir", new File("target/sparkhome/tmp").getAbsolutePath())
				.set("spark.cores.max", "2").set("spark.akka.threads", "2")
				.set("spark.akka.timeout", "60").set("spark.logConf", "true")
				.set("spark.cleaner.delay", "3700")
				.set("spark.cleaner.ttl", "86400")
				.set("spark.shuffle.spill", "false")
				.set("spark.driver.host", "localhost")
				.set("spark.driver.port", "43214");
		jssc = new JavaStreamingContext(sparkConf, new Duration(5000));
		String checkpointDir = hadoopClusterService.getHDFSUri() + "/sparkcheckpoint";
		jssc.checkpoint(checkpointDir);
		...

The spark instance is started in local mode for testing purpose.

Create Flume events stream

We have the streaming context setup. Let’s create the flume streams for the same.

	...
	JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(jssc, "localhost", 41111, StorageLevels.MEMORY_AND_DISK);
	...

The stream instance will listen on localhost on port “41111” to retrieve data.

Flume events sample data

The sample data generated from flume events, customer search clicks,

{"eventid":"629e9b5f-ff4a-4168-8664-6c8df8214aa7-1399386809805-24","hostedmachinename":"192.168.182.1330","pageurl":"http://blahblah:/5","customerid":24,"sessionid":"648a011d-570e-48ef-bccc-84129c9fa400","querystring":null,"sortorder":"desc","pagenumber":3,"totalhits":28,"hitsshown":7,"createdtimestampinmillis":1399386809805,"clickeddocid":"41","favourite":null,"eventidsuffix":"629e9b5f-ff4a-4168-8664-6c8df8214aa7","filters":[{"code":"searchfacettype_color_level_2","value":"Blue"},{"code":"searchfacettype_age_level_2","value":"12-18 years"}]}
{"eventid":"648b5cf7-7ca9-4664-915d-23b0d45facc4-1399386809782-298","hostedmachinename":"192.168.182.1333","pageurl":"http://blahblah:/4","customerid":298,"sessionid":"7bf042ea-526a-4633-84cd-55e0984ea2cb","querystring":"queryString48","sortorder":"desc","pagenumber":0,"totalhits":29,"hitsshown":19,"createdtimestampinmillis":1399386809782,"clickeddocid":"9","favourite":null,"eventidsuffix":"648b5cf7-7ca9-4664-915d-23b0d45facc4","filters":[{"code":"searchfacettype_color_level_2","value":"Green"}]}
{"eventid":"74bb7cfe-5f8c-4996-9700-0c387249a134-1399386809799-440","hostedmachinename":"192.168.182.1330","pageurl":"http://blahblah:/1","customerid":440,"sessionid":"940c9a0f-a9b2-4f1d-b114-511ac11bf2bb","querystring":"queryString16","sortorder":"asc","pagenumber":3,"totalhits":5,"hitsshown":32,"createdtimestampinmillis":1399386809799,"clickeddocid":null,"favourite":null,"eventidsuffix":"74bb7cfe-5f8c-4996-9700-0c387249a134","filters":[{"code":"searchfacettype_brand_level_2","value":"Apple"}]}

Create Top 10 search query string stream

We want to retrieve what all customer are currently searching for on the website, which is like top search string let’s say in last an hour. Check QueryStringJDStreams.java  for complete stream functionality.

public class QueryStringJDStreams implements Serializable {
	...
	//only data with query string
	JavaDStream<SparkFlumeEvent> onlyQueryStringStream = flumeStream
				.filter(new Function<SparkFlumeEvent, Boolean>() {
					@Override
					public Boolean call(SparkFlumeEvent event) throws Exception {
						String eventString = new String(event.event().getBody().array());
						String queryString = getQueryString(eventString);
						return queryString != null && queryString != "" && queryString != "null";
					}
				});
		//Count the query string
		JavaPairDStream<String, Integer> queryStringStream = onlyQueryStringStream
				.map(new PairFunction<SparkFlumeEvent, String, Integer>() {
					public Tuple2<String, Integer> call(SparkFlumeEvent event) {
						String eventString = new String(event.event().getBody().array());
						String queryString = getQueryString(eventString);
						return new Tuple2<String, Integer>(queryString, 1);
					}
				});
		//reduce based on query string and time window
		//event 5 sec over a period of an hour
		JavaPairDStream<String, Integer> counts = countStringStream
				.reduceByKeyAndWindow(
						new Function2<Integer, Integer, Integer>() {
							public Integer call(Integer i1, Integer i2) {
								return i1 + i2;
							}
						}, new Function2<Integer, Integer, Integer>() {
							public Integer call(Integer i1, Integer i2) {
								return i1 - i2;
							}
						}, new Duration(60 * 60 * 1000), new Duration(5 * 1000));
		//swap counts to be able to sort it
		JavaPairDStream<Integer, String> swappedCounts = counts
				.map(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
					public Tuple2<Integer, String> call(
							Tuple2<String, Integer> in) {
						return in.swap();
					}
				});
		//sort based on count of earch query string.
		JavaPairDStream<Integer, String> sortedCounts = swappedCounts
				.transform(new Function<JavaPairRDD<Integer, String>, JavaPairRDD<Integer, String>>() {
					public JavaPairRDD<Integer, String> call(
							JavaPairRDD<Integer, String> in) throws Exception {
						return in.sortByKey(false);
					}
				});
		//Currently only printing the top 10 records, can be extended to store the data.
		sortedCounts
				.foreach(new Function<JavaPairRDD<Integer, String>, Void>() {
					public Void call(JavaPairRDD<Integer, String> rdd) {
						String output = "\nTop 10 entries for stream id: " + rdd.id() + "\n";
						for (Tuple2<Integer, String> t : rdd.take(10)) {
							output = output + t.toString() + "\n";
						}
						System.out.println(output);
						return null;
					}
				});
	...
}

The operations and approach can further be performance tuned based on the requirements.

The data processed by the streaming functionality is like,

Flume->validated/filtered->Count->Swap count->Sort->Print.

Start the streaming context

Make sure to start the streaming context once the stream etc. have been finalized before using it.

	...
	jssc.start();
	...

Create Avro sink and configure Flume

To integrate the spark streaming functionality with the rest of the setup and to process flume events with Spark we will create separate channel/sink setting.
Create flume sink for spark,

		private AvroSink sparkAvroSink;
		private Channel sparkAvroChannel;

		...
		sparkAvroChannel = new MemoryChannel();
		Map<String, String> channelParamters = new HashMap<>();
		channelParamters.put("capacity", "100000");
		channelParamters.put("transactionCapacity", "1000");
		Context channelContext = new Context(channelParamters);
		Configurables.configure(sparkAvroChannel, channelContext);
		String channelName = "SparkAvroMemoryChannel-" + UUID.randomUUID();
		sparkAvroChannel.setName(channelName);

		sparkAvroSink = new AvroSink();
		sparkAvroSink.setName("SparkAvroSink-" + UUID.randomUUID());
		Map<String, String> paramters = new HashMap<>();
		paramters.put("type", "avro");
		paramters.put("hostname", "localhost");
		paramters.put("port", "41111");
		paramters.put("batch-size", "100");
		Context sinkContext = new Context(paramters);
		sparkAvroSink.configure(sinkContext);
		Configurables.configure(sparkAvroSink, sinkContext);
		sparkAvroSink.setChannel(sparkAvroChannel);
		...
		sparkAvroChannel.start();
		sparkAvroSink.start();

The overall flume events processing shared within the application is like,

SearchClick->Flume agent->Flume Source->(HDFS channel, ElasticSearch Channel, AvroSpark Channel)

The data in hadoop is stored for later batch processing, elasticsearch shows recently viewed items and Spark will be used for real time analytics generation etc.

Configure flume spark sink to send data to spark streaming socket also,

	...
	selectorProperties.put("mapping.VIEWED", HDFSChannel.getName() + " " + ESChannel.getName() + " " + sparkAvroChannel.getName());
		selectorProperties.put("mapping.FAVOURITE", HDFSChannel.getName() + " " + ESChannel.getName() + " " + sparkAvroChannel.getName());
		selectorProperties.put("default", HDFSChannel.getName() + " " + sparkAvroChannel.getName());
	...

Check FlumeAgentServiceImpl.java for details,

All the data is sent to spark using flume channel selector funcationality. You can also create separate state variable to configure spark specific data.

Results

The streaming context process the data and sample out data(count, search-query-string) is,

Top 10 entries for stream id: 2146
(36,queryString97)
(34,queryString78)
(33,queryString95)
(32,queryString9)
(32,queryString38)
(32,queryString14)
(31,queryString60)
(31,queryString64)
(30,queryString41)
(30,queryString28)

Similary, you can collect top product views data or other customers also viewing data as (count, productid)

Top 10 entries for stream id: 2156
(68,17)
(59,0)
(59,20)
(58,29)
(57,36)
(55,27)
(54,34)
(54,3)
(54,33)
(53,11)

The application covers only sample stream handling but you can definitely extend it to match your business requirements. The setup helps to get both real time analytics using spark and also batch processing using Hadoop.

About these ads

2 Responses to “Spark: Real time analytics for big data for top search queries and top product views”

  1. sematext said

    Nice post, Jai. We do similar stuff at Sematext, but with in-house software that predates Spark. Collapsing similar queries would be a nice addition here.

  2. […] sematext on Spark: Real time analytics for… […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
Follow

Get every new post delivered to your Inbox.

Join 29 other followers

%d bloggers like this: