Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
delay_vs_distance.py 4.13 KiB
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, abs, lit, min, max, desc, concat_ws
from pyspark.sql.types import IntegerType, StringType

spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet("/user/s2878755/airline.parquet")

df = df.select(col('Distance'), col('ArrDelay'), col('DepDelay')) \
        .filter(~((col('Distance') == 0) | col('Distance').contains('NA') | col('ArrDelay').contains('NA') | col('DepDelay').contains('NA') | col('Distance').isNull() | col('ArrDelay').isNull() | col('DepDelay').isNull())) \
        .select(col('Distance').cast(IntegerType()).alias('Distance'), col('ArrDelay'), col('DepDelay')) \

df_group100_avg = df.select(col('Distance'), (col('Distance')/100).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
                .groupBy(col('Group')) \
                .agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
                .select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('AverageArrDelay'), col('AverageDepDelay')) \
                .orderBy(col('group'))

df_group100_avg.write.csv("/user/s2878755/project/delay_vs_distance/avg_delay_vs_distance_grouped100.csv")

df_group200_avg = df.select(col('Distance'), (col('Distance')/200).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
            .groupBy(col('Group')) \
            .agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
            .select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('AverageArrDelay'), col('AverageDepDelay')) \
            .orderBy(col('group'))

df_group200_avg.write.csv("/user/s2878755/project/delay_vs_distance/avg_delay_vs_distance_grouped200.csv")

df_group100_var = df.select(col('Distance'), (col('Distance')/100).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
            .groupBy(col('Group')) \
            .agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), (max(col('ArrDelay'))-abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))-abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
            .select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('VarianceArrDelay'), col('VarianceDepDelay')) \
            .orderBy(col('group'))

df_group100_var.write.csv("/user/s2878755/project/delay_vs_distance/var_delay_vs_distance_grouped100.csv")

df_group200_var = df.select(col('Distance'), (col('Distance')/200).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
            .groupBy(col('Group')) \
            .agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), (max(col('ArrDelay'))-abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))-abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
            .select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('VarianceArrDelay'), col('VarianceDepDelay')) \
            .orderBy(col('group'))

df_group200_var.write.csv("/user/s2878755/project/delay_vs_distance/var_delay_vs_distance_grouped200.csv")

df1 = df.groupBy(col('Distance')) \
        .agg(mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
        .orderBy(col('Distance'))

df2 = df.groupBy(col('Distance')) \
        .agg((max(col('ArrDelay'))-abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))-abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
        .orderBy(col('Distance'))

df1.write.csv("/user/s2878755/project/delay_vs_distance/avg_delay_vs_distance.csv")

df2.write.csv("/user/s2878755/project/delay_vs_distance/variance_delay_vs_distance.csv")