Skip to content
Snippets Groups Projects
Commit ee1bc687 authored by eriwe600's avatar eriwe600
Browse files

Code cleaned

parent 5043a506
No related branches found
No related tags found
No related merge requests found
...@@ -5,8 +5,6 @@ from pyspark.sql.types import IntegerType ...@@ -5,8 +5,6 @@ from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.getOrCreate()
#df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
df = spark.read.parquet("/user/s2878755/airline.parquet") df = spark.read.parquet("/user/s2878755/airline.parquet")
# check which years cancellation was registered # check which years cancellation was registered
...@@ -28,12 +26,7 @@ df2 = df.select('Year', lit(1).alias('TotCancellations')) \ ...@@ -28,12 +26,7 @@ df2 = df.select('Year', lit(1).alias('TotCancellations')) \
.groupBy('Year').agg(sum('TotCancellations').alias('TotCancellations')) .groupBy('Year').agg(sum('TotCancellations').alias('TotCancellations'))
df = df1.join(df2, df1.Year == df2.Year) \ df = df1.join(df2, df1.Year == df2.Year) \
.select(df1.Year, df1.CancellationCode, (100*col('Cancellations')/col('TotCancellations')).alias('Percentage')) .select(df1.Year, df1.CancellationCode, (100*col('Cancellations')/col('TotCancellations')).alias('Percentage')) \
.sort(col('Year'))
df_years = df.sort(col('Year'))
df_code = df.sort(col('CancellationCode'))
df_years.write.csv("/user/s2878755/cancellations_year_sort.csv")
df_code.write.csv("/user/s2878755/cancellations_code_sort.csv") df.write.csv('/user/s2878755/cancellations_over_years.py')
...@@ -4,6 +4,6 @@ spark = SparkSession.builder.getOrCreate() ...@@ -4,6 +4,6 @@ spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true') df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
df = df.drop('DepTime').drop('CRSDepTime').drop('ArrTime').drop('CRSArrTime').drop('UniqueCarrier').drop('TailNum').drop('AirTime').drop('Origin').drop('Dest').drop('taxiIn').drop('taxiOut') df = df.drop('DepTime').drop('CRSDepTime').drop('ArrTime').drop('CRSArrTime').drop('UniqueCarrier').drop('TailNum').drop('AirTime').drop('taxiIn').drop('taxiOut')
df.write.parquet('/user/s2878755/airline.parquet') df.write.parquet('/user/s2878755/airline.parquet')
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, abs, lit, min, max, desc, concat_ws from pyspark.sql.functions import col, mean, abs, lit, min, max, desc, concat_ws, collect_set
from pyspark.sql.types import IntegerType, StringType from pyspark.sql.types import IntegerType, StringType
spark = SparkSession.builder.getOrCreate() spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("/user/s2878755/airline.parquet") df = spark.read.parquet("/user/s2878755/airline.parquet")
df = df.select(col('Distance'), col('ArrDelay'), col('DepDelay')) \ df = df.select(col('Origin'), col('Dest'), col('Distance'), col('ArrDelay'), col('DepDelay')) \
.filter(~((col('Distance') == 0) | col('Distance').contains('NA') | col('ArrDelay').contains('NA') | col('DepDelay').contains('NA') | col('Distance').isNull() | col('ArrDelay').isNull() | col('DepDelay').isNull())) \ .filter(~((col('Distance') == 0) | col('Distance').contains('NA') | col('ArrDelay').contains('NA') | col('DepDelay').contains('NA') | col('Origin').contains('NA') | col('Dest').contains('NA') | col('Distance').isNull() | col('ArrDelay').isNull() | col('DepDelay').isNull() | col('Origin').isNull() | col('Dest').isNull())) \
.select(col('Distance').cast(IntegerType()).alias('Distance'), col('ArrDelay'), col('DepDelay')) \ .select(col('Origin'), col('Dest'), col('Distance').cast(IntegerType()).alias('Distance'), col('ArrDelay'), col('DepDelay')) \
.orderBy(col('Distance'))
df_group100_avg = df.select(col('Distance'), (col('Distance')/100).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
.groupBy(col('Group')) \
.agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
.select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('AverageArrDelay'), col('AverageDepDelay')) \
.orderBy(col('group'))
df_group100_avg.write.csv("/user/s2878755/project/delay_vs_distance/avg_delay_vs_distance_grouped100.csv")
df_group200_avg = df.select(col('Distance'), (col('Distance')/200).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
.groupBy(col('Group')) \
.agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
.select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('AverageArrDelay'), col('AverageDepDelay')) \
.orderBy(col('group'))
df_group200_avg.write.csv("/user/s2878755/project/delay_vs_distance/avg_delay_vs_distance_grouped200.csv") df.write.csv("/user/s2878755/project/delay_vs_distance/delay_vs_distance.csv")
df_group100_var = df.select(col('Distance'), (col('Distance')/100).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \ df_routes_1 = df.select(col('Origin'), col('Dest'), col('Distance'), col('ArrDelay'), col('DepDelay')) \
.groupBy(col('Group')) \ .distinct() \
.agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), (max(col('ArrDelay'))-abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))-abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \ .filter(col('Distance') > 3000)
.select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('VarianceArrDelay'), col('VarianceDepDelay')) \
.orderBy(col('group'))
df_group100_var.write.csv("/user/s2878755/project/delay_vs_distance/var_delay_vs_distance_grouped100.csv") df_routes_2 = df.select(col('Origin'), col('Dest'), col('Distance'), col('ArrDelay'), col('DepDelay')) \
.filter(col('Distance') > 3000) \
.groupBy(col('Origin'), col('Dest'), col('Distance')) \
.agg(mean(col('ArrDelay')), mean(col('DepDelay')))
df_group200_var = df.select(col('Distance'), (col('Distance')/200).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \ df_routes_1.write.csv("/user/s2878755/project/delay_vs_distance/routes_1.csv")
.groupBy(col('Group')) \ df_routes_2.write.csv("/user/s2878755/project/delay_vs_distance/routes_2.csv")
.agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), (max(col('ArrDelay'))-abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))-abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
.select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('VarianceArrDelay'), col('VarianceDepDelay')) \
.orderBy(col('group'))
df_group200_var.write.csv("/user/s2878755/project/delay_vs_distance/var_delay_vs_distance_grouped200.csv")
df1 = df.groupBy(col('Distance')) \ df_grouped = df.select(col('Distance'), (col('Distance')/100).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
.agg(mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \ .groupBy(col('Group')) \
.orderBy(col('Distance')) .agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
.select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('AverageArrDelay'), col('AverageDepDelay')) \
.orderBy(col('group'))
df2 = df.groupBy(col('Distance')) \ df_grouped.write.csv("/user/s2878755/project/delay_vs_distance/delay_vs_distance_grouped.csv")
.agg((max(col('ArrDelay'))-abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))-abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
.orderBy(col('Distance'))
df1.write.csv("/user/s2878755/project/delay_vs_distance/avg_delay_vs_distance.csv") df_average = df.groupBy(col('Distance')) \
.agg(mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
.orderBy(col('Distance'))
df2.write.csv("/user/s2878755/project/delay_vs_distance/variance_delay_vs_distance.csv") df_average.write.csv("/user/s2878755/project/delay_vs_distance/avg_delay_vs_distance.csv")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment