From c49170d0b2598c954fe2031955f7c2159f319339 Mon Sep 17 00:00:00 2001 From: Drew DeVault Date: Wed, 14 Aug 2019 14:24:12 +0900 Subject: [PATCH] Implement tailing build logs over SSH --- buildsrht-shell | 89 ++++++++++++++++++++++++++++++++++++++--------- worker/context.go | 10 +++++- worker/http.go | 10 ++++++ 3 files changed, 92 insertions(+), 17 deletions(-) diff --git a/buildsrht-shell b/buildsrht-shell index c74c406..56b9e21 100755 --- a/buildsrht-shell +++ b/buildsrht-shell @@ -1,9 +1,15 @@ #!/usr/bin/env python3 +from buildsrht.manifest import Manifest +from datetime import datetime +from humanize import naturaltime from srht.config import cfg, get_origin import os import requests import shlex +import subprocess import sys +import time +import yaml def fail(reason): owner = cfg("sr.ht", "owner-name") @@ -19,34 +25,85 @@ cmd = shlex.split(cmd) if len(cmd) != 2: fail("Usage: ssh ... connect ") op = cmd[0] -if op not in ["connect"]: +if op not in ["connect", "tail"]: fail("Usage: ssh ... connect ") job_id = int(cmd[1]) -r = requests.get(f"http://localhost:8080/job/{job_id}/info") -if r.status_code != 200: +def get_info(job_id): + r = requests.get(f"http://localhost:8080/job/{job_id}/info") + if r.status_code != 200: + return None + return r.json() + +info = get_info(job_id) +if not info: fail("No such job found.") -info = r.json() meta_origin = get_origin("meta.sr.ht") r = requests.get(f"{meta_origin}/api/ssh-key/{b64key}") if r.status_code == 200: username = r.json()["owner"]["name"] +elif r.status_code == 404: + fail("We don't recognize your SSH key. Make sure you've added it to " + + f"your account.\n{get_origin('meta.sr.ht', external=True)}/keys") else: fail("Temporary authentication failure. Try again later.") if username != info["username"]: fail("You are not permitted to connect to this job.") -sys.stdout.flush() -sys.stderr.flush() -tty = os.open("/dev/tty", os.O_RDWR) -os.dup2(0, tty) -os.execvp("ssh", [ - "ssh", "-qt", - "-p", str(info["port"]), - "-o", "UserKnownHostsFile=/dev/null", - "-o", "StrictHostKeyChecking=no", - "-o", "LogLevel=quiet", - "build@localhost", "bash" -]) +url = f"{get_origin('builds.sr.ht', external=True)}/jobs/~{username}/{job_id}" +print(f"Connected to build job #{job_id} ({info['status']}): {url}") +deadline = datetime.utcfromtimestamp(info["deadline"]) + +manifest = Manifest(yaml.safe_load(info["manifest"])) + +def connect(job_id, info): + print("Your VM will be terminated " + + naturaltime(datetime.utcnow() - deadline)) + print() + sys.stdout.flush() + sys.stderr.flush() + tty = os.open("/dev/tty", os.O_RDWR) + os.dup2(0, tty) + os.execvp("ssh", [ + "ssh", "-qt", + "-p", str(info["port"]), + "-o", "UserKnownHostsFile=/dev/null", + "-o", "StrictHostKeyChecking=no", + "-o", "LogLevel=quiet", + "build@localhost", "bash" + ]) + +def tail(job_id, info): + logs = os.path.join(cfg("builds.sr.ht::worker", "buildlogs"), str(job_id)) + p = subprocess.Popen(["tail", "-f", os.path.join(logs, "log")]) + tasks = set() + procs = [p] + # holy bejeezus this is hacky + while True: + for task in manifest.tasks: + if task.name in tasks: + continue + path = os.path.join(logs, task.name, "log") + if os.path.exists(path): + procs.append(subprocess.Popen( + f"tail -c +1 -f {shlex.quote(path)} | " + + "awk '{ print \"[" + shlex.quote(task.name) + "] \" $0 }'", + shell=True)) + tasks.update({ task.name }) + info = get_info(job_id) + if not info: + break + if info["task"] == info["tasks"]: + for p in procs: + p.kill() + break + time.sleep(3) + +if op == "connect": + if info["task"] != info["tasks"]: + tail(job_id, info) + connect(job_id, info) +elif op == "tail": + tail(job_id, info) diff --git a/worker/context.go b/worker/context.go index ab4ccd3..a2bd3c3 100644 --- a/worker/context.go +++ b/worker/context.go @@ -59,6 +59,7 @@ type JobContext struct { Conf func(section, key string) string Context context.Context Db *sql.DB + Deadline time.Time Job *Job LogDir string LogFile *os.File @@ -66,6 +67,9 @@ type JobContext struct { Manifest *Manifest Port int + NTasks int + Task int + ProcessedTriggers bool } @@ -135,6 +139,7 @@ func (wctx *WorkerContext) RunBuild( Conf: wctx.Conf, Context: goctx, Db: wctx.Db, + Deadline: time.Now().UTC().Add(timeout), Job: job, Manifest: &manifest, } @@ -168,11 +173,14 @@ func (wctx *WorkerContext) RunBuild( ctx.CloneRepos, ctx.RunTasks, } - for _, task := range tasks { + ctx.NTasks = len(tasks) + for i, task := range tasks { + ctx.Task = i if err = task(); err != nil { panic(err) } } + ctx.Task = ctx.NTasks if manifest.Shell { ctx.Log.Println() diff --git a/worker/http.go b/worker/http.go index 3d9d9bd..f8ba97a 100644 --- a/worker/http.go +++ b/worker/http.go @@ -30,14 +30,24 @@ func HttpServer() { if job, ok := jobs[jobId]; ok { w.WriteHeader(200) bytes, _ := json.Marshal(struct { + Deadline int64 `json:"deadline"` + Manifest string `json:"manifest"` Note *string `json:"note"` OwnerId int `json:"owner_id"` Port int `json:"port"` + Status string `json:"status"` + Task int `json:"task"` + Tasks int `json:"tasks"` Username string `json:"username"` } { + Deadline: job.Deadline.Unix(), + Manifest: job.Job.Manifest, Note: job.Job.Note, OwnerId: job.Job.OwnerId, Port: job.Port, + Status: job.Job.Status, + Task: job.Task, + Tasks: job.NTasks, Username: job.Job.Username, }) w.Write(bytes) -- 2.38.5