Tuesday 25 July 2023

SQL & PySpark Comparison

 

SQL & PySpark Comparison

Consept

SQL

PySpark

Select

SELECT column(s) From table

SELECT * FROM table;

df.select("column(s)")

df.select("*")

DISTINCT

SELECT DISTINCT column(s) FROM table

df.select("column(s)").distinct()

WHERE

SELECT column(s) FROM table WHERE condition

df.filter(condition)\.select("column(s)")

ORDER BY

SELECT column(s) FROM table ORDER BY column(s)

df.sort("column(s)")\.select("column(s)")

LIMIT

SELECT column(s) FROM table LIMIT n

df.limit(n).select("column(s)")

COUNT

SELECT COUNT(*) FROM table

df.count()

SUM

SELECT SUM(column) FROM table

from pyspark.sql.functions import sum;

df.agg(sum("column"))

AVG

SELECT AVG(column) FROM table

from pyspark.sql.functions import avg;

df.agg(avg("column"))

MAX/ MIN

SELECT MAX(column) FROM table

from pyspark.sql.functions import max;

df.agg(max("column"))

String Length

SELECT LEN(string) FROM table

from pyspark.sql.functions import length;

df.select(length(col("string")))

Convert to Uppercase

SELECT UPPER(string) FROM table

from pyspark.sql.functions import upper;

df.select(upper(col("string")))

Convert to Lowercase

SELECT LOWER(string) FROM table

from pyspark.sql.functions import lower;

df.select(lower(col("string")))

Concatenate Strings

SELECT CONCAT(string1,string2) FROM table

from pyspark.sql.functions import concat;

df.select(concat((col("string"),col("string")))

Trim String

SELECT TRIM(string) FROM table

from pyspark.sql.functions import trim;

df.select(trim(col("string")))

Substring

SELECT SUBSTRING(string, start, length) FROM table

from pyspark.sql.functions import substring;

df.select(substring(col("string"),start,length))

CURDATE, NOW,

CURTIME

SELECT CURDATE() FROM table

from pyspark.sql.functions import current_date;

df.select(current_date())

CAST, CONVERT

SELECT CAST(column AS datatype) FROM table

df.select(col("column").cast("datatype"))

IF

SELECT IF(condition, value1, value2) FROM table

from pyspark.sql.functions import when,otherwise;

df.select(when(condition.value1)\.otherwise(value2))

COALESCE

SELECT COALESCE(column1, column2, column3)

FROM table

from pyspark.sql.functions import coalesce;

df.select(coalesce("column1", "column2", "column3"))

JOIN

JOIN table1 ON table1.column = table2.column

df1.join(df2, "column")

GROUP BY

GROUP BY column(s)

df.groupBy("column(s)")

PIVOT

PIVOT (agg_function(column) FOR pivot_column IN (values))

df.groupBy("pivot_column")\

.pivot("column").agg(agg_function)

Logical Operations

SELECT column FROM table

WHERE column1 = value AND column2 > value

df.filter((col("column1") == value) & (col("column2") == value))

IS NULL, IS NOT NULL

SELECT column FROM table WHERE column IS NULL

df.filter(col("column1").isNull())\

.select("column")

IN

SELECT column FROM table

WHERE column IN (value1, value2, value3)

df.filter(col("column1")\

.isin(value1, value2, value3))\

.select("column")

LIKE

SELECT column FROM table WHERE column LIKE 'value%'

df.filter(col("column").like("value%"))

BETWEEN

SELECT column FROM table

WHERE column BETWEEN value1 AND value2

df.filter(col("column") >= valye1) & (col("column") <= value2))\

.select("column")

UNION, UNION ALL

SELECT column FROM table1

UNION SELECT column FROM table2

df1.union(df2).select("column") or

df1.unionAll(df2).select("column")

RANK,

DENSERANK,

ROWNUMBER

SELECT column, RANK() OVER (ORDER BY column) as

rank FROM table

from pyspark.sql import Window;

from pyspark.sql.functions import rank;

df.select("column", rank().over(Window.orderBy("column"))\

.alias("rank"))

CTE

WITH cte1 AS (SELECT * FROM table1),

SELECT * FROM cte1 WHERE condition

df.createOrReplaceTempView("cte1");

df_cte1 = spark.sql("SELECT * FROM cte1 WHERE condition");

df_cte1.show() or

df.filter(condition1).filter(condition2)




DDL Operations



Datatypes

INT: for interger values

BIGINT: for large integer values

FLOAT: for floating point values

DOUBLE: for double precision floating point values

CHAR: for fixed-length character strings

VARCHAR: for variable-length character strings

DATE: for date values

TIMESTAMP: for timestamp values

in PySpark, the date types are similar, but are represented differently.

IntergerType: for integer value

LongType: for long integer values

FloatType: for floating point values

DoubleType: for double precision floating point values

StringType: for character strings

TimestampType: for timestamp values

DateType: for date values

create table

CREATE TABLE table_name

(column_name data_type constraint);

df.write.format("parquet")\

.saveAsTable("table_name")

Create Table with

Columns definition

CREATE TABLE table_name

(column_name data_type [constraint],

column_name data_type [constraint], .....);

from pyspark.sql.types import StructType,

StructField, IntegerType, StringType,DecimalType

schema = StructType([

StructField("id", IntegerType(), Ture),

StructField("name", StringType(), False),

StructField("age", IntegerType(), Ture)

StructField("salary", DecimalType(), Ture)])

df= spark.createDataFrame([], schema)

Create Table with

Primary Key

CREATE TABLE table_name(

column_name data_type

PRIMARY KEY, .....)


If table already exists:

ALTER TABLE table_name

ADD PRIMARY KEY

(column_name);

In PySpark or HiveQL, primary key constraints are not

enforced directly. However, you can use the

dropDuplicates() method to remove

duplicate rows based on one or more columns.


df = df.dropDuplicates(["id"])

Create Table with Auto

Increment constraints

CREATE TABLE table_name(

id INT AUTO_INCREMENT,

name VARCHAR(255),

PRIMARY KEY(id));

not natively supported by the DataFrame API, but there are

several ways to achieve the same functionality.


from pyspark.sql.functions import

monotonically_increasing_id

df = df.withColumn("id", monotonically_increasing_id()+ start_value)

Adding a column

ALTER TABLE table_name

ADD column_name datatype;

from pyspark.sql.functions import lit

df = df.withColumn("column_name", lit(None).cast("datatype"))

Modifying a column

ALTER TABLE table_name

MODIFY column_name datatype;

df = df.withColumn("column_name", df["column_name"].cast("datatype"))

Dropping a column

ALTER TABLE table_name

DROP COLUMN column_name;

df = df.drop("column_name")

Rename a column

ALTER TABLE table_name

RENAME COLUMN old_column_name TO new_column_name;


In mysql,

ALTER TABLE employees

CHANGE COLUMN first_name first_name_new VARCHAR(255);

df = df.withColumnRenamed("existing_column", "new_column")


SQL & PySpark Comparison

  SQL & PySpark Comparison Consept SQL PySpark Select SELECT column(s) From table SELECT * FROM table; df.select("column(s)") ...