from pathlib import Path import ast import os import subprocess import sys #I hate this but i work from visual studio code workspace so this fix is needed dir_path = os.getcwd() def get_all_sh() -> list[str]: result = list(Path(dir_path).rglob(pattern="*.sh")) result: list[str] = [str(r.resolve()) for r in result] return result def normalize_depends(file_path, depends): file_dir = os.path.dirname(file_path) result = [] for depend in depends: result.append(os.path.join(file_dir, depend).removeprefix(dir_path + os.path.sep)) return result def extract_info(file_path : str) -> dict: info = {} info["location"] = file_path.removeprefix(dir_path + os.path.sep) with open(file_path, 'r') as content: for line in content.readlines(): line = line.removesuffix("\n") if not line.startswith("#"): break if line.startswith("#image: "): info["image"] = line.removeprefix("#image: ") if line.startswith("#depends: "): info["depends"] = normalize_depends(file_path,ast.literal_eval(line.removeprefix("#depends: "))) if line.startswith("#stage: "): info["stage"] = line.removeprefix("#stage: ") if line.startswith("#artifacts: "): info["artifacts"] = os.path.join(os.path.dirname(info["location"]), line.removeprefix("#artifacts: ").replace('/', os.path.sep)) if line.startswith("#enable: false"): return None if "image" not in info: print(f"{file_path} does not have a valid config") return None return info def process_sh(files_sh : list[str]) -> list: compile_info = [] for file in files_sh: info = extract_info(file) if info is None: continue compile_info.append(info) return compile_info def can_run_jon(info : list[dict]): for i in info: if len(i["depends"]) == 0: return True return False def get_new_job(info : list[dict]) -> dict: for i in info: if len(i["depends"]) == 0: return i return None def run_cli_sequential(job : dict) -> None: return subprocess.call(args=["docker", "run", "--rm", "-v", f"{dir_path}:/usr/src/project", "-it", job['image'], "bash", os.path.join("/usr/src/project/",os.path.relpath(job['location'] , dir_path)).replace("\\", "/")]) def remove_from_depends(process_info : dict, jobname : str) -> dict: if jobname in process_info["depends"]: process_info["depends"].remove(jobname) return process_info def run_ci(compile_info : list[dict]) -> list: print(compile_info) jobs = [] while len(jobs) != 0 or can_run_jon(compile_info): new_job = get_new_job(compile_info) print(f"compiling: {new_job['location']}") run_cli_sequential(new_job) compile_info.remove(new_job) compile_info = [remove_from_depends(r, new_job['location']) for r in compile_info] def job_to_string(job : list[dict]): name : str = job['location'].replace(os.path.sep, '-') if "stage" in job: stage : str = job["stage"] else: stage = "generic" if "depends" in job: needs : list[str] = [ r.replace(os.path.sep, '-') for r in job["depends"]] else: needs = [] if "artifacts" in job: artifacts = job["artifacts"] else: artifacts = "untracked" image : str = job["image"] script : str = "bash " + job['location'] output: str = "\n" output = output + name + ":\n" output = output + '\t' + "stage: " + stage + '\n' output = output + '\t' + "needs: " + str(needs) + '\n' output = output + '\t' + "image:\n\t\tname: " + image + "\n\t\tentrypoint: [\"\"]" + '\n' output = output + '\t' + 'script:\n\t\t- ' + script + '\n' if artifacts == "untracked": output = output + '\t' + 'artifacts:\n\t\tuntracked: true\n' else: output = output + '\t' + 'artifacts:\n\t\tpaths:\n\t\t\t- ' + artifacts + '\n' return output def get_stages(compile_info : list[dict]) -> list[str]: stages = [] for i in compile_info: if "stage" not in i: if "generic" in stages: continue stages.insert(0, "generic") continue if i["stage"] not in stages: stages.append(i["stage"]) return sorted(stages) def generate_ci(compile_info : list[dict]) -> str: output = "" stages = get_stages(compile_info) output = output + "stages: \n" for stage in stages: output = output + "\t- " + stage + '\n' for job in compile_info: output = output + job_to_string(job) return output if "--ci" in sys.argv: with open(os.path.join(dir_path, "gitlab-ci-generated.yml"), 'w') as ci: ci.write(generate_ci(compile_info=process_sh(files_sh=get_all_sh())).replace("\t", " ")) else: run_ci(compile_info=process_sh(files_sh=get_all_sh()))