Skip to content
Snippets Groups Projects
Commit 7db6858b authored by a.b.wahlgren@student.utwente.nl's avatar a.b.wahlgren@student.utwente.nl
Browse files

sfsdfMerge branch 'main' of...

sfsdfMerge branch 'main' of https://gitlab.utwente.nl/s2875462/flight-delay-big-data-project into main
parents 42ace62e 07fb92e0
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,9 @@ from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
#df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
df = spark.read.parquet("/user/s2878755/airline.parquet")
# check which years cancellation was registered
'''
......@@ -25,32 +27,13 @@ df1 = df.select('Year', 'CancellationCode', lit(1).alias('Cancellations')) \
df2 = df.select('Year', lit(1).alias('TotCancellations')) \
.groupBy('Year').agg(sum('TotCancellations').alias('TotCancellations'))
df1.show()
df2.show()
''' Does not work for some reason, only run ones
def totalCancellations(year):
total = df2.filter(col('Year').contains(year))
total.show()
print(total.collect()[0].TotCancellations)
return total.collect()[0].TotCancellations
df = df1.select('Year', 'CancellationCode', ((100*col('Count')/totalCancellations(col('Year')))).alias('Percentage'))
df = df1.withColumn('Percentage', 100*df1.Count/totalCancellations(df1.Year))
'''
df = df1.join(df2, df1.Year == df2.Year) \
.select(df1.Year, df1.CancellationCode, (100*col('Cancellations')/col('TotCancellations')).alias('Percentage'))
df.show()
#df = df.select(col('Year'), col('CancellationCode'), (100*col('Cancellations')/col('TotCancellations')).alias('Percentage'))
df_years = df.sort(col('Year'))
df_years.show()
df_code = df.sort(col('CancellationCode'))
df_code.show()
df_years.write.csv("/user/s2878755/cancellations_year_sort.csv")
df_code.write.csv("/user/s2878755/cancellations_code_sort.csv")
......@@ -4,6 +4,6 @@ spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
df = df.drop('DepTime').drop('CRSDepTime').drop('ArrTime').drop('CRSArrTime').drop('UniqueCarrier').drop('FlightNum').drop('TailNum').drop('AirTime').drop('Origin').drop('Dest').drop('taxiIn').drop('taxiOut').drop('Diverted')
df = df.drop('DepTime').drop('CRSDepTime').drop('ArrTime').drop('CRSArrTime').drop('UniqueCarrier').drop('TailNum').drop('AirTime').drop('Origin').drop('Dest').drop('taxiIn').drop('taxiOut')
df.show()
df.write.parquet('/user/s2878755/airline.parquet')
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, abs, lit, min, max, desc
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, mean, abs, lit, min, max, desc, concat_ws
from pyspark.sql.types import IntegerType, StringType
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
df = spark.read.parquet("/user/s2878755/airline.parquet")
df = df.select(col('Distance'), col('ArrDelay'), col('DepDelay')) \
.filter(~((col('Distance') == 0) | col('Distance').contains('NA') | col('ArrDelay').contains('NA') | col('DepDelay').contains('NA') | col('Distance').isNull() | col('ArrDelay').isNull() | col('DepDelay').isNull())) \
.select(col('Distance').cast(IntegerType()).alias('Distance'), col('ArrDelay'), col('DepDelay'))
.select(col('Distance').cast(IntegerType()).alias('Distance'), col('ArrDelay'), col('DepDelay')) \
df.show()
df_group100_avg = df.select(col('Distance'), (col('Distance')/100).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
.groupBy(col('Group')) \
.agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
.select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('AverageArrDelay'), col('AverageDepDelay')) \
.orderBy(col('group'))
df_group100_avg.write.csv("/user/s2878755/project/delay_vs_distance/avg_delay_vs_distance_grouped100.csv")
df_group200_avg = df.select(col('Distance'), (col('Distance')/200).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
.groupBy(col('Group')) \
.agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
.select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('AverageArrDelay'), col('AverageDepDelay')) \
.orderBy(col('group'))
df_group200_avg.write.csv("/user/s2878755/project/delay_vs_distance/avg_delay_vs_distance_grouped200.csv")
df_group100_var = df.select(col('Distance'), (col('Distance')/100).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
.groupBy(col('Group')) \
.agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), (max(col('ArrDelay'))-abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))-abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
.select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('VarianceArrDelay'), col('VarianceDepDelay')) \
.orderBy(col('group'))
df_group100_var.write.csv("/user/s2878755/project/delay_vs_distance/var_delay_vs_distance_grouped100.csv")
df_group200_var = df.select(col('Distance'), (col('Distance')/200).cast(IntegerType()).alias('Group'), col('ArrDelay'), col('DepDelay')) \
.groupBy(col('Group')) \
.agg((min(col('Distance')).cast(StringType())).alias('min_dist'), (max(col('Distance')).cast(StringType())).alias('max_dist'), (max(col('ArrDelay'))-abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))-abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
.select(col('group'), concat_ws('-',col('min_dist'),col('max_dist')).alias('Distance'), col('VarianceArrDelay'), col('VarianceDepDelay')) \
.orderBy(col('group'))
df_group200_var.write.csv("/user/s2878755/project/delay_vs_distance/var_delay_vs_distance_grouped200.csv")
df1 = df.groupBy(col('Distance')) \
.agg(mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
.orderBy(col('Distance'))
df1.show()
print(df1.count())
df2 = df.groupBy(col('Distance')) \
.agg((max(col('ArrDelay'))+abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))+abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
.agg((max(col('ArrDelay'))-abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))-abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
.orderBy(col('Distance'))
df2.show()
df1.write.csv("/user/s2878755/project/delay_vs_distance/avg_delay_vs_distance.csv")
print(df2.count())
df2.write.csv("/user/s2878755/project/delay_vs_distance/variance_delay_vs_distance.csv")
#project code
#group 3
#Run time for distributed mode with 10 executer is 2m39s
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.functions import col, round
......@@ -9,7 +12,9 @@ sc = SparkContext(appName="project")
sc.setLogLevel("ERROR")
spark = SparkSession(sc)
df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
#read the dataset
#df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
df = spark.read.parquet("/user/s2878755/airline.parquet")
#eleminating NA values
df=df.filter(col('WeatherDelay')!= 'NA')
......@@ -17,7 +22,7 @@ df=df.filter(col('CarrierDelay')!= 'NA')
df=df.filter(col('NASDelay')!= 'NA')
df=df.filter(col('SecurityDelay')!= 'NA')
df=df.filter(col('LateAircraftDelay')!= 'NA')
df=df.filter(col('CancellationCode') != 'NA')
#finding the percentage of cancelled flights
df_c=df.groupBy('Cancelled').count()
......@@ -26,21 +31,22 @@ df_cancelled_vs_uncancelled=df_c\
.withColumn ('percent', F.col('count')/F.col('total')*100)
#round the percentage
df_cancelled_vs_uncancelled=df_cancelled_vs_uncancelled.select('Cancelled',round(col('percent'),2).alias('Cancelled_percentage'))
#save the output
#df_cancelled_vs_uncancelled.write.option("header",True).csv("file:/home/s2300397/Project_outputs")
#finding the proportion between different cancellation reasons
df1=df.select('FlightNum', 'Cancelled', 'CancellationCode')
df2=df1.filter(col('Cancelled')== 1)
df3=df2.filter(col('CancellationCode') != 'NA')
df4=df2.groupBy('CancellationCode').count()
df_carrier=df4.filter(col('CancellationCode')== 'A')
df4=df3.groupBy('CancellationCode').count()
#df_carrier=df4.filter(col('CancellationCode')== 'A')
df5=df4\
.withColumn('total', F.sum('count').over(Window.partitionBy()))\
.withColumn ('proportions', F.col('count')/F.col('total')*100)
df5=df5.select('CancellationCode',round(col('proportions'),2).alias('proportions'))
#df5.show()
#df5.write.option("header",True).csv("file:/home/s2300397/Project_outputs_2")
#finding the percentage of diverted flights
df_d=df.select(col('FlightNum').alias('FN'), 'Cancelled', 'CancellationCode', 'Diverted')
......@@ -50,6 +56,8 @@ df_d3=df_d2\
.withColumn('total', F.sum('count').over(Window.partitionBy()))\
.withColumn('percentage', F.col('count')/F.col('total')*100)
df_d3=df_d3.select('Diverted',round(col('percentage'),2).alias('Diverted_percentage'))
#df_d3.show()
#99.77% not diverted. 0.23% diverted
#how are diverted flights related to the cancellation reason?
df_diverted=df.select(col('FlightNum').alias('FN'), 'Diverted')
......@@ -63,40 +71,35 @@ df_relation=df_join\
.withColumn ('proportions', F.col('count')/F.col('total')*100)
df_relation=df_relation.select('CancellationCode',round(col('proportions'),2).alias('Diverted_cc_proportions'))
#df_relation.write.option("header",True).csv("file:/home/s2300397/Project_outputs_3")
#################
#arrival delay time vs departure delay time
df_diff=df.select('FlightNum', 'ArrDelay', 'DepDelay','AirTime', 'WeatherDelay','CarrierDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay')
df_diff=df_diff.filter(col('ArrDelay')>0)
df_diff=df_diff.filter(col('DepDelay')>0)
df_arrival=df.select('FlightNum', 'ArrDelay', 'DepDelay')
#time difference
df_diff_calculation=df_diff.withColumn('timediff', col('DepDelay')-col('ArrDelay'))
df_w=df_diff_calculation.sort(col('timediff').desc())
#arrival time analysis
#arrival delay time analysis
from pyspark.mllib.stat import Statistics
df_arrival=df.select('FlightNum', 'ArrDelay', 'DepDelay','AirTime', 'WeatherDelay','CarrierDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay', 'TaxiIn', 'TaxiOut')
df_diff=df_diff.withColumn('ArrDelay', df_diff['ArrDelay'].cast(IntegerType()))
df_diff=df_diff.withColumn('DepDelay', df_diff['Delay'].cast(IntegerType()))
correlation=df_diff.stat.corr("ArrDelay", "DepDelay")
print(str(correlation)) #the correlation is 0.86 which means there is a positive strong correlation between departure delay and arrival delay
df_arrival=df_arrival.withColumn('ArrDelay', df_arrival['ArrDelay'].cast(IntegerType()))
df_arrival=df_arrival.withColumn('DepDelay', df_arrival['DepDelay'].cast(IntegerType()))
correlation=df_arrival.stat.corr("ArrDelay", "DepDelay")
print(str(correlation)) #the correlation is 0.86 which means that there is a positive strong correlation between departure delay and arrival delay
df_diff=df.select('DepDelay', 'WeatherDelay','CarrierDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay')
#check the data types
df_diff.printSchema()
#convert from string to integer
#df_diff.printSchema()
#convert from string to integer
df_diff=df_diff.withColumn('WeatherDelay', df_diff['WeatherDelay'].cast(IntegerType()))
df_diff=df_diff.withColumn('CarrierDelay', df_diff['CarrierDelay'].cast(IntegerType()))
df_diff=df_diff.withColumn('SecurityDelay', df_diff['SecurityDelay'].cast(IntegerType()))
df_diff=df_diff.withColumn('NASDelay', df_diff['NASDelay'].cast(IntegerType()))
df_diff=df_diff.withColumn('LateAircraftDelay', df_diff['LateAircraftDelay'].cast(IntegerType()))
#df_diff=df_diff.withColumn('DepDelay', df_diff['DepDelay'].cast(IntegerType()))
df_diff=df_diff.withColumn('DepDelay', df_diff['DepDelay'].cast(IntegerType()))
df_diff=df_diff.filter(col('DepDelay')>0)
#linear regression
from pyspark.ml.feature import VectorAssembler
......@@ -126,49 +129,9 @@ print(test_stats.meanSquaredError)
#------
#visualize the original and predicted 'label' data
prediction=lrModel.transform(data_final)
x_ax = range(0, data_final.count())
#the following code about the visualization (up yo ###) doesnt work in pyspark
y_pred = prediction.select("prediction").collect()
y_orig = prediction.select("DepDelay").collect()
#visualize the original and predicted data in a plot
import matplotlib.pyplot as plt
plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Delay test and predicted data")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show()
###
#How has the proportion between the different cancellation reasons .changed in different years (2004-2008)?
# terrorist attack 11 september 2001
df_date=df.select('FlightNum','DayofMonth','Month', 'Year', 'Cancelled', 'CancellationCode')
#merge day of month, month and year into 1 column
df_date=df_date.withColumn('Date',F.concat(F.col('DayofMonth'),F.lit("-"), F.col('Month'),F.lit("-"), F.col('Year')))
df_date=df_date.filter(col('CancellationCode')!= 'NA')
df_date=df_date.filter(col('Cancelled').rlike('1'))
df_date_after=df_date.filter(col('Date')> '%11-%09-%2001')
#finding the proportions
df_date_after=df_date_after.groupBy('CancellationCode').count()
df_date_after=df_date_after\
.withColumn('total', F.sum('count').over(Window.partitionBy()))\
.withColumn ('proportions', F.col('count')/F.col('total')*100)
df_date_after=df_date_after.select('CancellationCode',round(col('proportions'),2).alias('proportions'))
#df_date_after.show()
df_date_before=df_date.filter(col('Date')< '%11-%09-%2001') ##dpesnt work
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment