Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
delay_vs_distance.py 1.25 KiB
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, abs, lit, min, max, desc
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.getOrCreate()

#df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')

df = spark.read.parquet("/user/s2878755/airline.parquet")

df.show()

df = df.select(col('Distance'), col('ArrDelay'), col('DepDelay')) \
        .filter(~((col('Distance') == 0) | col('Distance').contains('NA') | col('ArrDelay').contains('NA') | col('DepDelay').contains('NA') | col('Distance').isNull() | col('ArrDelay').isNull() | col('DepDelay').isNull())) \
        .select(col('Distance').cast(IntegerType()).alias('Distance'), col('ArrDelay'), col('DepDelay'))

df1 = df.groupBy(col('Distance')) \
        .agg(mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
        .orderBy(col('Distance'))

df2 = df.groupBy(col('Distance')) \
        .agg((max(col('ArrDelay'))+abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))+abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
        .orderBy(col('Distance'))

df1.write.csv("/user/s2878755/avg_delay_vs_distance.csv")

df2.write.csv("/user/s2878755/variance_delay_vs_distance.csv")