Thursday, 21 July 2022

RDD

 


Resilient Distributed Datasets (RDDs)

are collections of Immutable JVM object that are distributed across an Apache Spark Cluster


Any Action  on Spark DATAFRAME eventually get translated into a highly optimized execution of transformation and action on RDDs


Data in rdd is split into  chuncks based on a key and then dispersed across all the executor nodes.


RDDs are resilient, that us they are able to recover quickly from any issues as tge same data chucks are replicared across multiple executor nodes

If one executor fail, another will still  process the data


./bin/pyspark -master local


There re 2 ways to create RDDs parallelize() method- a collection of list or array of some elements)

or reference file or external source



# =============== Create RDD =======================

myRDD = sc.parallelize(

("mike", 19),

("prashant", "18"),

{"shivani", "17"),

{"baibhab", "16"),

{"Rob", "17"),

{"Scott", "16")

)


Focusing first on the statement in the sc.parallelize() method, 

we first created a Python list (that is, [A, B, ..., E]) composed of a list of arrays (that is, ('Mike', 19),  ..., ('Scott', 16)). 

The sc.parallelize() method is the SparkContext's parallelize method to create a parallelized collection. 

This allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data


# = =================== View =====================


we will use the take() method to return the values to the console. 

We will now execute an RDD action take(). 

Note that a common approach in PySpark is to use collect(), which returns all values in your RDD from the Spark worker nodes to the driver.

There are performance implications when working with a large amount of data as this translates to large volumes of data being transferred from the Spark worker nodes to the driver


You should pretty much always use the take(n) method instead; it returns the first n elements of the RDD instead of the whole dataset


It is a more efficient method because it first scans one partition and uses those statistics to determine the number of partitions required to return the results

 

myRDD.take(5)



# ============ Reading data from files =====================


Local files sc.textFile('/local folder/filename.csv')

Hadoop HDFS sc.textFile('hdfs://folder/filename.csv')



# for tab seperated file


myRDD = (

    sc

    .textFile(

        '<file.csv>'

        , minPartitions=4

        , use_unicode=True

    ).map(lambda element: element.split("\t"))

)


Only the first parameter is required, which indicates the location of the text file


minPartitions: Indicates the minimum number of partitions that make up the RDD. 

The Spark engine can often determine the best number of partitions based on the file size, 

but you may want to change the number of partitions for performance reasons


use_unicode: Engage this parameter if you are processing Unicode data.


Note that if you were to execute this statement without the subsequent map() function, 

the resulting RDD would not reference the tab-delimiter—basically a list of strings that is:

it will give list of rows from file


myRDD = sc.textFile('<file.csv>')

myRDD.take(5)


# .map(...) function  transform the data from a list of strings to a list of lists:


myRDD.take(5)



# let's determine the number of rows in this RDD.

 

 myRDD.count()

 

# find out the number of partitions that support this RDD:

  myRDD.getNumPartitions()

  

  

# =============================================================


The transformations include the following common tasks:


Removing the header line from your text file: zipWithIndex()

Selecting columns from your RDD: map()

Running a WHERE (filter) clause: filter()

Getting the distinct values: distinct()

Getting the number of partitions: getNumPartitions()

Determining the size of your partitions (that is, the number of elements within each partition): mapPartitionsWithIndex()



# map


The map(f) transformation returns a new RDD formed by passing each element through a function, f.


myRDD.map(lambda c: (c[0], c[1])).take(5)




# .filter(...) transformation

The filter(f)  transformation returns a new RDD based on selecting elements for which the f function returns true. 

Therefore, look at the following code snippet:


myRdd

    .map(lambda c: (c[0], c[1]))

    .filter(lambda c: c[1] == "16")

    .take(5)

# .flatMap(...) transformation

The flatMap(f) transformation is similar to map, but the new RDD flattens out all of the elements 

note - map return list of list / Tuple

   flatmap will return list by flatting list of list

   

 myRDD

    .filter(lambda c: c[1] == "16")

    .map(lambda c: (c[0], c[1]))

    .flatMap(lambda x: x)

    .take(10)




# .distinct() transformation


The distinct() transformation returns a new RDD containing the distinct elements of the source RDD.


myRDD

    .map(lambda c: c[1])

    .distinct()

    .take(5)

# .join(...) transformation

The join(RDD') transformation returns an RDD of (key, (val_left, val_right)) when calling RDD (key, val_left) and RDD (key, val_right). 

Outer joins are supported through left outer join, right outer join, and full outer join. 



rdd1 = myRDD.map(lambda c: (c[1]))



rdd2 = myRDD.map(lambda c: (c[0], c[1]))


# Execute inner join between RDDs

rdd2.join(rdd1).take(5)




# .zipWithIndex() transformation

The zipWithIndex() transformation appends (or ZIPs) the RDD with the element indices. 

This is very handy when wanting to remove the header row (first row) of a file.



ac = myRDD.map(lambda c: (c[0], c[3]))

ac.zipWithIndex().take(5)


# To remove the header from your data


# Using zipWithIndex to skip header row

# - filter out row 0

# - extract only row info


(

    myRDD

    .zipWithIndex()

    .filter(lambda (row, idx): idx > 0)

    .map(lambda (row, idx): row)

    .take(5)

)




#  .reduceByKey(...) transformation

The reduceByKey(f) transformation reduces the elements of the RDD using f by the key. 

The f function should be commutative and associative so that it can be computed correctly in parallel.


Look at the following code snippet:


 

(

    myRDD

    .zipWithIndex()

    .filter(lambda (row, idx): idx > 0)

    .map(lambda (row, idx): row)

    .map(lambda c: (c[0], int(c[1])))

    .reduceByKey(lambda x, y: x + y)

    .take(5)

)




# .sortByKey(...) transformation

The sortByKey(asc) transformation orders (key, value) RDD by key and returns an RDD in ascending or descending order. 

(

    myRDD

    .zipWithIndex()

    .filter(lambda (row, idx): idx > 0)

    .map(lambda (row, idx): row)

    .map(lambda c: (c[0], int(c[1])))

    .reduceByKey(lambda x, y: x + y)

.sortByKey()

    .take(5)

)



# .union(...) transformation

The union(RDD) transformation returns a new RDD that is the union of the source and argument RDDs. Look at the following code snippet:


# Create `a` RDD of Washington airports

a = (

    myRDD

    .zipWithIndex()

    .filter(lambda (row, idx): idx > 0)

    .map(lambda (row, idx): row)

    .filter(lambda c: c[1] == "16")

)


# Create `b` RDD of British Columbia airports

b = (

    myRDD

    .zipWithIndex()

    .filter(lambda (row, idx): idx > 0)

    .map(lambda (row, idx): row)

    .filter(lambda c: c[1] == "17")

)


# Union WA and BC airports

a.union(b).collect()



# =================


myRdd = sc.textFile("<file>")

myRdd = myRDD.map(lambda x: x.split(","))

myRdd = myRdd.filter(lambda word: word.count() > 0)  # remove empty line


myRdd = myRdd.map(lambda x: x[0]).reduceByKey(lambda x,y: x+y)




# Actions


.take

.collect

.count

.reduce(...) action

The reduce(f) action aggregates the elements of an RDD by f. The f function should be commutative and associative so that it can be computed correctly in parallel. Look at the following code:


# Calculate the total delays of flights

# between SEA (origin) and SFO (dest),

# convert delays column to int 

# and summarize

flights\

 .filter(lambda c: c[3] == 'SEA' and c[4] == 'SFO')\

 .map(lambda c: int(c[1]))\

 .reduce(lambda x, y: x + y)





# .saveAsTextFile(...) action

The saveAsTextFile() action saves your RDD into a text file; 

note that each partition is a separate file.


# saveAsTextFile

myRdd.saveAsTextFile("abc.txt")




Pitfalls of using RDDs

The key concern associated with using RDDs is that they can take a lot of time to master. The flexibility of running functional operators such as map, reduce, and shuffle allows you to perform a wide variety of transformations against your data. But with this power comes great responsibility, and it is potentially possible to write code that is inefficient



The reason RDDs are slow—especially within the context of PySpark—is because whenever a PySpark program is executed using RDDs, there is a potentially large overhead to execute the job. 


In the PySpark driver, the Spark Context uses Py4j to launch a JVM using JavaSparkContext. 

Any RDD transformations are initially mapped to PythonRDD objects in Java.


Once these tasks are pushed out to the Spark worker(s), 

PythonRDD objects launch Python subprocesses using pipes to send both code and data to be processed in Python:


While this approach allows PySpark to distribute the processing of the data to multiple Python subprocesses on multiple workers,

There is a lot of context switching and communications overhead between Python and the JVM.


Note - This is even more apparent when using Python UDFs, as the performance is significantly slower because all of the data will need to be transferred to the driver prior to using a Python UDF. Note that vectorized UDFs were introduced as part of Spark 2.3 and will improve PySpark UDF performance. 






# ===============================================

# Data frames



As in the previous sections, let's make use of the flights dataset and create an RDD and a DataFrame against this dataset:


## Create flights RDD

flights = sc.textFile('/databricks-datasets/flights/departuredelays.csv')\

  .map(lambda line: line.split(","))\

  .zipWithIndex()\

  .filter(lambda (row, idx): idx > 0)\

  .map(lambda (row, idx): row)


# Create flightsDF DataFrame

flightsDF = spark.read\

  .options(header='true', inferSchema='true')

  .csv('~/data/flights/departuredelays.csv')

flightsDF.createOrReplaceTempView("flightsDF")




In this section, we will run the same group by statement—one via an RDD using reduceByKey(), and one via a DataFrame using Spark SQL GROUP BY. For this query, we will sum the time delays grouped by originating city and sort according to the originating city:


# RDD: Sum delays, group by and order by originating city

flights.map(lambda c: (c[3], int(c[1]))).reduceByKey(lambda x, y: x + y).sortByKey().take(50)


# Output (truncated)

# Duration: 11.08 seconds

[(u'ABE', 5113),  

 (u'ABI', 5128),  

 (u'ABQ', 64422),  

 (u'ABY', 1554),  

 (u'ACT', 392),

 ... ]

For this particular configuration, it took 11.08 seconds to extract the columns, execute reduceByKey() to summarize the data, execute sortByKey() to order it, and then return the values to the driver:


# RDD: Sum delays, group by and order by originating city

spark.sql("select origin, sum(delay) as TotalDelay from flightsDF group by origin order by origin").show(50)


# Output (truncated)

# Duration: 4.76s

+------+----------+ 

|origin|TotalDelay| 

+------+----------+ 

| ABE  |      5113| 

| ABI  |      5128|

| ABQ  |     64422| 

| ABY  |      1554| 

| ACT  |       392|

...

+------+----------+ 


There are many advantages of Spark DataFrames, including, but not limited to the following:


You can execute Spark SQL statements (not just through the Spark DataFrame API)

There is a schema associated with your data so you can specify the column name instead of position




==


DataFrames take advantage of the developments in the tungsten project and the Catalyst Optimizer. 

These two improvements bring the performance of PySpark on par with that of either Scala or Java.


Project tungsten is a set of improvements to Spark Engine aimed at bringing its execution process closer to the bare metal. The main deliverables include:


Code generation at runtime: This aims at leveraging the optimizations implemented in modern compilers

Taking advantage of the memory hierarchy: The algorithms and data structures exploit memory hierarchy for fast execution

Direct-memory management: Removes the overhead associated with Java garbage collection and JVM object creation and management

Low-level programming: Speeds up memory access by loading immediate data to CPU registers

Virtual function dispatches elimination: This eliminates the necessity of multiple CPU calls




The Catalyst Optimizer sits at the core of Spark SQL and powers both the SQL queries executed against the data and DataFrames. The process starts with the query being issued to the engine. The logical plan of execution is first being optimized. Based on the optimized logical plan, multiple physical plans are derived and pushed through a cost optimizer. The selected, most cost-efficient plan is then translated (using code generation optimizations implemented as part of the tungsten project) into an optimized RDD-based execution code.




# ======================= dataframe ==========================


A Spark DataFrame is an immutable collection of data distributed within a cluster. 

The data inside a DataFrame is organized into named columns that can be compared to tables in a relational database.



# ============= Create Dataframe =========================


There are many ways to create a DataFrame.


The simplest way is to create an RDD and convert it into a DataFrame:


# create RDD

sample_data = sc.parallelize([

      (1, 'MacBook Pro', 2015, '15"', '16GB', '512GB SSD'

        , 13.75, 9.48, 0.61, 4.02)

    , (2, 'MacBook', 2016, '12"', '8GB', '256GB SSD'

        , 11.04, 7.74, 0.52, 2.03)

    , (3, 'MacBook Air', 2016, '13.3"', '8GB', '128GB SSD'

        , 12.8, 8.94, 0.68, 2.96)

    , (4, 'iMac', 2017, '27"', '64GB', '1TB SSD'

        , 25.6, 8.0, 20.3, 20.8)

])


as with all RDDs, it is hard to figure out what each element of the tuple stands for since RDDs are schema-less structures.


Therefore, when using the .createDataFrame(...) method of SparkSession, we pass a list of column names as the second argument; the first argument is the RDD we wish to transform into a DataFrame


# create datframe

sample_data_df = spark.createDataFrame(

    sample_data

    , [

        'Id'

        , 'Model'

        , 'Year'

        , 'ScreenSize'

        , 'RAM'

        , 'HDD'

        , 'W'

        , 'D'

        , 'H'

        , 'Weight'

    ]

)



sample_data_df.take(1)


 a DataFrame is a collection of Row(...) objects. A Row(...) object consists of data that is named, unlike an RDD. 

 

 If the preceding Row(...) object looks similar to a dictionary.

 Any Row(...) object can be converted into a dictionary using the .asDict(...) method.

 

 

 sample_data_df.show()

 

 sample_data_df.printSchema()

 

 

 

 When you use the .read attribute of SparkSession, it returns a DataFrameReader object. DataFrameReader is an interface to read data into a DataFrame.

 

 

 # ================  From JSON ==================

To read data from a JSON-formatted file, you can simply do the following:


sample_data_json_df = (

    spark

    .read

    .json('../Data/DataFrames_sample.json')

)



# =================== From CSV ====================

Reading from a CSV file is equally simple:


sample_data_csv = (

    spark

    .read

    .csv(

        '../Data/DataFrames_sample.csv'

        , header=True

        , inferSchema=True)

)




# ===  Accessing underlying RDDs


Switching to using DataFrames does not mean we need to completely abandon RDDs. 

Under the hood, DataFrames still use RDDs, but of Row(...) objects, as explained earlier. 

In this recipe, we will learn how to interact with the underlying RDD of a DataFrame.




import pyspark.sql as sql

import pyspark.sql.functions as f


sample_data_transformed = (

    sample_data_df

    .rdd

    .map(lambda row: sql.Row(

        **row.asDict()

        , HDD_size=row.HDD.split(' ')[0]

        )

    )

    .map(lambda row: sql.Row(

        **row.asDict()

        , HDD_type=row.HDD.split(' ')[1]

        )

    )

    .map(lambda row: sql.Row(

        **row.asDict()

        , Volume=row.H * row.D * row.W

        )

    )

    .toDF()

    .select(

        sample_data_df.columns + 

        [

              'HDD_size'

            , 'HDD_type'

            , f.round(

                f.col('Volume')

            ).alias('Volume_cuIn')

        ]

    )

)




# =====================

# we will first read our CSV sample data into an RDD and then create a DataFrame from it


import pyspark.sql as sql


sample_data_rdd = sc.textFile('../Data/DataFrames_sample.csv')


header = sample_data_rdd.first()


sample_data_rdd_row = (

    sample_data_rdd

    .filter(lambda row: row != header)

    .map(lambda row: row.split(','))

    .map(lambda row:

        sql.Row(

            Id=int(row[0])

            , Model=row[1]

            , Year=int(row[2])

            , ScreenSize=row[3]

            , RAM=row[4]

            , HDD=row[5]

            , W=float(row[6])

            , D=float(row[7])

            , H=float(row[8])

            , Weight=float(row[9])

        )

    )

)


sample_data_schema = spark.createDataFrame(sample_data_rdd


the RDD still contains the row with column names. 

In order to get rid of it, we first extract it using the .first() method and then later using the .filter(...) transformation to remove any row that is equal to the header.


Next, we split each row with a comma and create a Row(...) object for each observation. 

Note here that we convert all of the fields to the proper datatypes. For example, the Id column should be an integer, the Model name is a string, and W (width) is a float. 


Finally, we simply call the .createDataFrame(...) method of SparkSession to convert our RDD of Row(...) objects into a DataFrame



# ====================

how to specify the schema programmatically



import pyspark.sql.types as typ


sch = typ.StructType([

      typ.StructField('Id', typ.LongType(), False)

    , typ.StructField('Model', typ.StringType(), True)

    , typ.StructField('Year', typ.IntegerType(), True)

    , typ.StructField('ScreenSize', typ.StringType(), True)

    , typ.StructField('RAM', typ.StringType(), True)

    , typ.StructField('HDD', typ.StringType(), True)

    , typ.StructField('W', typ.DoubleType(), True)

    , typ.StructField('D', typ.DoubleType(), True)

    , typ.StructField('H', typ.DoubleType(), True)

    , typ.StructField('Weight', typ.DoubleType(), True)

])


sample_data_rdd = sc.textFile('../Data/DataFrames_sample.csv')


header = sample_data_rdd.first()


sample_data_rdd = (

    sample_data_rdd

    .filter(lambda row: row != header)

    .map(lambda row: row.split(','))

    .map(lambda row: (

                int(row[0])

                , row[1]

                , int(row[2])

                , row[3]

                , row[4]

                , row[5]

                , float(row[6])

                , float(row[7])

                , float(row[8])

                , float(row[9])

        )

    )

)


sample_data_schema = spark.createDataFrame(sample_data_rdd, schema=sch)

sample_data_schema.show()



First, we create a list of .StructField(...) objects. .StructField(...) is a programmatic way of adding a field to a schema in PySpark. 

The first parameter is the name of the column we want to add.

The last parameter of .StructField(...) indicates whether the column can contain null values or not; if set to True, it means it can.


Next, we read in the DataFrames_sample.csv file using the .textFile(...) method of SparkContext. We filter out the header, as we will specify the schema explicitly and we do not need the name columns that are stored in the first row. Next, we split each row with a comma and impose the right datatypes on each element so it conforms to the schema we just specified.


Finally, we call the .createDataFrame(...) method but this time, along with the RDD, we also pass schema.

The second parameter is the datatype of the data we want to store in the column; some of the types  available include .LongType(), .StringType(), .DoubleType(), .BooleanType(), .DateType(), and .BinaryType().



# ========================================

# Creating a temporary table

DataFrames can easily be manipulated with SQL queries in Spark.


We simply use the .createTempView(...) method of a DataFrame:


sample_data_schema.createTempView('sample_data_view')


Now temporary view can  be used to extract data:


spark.sql('''

    SELECT Model

        , Year

        , RAM

        , HDD

    FROM sample_data_view

''').show()


We simply use the .sql(...) method of SparkSession, which allows us to write ANSI-SQL code to manipulate data within a DataFrame



# ==============================

Once you have created a temporary view, you cannot create another view with the same name. However, Spark provides another method that allows us to either create or update a view: .createOrReplaceTempView(...). As the name suggests, by calling this method, we either create a new view if one does not exist, or we replace an already existing one with the new one:



sample_data_schema.createOrReplaceTempView('sample_data_view')




# ===================================


Using SQL to interact with DataFrames



models_df = sc.parallelize([

      ('MacBook Pro', 'Laptop')

    , ('MacBook', 'Laptop')

    , ('MacBook Air', 'Laptop')

    , ('iMac', 'Desktop')

]).toDF(['Model', 'FormFactor'])


models_df.createOrReplaceTempView('models')


sample_data_schema.createOrReplaceTempView('sample_data_view')


spark.sql('''

    SELECT a.*

        , b.FormFactor

    FROM sample_data_view AS a

    LEFT JOIN models AS b

        ON a.Model == b.Model

    ORDER BY Weight DESC

''').show()



#============================

Aggregation

The SQL queries are not limited to extracting data only. We can also run some aggregations:


spark.sql('''

    SELECT b.FormFactor

        , COUNT(*) AS ComputerCnt

    FROM sample_data_view AS a

    LEFT JOIN models AS b

        ON a.Model == b.Model

    GROUP BY FormFactor

''').show()




# ============== Dataframe Transformation


The .select(...) transformation


The .select(...) transformation allows us to extract column or columns from a DataFrame. It works the same way as SELECT found in SQL.


sample_data_schema.select('Model', 'ScreenSize').show()



#---------------------------------------

The .filter(...) transformation


in contrast to .select(...), selects only rows that pass the condition specified. It can be compared with the WHERE statement from SQL.


Look at the following code snippet:


# extract only machines from 2015 onwards


(

    sample_data_schema

    .filter(sample_data_schema.Year > 2015)

    .show()

)



# ----------------------

The .groupBy(...) transformation

The .groupBy(...) transformation performs data aggregation based on the value (or values) from a column (or multiple columns). In SQL syntax, this equates to GROUP BY.


Look at the following code:


(

    sample_data_schema

    .groupBy('RAM')

    .count()

    .show()

)


# --------------------------------------------

The .orderBy(...) transformation

The .orderBy(...) transformation sorts the results given the columns specified. An equivalent from the SQL world would also be ORDER BY.


Look at the following code snippet:


# sort by width (W)


sample_data_schema.orderBy('W').show()





# ----------------------------------------

The .withColumn(...) transformation


The .withColumn(...) transformation applies a function to some other columns and/or literals (using the .lit(...) method) and stores it as a new function. 

In SQL, this could be any method that applies any transformation to any of the columns and uses AS to assign a new column name. 

This transformation extends the original DataFrame.



# split the HDD into size and type


(

    sample_data_schema

    .withColumn('HDDSplit', f.split(f.col('HDD'), ' '))

    .show()

)


You could achieve the same result with the .select(...) transformation. The following code will produce the same result:


# do the same as withColumn


(

    sample_data_schema

    .select(

        f.col('*')

        , f.split(f.col('HDD'), ' ').alias('HDD_Array')

    ).show()

)



# The SQL (T-SQL) equivalent would be:


SELECT *

    , STRING_SPLIT(HDD, ' ') AS HDD_Array

FROM sample_data_schema



# ========================================


The .join(...) transformation

The .join(...) transformation allow us to join two DataFrames. The first parameter is the other DataFrame we want to join with, while the second parameter specifies the columns on which to join, and the final parameter specifies the nature of the join. 

Available types are inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti. In SQL, the equivalent is the JOIN statement.


models_df = sc.parallelize([

      ('MacBook Pro', 'Laptop')

    , ('MacBook', 'Laptop')

    , ('MacBook Air', 'Laptop')

    , ('iMac', 'Desktop')

]).toDF(['Model', 'FormFactor'])


(

    sample_data_schema

    .join(

        models_df

        , sample_data_schema.Model == models_df.Model

        , 'left'

    ).show()

)



If we had a DataFrame that would not list every Model (note that the MacBook is missing), then the following code is:


models_df = sc.parallelize([

      ('MacBook Pro', 'Laptop')

    , ('MacBook Air', 'Laptop')

    , ('iMac', 'Desktop')

]).toDF(['Model', 'FormFactor'])


(

    sample_data_schema

    .join(

        models_df

        , sample_data_schema.Model == models_df.Model

        , 'left'

    ).show()



# ===============================


The .unionAll(...) transformation

The .unionAll(...) transformation appends values from another DataFrame. An equivalent in SQL syntax is UNION ALL.


Look at the following code:


another_macBookPro = sc.parallelize([

      (5,

 'MacBook Pro', 2018, '15"', '16GB', '256GB SSD', 13.75, 9.48, 0.61, 4.02)

]).toDF(sample_data_schema.columns)


sample_data_schema.unionAll(another_macBookPro).show()



# ==============================


The .distinct(...) transformation

The .distinct(...) transformation returns a list of distinct values from a column. An equivalent in SQL would be DISTINCT.


Look at the following code:


# select the distinct values from the RAM column


sample_data_schema.select('RAM').distinct().show()



# ==========================


The .repartition(...) transformation

The .repartition(...) transformation shuffles the data around the cluster and combines it into a specified number of partitions. You can also specify the column or columns you want to use to perform the partitioning on. There is no direct equivalent in the SQL world.


Look at the following code:


sample_data_schema_rep = (

    sample_data_schema

    .repartition(2, 'Year')

)


sample_data_schema_rep.rdd.getNumPartitions()



# =====================================================

The .fillna(...) transformation

The .fillna(...) transformation fills in the missing values in a DataFrame. You can either specify a single value and all the missing values will be filled in with it, or you can pass a dictionary where each key is the name of the column, and the values are to fill the missing values in the corresponding column. No direct equivalent exists in the SQL world.


Look at the following code:


missing_df = sc.parallelize([

    (None, 36.3, 24.2)

    , (1.6, 32.1, 27.9)

    , (3.2, 38.7, 24.7)

    , (2.8, None, 23.9)

    , (3.9, 34.1, 27.9)

    , (9.2, None, None)

]).toDF(['A', 'B', 'C'])


missing_df.fillna(21.4).show()



# =================================

The .dropna(...) transformation

The .dropna(...) transformation removes records that have missing values. You can specify the threshold that translates to a minimum number of missing observations in the record that qualifies it to be removed. As with .fillna(...), there is no direct equivalent in the SQL world.


Look at the following code:


missing_df.dropna().show()


# ========================================================

The .dropDuplicates(...) transformation

The .dropDuplicates(...) transformation, as the name suggests, removes duplicated records. You can also specify a subset parameter as a list of column names; the method will remove duplicated records based on the values found in those columns.


Look at the following code:


dupes_df = sc.parallelize([

      (1.6, 32.1, 27.9)

    , (3.2, 38.7, 24.7)

    , (3.9, 34.1, 27.9)

    , (3.2, 38.7, 24.7)

]).toDF(['A', 'B', 'C'])


dupes_df.dropDuplicates().show()




# ============================================


Overview of DataFrame actions

Transformations listed in the previous recipe transform one DataFrame into another. However, they only get executed once an action is called on a DataFrame.



# --------------------------------------

The .show(...) action

The .show(...) action, by default, shows the top five rows in tabular form. You can specify how many records to retrieve by passing an integer as a parameter.



# --------------------------------------

The .collect() action

The .collect() action, as the name suggests, collects all the results from all the worker nodes, and returns them to the driver. Beware of using this method on a big dataset as your driver will most likely break if you try to return the whole DataFrame of billions of records; use this method only to return small, aggregated data.


Look at the following code:


sample_data_schema.groupBy('Year').count().collect()


# --------------------------------------------


The .take(...) action

The .take(...) action works in the same as in RDDs–it returns the specified number of records to the driver node:


Look at the following code:sample_data_schema.take(2)

No comments:

Post a Comment