Press "Enter" to skip to content

Category: Spark

How Qubole Optimizes Apache Spark Clusters

Mikhail Stolpner gives us some tips on how to optimize Apache Spark clusters:

There are four major resources: memory, compute (CPU), disk, and network. Memory and compute are by far the most expensive. Understanding how much compute and memory your application requires is crucial for optimization.

You can configure how much memory and how many CPUs each executor gets. While the number of CPUs for each task is fixed, executor memory is shared between the tasks processed by a single executor.

A few key parameters provide the most impact on how Spark is executed in terms of resources: spark.executor.memoryspark.executor.coresspark.task.cpus, spark.executor.instances, and spark.qubole.max.executors.

This article gives us some idea of the levers we have available as well as when to pull them.  Though the article itself is vendor-specific, a lot of the advice is general.

Comments closed

RStudio Integration With Databricks

Brian Dirking, et al, announce support between RStudio and the Databricks platform:

With Databricks RStudio Integration, both popular R packages for interacting with Apache Spark, SparkR or sparklyr can be used the inside the RStudio IDE on Databricks. When multiple users use a cluster, each creates a separate SparkR Context or sparklyr connection, but they are all talking to a single Databricks managed Spark application allowing unique opportunities for collaboration between users. Together, RStudio can take advantage of Databricks’ cluster management and Apache Spark to perform such as a massive model selection as noted in the figure below.

I like seeing this level of integration, especially from a language like R, which has historically been limited to operating on a single machine’s memory.

Comments closed

Tuning Spark Jobs Running On YARN

Anubhav Tarar shows us ways of optimizing YARN to run Apache Spark jobs:

1. yarn-client mode:  In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN. To manage the memory first make sure that you have your yarn-site.xml in spark,

  • spark.yarn.am.memory: To increase the memory you should set spark.yarn.am.memory property in spark-defaults.conf but make sure that you do not allocate more memory than capacity of node manager which is defined in yarn-site.xml as yarn.nodemanager.resource.memory-mb or you can also give it when you are running spark submit with –conf parameter

For example $SPARK_HOME/bin/spark-submit –conf spark.yarn.am.memory=1024m

Check it out for a few other configuration settings you can tweak.

Comments closed

Understanding A Spark Streaming Workflow

Himanshu Gupta continues a series on structured streaming using Spark Streaming:

Here we can clearly see that if new data is pushed to the source, Spark will run the “incremental” query that combines the previous running counts with the new data to compute updated counts. The “Input Table” here is the lines DataFrame which acts as a streaming input for wordCounts DataFrame.

Now, the only unknown thing in the above diagram is “Complete Mode“. It is nothing but one of the 3 output modes available in Structured Streaming. Since they are an important part of Structured Streaming, so, let’s read about them in detail:

  1. Complete Mode – This mode updates the entire Result Table which is eventually written to the sink.

  2. Append Mode – In this mode, only the new rows are appended in the Result Table and eventually sent to the sink.

  3. Update Mode – At last, this mode updates only the rows that are changed in the Result Table since the last trigger. Also, only the new rows are sent to the sink. There is one peculiar thing to note about this mode, i.e., it is different from the Complete Mode in the way that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain any aggregations, it is equivalent to the Append mode.

Check it out.

Comments closed

Calculating TF-IDF Using Apache Spark

Arseniy Tashoyan shows us how to calculate Term Frequency-Inverse Document Frequency using Apache Spark:

TF-IDF is used in a large variety of applications. Typical use cases include:

  • Document search.
  • Document tagging.
  • Text preprocessing and feature vector engineering for Machine Learning algorithms.

There is a vast number of resources on the web explaining the concept itself and the calculation algorithm. This article does not repeat the information in these other Internet resources, it just illustrates TF-IDF calculation with help of Apache Spark. Emml Asimadi, in his excellent article Understanding TF-IDF, shares an approach based on the old Spark RDD and the Python language. This article, on the other hand, uses the modern Spark SQL API and Scala language.

Although Spark MLlib has an API to calculate TF-IDF, this API is not convenient to learn the concept. MLlib tools are intended to generate feature vectors for ML algorithms. There is no way to figure out the weight for a particular term in a particular document. Well, let’s make it from scratch, this will sharpen our skills.

Read on for the solution.  It seems that there tend to be better options today than TF-IDF for natural language problems, but it’s an easy algorithm to understand, so it’s useful as a first go.

Comments closed

Exception Handling In Scala

Shivangi Gupta shows off the Either keyword in Scala:

How to get values from Either?

There are many ways we will talk about all one by one.  One way to get values is by doing left and right projection. We can not perform any operation i.e, map, filter etc; on Either. Either provide left and right methods to get the left and right projection. Projection on either allows us to apply functions like map, filter etc.

For example,

scala> val div = divide(14, 7)
div: scala.util.Either[String,Int] = Right(2)

scala> div.right
res1: scala.util.Either.RightProjection[String,Int] = RightProjection(Right(2))

When we applied right on either, it returned RightProjection. Now we can extract the value from right projection using get, but if there is no value the compiler will blow up using get.

There’s more to Scala exception handling than just try-catch.

Comments closed

Flattening JSON Data With Databricks

Ivan Vazharov gives us a Databricks notebook to parse and flatten JSON using PySpark:

With Databricks you get:

  • An easy way to infer the JSON schema and avoid creating it manually
  • Subtle changes in the JSON schema won’t break things
  • The ability to explode nested lists into rows in a very easy way (see the Notebook below)
  • Speed!

Following is an example Databricks Notebook (Python) demonstrating the above claims. The JSON sample consists of an imaginary JSON result set, which contains a list of car models within a list of car vendors within a list of people. We want to flatten this result into a dataframe.

Click through for the notebook.

Comments closed

Databricks MLflow

Matai Zaharia announces a new Databricks offering:

MLflow is inspired by existing ML platforms, but it is designed to be open in two senses:

  1. Open interface: MLflow is designed to work with any ML library, algorithm, deployment tool or language. It’s built around REST APIs and simple data formats (e.g., a model can be viewed as a lambda function) that can be used from a variety of tools, instead of only providing a small set of built-in functionality. This also makes it easy to add MLflow to your existing ML code so you can benefit from it immediately, and to share code using any ML library that others in your organization can run.
  2. Open source: We’re releasing MLflow as an open source project that users and library developers can extend. In addition, MLflow’s open format makes it very easy to share workflow steps and models across organizations if you wish to open source your code.

Mlflow is still currently in alpha, but we believe that it already offers a useful framework to work with ML code, and we would love to hear your feedback. In this post, we’ll introduce MLflow in detail and explain its components.

Even in alpha, it looks nice.

Comments closed

Spark: DataFrame To RDD For Data Cleansing

Gilad Moscovitch walks us through a common data cleansing problem with Spark data frames:

A problem can arise when one of the inner fields of the json, has undesired non-json values in some of the records.
For instance, an inner field might contains HTTP errors, that would be interpreted as a string, rather than as a struct.
As a result, our schema would look like:
root
 |– headers: struct (nullable = true)
 |    |– items: array (nullable = true)
 |    |    |– element: struct (containsNull = true)
 |– requestBody: string (nullable = true)
Instead of
root
 |– headers: struct (nullable = true)
 |    |– items: array (nullable = true)
 |    |    |– element: struct (containsNull = true)
 |– requestBody: struct (nullable = true)
 |    |– items: array (nullable = true)
 |    |    |– element: struct (containsNull = true)
When trying to explode a “string” type, we will get a miss type error:
org.apache.spark.sql.AnalysisException: Can’t extract value from requestBody#10

Click through to see how to handle this scenario cleanly.

Comments closed