Skip to content
Snippets Groups Projects
Commit bd70e43c authored by eriwe600's avatar eriwe600
Browse files

add parquet

parent a3742906
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,9 @@ from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
#df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
df = spark.read.parquet("/user/s2878755/airline.parquet")
# check which years cancellation was registered
'''
......@@ -25,32 +27,13 @@ df1 = df.select('Year', 'CancellationCode', lit(1).alias('Cancellations')) \
df2 = df.select('Year', lit(1).alias('TotCancellations')) \
.groupBy('Year').agg(sum('TotCancellations').alias('TotCancellations'))
df1.show()
df2.show()
''' Does not work for some reason, only run ones
def totalCancellations(year):
total = df2.filter(col('Year').contains(year))
total.show()
print(total.collect()[0].TotCancellations)
return total.collect()[0].TotCancellations
df = df1.select('Year', 'CancellationCode', ((100*col('Count')/totalCancellations(col('Year')))).alias('Percentage'))
df = df1.withColumn('Percentage', 100*df1.Count/totalCancellations(df1.Year))
'''
df = df1.join(df2, df1.Year == df2.Year) \
.select(df1.Year, df1.CancellationCode, (100*col('Cancellations')/col('TotCancellations')).alias('Percentage'))
df.show()
#df = df.select(col('Year'), col('CancellationCode'), (100*col('Cancellations')/col('TotCancellations')).alias('Percentage'))
df_years = df.sort(col('Year'))
df_years.show()
df_code = df.sort(col('CancellationCode'))
df_code.show()
df_years.write.csv("/user/s2878755/cancellations_year_sort.csv")
df_code.write.csv("/user/s2878755/cancellations_code_sort.csv")
......@@ -4,6 +4,6 @@ spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
df = df.drop('DepTime').drop('CRSDepTime').drop('ArrTime').drop('CRSArrTime').drop('UniqueCarrier').drop('FlightNum').drop('TailNum').drop('AirTime').drop('Origin').drop('Dest').drop('taxiIn').drop('taxiOut').drop('Diverted')
df = df.drop('DepTime').drop('CRSDepTime').drop('ArrTime').drop('CRSArrTime').drop('UniqueCarrier').drop('FlightNum').drop('TailNum').drop('AirTime').drop('Origin').drop('Dest').drop('taxiIn').drop('taxiOut')
df.show()
df.write.parquet('/user/s2878755/airline.parquet')
......@@ -4,26 +4,24 @@ from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
#df = spark.read.csv("/user/s2875462/airline.csv.shuffle", header='true')
df = spark.read.parquet("/user/s2878755/airline.parquet")
df.show()
df = df.select(col('Distance'), col('ArrDelay'), col('DepDelay')) \
.filter(~((col('Distance') == 0) | col('Distance').contains('NA') | col('ArrDelay').contains('NA') | col('DepDelay').contains('NA') | col('Distance').isNull() | col('ArrDelay').isNull() | col('DepDelay').isNull())) \
.select(col('Distance').cast(IntegerType()).alias('Distance'), col('ArrDelay'), col('DepDelay'))
df.show()
df1 = df.groupBy(col('Distance')) \
.agg(mean(col('ArrDelay')).alias('AverageArrDelay'), mean(col('DepDelay')).alias('AverageDepDelay')) \
.orderBy(col('Distance'))
df1.show()
print(df1.count())
df2 = df.groupBy(col('Distance')) \
.agg((max(col('ArrDelay'))+abs(min(col('ArrDelay')))).alias('VarianceArrDelay'), (max(col('DepDelay'))+abs(min(col('DepDelay')))).alias('VarianceDepDelay')) \
.orderBy(col('Distance'))
df2.show()
df1.write.csv("/user/s2878755/avg_delay_vs_distance.csv")
print(df2.count())
df2.write.csv("/user/s2878755/variance_delay_vs_distance.csv")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment