Thursday, 21 July 2022

Spark Interview Questions

 

You can do it just using split and size of pyspark API functions (Below is 


example):-


from pyspark.sql import SparkSession

from pyspark.sql import SQLContext


my_spark = SparkSession \

    .builder \

    .appName("Python Spark SQL example") \

    .enableHiveSupport() \

    .getOrCreate()


sqlcontext = SQLContext(my_spark.sparkContext)

sqlContext.createDataFrame([['this is a sample address'],['another address']])\

.select(F.size(F.split(F.col("_1"), " "))).show()


Below is Output:-

+------------------+

|size(split(_1,  ))|

+------------------+

|                 5|

|                 2|

+------------------+







import sys

 

from pyspark import SparkContext, SparkConf

 

if __name__ == "__main__":

# create Spark context with necessary configuration

sc = SparkContext("local","PySpark Word Count Exmaple")

# read data from text file and split each line into words

words = sc.textFile("D:/workspace/spark/input.txt").flatMap(lambda line: 


line.split(" "))

# count the occurrence of each word

wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a 


+b)

# save the counts to output

wordCounts.saveAsTextFile("D:/workspace/spark/output/")







Apache Spark is an open-source cluster computing framework for real-time 


processing.


YARN is one of the key features in Spark, providing a central and resource 


management platform to deliver scalable operations across the cluster. 


YARN is a distributed container manager, like Mesos for example, whereas 

Spark is a data processing tool. 

Spark can run on YARN, the same way Hadoop Map Reduce can run on YARN. 

Running Spark on YARN necessitates a binary distribution of Spark as built on 


YARN support. 



Explain the concept of Resilient Distributed Dataset (RDD).


Resilient Distributed Datasets are the fundamental data structure of Apache 


Spark. 

It is embedded in Spark Core. 

RDDs are immutable, fault-tolerant, distributed collections of objects that can 


be operated on in parallel.RDD’s are split into partitions and can be executed 


on different nodes of a cluster.


RDDs are created by either transformation of existing RDDs or by loading an 


external dataset from stable storage like HDFS or HBase.


RDD stands for Resilient Distribution Datasets. An RDD is a fault-tolerant 


collection of operational elements that run in parallel. The partitioned data in 


RDD is immutable and distributed in nature. There are primarily two types of 


RDD:


Parallelized Collections: Here, the existing RDDs running parallel with one 


another.

Hadoop Datasets: They perform functions on each file record in HDFS or other 


storage systems.

RDDs are basically parts of data that are stored in the memory distributed 


across many nodes. RDDs are lazily evaluated in Spark. This lazy evaluation is 


what contributes to Spark’s speed



The executor memory is basically a measure on how much memory of the worker node 


will the application utilize


Spark manages data using partitions that help parallelize distributed data 


processing with minimal network traffic for sending data between executors


park tries to read data into an RDD from the nodes that are close to it. Since 


Spark usually accesses distributed partitioned data, to optimize transformation 


operations it creates partitions to hold the data chunks. Everything in Spark is 


a partitioned RDD.



RDD has distributed a collection of objects. Distributed means, each RDD is 


divided into multiple partitions. Each of these partitions can reside in memory 


or stored on the disk of different machines in a cluster. RDDs are immutable 


(Read Only) data structure. You can’t change original RDD, but you can always 


transform it into different RDD with all changes you want.



Spark applications run as independent processes that are coordinated by the 


SparkSession object in the driver program. The resource manager or cluster 


manager assigns tasks to the worker nodes with one task per partition. Iterative 


algorithms apply operations repeatedly to the data so they can benefit from 


caching datasets across iterations. A task applies its unit of work to the 


dataset in its partition and outputs a new partition dataset. Finally, the 


results are sent back to the driver application or can be saved to the disk.







 What is a Parquet file and what are its advantages?

Parquet is a columnar format that is supported by several data processing 


systems. With the Parquet file, Spark can perform both read and write 


operations. 


Some of the advantages of having a Parquet file are:


It enables you to fetch specific columns for access.

It consumes less space

It follows the type-specific encoding

It supports limited I/O operations




11. What is shuffling in Spark? When does it occur?

Shuffling is the process of redistributing data across partitions that may lead 


to data movement across the executors. The shuffle operation is implemented 


differently in Spark compared to Hadoop. 


Shuffling has 2 important compression parameters:


spark.shuffle.compress – checks whether the engine would compress shuffle 


outputs or not spark.shuffle.spill.compress – decides whether to compress 


intermediate shuffle spill files or not


It occurs while joining two tables or while performing byKey operations such as 


GroupByKey or ReduceByKey





13. How can you calculate the executor memory?

Consider the following cluster information:


cluster

   node=10

   each node 

     core =16 (-1 OS)

     RAM = 61GB (-1 for OS)



Here is the number of core identification:

  Number of core is - number of concurrent task an executor can run in parallel.

  rule of tumb = 5 


To calculate the number of executor identification:


no of execuotr = no. of core/concurrent task

               = 15 / 5 =3


number of executor = no of node * no of executore on each node 

10 * 3 = 30


executor.






Top 40 Apache Spark Interview Questions and Answers

By Shivam Arora

Last updated on Jul 1, 202161364

Top 40 Apache Spark Interview Questions and Answers

Apache Spark is a unified analytics engine for processing large volumes of data. 


It can run workloads 100 times faster and offers over 80 high-level operators 


that make it easy to build parallel apps. Spark can run on Hadoop, Apache Mesos, 


Kubernetes, standalone, or in the cloud, and can access data from multiple 


sources.


And this article covers the most important Apache Spark Interview questions that 


you might face in a Spark interview. The Spark interview questions have been 


segregated into different sections based on the various components of Apache 


Spark and surely after going through this article you will be able to answer 


most of the questions asked in your next Spark interview.


Apache Spark Interview Questions

The Apache Spark interview questions have been divided into two parts:


Apache Spark Interview Questions for Beginners

Apache Spark Interview Questions for Experienced 

Let us begin with a few basic Apache Spark interview questions!


Apache Spark Interview Questions for Beginners

1. How is Apache Spark different from MapReduce?

Apache Spark


MapReduce


Spark processes data in batches as well as in real-time


MapReduce processes data in batches only


Spark runs almost 100 times faster than Hadoop MapReduce


Hadoop MapReduce is slower when it comes to large scale data processing


Spark stores data in the RAM i.e. in-memory. So, it is easier to retrieve it


Hadoop MapReduce data is stored in HDFS and hence takes a long time to retrieve 


the data


Spark provides caching and in-memory data storage


Hadoop is highly disk-dependent


2. What are the important components of the Spark ecosystem?

apache-spark


Apache Spark has 3 main categories that comprise its ecosystem. Those are:


Language support: Spark can integrate with different languages to applications 


and perform analytics. These languages are Java, Python, Scala, and R.

Core Components: Spark supports 5 main core components. There are Spark Core, 


Spark SQL, Spark Streaming, Spark MLlib, and GraphX.

Cluster Management: Spark can be run in 3 environments. Those are the Standalone 


cluster, Apache Mesos, and YARN.

3. Explain how Spark runs applications with the help of its architecture.

This is one of the most frequently asked spark interview questions, and the 


interviewer will expect you to give a thorough answer to it.


worker-node


Spark applications run as independent processes that are coordinated by the 


SparkSession object in the driver program. The resource manager or cluster 


manager assigns tasks to the worker nodes with one task per partition. Iterative 


algorithms apply operations repeatedly to the data so they can benefit from 


caching datasets across iterations. A task applies its unit of work to the 


dataset in its partition and outputs a new partition dataset. Finally, the 


results are sent back to the driver application or can be saved to the disk.


4. What are the different cluster managers available in Apache Spark?

Standalone Mode: By default, applications submitted to the standalone mode 


cluster will run in FIFO order, and each application will try to use all 


available nodes. You can launch a standalone cluster either manually, by 


starting a master and workers by hand, or use our provided launch scripts. It is 


also possible to run these daemons on a single machine for testing.

Apache Mesos: Apache Mesos is an open-source project to manage computer 


clusters, and can also run Hadoop applications. The advantages of deploying 


Spark with Mesos include dynamic partitioning between Spark and other frameworks 


as well as scalable partitioning between multiple instances of Spark.

Hadoop YARN: Apache YARN is the cluster resource manager of Hadoop 2. Spark can 


be run on YARN as well.

Kubernetes: Kubernetes is an open-source system for automating deployment, 


scaling, and management of containerized applications.

5. What is the significance of Resilient Distributed Datasets in Spark?

Resilient Distributed Datasets are the fundamental data structure of Apache 


Spark. It is embedded in Spark Core. RDDs are immutable, fault-tolerant, 


distributed collections of objects that can be operated on in parallel.RDD’s are 


split into partitions and can be executed on different nodes of a cluster.


RDDs are created by either transformation of existing RDDs or by loading an 


external dataset from stable storage like HDFS or HBase.


Here is how the architecture of RDD looks like:


create-rdd


6. What is a lazy evaluation in Spark?

When Spark operates on any dataset, it remembers the instructions. When a 


transformation such as a map() is called on an RDD, the operation is not 


performed instantly. Transformations in Spark are not evaluated until you 


perform an action, which aids in optimizing the overall data processing 


workflow, known as lazy evaluation.


7. What makes Spark good at low latency workloads like graph processing and 


Machine Learning?

Apache Spark stores data in-memory for faster processing and building machine 


learning models. Machine Learning algorithms require multiple iterations and 


different conceptual steps to create an optimal model. Graph algorithms traverse 


through all the nodes and edges to generate a graph. These low latency workloads 


that need multiple iterations can lead to increased performance.


8. How can you trigger automatic clean-ups in Spark to handle accumulated 


metadata?

To trigger the clean-ups, you need to set the parameter spark.cleaner.ttlx.


job


9. How can you connect Spark to Apache Mesos?

There are a total of 4 steps that can help you connect Spark to Apache Mesos.


Configure the Spark Driver program to connect with Apache Mesos

Put the Spark binary package in a location accessible by Mesos

Install Spark in the same location as that of the Apache Mesos

Configure the spark.mesos.executor.home property for pointing to the location 


where Spark is installed

10. What is a Parquet file and what are its advantages?

Parquet is a columnar format that is supported by several data processing 


systems. With the Parquet file, Spark can perform both read and write 


operations. 


Some of the advantages of having a Parquet file are:


It enables you to fetch specific columns for access.

It consumes less space

It follows the type-specific encoding

It supports limited I/O operations

Learn open-source framework and scala programming languages with the Apache 


Spark and Scala Certification training course.

11. What is shuffling in Spark? When does it occur?

Shuffling is the process of redistributing data across partitions that may lead 


to data movement across the executors. The shuffle operation is implemented 


differently in Spark compared to Hadoop. 


Shuffling has 2 important compression parameters:


spark.shuffle.compress – checks whether the engine would compress shuffle 


outputs or not spark.shuffle.spill.compress – decides whether to compress 


intermediate shuffle spill files or not


It occurs while joining two tables or while performing byKey operations such as 


GroupByKey or ReduceByKey


12. What is the use of coalesce in Spark?

Spark uses a coalesce method to reduce the number of partitions in a DataFrame.


Suppose you want to read data from a CSV file into an RDD having four 


partitions.


partition


This is how a filter operation is performed to remove all the multiple of 10 


from the data.




The RDD has some empty partitions. It makes sense to reduce the number of 


partitions, which can be achieved by using coalesce.




This is how the resultant RDD would look like after applying to coalesce.


13. How can you calculate the executor memory?

Consider the following cluster information:


cluster


Here is the number of core identification:


core-iden


To calculate the number of executor identification:


executor.


14. What are the various functionalities supported by Spark Core?

Spark Core is the engine for parallel and distributed processing of large data 


sets. The various functionalities supported by Spark Core include:


Scheduling and monitoring jobs

Memory management

Fault recovery

Task dispatching

15. How do you convert a Spark RDD into a DataFrame?

There are 2 ways to convert a Spark RDD into a DataFrame:


Using the helper function - toDF

import com.mapr.db.spark.sql._


val df = sc.loadFromMapRDB(<table-name>)


.where(field(“first_name”) === “Peter”)


.select(“_id”, “first_name”).toDF()


Using SparkSession.createDataFrame

You can convert an RDD[Row] to a DataFrame by


calling createDataFrame on a SparkSession object


def createDataFrame(RDD, schema:StructType)


16. Explain the types of operations supported by RDDs.

RDDs support 2 types of operation:


Transformations: Transformations are operations that are performed on an RDD to 


create a new RDD containing the results (Example: map, filter, join, union)


Actions: Actions are operations that return a value after running a computation 


on an RDD (Example: reduce, first, count) 


17. What is a Lineage Graph?

This is another frequently asked spark interview question. A Lineage Graph is a 


dependencies graph between the existing RDD and the new RDD. It means that all 


the dependencies between the RDD will be recorded in a graph,  rather than the 


original data.


The need for an RDD lineage graph happens when we want to compute a new RDD or 


if we want to recover the lost data from the lost persisted RDD. Spark does not 


support data replication in memory. So, if any data is lost, it can be rebuilt 


using RDD lineage. It is also called an RDD operator graph or RDD dependency 


