Skip to content
Snippets Groups Projects
Commit 20423198 authored by s2017873's avatar s2017873
Browse files

Merge branch 'new_project' into 'master'

New project

See merge request !1
parents 9e5f8a22 155d9bb3
No related branches found
No related tags found
1 merge request!1New project
PYSPARK_PYTHON=./environment/bin/python time spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.dynamicAllocation.maxExecutors=10 --conf spark.files.overwrite=true --archives warc_env.tar.gz#environment get_url_ip_from_way.py
\ No newline at end of file
import pyspark
from warcio.archiveiterator import ArchiveIterator
import warc
import esutil
import json
from pyspark.sql import SparkSession
from pyspark import SparkFiles, SparkConf
conf = SparkConf()
conf.appName = 'master-yarn'
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
# m1_df = spark.read.csv('/user/s2017873/top-1m.csv.gz')
wat_files = esutil.hdfs.ls('/user/s2017873/wat_files/', False, False)
def get_records_for_path(path):
result = []
with esutil.hdfs.opent('hdfs:' + path) as stream:
for record in ArchiveIterator(stream):
if record.rec_type =='metadata':
# wat_json = spark.read.json(record.content_stream().read()).asDict()
wat_json = json.loads(record.content_stream().read())
if "WARC-IP-Address" in wat_json["Envelope"]["WARC-Header-Metadata"]:
ip = wat_json["Envelope"]["WARC-Header-Metadata"]["WARC-IP-Address"]
url = wat_json["Envelope"]["WARC-Header-Metadata"]["WARC-Target-URI"]
no_https = url.split("//")[1]
if not ('/' in no_https):
result.append((no_https, ip))
return result
rdd = sc.parallelize(wat_files)
new_rdd = rdd.flatMap(lambda path: get_records_for_path(path))
# m1_rdd = m1_df.rdd.map(lambda (rank, url) : (url, rank))
#with open("/home/s2017873/log4.txt", "w+") as f:
# f.write(str(new_rdd.take(1)))
# final_rdd = m1_rdd.leftOuterJoin(new_rdd)
# joined_rdd = m1_rdd.join(new_rdd).take(1)
#with open("/home/s2017873/log4.txt", 'w+') as f:
# first = final_rdd.take(1)
# f.write(str(first))
new_rdd.saveAsTextFile("hdfs:///user/s2017873/FINAL")
# joined_rdd.saveAsTextFile("output_folder")
# final_rdd.saveAsTextFile("url_rank_ip.csv")
from pyspark.sql import SparkSession
from warcio.archiveiterator import ArchiveIterator
import json
import sys
import subprocess
import esutil
import multiprocessing as mp
from urlparse import urlparse
# Put your starting and ending index here
if len(sys.argv) <= 3 :
start = 0
end = 0
else:
start = int(sys.argv[1])
end = int(sys.argv[2])
print("Start has been set to {0}, end has been set to {1}".format(start, end))
# Do not touch the rest
wat_files = esutil.hdfs.ls("/user/s2017873/wat_files")
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
def get_dicts_for_path(path, queue):
result = set()
cat = subprocess.Popen(["hdfs", "dfs", "-cat", path], stdout=subprocess.PIPE)
for record in ArchiveIterator(cat.stdout):
if record.rec_type =='metadata':
# wat_json = spark.read.json(record.content_stream().read()).asDict()
wat_json = json.loads(record.content_stream().read())
payload = wat_json["Envelope"]["WARC-Header-Metadata"]
if "WARC-IP-Address" in payload:
url = payload["WARC-Target-URI"]
ip = payload["WARC-IP-Address"]
parsed_uri = urlparse(url)
result.add((parsed_uri.netloc, ip))
queue.put((path, list(result)))
def write_list_to_csv(queue, subset):
for i in range(len(subset)):
path, result = queue.get()
df = sc.parallelize(list(result)).toDF()
df = df.withColumnRenamed("_1", "url")\
.withColumnRenamed("_2", "ip")
dir_name = path.split("/")[-1].split(".")[0]
file_path = "/user/s2017873/url_ip/" + dir_name
df.write.csv(file_path)
print("{0} / {1}".format(i, len(subset)))
subset = wat_files[start:end]
processes = []
q = mp.Queue()
writer = mp.Process(target=write_list_to_csv, args=(q, subset, ))
writer.start()
print("Started")
for path in subset:
p = mp.Process(target=get_dicts_for_path, args = (path, q, ))
processes.append(p)
p.start()
writer.join()
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment