Commit fc7a1a26
seperated to class

parent 65de8b37
from typing import Tuple
import os
import ast
class job_data:
def __init__(self, location : str, image : str, depends : list[str], stage : str, artifact : list[str], cache : list[str]) -> None:
self.location: str = os.path.relpath(location,os.getcwd()).replace("\\", os.sep).replace("/", os.sep)
self.image: str = image
self.depends: list[str] = depends
self.stage: str = stage
self.artifact: list[str] = artifact
self.cache: list[str] = cache
self.finished = False
self.failed = False
def can_run(self, jobs : list["job_data"]) -> bool:
if self.finished == True:
return False
dep = self.depends.copy()
if len(dep) == 0:
return True
job_suc = 0
for d in dep:
for job in jobs:
rel_path = os.path.relpath(job.location, os.getcwd())
if rel_path != d:
if job.finished == True:
if job.failed == True:
self.failed = True
self.finished = True
job_suc = job_suc + 1
return len(dep) == job_suc
def construct_from_string(file_path : str, file_string : str) -> "job_data":
extracted_data = job_data.extract_data(file_path, file_string)
if extracted_data is None:
raise FileNotFoundError
(image, depends, stage, artifact, cache) = extracted_data
return job_data(file_path, image, depends, stage, artifact, cache)
def construct_from_file(file_path : str) -> "job_data":
with open(file=file_path, mode='r') as file:
return job_data.construct_from_string(file_path,
def normalize_path(file_path, depend) -> str:
file_dir = os.path.dirname(file_path)
dir_path = os.getcwd()
file_location = os.path.join(file_dir, depend)
relative_file = file_location.removeprefix(dir_path + os.path.sep)
return (relative_file.replace("\\", os.path.sep).replace("/", os.path.sep))
def extract_data(file_path : str, file_string : str) -> None | Tuple[str, list[str], str, list[str], list[str]]:
if "parse-ci" in file_path:
return None
image = None
depends = []
stage = "unknown"
artifacts = []
cache = []
for line in file_string.splitlines():
line = line.removesuffix("\n")
if not line.startswith("#"):
if line.startswith("#image: "):
image = line.removeprefix("#image: ")
if line.startswith("#depends: "):
text = line.removeprefix("#depends: ")
if "[]" in text:
depends = []
if "[" in text:
depends = [job_data.normalize_path(file_path, depend) for depend in ast.literal_eval(text)]
depends = [job_data.normalize_path(file_path, text)]
if line.startswith("#image: "):
stage = line.removeprefix("#image: ")
if line.startswith("#artifacts: "):
text = line.removeprefix("#artifacts: ")
if "[]" in text:
artifacts = []
if "[" in text:
artifacts = [job_data.normalize_path(file_path, depend) for depend in ast.literal_eval(text)]
artifacts = [job_data.normalize_path(file_path, text)]
if line.startswith("#cache: "):
text = line.removeprefix("#cache: ")
if "[]" in text:
cache = []
if "[" in text:
cache = [job_data.normalize_path(file_path, depend) for depend in ast.literal_eval(text)]
cache = [job_data.normalize_path(file_path, text)]
if line.startswith("#enable: false"):
return None
if image is None:
print("does not have a valid config")
return None
return (image, depends, stage, artifacts, cache)
def __str__(self) -> str:
name : str = self.location.replace(os.path.sep, '-')
stage : str = self.stage
needs = [ r.replace(os.path.sep, '-') for r in self.depends]
if len(self.artifact) != 0:
artifacts = self.artifact
artifacts = "untracked"
if len(self.cache) != 0:
cache = self.artifact
cache = None
image : str = self.image
script : str = "chmod +x " + self.location + " && " + "./" + self.location
output: str = "\n"
output = output + name + ":\n"
output = output + '\t' + "stage: " + stage + '\n'
output = output + '\t' + "needs: " + str(needs) + '\n'
output = output + '\t' + "image:\n\t\tname: " + image + "\n\t\tentrypoint: [\"\"]" + '\n'
output = output + '\t' + 'script:\n\t\t- ' + script + '\n'
if artifacts == "untracked":
output = output + '\t' + 'artifacts:\n\t\tuntracked: true\n'
output = output + '\t' + 'artifacts:\n\t\tpaths:\n'
for artifact in artifacts:
output = output + "\t\t\t- \"" + artifact + '\"\n'
if cache is not None:
output = output + '\t' + 'cache:\n\t\tpaths:\n'
for c in cache:
output = output + "\t\t\t- \"" + c + '\"\n'
return output
\ No newline at end of file
from pathlib import Path
import ast
import os
import subprocess
import sys
from job_data import job_data
#I hate this but i work from visual studio code workspace so this fix is needed
dir_path = os.getcwd()
......@@ -11,63 +12,18 @@ def get_all_sh() -> list[str]:
result = list(Path(dir_path).rglob(pattern="*.sh"))
result: list[str] = [str(r.resolve()) for r in result]
return result
def normalize_depends(file_path, depends):
file_dir = os.path.dirname(file_path)
result = []
for depend in depends:
result.append(os.path.join(file_dir, depend).removeprefix(dir_path + os.path.sep).replace("\\", os.path.sep).replace("/", os.path.sep))
return result
def extract_info(file_path : str) -> dict:
info = {}
info["location"] = file_path.removeprefix(dir_path + os.path.sep)
with open(file_path, 'r') as content:
for line in content.readlines():
line = line.removesuffix("\n")
if not line.startswith("#"):
if line.startswith("#image: "):
info["image"] = line.removeprefix("#image: ")
if line.startswith("#depends: "):
info["depends"] = normalize_depends(file_path,ast.literal_eval(line.removeprefix("#depends: ")))
if line.startswith("#stage: "):
info["stage"] = line.removeprefix("#stage: ")
if line.startswith("#artifacts: "):
if '[' in line:
info["artifacts"] = normalize_depends(file_path,ast.literal_eval(line.removeprefix("#artifacts: ")))
info["artifacts"] = [os.path.join(os.path.dirname(info["location"]), line.removeprefix("#artifacts: ").replace('/', os.path.sep))]
if line.startswith("#cache: "):
if '[' in line:
info["cache"] = normalize_depends(file_path,ast.literal_eval(line.removeprefix("#cache: ")))
info["cache"] = [os.path.join(os.path.dirname(info["location"]), line.removeprefix("#cache: ").replace('/', os.path.sep))]
if line.startswith("#enable: false"):
return None
if "image" not in info:
print(f"{file_path} does not have a valid config")
return None
if "parse-ci" in info["location"]:
return None
return info
def process_sh(files_sh : list[str]) -> list:
compile_info = []
def process_sh(files_sh : list[str]) -> list[job_data]:
compile_info : list[job_data] = []
for file in files_sh:
info = extract_info(file)
if info is None:
info: job_data = job_data.construct_from_file(file)
except FileNotFoundError:
return compile_info
def can_run_jon(info : list[dict]):
for i in info:
if len(i["depends"]) == 0:
return True
return False
def get_new_job(info : list[dict]) -> dict:
for i in info:
if len(i["depends"]) == 0:
......@@ -88,10 +44,11 @@ def run_cli_sequential(job : dict) -> int:
job['image'], \
def run_cli_parallel(job : dict) -> dict:
exec_location = os.path.join("/usr/src/project/",os.path.relpath(job['location'] , dir_path)).replace("\\", "/")
f = open(dir_path + os.sep + job['location'].replace("\\", os.sep).replace("/", os.sep) + ".log", 'w')
job["task"] = subprocess.Popen( \
def run_cli_parallel(job : job_data) -> dict:
exec_location = os.path.join("/usr/src/project/",os.path.relpath(job.location , dir_path)).replace("\\", "/")
logfile= job.location.replace("\\", os.sep).replace("/", '') + ".log"
f = open(logfile, 'w')
task = subprocess.Popen( \
args= \
[ "docker", \
"run", \
......@@ -100,140 +57,104 @@ def run_cli_parallel(job : dict) -> dict:
f"{dir_path}:/usr/src/project", \
"--entrypoint", "sh", \
"-it", \
job['image'], \
job.image, \
job["output"] = f
return job
output = f
return (task, output)
def remove_from_depends(process_info : dict, jobname : str) -> dict:
if jobname in process_info["depends"]:
return process_info
def can_run_job(compile_info : list[job_data], jobs):
for j in compile_info:
if j.can_run(compile_info):
if j in [job[0] for job in jobs]:
return True
return False
def get_runnable_job(compile_info : list[job_data], jobs) -> job_data | None:
for j in compile_info:
if j.can_run(compile_info):
if j in [job[0] for job in jobs]:
return j
return None
def run_ci(compile_info : list[dict]) -> bool:
def run_ci(compile_info : list[job_data]) -> bool:
jobs = []
failed_jobs = []
current_jobs = []
waiting_count = len(compile_info)
running_count = 0
completed_count = 0
failed_count = 0
while len(jobs) != 0 or can_run_jon(compile_info) or len(current_jobs) != 0:
removed_job = None
for job in current_jobs:
poll = job["task"].poll()
if poll is not None:
running_count = running_count - 1
if job["task"].returncode != 0:
failed_count = failed_count + 1
print("failed: " + job['location'])
removed_job = job
completed_count = completed_count + 1
print("completed: " + job['location'])
compile_info = [remove_from_depends(r, job['location']) for r in compile_info]
removed_job = job
if removed_job is not None:
print("\U0001f7e9" * (completed_count) + "\U0001f7e5" * failed_count + "\U0001f7ea" * running_count + "\u2B1C" * waiting_count)
for location in [j['location'] for j in current_jobs]:
print("\t" + location)
if can_run_jon(compile_info):
waiting_count = waiting_count - 1
running_count = running_count + 1
new_job = get_new_job(compile_info)
code = run_cli_parallel(new_job)
while can_run_job(compile_info, jobs) or len(jobs) != 0:
change = False
remove_able_jobs = []
for job in jobs:
(job_info,task, output) = job
if task.poll() is not None:
job_info.finished = True
job_info.failed = (task.returncode != 0)
change = True
for r_job in remove_able_jobs:
runnable_job = get_runnable_job(compile_info, jobs)
if runnable_job is not None:
(task, output) = run_cli_parallel(runnable_job)
jobs.append((runnable_job, task, output))
change = True
if change:
completed_count = 0
failed_count = 0
for j in compile_info:
if j.finished == True:
if j.failed == True:
failed_count = failed_count + 1
completed_count = completed_count + 1
running_count = len(jobs)
waiting_count = len(compile_info) - failed_count - completed_count - running_count
print("\U0001f7e9" * (completed_count) + "\U0001f7e5" * failed_count + "\U0001f7ea" * running_count + "\u2B1C" * waiting_count)
for location in [j['location'] for j in current_jobs]:
print("\t" + location)
if len(failed_jobs) != 0:
print(f"failed {len(failed_jobs)} jobs:\n\t{failed_jobs}")
return False
for job in jobs:
(job_info,task, output) = job
print("\t" + job_info.location)
print("failed: ")
for i in compile_info:
if i.failed == True:
print("\t" + i.location)
return True
def job_to_string(job : list[dict]):
name : str = job['location'].replace(os.path.sep, '-')
if "stage" in job:
stage : str = job["stage"]
stage = "generic"
if "depends" in job:
needs : list[str] = [ r.replace(os.path.sep, '-') for r in job["depends"]]
needs = []
if "artifacts" in job:
artifacts = job["artifacts"]
artifacts = "untracked"
if "cache" in job:
cache = job["cache"]
cache = None
image : str = job["image"]
script : str = "chmod +x " + job['location'] + " && " + "./" + job['location']
output: str = "\n"
output = output + name + ":\n"
output = output + '\t' + "stage: " + stage + '\n'
output = output + '\t' + "needs: " + str(needs) + '\n'
output = output + '\t' + "image:\n\t\tname: " + image + "\n\t\tentrypoint: [\"\"]" + '\n'
output = output + '\t' + 'script:\n\t\t- ' + script + '\n'
if artifacts == "untracked":
output = output + '\t' + 'artifacts:\n\t\tuntracked: true\n'
output = output + '\t' + 'artifacts:\n\t\tpaths:\n'
for artifact in artifacts:
output = output + "\t\t\t- \"" + artifact + '\"\n'
if cache is not None:
output = output + '\t' + 'cache:\n\t\tpaths:\n'
for c in cache:
output = output + "\t\t\t- \"" + c + '\"\n'
return output
def get_stages(compile_info : list[dict]) -> list[str]:
def get_stages(compile_info : list[job_data]) -> list[str]:
stages = []
for i in compile_info:
if "stage" not in i:
if "generic" in stages:
stages.insert(0, "generic")
if i["stage"] not in stages:
if i.stage not in stages:
return sorted(stages)
def generate_ci(compile_info : list[dict]) -> str:
def generate_ci(compile_info : list[job_data]) -> str:
output = ""
stages = get_stages(compile_info)
output = output + "stages: \n"
for stage in stages:
output = output + "\t- " + stage + '\n'
for job in compile_info:
output = output + job_to_string(job)
output = output + str(job) + "\n"
return output
info = process_sh(files_sh=get_all_sh())
if "--ci" in sys.argv:
with open(os.path.join(dir_path, "gitlab-ci-generated.yml"), 'w') as ci:
ci.write(generate_ci(compile_info=process_sh(files_sh=get_all_sh())).replace("\t", " "))
ci.write(generate_ci(info).replace("\t", " ").replace("\\", "\\\\"))
if run_ci(compile_info=process_sh(files_sh=get_all_sh())) != True:
if not run_ci(compile_info=info):