graph.


18. What do you understand about DStreams in Spark?

Discretized Streams is the basic abstraction provided by Spark Streaming. 


It represents a continuous stream of data that is either in the form of an input 


source or processed data stream generated by transforming the input stream.


dstream.


19. Explain Caching in Spark Streaming.

Caching also known as Persistence is an optimization technique for Spark 


computations. Similar to RDDs, DStreams also allow developers to persist the 


stream’s data in memory. That is, using the persist() method on a DStream will 


automatically persist every RDD of that DStream in memory. It helps to save 


interim partial results so they can be reused in subsequent stages.


The default persistence level is set to replicate the data to two nodes for 


fault-tolerance, and for input streams that receive data over the network.


kafka


20. What is the need for broadcast variables in Spark?

Broadcast variables allow the programmer to keep a read-only variable cached on 


each machine rather than shipping a copy of it with tasks. They can be used to 


give every node a copy of a large input dataset in an efficient manner. Spark 


distributes broadcast variables using efficient broadcast algorithms to reduce 


communication costs.


scala


scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))


broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)


scala> broadcastVar.value


res0: Array[Int] = Array(1, 2, 3)


Apache Spark Beginners Free Course

Learn the Fundamentals of Apache SparkENROLL NOWApache Spark Beginners Free 


Course

Apache Spark Interview Questions for Experienced

21. How to programmatically specify a schema for DataFrame?

DataFrame can be created programmatically with three steps:


Create an RDD of Rows from the original RDD;

Create the schema represented by a StructType matching the structure of Rows in 


the RDD created in Step 1.

Apply the schema to the RDD of Rows via createDataFrame method provided by 


SparkSession.

spark-session


22. Which transformation returns a new DStream by selecting only those records 


of the source DStream for which the function returns true?

1. map(func)


2. transform(func)


3. filter(func)


4. count()


The correct answer is c) filter(func).


23. Does Apache Spark provide checkpoints?

This is one of the most frequently asked spark interview questions where the 


interviewer expects a detailed answer (and not just a yes or no!). Give as 


detailed an answer as possible here.


Yes, Apache Spark provides an API for adding and managing checkpoints. 


Checkpointing is the process of making streaming applications resilient to 


failures. It allows you to save the data and metadata into a checkpointing 


directory. In case of a failure, the spark can recover this data and start from 


wherever it has stopped.


There are 2 types of data for which we can use checkpointing in Spark.


Metadata Checkpointing: Metadata means the data about data. It refers to saving 


the metadata to fault-tolerant storage like HDFS. Metadata includes 


configurations, DStream operations, and incomplete batches.


Data Checkpointing: Here, we save the RDD to reliable storage because its need 


arises in some of the stateful transformations. In this case, the upcoming RDD 


depends on the RDDs of previous batches. 


24. What do you mean by sliding window operation?

Controlling the transmission of data packets between multiple computer networks 


is done by the sliding window. Spark Streaming library provides windowed 


computations where the transformations on RDDs are applied over a sliding window 


of data.


originald-stream


25. What are the different levels of persistence in Spark?

DISK_ONLY - Stores the RDD partitions only on the disk


MEMORY_ONLY_SER - Stores the RDD as serialized Java objects with a one-byte 


array per partition


MEMORY_ONLY - Stores the RDD as deserialized Java objects in the JVM. If the RDD 


is not able to fit in the memory available, some partitions won’t be cached


OFF_HEAP - Works like MEMORY_ONLY_SER but stores the data in off-heap memory


MEMORY_AND_DISK - Stores RDD as deserialized Java objects in the JVM. In case 


the RDD is not able to fit in the memory, additional partitions are stored on 


the disk


MEMORY_AND_DISK_SER - Identical to MEMORY_ONLY_SER with the exception of storing 


partitions not able to fit in the memory to the disk


26. What is the difference between map and flatMap transformation in Spark 


Streaming?

map()


flatMap()


A map function returns a new DStream by passing each element of the source 


DStream through a function func


It is similar to the map function and applies to each element of RDD and it 


returns the result as a new RDD


Spark Map function takes one element as an input process it according to custom 


code (specified by the developer) and returns one element at a time


FlatMap allows returning 0, 1, or more elements from the map function. In the 


FlatMap operation


27. How would you compute the total count of unique words in Spark?

1. Load the text file as RDD:


sc.textFile(“hdfs://Hadoop/user/test_file.txt”);


2. Function that breaks each line into words:


def toWords(line):


return line.split();


3. Run the toWords function on each element of RDD in Spark as flatMap 


transformation:


words = line.flatMap(toWords);


4. Convert each word into (key,value) pair:


def toTuple(word):


return (word, 1);


wordTuple = words.map(toTuple);


5. Perform reduceByKey() action:


def sum(x, y):


return x+y:


counts = wordsTuple.reduceByKey(sum) 


6. Print:


counts.collect()


=========================================================



1)

paired RDD


2) 

coalesce and repartition

repartion can increase and decrease the partition and does full shuffle. 


Coalesce avoids full shuffle and better for reduced number of partitions

Spark works fine when the partitions sizes are even, Coalesce might give an 


uneven sized partitions and that would impact the performance of the job


3)

Different level of persistence

memory only

disk only

memory and disk


4)

difference static and dynamic partition in Hive



5)

Parquet - 

columer store

good for compression

analytical functions


6)

Bucketting with dynamic partition - is it possible ?




7)

second higest salary



8)

Can we use hive as metadata DB instead of mysql



9)

what is sqoop




10)

expalin map reduce flow

mapper => key-value pair => shuffle =>reducer


Assume 10GB data.how may mapper and reducer will be created

10GB / 128MB = number of mapper

by default reducer =1 , we an specify number fo reducer 2 or 3


how does data get transfered from mapper to reducer

shuffle phase automatically handle it


if you have more then 1 reducer how data get into reducr


in shuffle phase the key - value pair goes through partitioner,

partitioner decide where KV should go. partitioner is hash function.

for each key it generate hash which goes to fixed reducer 



11)


when to use Hive and when to use Spark


hive - is client tool - any query will be executed as Map reducer on hadoop 


cluster.

query get translated into MR


spark is execution framework.it creates lineage, form pysicl and logical plan 


and execute.




12)

does hive support hive and curd


CURD (update delete) and ACId (Transcation) is experimental feature which i only

available for ORC (optimise RC file format)

available from Apache 0.1.3



13) 

what is diff between RDD and dataframe

In spark when we write instruction, those instruction are transformed into RDD

and spark will form the association between RDD based on instr

then those instruction are converted to logical and pyhiscal plan


Dataframe is improvement for RDD

Dataframe give facility to view at RDD with schema

as if we are looking to it as table


pulgable memory management. like project tungston

and better optimiser (catalyst optiiser)



14)

Oozie


scheduler tool

it helps to form workflow

help to maintain dependency.


15) 

Flume and Kafka

are data ingestion tool

Kafla is cluster in itslef. It is used for streaming.

kafka is distributed. can get stream from multiple producer and give data to 


multiple consumer

data can be pushed to hdfs


16)

Sentry

Its authorization tool.



17)

https://www.youtube.com/watch?v=haPwh4m_jq0



Spark is memory intensive distributed system. With access t o several GB of 


memory. 


But yet job fails with out of memory issue.


Driver and Executor are the JVM process.


they have fixed memory allocated




when spark job fails, identify what is failing.


Spark job is made of drive and set of executor. They both use large amount of 


memory


they both are allocated certain finite amount of memory


When they need more then what is allocated we get OOM




Check whether driver failed or executor failed or Task inside executor failed.



Spark job consist of numbe rof task.

Task run insdie Executor container


Sevral spark job can run inside container (executor)

Executor are the containers with CPU and memory allocated.


like this multiple executor (container) can exists for the job


Error like OOM or container killed will be showing up when memory issue happens.




Driver is orchestrating the execution between executors


both memory can be controlled by 


spark.executor.memory


spark.driver.memory




If 4 GB is allocated to Executor


            300 MB is reserved


           60 % of (4GB - 300M ) - will go to Spark = spark memory


          rest to user memory




out of 60 % of (4GB - 300M )  (Spark Memory)


some is used for storage(cached obj) and rest execution




spark memory is dynamically managed (available after 1.16 version .)


Initially storage and execution get 50 -50 % of spark memory (60% of 4gb - 


300mb)




4gb = 300MB   reserved


      1.5     user memory (40% of 4gb - 300M)


      2.2     spark Memory (60% of 4gb - 300M)


         1.1  storage


         1.1  Execution




when memory for execution is not enough spark with take memory from storage .


If storage have object, it will move to Disk.


Storage memory inside spark memory - is managed by spark.memory.storageFraction  


which is 50%






How Many cores are allocated to the executor




number of cores mean number of task we can run in parallel.


If we have 2 CPU core then we can run 2 task in parallel.


# CPU Core = # Task in parallel




rule of thumb = for each CPU 2or 3 task


Number of partition decides the number of Task






so if we have big data set which have 100 partition 


then spark will create 100 task.


If each task have to deal with lot of data then potential problem to get OOM




few partition mean - Each task is overload,






Q)

How to optimise Spark


Choose file format - JSON and XML are slow. No optimization

                     PARQUET or RC will give better performance




Partition Technique

 rdd1 = sc.parallelize(Array((a,3), (a,1), (b,7), (a,5), (c,5)),3 ) 


.partitionBy(new ashpartition(2))

 rdd2 = sc.parallelize(Array((c,5), (d,1), (b,5), (d,5)),2 ) .partitionBy(new 


ashpartition(2))



BroadCast :- broad cst certain data to all executor


Kryo Seriallizer:-

spark serialize data. for all operation

default java serailzer are bulky

 jrdd = rdd1.join(rdd2)

 jrdd.collect().foreach(println)





Q)


The convergence of SQL and NoSQL

Both SQL and NoSQL databases have their pros and cons. As such, there has been a 


movement to take the best characteristics of both types of databases and 


integrate them so users can realize the best of both worlds.


For instance, MySQL, the most popular open-source relational database, offers 


MySQL Document Store. This provides the structure of a MySQL database combined 


with the flexibility and high availability of NoSQL without having to implement 


a separate NoSQL database.


MongoDB, one of the most popular NoSQL databases, offers multi-document ACID 


transactions.


AWS’ managed NoSQL database, DynamoDB, also provides ACID-compliant transaction 


functionality.


And with the easy database setup that cloud service providers offer, you have 


the ability to use both SQL and NoSQL databases in your cloud data architecture 


to meet your data storage needs.


Now you have much more flexibility regardless of whether you choose a SQL or 


NoSQL database, and there are sure to be more flexible options in the future.


Database options

Regardless of whether you go with a SQL or NoSQL database (or both!), there are 


plenty of options to choose from.


On-premise SQL database offerings include:


MySQL – as mentioned prior, the most popular open-source relational database

Microsoft SQL server – Microsoft’s enterprise version of SQL

PostgreSQL – and enterprise-level, open-source database focused on extensibility

Oracle – full-service (and expensive) SQL option

MariaDB – an enhanced version of MySQL, built by MySQL’s original developers

And many more

The major cloud service platforms have their own SQL options:


AWS has:

RDS, their standard cloud SQL database

Aurora, which focuses on increased throughput and scalability

Microsoft Azure has:

Azure SQL Database, their managed database-as-a-service

Azure Database for MySQL, PostgreSQL, and MariaDB

Google Cloud Platform (GCP) has:

Cloud SQL, which you can use for MySQL and PostgreSQL

Cloud Spanner, which combines elements of SQL and NoSQL

On-premise NoSQL database options include:


MongoDB – by far the most popular NoSQL database

Redis – an open source, distributed, in-memory key-value database that is super 


fast

Cassandra – free, open-source NoSQL database created by Facebook that focuses on 


scalability and high availability

Many others

Cloud service providers offer plenty of NoSQL options as well:


AWS has:

DynamoDB, its managed NoSQL database

DocumentDB, a fast, scalable, highly-available MongoDB-compatible database

Microsoft Azure offers:

CosmosDB, its globally distributed, multi-model database

Google Cloud has:

Bigtable, its NoSQL wide-column database service

Cloud Datastore, its NoSQL document database service

Cloud Firestore, a cloud-native NoSQL document database that helps store and 


query app data

There is no shortage of database options to choose from!



Q)

nodes 54


Vcore 32

Memorty 32

CSH 5.16

Impala 2.12


impala no of deamon = 250

overall mem  1TB

RDD

 


Resilient Distributed Datasets (RDDs)

are collections of Immutable JVM object that are distributed across an Apache Spark Cluster


Any Action  on Spark DATAFRAME eventually get translated into a highly optimized execution of transformation and action on RDDs


Data in rdd is split into  chuncks based on a key and then dispersed across all the executor nodes.


RDDs are resilient, that us they are able to recover quickly from any issues as tge same data chucks are replicared across multiple executor nodes

If one executor fail, another will still  process the data


./bin/pyspark -master local


There re 2 ways to create RDDs parallelize() method- a collection of list or array of some elements)

or reference file or external source



# =============== Create RDD =======================

myRDD = sc.parallelize(

("mike", 19),

("prashant", "18"),

{"shivani", "17"),

{"baibhab", "16"),

{"Rob", "17"),

{"Scott", "16")

)


Focusing first on the statement in the sc.parallelize() method, 

we first created a Python list (that is, [A, B, ..., E]) composed of a list of arrays (that is, ('Mike', 19),  ..., ('Scott', 16)). 

The sc.parallelize() method is the SparkContext's parallelize method to create a parallelized collection. 

This allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data


# = =================== View =====================


we will use the take() method to return the values to the console. 

We will now execute an RDD action take(). 

Note that a common approach in PySpark is to use collect(), which returns all values in your RDD from the Spark worker nodes to the driver.

There are performance implications when working with a large amount of data as this translates to large volumes of data being transferred from the Spark worker nodes to the driver


You should pretty much always use the take(n) method instead; it returns the first n elements of the RDD instead of the whole dataset


It is a more efficient method because it first scans one partition and uses those statistics to determine the number of partitions required to return the results

 

myRDD.take(5)



# ============ Reading data from files =====================


Local files sc.textFile('/local folder/filename.csv')

Hadoop HDFS sc.textFile('hdfs://folder/filename.csv')



# for tab seperated file


myRDD = (

    sc

    .textFile(

        '<file.csv>'

        , minPartitions=4

        , use_unicode=True

    ).map(lambda element: element.split("\t"))

)


Only the first parameter is required, which indicates the location of the text file


minPartitions: Indicates the minimum number of partitions that make up the RDD. 

The Spark engine can often determine the best number of partitions based on the file size, 

but you may want to change the number of partitions for performance reasons


use_unicode: Engage this parameter if you are processing Unicode data.


Note that if you were to execute this statement without the subsequent map() function, 

the resulting RDD would not reference the tab-delimiter—basically a list of strings that is:

it will give list of rows from file


myRDD = sc.textFile('<file.csv>')

myRDD.take(5)


# .map(...) function  transform the data from a list of strings to a list of lists:


myRDD.take(5)



# let's determine the number of rows in this RDD.

 

 myRDD.count()

 

# find out the number of partitions that support this RDD:

  myRDD.getNumPartitions()

  

  

# =============================================================


The transformations include the following common tasks:


Removing the header line from your text file: zipWithIndex()

Selecting columns from your RDD: map()

Running a WHERE (filter) clause: filter()

Getting the distinct values: distinct()

Getting the number of partitions: getNumPartitions()

Determining the size of your partitions (that is, the number of elements within each partition): mapPartitionsWithIndex()



# map


The map(f) transformation returns a new RDD formed by passing each element through a function, f.


myRDD.map(lambda c: (c[0], c[1])).take(5)




# .filter(...) transformation

The filter(f)  transformation returns a new RDD based on selecting elements for which the f function returns true. 

Therefore, look at the following code snippet:


myRdd

    .map(lambda c: (c[0], c[1]))

    .filter(lambda c: c[1] == "16")

    .take(5)

# .flatMap(...) transformation

The flatMap(f) transformation is similar to map, but the new RDD flattens out all of the elements 

note - map return list of list / Tuple

   flatmap will return list by flatting list of list

   

 myRDD

    .filter(lambda c: c[1] == "16")

    .map(lambda c: (c[0], c[1]))

    .flatMap(lambda x: x)

    .take(10)




# .distinct() transformation


The distinct() transformation returns a new RDD containing the distinct elements of the source RDD.


myRDD

    .map(lambda c: c[1])

    .distinct()

    .take(5)

# .join(...) transformation

The join(RDD') transformation returns an RDD of (key, (val_left, val_right)) when calling RDD (key, val_left) and RDD (key, val_right). 

Outer joins are supported through left outer join, right outer join, and full outer join. 



rdd1 = myRDD.map(lambda c: (c[1]))



rdd2 = myRDD.map(lambda c: (c[0], c[1]))


# Execute inner join between RDDs

rdd2.join(rdd1).take(5)




# .zipWithIndex() transformation

The zipWithIndex() transformation appends (or ZIPs) the RDD with the element indices. 

This is very handy when wanting to remove the header row (first row) of a file.



ac = myRDD.map(lambda c: (c[0], c[3]))

ac.zipWithIndex().take(5)


# To remove the header from your data


# Using zipWithIndex to skip header row

# - filter out row 0

# - extract only row info


(

    myRDD

    .zipWithIndex()

    .filter(lambda (row, idx): idx > 0)

    .map(lambda (row, idx): row)

    .take(5)

)




#  .reduceByKey(...) transformation

The reduceByKey(f) transformation reduces the elements of the RDD using f by the key. 

The f function should be commutative and associative so that it can be computed correctly in parallel.


Look at the following code snippet:


 

(

    myRDD

    .zipWithIndex()

    .filter(lambda (row, idx): idx > 0)

    .map(lambda (row, idx): row)

    .map(lambda c: (c[0], int(c[1])))

    .reduceByKey(lambda x, y: x + y)

    .take(5)

)




# .sortByKey(...) transformation

The sortByKey(asc) transformation orders (key, value) RDD by key and returns an RDD in ascending or descending order. 

(

    myRDD

    .zipWithIndex()

    .filter(lambda (row, idx): idx > 0)

    .map(lambda (row, idx): row)

    .map(lambda c: (c[0], int(c[1])))

    .reduceByKey(lambda x, y: x + y)

.sortByKey()

    .take(5)

)



# .union(...) transformation

The union(RDD) transformation returns a new RDD that is the union of the source and argument RDDs. Look at the following code snippet:


# Create `a` RDD of Washington airports

a = (

    myRDD

    .zipWithIndex()

    .filter(lambda (row, idx): idx > 0)

    .map(lambda (row, idx): row)

    .filter(lambda c: c[1] == "16")

)


# Create `b` RDD of British Columbia airports

b = (

    myRDD

    .zipWithIndex()

    .filter(lambda (row, idx): idx > 0)

    .map(lambda (row, idx): row)

    .filter(lambda c: c[1] == "17")

)


# Union WA and BC airports

a.union(b).collect()



# =================


myRdd = sc.textFile("<file>")

myRdd = myRDD.map(lambda x: x.split(","))

myRdd = myRdd.filter(lambda word: word.count() > 0)  # remove empty line


myRdd = myRdd.map(lambda x: x[0]).reduceByKey(lambda x,y: x+y)




# Actions


.take

.collect

.count

.reduce(...) action

The reduce(f) action aggregates the elements of an RDD by f. The f function should be commutative and associative so that it can be computed correctly in parallel. Look at the following code:


# Calculate the total delays of flights

# between SEA (origin) and SFO (dest),

# convert delays column to int 

# and summarize

flights\

 .filter(lambda c: c[3] == 'SEA' and c[4] == 'SFO')\

 .map(lambda c: int(c[1]))\

 .reduce(lambda x, y: x + y)





# .saveAsTextFile(...) action

The saveAsTextFile() action saves your RDD into a text file; 

note that each partition is a separate file.


# saveAsTextFile

myRdd.saveAsTextFile("abc.txt")




Pitfalls of using RDDs

The key concern associated with using RDDs is that they can take a lot of time to master. The flexibility of running functional operators such as map, reduce, and shuffle allows you to perform a wide variety of transformations against your data. But with this power comes great responsibility, and it is potentially possible to write code that is inefficient



The reason RDDs are slow—especially within the context of PySpark—is because whenever a PySpark program is executed using RDDs, there is a potentially large overhead to execute the job. 


In the PySpark driver, the Spark Context uses Py4j to launch a JVM using JavaSparkContext. 

Any RDD transformations are initially mapped to PythonRDD objects in Java.


Once these tasks are pushed out to the Spark worker(s), 

PythonRDD objects launch Python subprocesses using pipes to send both code and data to be processed in Python:


While this approach allows PySpark to distribute the processing of the data to multiple Python subprocesses on multiple workers,

There is a lot of context switching and communications overhead between Python and the JVM.


Note - This is even more apparent when using Python UDFs, as the performance is significantly slower because all of the data will need to be transferred to the driver prior to using a Python UDF. Note that vectorized UDFs were introduced as part of Spark 2.3 and will improve PySpark UDF performance. 






# ===============================================

# Data frames



As in the previous sections, let's make use of the flights dataset and create an RDD and a DataFrame against this dataset:


## Create flights RDD

flights = sc.textFile('/databricks-datasets/flights/departuredelays.csv')\

  .map(lambda line: line.split(","))\

  .zipWithIndex()\

  .filter(lambda (row, idx): idx > 0)\

  .map(lambda (row, idx): row)


# Create flightsDF DataFrame

flightsDF = spark.read\

  .options(header='true', inferSchema='true')

  .csv('~/data/flights/departuredelays.csv')

flightsDF.createOrReplaceTempView("flightsDF")




In this section, we will run the same group by statement—one via an RDD using reduceByKey(), and one via a DataFrame using Spark SQL GROUP BY. For this query, we will sum the time delays grouped by originating city and sort according to the originating city:


# RDD: Sum delays, group by and order by originating city

flights.map(lambda c: (c[3], int(c[1]))).reduceByKey(lambda x, y: x + y).sortByKey().take(50)


# Output (truncated)

# Duration: 11.08 seconds

[(u'ABE', 5113),  

 (u'ABI', 5128),  

 (u'ABQ', 64422),  

 (u'ABY', 1554),  

 (u'ACT', 392),

 ... ]

For this particular configuration, it took 11.08 seconds to extract the columns, execute reduceByKey() to summarize the data, execute sortByKey() to order it, and then return the values to the driver:


# RDD: Sum delays, group by and order by originating city

spark.sql("select origin, sum(delay) as TotalDelay from flightsDF group by origin order by origin").show(50)


# Output (truncated)

# Duration: 4.76s

+------+----------+ 

|origin|TotalDelay| 

+------+----------+ 

| ABE  |      5113| 

| ABI  |      5128|

| ABQ  |     64422| 

| ABY  |      1554| 

| ACT  |       392|

...

+------+----------+ 


There are many advantages of Spark DataFrames, including, but not limited to the following:


You can execute Spark SQL statements (not just through the Spark DataFrame API)

There is a schema associated with your data so you can specify the column name instead of position




==


DataFrames take advantage of the developments in the tungsten project and the Catalyst Optimizer. 

These two improvements bring the performance of PySpark on par with that of either Scala or Java.


Project tungsten is a set of improvements to Spark Engine aimed at bringing its execution process closer to the bare metal. The main deliverables include:


Code generation at runtime: This aims at leveraging the optimizations implemented in modern compilers

Taking advantage of the memory hierarchy: The algorithms and data structures exploit memory hierarchy for fast execution

Direct-memory management: Removes the overhead associated with Java garbage collection and JVM object creation and management

Low-level programming: Speeds up memory access by loading immediate data to CPU registers

Virtual function dispatches elimination: This eliminates the necessity of multiple CPU calls




The Catalyst Optimizer sits at the core of Spark SQL and powers both the SQL queries executed against the data and DataFrames. The process starts with the query being issued to the engine. The logical plan of execution is first being optimized. Based on the optimized logical plan, multiple physical plans are derived and pushed through a cost optimizer. The selected, most cost-efficient plan is then translated (using code generation optimizations implemented as part of the tungsten project) into an optimized RDD-based execution code.




# ======================= dataframe ==========================


A Spark DataFrame is an immutable collection of data distributed within a cluster. 

The data inside a DataFrame is organized into named columns that can be compared to tables in a relational database.



# ============= Create Dataframe =========================


There are many ways to create a DataFrame.


The simplest way is to create an RDD and convert it into a DataFrame:


# create RDD

sample_data = sc.parallelize([

      (1, 'MacBook Pro', 2015, '15"', '16GB', '512GB SSD'

        , 13.75, 9.48, 0.61, 4.02)

    , (2, 'MacBook', 2016, '12"', '8GB', '256GB SSD'

        , 11.04, 7.74, 0.52, 2.03)

    , (3, 'MacBook Air', 2016, '13.3"', '8GB', '128GB SSD'

        , 12.8, 8.94, 0.68, 2.96)

    , (4, 'iMac', 2017, '27"', '64GB', '1TB SSD'

        , 25.6, 8.0, 20.3, 20.8)

])


as with all RDDs, it is hard to figure out what each element of the tuple stands for since RDDs are schema-less structures.


Therefore, when using the .createDataFrame(...) method of SparkSession, we pass a list of column names as the second argument; the first argument is the RDD we wish to transform into a DataFrame


# create datframe

sample_data_df = spark.createDataFrame(

    sample_data

    , [

        'Id'

        , 'Model'

        , 'Year'

        , 'ScreenSize'

        , 'RAM'

        , 'HDD'

        , 'W'

        , 'D'

        , 'H'

        , 'Weight'

    ]

)



sample_data_df.take(1)


 a DataFrame is a collection of Row(...) objects. A Row(...) object consists of data that is named, unlike an RDD. 

 

 If the preceding Row(...) object looks similar to a dictionary.

 Any Row(...) object can be converted into a dictionary using the .asDict(...) method.

 

 

 sample_data_df.show()

 

 sample_data_df.printSchema()

 

 

 

 When you use the .read attribute of SparkSession, it returns a DataFrameReader object. DataFrameReader is an interface to read data into a DataFrame.

 

 

 # ================  From JSON ==================

To read data from a JSON-formatted file, you can simply do the following:


sample_data_json_df = (

    spark

    .read

    .json('../Data/DataFrames_sample.json')

)



# =================== From CSV ====================

Reading from a CSV file is equally simple:


sample_data_csv = (

    spark

    .read

    .csv(

        '../Data/DataFrames_sample.csv'

        , header=True

        , inferSchema=True)

)




# ===  Accessing underlying RDDs


Switching to using DataFrames does not mean we need to completely abandon RDDs. 

Under the hood, DataFrames still use RDDs, but of Row(...) objects, as explained earlier. 

In this recipe, we will learn how to interact with the underlying RDD of a DataFrame.




import pyspark.sql as sql

import pyspark.sql.functions as f


sample_data_transformed = (

    sample_data_df

    .rdd

    .map(lambda row: sql.Row(

        **row.asDict()

        , HDD_size=row.HDD.split(' ')[0]

        )

    )

    .map(lambda row: sql.Row(

        **row.asDict()

        , HDD_type=row.HDD.split(' ')[1]

        )

    )

    .map(lambda row: sql.Row(

        **row.asDict()

        , Volume=row.H * row.D * row.W

        )

    )

    .toDF()

    .select(

        sample_data_df.columns + 

        [

              'HDD_size'

            , 'HDD_type'

            , f.round(

                f.col('Volume')

            ).alias('Volume_cuIn')

        ]

    )

)




# =====================

# we will first read our CSV sample data into an RDD and then create a DataFrame from it


import pyspark.sql as sql


sample_data_rdd = sc.textFile('../Data/DataFrames_sample.csv')


header = sample_data_rdd.first()


sample_data_rdd_row = (

    sample_data_rdd

    .filter(lambda row: row != header)

    .map(lambda row: row.split(','))

    .map(lambda row:

        sql.Row(

            Id=int(row[0])

            , Model=row[1]

            , Year=int(row[2])

            , ScreenSize=row[3]

            , RAM=row[4]

            , HDD=row[5]

            , W=float(row[6])

            , D=float(row[7])

            , H=float(row[8])

            , Weight=float(row[9])

        )

    )

)


sample_data_schema = spark.createDataFrame(sample_data_rdd


the RDD still contains the row with column names. 

In order to get rid of it, we first extract it using the .first() method and then later using the .filter(...) transformation to remove any row that is equal to the header.


Next, we split each row with a comma and create a Row(...) object for each observation. 

Note here that we convert all of the fields to the proper datatypes. For example, the Id column should be an integer, the Model name is a string, and W (width) is a float. 


Finally, we simply call the .createDataFrame(...) method of SparkSession to convert our RDD of Row(...) objects into a DataFrame



# ====================

how to specify the schema programmatically



import pyspark.sql.types as typ


sch = typ.StructType([

      typ.StructField('Id', typ.LongType(), False)

    , typ.StructField('Model', typ.StringType(), True)

    , typ.StructField('Year', typ.IntegerType(), True)

    , typ.StructField('ScreenSize', typ.StringType(), True)

    , typ.StructField('RAM', typ.StringType(), True)

    , typ.StructField('HDD', typ.StringType(), True)

    , typ.StructField('W', typ.DoubleType(), True)

    , typ.StructField('D', typ.DoubleType(), True)

    , typ.StructField('H', typ.DoubleType(), True)

    , typ.StructField('Weight', typ.DoubleType(), True)

])


sample_data_rdd = sc.textFile('../Data/DataFrames_sample.csv')


header = sample_data_rdd.first()


sample_data_rdd = (

    sample_data_rdd

    .filter(lambda row: row != header)

    .map(lambda row: row.split(','))

    .map(lambda row: (

                int(row[0])

                , row[1]

                , int(row[2])

                , row[3]

                , row[4]

                , row[5]

                , float(row[6])

                , float(row[7])

                , float(row[8])

                , float(row[9])

        )

    )

)


sample_data_schema = spark.createDataFrame(sample_data_rdd, schema=sch)

sample_data_schema.show()



First, we create a list of .StructField(...) objects. .StructField(...) is a programmatic way of adding a field to a schema in PySpark. 

The first parameter is the name of the column we want to add.

The last parameter of .StructField(...) indicates whether the column can contain null values or not; if set to True, it means it can.


Next, we read in the DataFrames_sample.csv file using the .textFile(...) method of SparkContext. We filter out the header, as we will specify the schema explicitly and we do not need the name columns that are stored in the first row. Next, we split each row with a comma and impose the right datatypes on each element so it conforms to the schema we just specified.


Finally, we call the .createDataFrame(...) method but this time, along with the RDD, we also pass schema.

The second parameter is the datatype of the data we want to store in the column; some of the types  available include .LongType(), .StringType(), .DoubleType(), .BooleanType(), .DateType(), and .BinaryType().



# ========================================

# Creating a temporary table

DataFrames can easily be manipulated with SQL queries in Spark.


We simply use the .createTempView(...) method of a DataFrame:


sample_data_schema.createTempView('sample_data_view')


Now temporary view can  be used to extract data:


spark.sql('''

    SELECT Model

        , Year

        , RAM

        , HDD

    FROM sample_data_view

''').show()


We simply use the .sql(...) method of SparkSession, which allows us to write ANSI-SQL code to manipulate data within a DataFrame



# ==============================

Once you have created a temporary view, you cannot create another view with the same name. However, Spark provides another method that allows us to either create or update a view: .createOrReplaceTempView(...). As the name suggests, by calling this method, we either create a new view if one does not exist, or we replace an already existing one with the new one:



sample_data_schema.createOrReplaceTempView('sample_data_view')




# ===================================


Using SQL to interact with DataFrames



models_df = sc.parallelize([

      ('MacBook Pro', 'Laptop')

    , ('MacBook', 'Laptop')

    , ('MacBook Air', 'Laptop')

    , ('iMac', 'Desktop')

]).toDF(['Model', 'FormFactor'])


models_df.createOrReplaceTempView('models')


sample_data_schema.createOrReplaceTempView('sample_data_view')


spark.sql('''

    SELECT a.*

        , b.FormFactor

    FROM sample_data_view AS a

    LEFT JOIN models AS b

        ON a.Model == b.Model

    ORDER BY Weight DESC

''').show()



#============================

Aggregation

The SQL queries are not limited to extracting data only. We can also run some aggregations:


spark.sql('''

    SELECT b.FormFactor

        , COUNT(*) AS ComputerCnt

    FROM sample_data_view AS a

    LEFT JOIN models AS b

        ON a.Model == b.Model

    GROUP BY FormFactor

''').show()




# ============== Dataframe Transformation


The .select(...) transformation


The .select(...) transformation allows us to extract column or columns from a DataFrame. It works the same way as SELECT found in SQL.


sample_data_schema.select('Model', 'ScreenSize').show()



#---------------------------------------

The .filter(...) transformation


in contrast to .select(...), selects only rows that pass the condition specified. It can be compared with the WHERE statement from SQL.


Look at the following code snippet:


# extract only machines from 2015 onwards


(

    sample_data_schema

    .filter(sample_data_schema.Year > 2015)

    .show()

)



# ----------------------

The .groupBy(...) transformation

The .groupBy(...) transformation performs data aggregation based on the value (or values) from a column (or multiple columns). In SQL syntax, this equates to GROUP BY.


Look at the following code:


(

    sample_data_schema

    .groupBy('RAM')

    .count()

    .show()

)


# --------------------------------------------

The .orderBy(...) transformation

The .orderBy(...) transformation sorts the results given the columns specified. An equivalent from the SQL world would also be ORDER BY.


Look at the following code snippet:


# sort by width (W)


sample_data_schema.orderBy('W').show()





# ----------------------------------------

The .withColumn(...) transformation


The .withColumn(...) transformation applies a function to some other columns and/or literals (using the .lit(...) method) and stores it as a new function. 

In SQL, this could be any method that applies any transformation to any of the columns and uses AS to assign a new column name. 

This transformation extends the original DataFrame.



# split the HDD into size and type


(

    sample_data_schema

    .withColumn('HDDSplit', f.split(f.col('HDD'), ' '))

    .show()

)


You could achieve the same result with the .select(...) transformation. The following code will produce the same result:


# do the same as withColumn


(

    sample_data_schema

    .select(

        f.col('*')

        , f.split(f.col('HDD'), ' ').alias('HDD_Array')

    ).show()

)



# The SQL (T-SQL) equivalent would be:


SELECT *

    , STRING_SPLIT(HDD, ' ') AS HDD_Array

FROM sample_data_schema



# ========================================


The .join(...) transformation

The .join(...) transformation allow us to join two DataFrames. The first parameter is the other DataFrame we want to join with, while the second parameter specifies the columns on which to join, and the final parameter specifies the nature of the join. 

Available types are inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti. In SQL, the equivalent is the JOIN statement.


models_df = sc.parallelize([

      ('MacBook Pro', 'Laptop')

    , ('MacBook', 'Laptop')

    , ('MacBook Air', 'Laptop')

    , ('iMac', 'Desktop')

]).toDF(['Model', 'FormFactor'])


(

    sample_data_schema

    .join(

        models_df

        , sample_data_schema.Model == models_df.Model

        , 'left'

    ).show()

)



If we had a DataFrame that would not list every Model (note that the MacBook is missing), then the following code is:


models_df = sc.parallelize([

      ('MacBook Pro', 'Laptop')

    , ('MacBook Air', 'Laptop')

    , ('iMac', 'Desktop')

]).toDF(['Model', 'FormFactor'])


(

    sample_data_schema

    .join(

        models_df

        , sample_data_schema.Model == models_df.Model

        , 'left'

    ).show()



# ===============================


The .unionAll(...) transformation

The .unionAll(...) transformation appends values from another DataFrame. An equivalent in SQL syntax is UNION ALL.


Look at the following code:


another_macBookPro = sc.parallelize([

      (5,

 'MacBook Pro', 2018, '15"', '16GB', '256GB SSD', 13.75, 9.48, 0.61, 4.02)

]).toDF(sample_data_schema.columns)


sample_data_schema.unionAll(another_macBookPro).show()



# ==============================


The .distinct(...) transformation

The .distinct(...) transformation returns a list of distinct values from a column. An equivalent in SQL would be DISTINCT.


Look at the following code:


# select the distinct values from the RAM column


sample_data_schema.select('RAM').distinct().show()



# ==========================


The .repartition(...) transformation

The .repartition(...) transformation shuffles the data around the cluster and combines it into a specified number of partitions. You can also specify the column or columns you want to use to perform the partitioning on. There is no direct equivalent in the SQL world.


Look at the following code:


sample_data_schema_rep = (

    sample_data_schema

    .repartition(2, 'Year')

)


sample_data_schema_rep.rdd.getNumPartitions()



# =====================================================

The .fillna(...) transformation

The .fillna(...) transformation fills in the missing values in a DataFrame. You can either specify a single value and all the missing values will be filled in with it, or you can pass a dictionary where each key is the name of the column, and the values are to fill the missing values in the corresponding column. No direct equivalent exists in the SQL world.


Look at the following code:


missing_df = sc.parallelize([

    (None, 36.3, 24.2)

    , (1.6, 32.1, 27.9)

    , (3.2, 38.7, 24.7)

    , (2.8, None, 23.9)

    , (3.9, 34.1, 27.9)

    , (9.2, None, None)

]).toDF(['A', 'B', 'C'])


missing_df.fillna(21.4).show()



# =================================

The .dropna(...) transformation

The .dropna(...) transformation removes records that have missing values. You can specify the threshold that translates to a minimum number of missing observations in the record that qualifies it to be removed. As with .fillna(...), there is no direct equivalent in the SQL world.


Look at the following code:


missing_df.dropna().show()


# ========================================================

The .dropDuplicates(...) transformation

The .dropDuplicates(...) transformation, as the name suggests, removes duplicated records. You can also specify a subset parameter as a list of column names; the method will remove duplicated records based on the values found in those columns.


Look at the following code:


dupes_df = sc.parallelize([

      (1.6, 32.1, 27.9)

    , (3.2, 38.7, 24.7)

    , (3.9, 34.1, 27.9)

    , (3.2, 38.7, 24.7)

]).toDF(['A', 'B', 'C'])


dupes_df.dropDuplicates().show()




# ============================================


Overview of DataFrame actions

Transformations listed in the previous recipe transform one DataFrame into another. However, they only get executed once an action is called on a DataFrame.



# --------------------------------------

The .show(...) action

The .show(...) action, by default, shows the top five rows in tabular form. You can specify how many records to retrieve by passing an integer as a parameter.



# --------------------------------------

The .collect() action

The .collect() action, as the name suggests, collects all the results from all the worker nodes, and returns them to the driver. Beware of using this method on a big dataset as your driver will most likely break if you try to return the whole DataFrame of billions of records; use this method only to return small, aggregated data.


Look at the following code:


sample_data_schema.groupBy('Year').count().collect()


# --------------------------------------------


The .take(...) action

The .take(...) action works in the same as in RDDs–it returns the specified number of records to the driver node:


Look at the following code:sample_data_schema.take(2)