~comcloudway/builds.sr.ht

c49170d0b2598c954fe2031955f7c2159f319339 — Drew DeVault 5 years ago f8edadb
Implement tailing build logs over SSH
3 files changed, 92 insertions(+), 17 deletions(-)

M buildsrht-shell
M worker/context.go
M worker/http.go
M buildsrht-shell => buildsrht-shell +73 -16
@@ 1,9 1,15 @@
#!/usr/bin/env python3
from buildsrht.manifest import Manifest
from datetime import datetime
from humanize import naturaltime
from srht.config import cfg, get_origin
import os
import requests
import shlex
import subprocess
import sys
import time
import yaml

def fail(reason):
    owner = cfg("sr.ht", "owner-name")


@@ 19,34 25,85 @@ cmd = shlex.split(cmd)
if len(cmd) != 2:
    fail("Usage: ssh ... connect <job ID>")
op = cmd[0]
if op not in ["connect"]:
if op not in ["connect", "tail"]:
    fail("Usage: ssh ... connect <job ID>")
job_id = int(cmd[1])

r = requests.get(f"http://localhost:8080/job/{job_id}/info")
if r.status_code != 200:
def get_info(job_id):
    r = requests.get(f"http://localhost:8080/job/{job_id}/info")
    if r.status_code != 200:
        return None
    return r.json()

info = get_info(job_id)
if not info:
    fail("No such job found.")
info = r.json()

meta_origin = get_origin("meta.sr.ht")
r = requests.get(f"{meta_origin}/api/ssh-key/{b64key}")
if r.status_code == 200:
    username = r.json()["owner"]["name"]
elif r.status_code == 404:
    fail("We don't recognize your SSH key. Make sure you've added it to " +
        f"your account.\n{get_origin('meta.sr.ht', external=True)}/keys")
else:
    fail("Temporary authentication failure. Try again later.")

if username != info["username"]:
    fail("You are not permitted to connect to this job.")

sys.stdout.flush()
sys.stderr.flush()
tty = os.open("/dev/tty", os.O_RDWR)
os.dup2(0, tty)
os.execvp("ssh", [
    "ssh", "-qt",
    "-p", str(info["port"]),
    "-o", "UserKnownHostsFile=/dev/null",
    "-o", "StrictHostKeyChecking=no",
    "-o", "LogLevel=quiet",
    "build@localhost", "bash"
])
url = f"{get_origin('builds.sr.ht', external=True)}/jobs/~{username}/{job_id}"
print(f"Connected to build job #{job_id} ({info['status']}): {url}")
deadline = datetime.utcfromtimestamp(info["deadline"])

manifest = Manifest(yaml.safe_load(info["manifest"]))

def connect(job_id, info):
    print("Your VM will be terminated "
            + naturaltime(datetime.utcnow() - deadline))
    print()
    sys.stdout.flush()
    sys.stderr.flush()
    tty = os.open("/dev/tty", os.O_RDWR)
    os.dup2(0, tty)
    os.execvp("ssh", [
        "ssh", "-qt",
        "-p", str(info["port"]),
        "-o", "UserKnownHostsFile=/dev/null",
        "-o", "StrictHostKeyChecking=no",
        "-o", "LogLevel=quiet",
        "build@localhost", "bash"
    ])

def tail(job_id, info):
    logs = os.path.join(cfg("builds.sr.ht::worker", "buildlogs"), str(job_id))
    p = subprocess.Popen(["tail", "-f", os.path.join(logs, "log")])
    tasks = set()
    procs = [p]
    # holy bejeezus this is hacky
    while True:
        for task in manifest.tasks:
            if task.name in tasks:
                continue
            path = os.path.join(logs, task.name, "log")
            if os.path.exists(path):
                procs.append(subprocess.Popen(
                    f"tail -c +1 -f {shlex.quote(path)} | " +
                    "awk '{ print \"[" + shlex.quote(task.name) + "] \" $0 }'",
                    shell=True))
                tasks.update({ task.name })
        info = get_info(job_id)
        if not info:
            break
        if info["task"] == info["tasks"]:
            for p in procs:
                p.kill()
            break
        time.sleep(3)

if op == "connect":
    if info["task"] != info["tasks"]:
        tail(job_id, info)
    connect(job_id, info)
elif op == "tail":
    tail(job_id, info)

M worker/context.go => worker/context.go +9 -1
@@ 59,6 59,7 @@ type JobContext struct {
	Conf     func(section, key string) string
	Context  context.Context
	Db       *sql.DB
	Deadline time.Time
	Job      *Job
	LogDir   string
	LogFile  *os.File


@@ 66,6 67,9 @@ type JobContext struct {
	Manifest *Manifest
	Port     int

	NTasks int
	Task   int

	ProcessedTriggers bool
}



@@ 135,6 139,7 @@ func (wctx *WorkerContext) RunBuild(
		Conf:     wctx.Conf,
		Context:  goctx,
		Db:       wctx.Db,
		Deadline: time.Now().UTC().Add(timeout),
		Job:      job,
		Manifest: &manifest,
	}


@@ 168,11 173,14 @@ func (wctx *WorkerContext) RunBuild(
		ctx.CloneRepos,
		ctx.RunTasks,
	}
	for _, task := range tasks {
	ctx.NTasks = len(tasks)
	for i, task := range tasks {
		ctx.Task = i
		if err = task(); err != nil {
			panic(err)
		}
	}
	ctx.Task = ctx.NTasks

	if manifest.Shell {
		ctx.Log.Println()

M worker/http.go => worker/http.go +10 -0
@@ 30,14 30,24 @@ func HttpServer() {
			if job, ok := jobs[jobId]; ok {
				w.WriteHeader(200)
				bytes, _ := json.Marshal(struct {
					Deadline int64   `json:"deadline"`
					Manifest string  `json:"manifest"`
					Note     *string `json:"note"`
					OwnerId  int     `json:"owner_id"`
					Port     int     `json:"port"`
					Status   string  `json:"status"`
					Task     int     `json:"task"`
					Tasks    int     `json:"tasks"`
					Username string  `json:"username"`
				} {
					Deadline: job.Deadline.Unix(),
					Manifest: job.Job.Manifest,
					Note:     job.Job.Note,
					OwnerId:  job.Job.OwnerId,
					Port:     job.Port,
					Status:   job.Job.Status,
					Task:     job.Task,
					Tasks:    job.NTasks,
					Username: job.Job.Username,
				})
				w.Write(bytes)