Skip to content
Snippets Groups Projects
Commit f175d95f authored by a.b.wahlgren@student.utwente.nl's avatar a.b.wahlgren@student.utwente.nl
Browse files

canc_thresh

parent 7db6858b
No related branches found
No related tags found
No related merge requests found
......@@ -2,8 +2,9 @@ from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header="true")
#df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header="true")
df = spark.read.parquet("/user/s2878755/airline.parquet")
'''
QUESTION: What is the average delay time when it is caused by security reasons vs when it is caused by weather conditions?
Local run: 6m50.311s
......@@ -43,10 +44,10 @@ print("The average arrival delay is "+str(avg_arr)+" minutes.")
'''
--- Average Departure Delay ---
'''
df_dep = df.select(col("DepDelay")).where(col("DepDelay") > 0)
sum_dep = df_dep.rdd.map(lambda x: (1, x[0])).reduceByKey(lambda x, y: int(x) + int(y)).collect()[0][1]
avg_dep = sum_dep / df_dep.count()
print("The average departure delay is "+str(avg_dep)+" minutes.")
df_dep = df.select(col("DepDelay")).where(col("DepDelay") > 0)
sum_dep = df_dep.rdd.map(lambda x: (1, x[0])).reduceByKey(lambda x, y: int(x) + int(y)).collect()[0][1]
avg_dep = sum_dep / df_dep.count()
print("The average departure delay is "+str(avg_dep)+" minutes.")
'''
--- Average Carrier Delay ---
......@@ -56,3 +57,19 @@ sum_car = df_arr.rdd.map(lambda x: (1, x[0])).reduceByKey(lambda x, y: int(x) +
avg_car = sum_car / df_car.count()
print("The average carrier delay caused is "+str(avg_car)+" minutes.")
'''
--- Average NAS Delay ---
'''
df_nas = df.select(col("NASDelay")).where(col("NASDelay") > 0)
sum_nas = df_nas.rdd.map(lambda x: (1, x[0])).reduceByKey(lambda x, y: int(x) + int(y)).collect()[0][1]
avg_nas = sum_nas / df_nas.count()
print("The average NAS delay is "+str(avg_nas)+" minutes.")
'''
--- Average Late Aircraft Delay ---
'''
df_late = df.select(col("LateAircraftDelay")).where(col("LateAircraftDelay") > 0)
sum_late = df_late.rdd.map(lambda x: (1, x[0])).reduceByKey(lambda x, y: int(x) + int(y)).collect()[0][1]
avg_late = sum_late / df_late.count()
print("The average late aircraft delay is "+str(avg_late)+" minutes.")
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max as pmax
from pyspark.sql.functions import col
#from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header="true")
#df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header="true")
df = spark.read.parquet("/user/s2878755/airline.parquet")
'''
QUESTION: Is there a certain threshold where the total delay causes flights to be canceled?
......@@ -10,36 +13,57 @@ Local run:
Cluster run:
Approach:
1. Get all flights which are cancelled and have been delayed
2. Sort them on department delay
3. Look for patterns
1. Get all flights which don't have a NA value for departure delay, with cols depdelay and cancelled
2. Fit a logistic regression model to do a binary classification (cancelled/not cancelled) based on the total delay (depdelay)
3. Analyze the interception of the model and see if there's some correlation.
Output:
'''
df2 = df.select(col("DepDelay"), col("Cancelled"))\
.where(col("DepDelay") >= 0)\
.sort(col("DepDelay"))
df2 = df.select(col("DepDelay").cast("int"), col("Cancelled").cast("int"))\
.where(col("DepDelay") >= 0)
# .sort(col("DepDelay"))
assembler = VectorAssembler(inputCols=["DepDelay"], outputCol="features")
transformed_df = assembler.transform(df2)
data = transformed_df.select("features", "Cancelled")
train_data, test_data = data.randomSplit([0.7, 0.3])
tot = df2.count()
from pyspark.ml.regression import LinearRegression
df3 = df2.limit(tot//4).select(pmax("DepDelay").alias("max"))
df3.show()
lr = LinearRegression(labelCol="Cancelled")
lrModel = lr.fit(train_data)
test_stats = lrModel.evaluate(test_data)
df4 = df2.limit(2*tot//4).select(pmax("DepDelay").alias("max"))
df4.show()
print("R2 error:", test_stats.r2)
print("Coefficients: ", lrModel.coefficients)
print("Intercept: ", lrModel.intercept)
df5 = df2.limit(3*tot//4).select(pmax("DepDelay").alias("max"))
df5.show()
df6 = df2.select(pmax("DepDelay").alias("max"))
df6.show()
#tot = df2.count()
#lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
#lrModel = lr.fit(df2)
#print("Coefficients: "+str(lrModel.coefficients))
#print("Intercept: "+str(lrModel.intercept))
#df3 = df2.limit(tot//4).select(pmax("DepDelay").alias("max"))
#df3.show()
#df4 = df2.limit(2*tot//4).select(pmax("DepDelay").alias("max"))
#df4.show()
#df5 = df2.limit(3*tot//4).select(pmax("DepDelay").alias("max"))
#df5.show()
#df6 = df2.select(pmax("DepDelay").alias("max"))
#df6.show()
# .where((col("Cancelled") == 1) & (col("DepDelay") > 0))
df2.show(10)
print(df2.count())
#print(df2.count())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment