~comcloudway/builds.sr.ht

0a2552096c94306623b2b0daf4fefe8157e1ff48 — Adnan Maolood 2 years ago 73d7801
api/graph: Implement GraphQL-native user webhooks

Implement GraphQL-native user webhooks for job creation events.
A api/graph/model/webhooks.go => api/graph/model/webhooks.go +233 -0
@@ 0,0 1,233 @@
package model

import (
	"context"
	"database/sql"
	"fmt"
	"strconv"
	"time"

	"git.sr.ht/~sircmpwn/core-go/database"
	"git.sr.ht/~sircmpwn/core-go/model"
	sq "github.com/Masterminds/squirrel"
	"github.com/lib/pq"
)

type WebhookDelivery struct {
	UUID            string       `json:"uuid"`
	Date            time.Time    `json:"date"`
	Event           WebhookEvent `json:"event"`
	RequestBody     string       `json:"requestBody"`
	ResponseBody    *string      `json:"responseBody"`
	ResponseHeaders *string      `json:"responseHeaders"`
	ResponseStatus  *int         `json:"responseStatus"`

	ID             int
	SubscriptionID int
	Name           string

	alias  string
	fields *database.ModelFields
}

func (whd *WebhookDelivery) WithName(name string) *WebhookDelivery {
	whd.Name = name
	return whd
}

func (whd *WebhookDelivery) As(alias string) *WebhookDelivery {
	whd.alias = alias
	return whd
}

func (whd *WebhookDelivery) Alias() string {
	return whd.alias
}

func (whd *WebhookDelivery) Table() string {
	return "gql_" + whd.Name + "_wh_delivery"
}

func (whd *WebhookDelivery) Fields() *database.ModelFields {
	if whd.fields != nil {
		return whd.fields
	}
	whd.fields = &database.ModelFields{
		Fields: []*database.FieldMap{
			{"uuid", "uuid", &whd.UUID},
			{"date", "date", &whd.Date},
			{"event", "event", &whd.Event},
			{"request_body", "requestBody", &whd.RequestBody},
			{"response_body", "responseBody", &whd.ResponseBody},
			{"response_headers", "responseHeaders", &whd.ResponseHeaders},
			{"response_status", "responseStatus", &whd.ResponseStatus},

			// Always fetch:
			{"id", "", &whd.ID},
			{"subscription_id", "", &whd.SubscriptionID},
		},
	}
	return whd.fields
}

func (whd *WebhookDelivery) QueryWithCursor(ctx context.Context,
	runner sq.BaseRunner, q sq.SelectBuilder,
	cur *model.Cursor) ([]*WebhookDelivery, *model.Cursor) {
	var (
		err  error
		rows *sql.Rows
	)

	if cur.Next != "" {
		next, _ := strconv.ParseInt(cur.Next, 10, 64)
		q = q.Where(database.WithAlias(whd.alias, "id")+"<= ?", next)
	}
	q = q.
		OrderBy(database.WithAlias(whd.alias, "id") + " DESC").
		Limit(uint64(cur.Count + 1))

	if rows, err = q.RunWith(runner).QueryContext(ctx); err != nil {
		panic(err)
	}
	defer rows.Close()

	var deliveries []*WebhookDelivery
	for rows.Next() {
		var delivery WebhookDelivery
		if err := rows.Scan(database.Scan(ctx, &delivery)...); err != nil {
			panic(err)
		}
		delivery.Name = whd.Name
		deliveries = append(deliveries, &delivery)
	}

	if len(deliveries) > cur.Count {
		cur = &model.Cursor{
			Count:  cur.Count,
			Next:   strconv.Itoa(deliveries[len(deliveries)-1].ID),
			Search: cur.Search,
		}
		deliveries = deliveries[:cur.Count]
	} else {
		cur = nil
	}

	return deliveries, cur
}

type UserWebhookSubscription struct {
	ID     int            `json:"id"`
	Events []WebhookEvent `json:"events"`
	Query  string         `json:"query"`
	URL    string         `json:"url"`

	UserID     int
	AuthMethod string
	ClientID   *string
	TokenHash  *string
	Expires    *time.Time
	Grants     *string
	NodeID     *string

	alias  string
	fields *database.ModelFields
}

func (we *WebhookEvent) Scan(src interface{}) error {
	bytes, ok := src.([]uint8)
	if !ok {
		return fmt.Errorf("Unable to scan from %T into WebhookEvent", src)
	}
	*we = WebhookEvent(string(bytes))
	if !we.IsValid() {
		return fmt.Errorf("%s is not a valid WebhookEvent", string(bytes))
	}
	return nil
}

func (UserWebhookSubscription) IsWebhookSubscription() {}

func (sub *UserWebhookSubscription) As(alias string) *UserWebhookSubscription {
	sub.alias = alias
	return sub
}

func (sub *UserWebhookSubscription) Alias() string {
	return sub.alias
}

func (sub *UserWebhookSubscription) Table() string {
	return "gql_user_wh_sub"
}

func (sub *UserWebhookSubscription) Fields() *database.ModelFields {
	if sub.fields != nil {
		return sub.fields
	}
	sub.fields = &database.ModelFields{
		Fields: []*database.FieldMap{
			{"events", "events", pq.Array(&sub.Events)},
			{"url", "url", &sub.URL},

			// Always fetch:
			{"id", "", &sub.ID},
			{"query", "", &sub.Query},
			{"user_id", "", &sub.UserID},
			{"auth_method", "", &sub.AuthMethod},
			{"token_hash", "", &sub.TokenHash},
			{"client_id", "", &sub.ClientID},
			{"grants", "", &sub.Grants},
			{"expires", "", &sub.Expires},
			{"node_id", "", &sub.NodeID},
		},
	}
	return sub.fields
}

func (sub *UserWebhookSubscription) QueryWithCursor(ctx context.Context,
	runner sq.BaseRunner, q sq.SelectBuilder,
	cur *model.Cursor) ([]WebhookSubscription, *model.Cursor) {
	var (
		err  error
		rows *sql.Rows
	)

	if cur.Next != "" {
		next, _ := strconv.ParseInt(cur.Next, 10, 64)
		q = q.Where(database.WithAlias(sub.alias, "id")+"<= ?", next)
	}
	q = q.
		OrderBy(database.WithAlias(sub.alias, "id")).
		Limit(uint64(cur.Count + 1))

	if rows, err = q.RunWith(runner).QueryContext(ctx); err != nil {
		panic(err)
	}
	defer rows.Close()

	var (
		subs   []WebhookSubscription
		lastID int
	)
	for rows.Next() {
		var sub UserWebhookSubscription
		if err := rows.Scan(database.Scan(ctx, &sub)...); err != nil {
			panic(err)
		}
		subs = append(subs, &sub)
		lastID = sub.ID
	}

	if len(subs) > cur.Count {
		cur = &model.Cursor{
			Count:  cur.Count,
			Next:   strconv.Itoa(lastID),
			Search: cur.Search,
		}
		subs = subs[:cur.Count]
	} else {
		cur = nil
	}

	return subs, cur
}

M api/graph/schema.graphqls => api/graph/schema.graphqls +147 -0
@@ 8,6 8,12 @@ scalar File
"Used to provide a human-friendly description of an access scope"
directive @scopehelp(details: String!) on ENUM_VALUE

"""
This is used to decorate fields which are only accessible with a personal
access token, and are not available to clients using OAuth 2.0 access tokens.
"""
directive @private on FIELD_DEFINITION

enum AccessScope {
  PROFILE @scopehelp(details: "profile information")
  JOBS    @scopehelp(details: "build jobs")


@@ 253,6 259,99 @@ type SecretFile implements Secret {
  data: Binary! @worker
}

type OAuthClient {
  uuid: String!
}

enum WebhookEvent {
  JOB_CREATED @access(scope: JOBS, kind: RO)
}

interface WebhookSubscription {
  id: Int!
  events: [WebhookEvent!]!
  query: String!
  url: String!

  """
  If this webhook was registered by an authorized OAuth 2.0 client, this
  field is non-null.
  """
  client: OAuthClient @private

  "All deliveries which have been sent to this webhook."
  deliveries(cursor: Cursor): WebhookDeliveryCursor!

  "Returns a sample payload for this subscription, for testing purposes"
  sample(event: WebhookEvent!): String!
}

type UserWebhookSubscription implements WebhookSubscription {
  id: Int!
  events: [WebhookEvent!]!
  query: String!
  url: String!
  client: OAuthClient @private
  deliveries(cursor: Cursor): WebhookDeliveryCursor!
  sample(event: WebhookEvent!): String!
}

type WebhookDelivery {
  uuid: String!
  date: Time!
  event: WebhookEvent!
  subscription: WebhookSubscription!
  requestBody: String!

  """
  These details are provided only after a response is received from the
  remote server. If a response is sent whose Content-Type is not text/*, or
  cannot be decoded as UTF-8, the response body will be null. It will be
  truncated after 64 KiB.
  """
  responseBody: String
  responseHeaders: String
  responseStatus: Int
}

interface WebhookPayload {
  uuid: String!
  event: WebhookEvent!
  date: Time!
}

type JobEvent implements WebhookPayload {
  uuid: String!
  event: WebhookEvent!
  date: Time!

  job: Job!
}

"""
A cursor for enumerating a list of webhook deliveries

If there are additional results available, the cursor object may be passed
back into the same endpoint to retrieve another page. If the cursor is null,
there are no remaining results to return.
"""
type WebhookDeliveryCursor {
  results: [WebhookDelivery!]!
  cursor: Cursor
}

"""
A cursor for enumerating a list of webhook subscriptions

If there are additional results available, the cursor object may be passed
back into the same endpoint to retrieve another page. If the cursor is null,
there are no remaining results to return.
"""
type WebhookSubscriptionCursor {
  results: [WebhookSubscription!]!
  cursor: Cursor
}

type Query {
  "Returns API version information."
  version: Version!


@@ 272,6 371,25 @@ type Query {

  "Returns secrets owned by the authenticated user."
  secrets(cursor: Cursor): SecretCursor! @access(scope: SECRETS, kind: RO)

  """
  Returns a list of user webhook subscriptions. For clients
  authenticated with a personal access token, this returns all webhooks
  configured by all GraphQL clients for your account. For clients
  authenticated with an OAuth 2.0 access token, this returns only webhooks
  registered for your client.
  """
  userWebhooks(cursor: Cursor): WebhookSubscriptionCursor!

  "Returns details of a user webhook subscription by its ID."
  userWebhook(id: Int!): WebhookSubscription

  """
  Returns information about the webhook currently being processed. This is
  not valid during normal queries over HTTP, and will return an error if used
  outside of a webhook context.
  """
  webhook: WebhookPayload!
}

enum TriggerType {


@@ 296,6 414,12 @@ input TriggerInput {
  webhook: WebhookTriggerInput
}

input UserWebhookInput {
  url: String!
  events: [WebhookEvent!]!
  query: String!
}

type Mutation {
  """
  Submits a new job to the queue.


@@ 341,4 465,27 @@ type Mutation {

  "Uploads a build artifact."
  createArtifact(jobId: Int!, path: String!, contents: File!): Artifact @worker

  """
  Creates a new user webhook subscription. When an event from the
  provided list of events occurs, the 'query' parameter (a GraphQL query)
  will be evaluated and the results will be sent to the provided URL as the
  body of an HTTP POST request. The list of events must include at least one
  event, and no duplicates.

  This query is evaluated in the webhook context, such that query { webhook }
  may be used to access details of the event which trigged the webhook. The
  query may not make any mutations.
  """
  createUserWebhook(config: UserWebhookInput!): WebhookSubscription!

  """
  Deletes a user webhook. Any events already queued may still be
  delivered after this request completes. Clients authenticated with a
  personal access token may delete any webhook registered for their account,
  but authorized OAuth 2.0 clients may only delete their own webhooks.
  Manually deleting a webhook configured by a third-party client may cause
  unexpected behavior with the third-party integration.
  """
  deleteUserWebhook(id: Int!): WebhookSubscription!
}

M api/graph/schema.resolvers.go => api/graph/schema.resolvers.go +323 -0
@@ 10,16 10,22 @@ import (
	"encoding/json"
	"fmt"
	"net/http"
	"net/url"
	"strings"
	"time"

	"git.sr.ht/~sircmpwn/builds.sr.ht/api/graph/api"
	"git.sr.ht/~sircmpwn/builds.sr.ht/api/graph/model"
	"git.sr.ht/~sircmpwn/builds.sr.ht/api/loaders"
	"git.sr.ht/~sircmpwn/builds.sr.ht/api/webhooks"
	"git.sr.ht/~sircmpwn/core-go/auth"
	"git.sr.ht/~sircmpwn/core-go/config"
	"git.sr.ht/~sircmpwn/core-go/database"
	coremodel "git.sr.ht/~sircmpwn/core-go/model"
	"git.sr.ht/~sircmpwn/core-go/server"
	corewebhooks "git.sr.ht/~sircmpwn/core-go/webhooks"
	sq "github.com/Masterminds/squirrel"
	"github.com/google/uuid"
	"github.com/lib/pq"
	yaml "gopkg.in/yaml.v2"
)


@@ 313,6 319,7 @@ func (r *mutationResolver) Submit(ctx context.Context, manifest string, tags []s
		}
	}

	webhooks.DeliverUserJobEvent(ctx, model.WebhookEventJobCreated, &job)
	return &job, nil
}



@@ 563,6 570,109 @@ func (r *mutationResolver) CreateArtifact(ctx context.Context, jobID int, path s
	panic(fmt.Errorf("not implemented"))
}

func (r *mutationResolver) CreateUserWebhook(ctx context.Context, config model.UserWebhookInput) (model.WebhookSubscription, error) {
	schema := server.ForContext(ctx).Schema
	if err := corewebhooks.Validate(schema, config.Query); err != nil {
		return nil, err
	}

	user := auth.ForContext(ctx)
	ac, err := corewebhooks.NewAuthConfig(ctx)
	if err != nil {
		return nil, err
	}

	var sub model.UserWebhookSubscription
	if len(config.Events) == 0 {
		return nil, fmt.Errorf("Must specify at least one event")
	}
	events := make([]string, len(config.Events))
	for i, ev := range config.Events {
		events[i] = ev.String()
		// TODO: gqlgen does not support doing anything useful with directives
		// on enums at the time of writing, so we have to do a little bit of
		// manual fuckery
		var access string
		switch ev {
		case model.WebhookEventJobCreated:
			access = "JOBS"
		default:
			return nil, fmt.Errorf("Unsupported event %s", ev.String())
		}
		if !user.Grants.Has(access, auth.RO) {
			return nil, fmt.Errorf("Insufficient access granted for webhook event %s", ev.String())
		}
	}

	u, err := url.Parse(config.URL)
	if err != nil {
		return nil, err
	} else if u.Host == "" {
		return nil, fmt.Errorf("Cannot use URL without host")
	} else if u.Scheme != "http" && u.Scheme != "https" {
		return nil, fmt.Errorf("Cannot use non-HTTP or HTTPS URL")
	}

	if err := database.WithTx(ctx, nil, func(tx *sql.Tx) error {
		row := tx.QueryRowContext(ctx, `
			INSERT INTO gql_user_wh_sub (
				created, events, url, query,
				auth_method,
				token_hash, grants, client_id, expires,
				node_id,
				user_id
			) VALUES (
				NOW() at time zone 'utc',
				$1, $2, $3, $4, $5, $6, $7, $8, $9, $10
			) RETURNING id, url, query, events, user_id;`,
			pq.Array(events), config.URL, config.Query,
			ac.AuthMethod,
			ac.TokenHash, ac.Grants, ac.ClientID, ac.Expires, // OAUTH2
			ac.NodeID, // INTERNAL
			user.UserID)

		if err := row.Scan(&sub.ID, &sub.URL,
			&sub.Query, pq.Array(&sub.Events), &sub.UserID); err != nil {
			return err
		}
		return nil
	}); err != nil {
		return nil, err
	}

	return &sub, nil
}

func (r *mutationResolver) DeleteUserWebhook(ctx context.Context, id int) (model.WebhookSubscription, error) {
	var sub model.UserWebhookSubscription

	filter, err := corewebhooks.FilterWebhooks(ctx)
	if err != nil {
		return nil, err
	}

	if err := database.WithTx(ctx, nil, func(tx *sql.Tx) error {
		row := sq.Delete(`gql_user_wh_sub`).
			PlaceholderFormat(sq.Dollar).
			Where(sq.And{sq.Expr(`id = ?`, id), filter}).
			Suffix(`RETURNING id, url, query, events, user_id`).
			RunWith(tx).
			QueryRowContext(ctx)
		if err := row.Scan(&sub.ID, &sub.URL,
			&sub.Query, pq.Array(&sub.Events), &sub.UserID); err != nil {
			return err
		}
		return nil
	}); err != nil {
		if err == sql.ErrNoRows {
			return nil, fmt.Errorf("No user webhook by ID %d found for this user", id)
		}
		return nil, err
	}

	return &sub, nil
}

func (r *pGPKeyResolver) PrivateKey(ctx context.Context, obj *model.PGPKey) (string, error) {
	// TODO: This is simple to implement, but I'm not going to rig it up until
	// we need it


@@ 662,6 772,79 @@ func (r *queryResolver) Secrets(ctx context.Context, cursor *coremodel.Cursor) (
	return &model.SecretCursor{secrets, cursor}, nil
}

func (r *queryResolver) UserWebhooks(ctx context.Context, cursor *coremodel.Cursor) (*model.WebhookSubscriptionCursor, error) {
	if cursor == nil {
		cursor = coremodel.NewCursor(nil)
	}

	filter, err := corewebhooks.FilterWebhooks(ctx)
	if err != nil {
		return nil, err
	}

	var subs []model.WebhookSubscription
	if err := database.WithTx(ctx, &sql.TxOptions{
		Isolation: 0,
		ReadOnly:  true,
	}, func(tx *sql.Tx) error {
		sub := (&model.UserWebhookSubscription{}).As(`sub`)
		query := database.
			Select(ctx, sub).
			From(`gql_user_wh_sub sub`).
			Where(filter)
		subs, cursor = sub.QueryWithCursor(ctx, tx, query, cursor)
		return nil
	}); err != nil {
		return nil, err
	}

	return &model.WebhookSubscriptionCursor{subs, cursor}, nil
}

func (r *queryResolver) UserWebhook(ctx context.Context, id int) (model.WebhookSubscription, error) {
	var sub model.UserWebhookSubscription

	filter, err := corewebhooks.FilterWebhooks(ctx)
	if err != nil {
		return nil, err
	}

	if err := database.WithTx(ctx, &sql.TxOptions{
		Isolation: 0,
		ReadOnly:  true,
	}, func(tx *sql.Tx) error {
		row := database.
			Select(ctx, &sub).
			From(`gql_user_wh_sub`).
			Where(sq.And{sq.Expr(`id = ?`, id), filter}).
			RunWith(tx).
			QueryRowContext(ctx)
		if err := row.Scan(database.Scan(ctx, &sub)...); err != nil {
			return err
		}
		return nil
	}); err != nil {
		if err == sql.ErrNoRows {
			return nil, fmt.Errorf("No user webhook by ID %d found for this user", id)
		}
		return nil, err
	}

	return &sub, nil
}

func (r *queryResolver) Webhook(ctx context.Context) (model.WebhookPayload, error) {
	raw, err := corewebhooks.Payload(ctx)
	if err != nil {
		return nil, err
	}
	payload, ok := raw.(model.WebhookPayload)
	if !ok {
		panic("Invalid webhook payload context")
	}
	return payload, nil
}

func (r *sSHKeyResolver) PrivateKey(ctx context.Context, obj *model.SSHKey) (string, error) {
	// TODO: This is simple to implement, but I'm not going to rig it up until
	// we need it


@@ 710,6 893,136 @@ func (r *userResolver) Jobs(ctx context.Context, obj *model.User, cursor *coremo
	return &model.JobCursor{jobs, cursor}, nil
}

func (r *userWebhookSubscriptionResolver) Client(ctx context.Context, obj *model.UserWebhookSubscription) (*model.OAuthClient, error) {
	if obj.ClientID == nil {
		return nil, nil
	}
	return &model.OAuthClient{
		UUID: *obj.ClientID,
	}, nil
}

func (r *userWebhookSubscriptionResolver) Deliveries(ctx context.Context, obj *model.UserWebhookSubscription, cursor *coremodel.Cursor) (*model.WebhookDeliveryCursor, error) {
	if cursor == nil {
		cursor = coremodel.NewCursor(nil)
	}

	var deliveries []*model.WebhookDelivery
	if err := database.WithTx(ctx, &sql.TxOptions{
		Isolation: 0,
		ReadOnly:  true,
	}, func(tx *sql.Tx) error {
		d := (&model.WebhookDelivery{}).
			WithName(`user`).
			As(`delivery`)
		query := database.
			Select(ctx, d).
			From(`gql_user_wh_delivery delivery`).
			Where(`delivery.subscription_id = ?`, obj.ID)
		deliveries, cursor = d.QueryWithCursor(ctx, tx, query, cursor)
		return nil
	}); err != nil {
		return nil, err
	}

	return &model.WebhookDeliveryCursor{deliveries, cursor}, nil
}

func (r *userWebhookSubscriptionResolver) Sample(ctx context.Context, obj *model.UserWebhookSubscription, event model.WebhookEvent) (string, error) {
	payloadUUID := uuid.New()
	webhook := corewebhooks.WebhookContext{
		User:        auth.ForContext(ctx),
		PayloadUUID: payloadUUID,
		Name:        "user",
		Event:       event.String(),
		Subscription: &corewebhooks.WebhookSubscription{
			ID:         obj.ID,
			URL:        obj.URL,
			Query:      obj.Query,
			AuthMethod: obj.AuthMethod,
			TokenHash:  obj.TokenHash,
			Grants:     obj.Grants,
			ClientID:   obj.ClientID,
			Expires:    obj.Expires,
			NodeID:     obj.NodeID,
		},
	}

	auth := auth.ForContext(ctx)
	switch event {
	case model.WebhookEventJobCreated:
		note := "Sample job"
		webhook.Payload = &model.JobEvent{
			UUID:  payloadUUID.String(),
			Event: event,
			Date:  time.Now().UTC(),
			Job: &model.Job{
				ID:         -1,
				Created:    time.Now().UTC(),
				Updated:    time.Now().UTC(),
				Manifest:   "image: alpine/latest\ntasks:\n  - hello: echo hello world",
				Note:       &note,
				Image:      "alpine/latest",
				Runner:     nil,
				OwnerID:    auth.UserID,
				JobGroupID: nil,
				RawTags:    nil,
				RawStatus:  "success",
			},
		}
	default:
		return "", fmt.Errorf("Unsupported event %s", event.String())
	}

	subctx := corewebhooks.Context(ctx, webhook.Payload)
	bytes, err := webhook.Exec(subctx, server.ForContext(ctx).Schema)
	if err != nil {
		return "", err
	}
	return string(bytes), nil
}

func (r *webhookDeliveryResolver) Subscription(ctx context.Context, obj *model.WebhookDelivery) (model.WebhookSubscription, error) {
	if obj.Name == "" {
		panic("WebhookDelivery without name")
	}

	// XXX: This could use a loader but it's unlikely to be a bottleneck
	var sub model.WebhookSubscription
	if err := database.WithTx(ctx, &sql.TxOptions{
		Isolation: 0,
		ReadOnly:  true,
	}, func(tx *sql.Tx) error {
		// XXX: This needs some work to generalize to other kinds of webhooks
		var subscription interface {
			model.WebhookSubscription
			database.Model
		} = nil
		switch obj.Name {
		case "user":
			subscription = (&model.UserWebhookSubscription{}).As(`sub`)
		default:
			panic(fmt.Errorf("unknown webhook name %q", obj.Name))
		}
		// Note: No filter needed because, if we have access to the delivery,
		// we also have access to the subscription.
		row := database.
			Select(ctx, subscription).
			From(`gql_`+obj.Name+`_wh_sub sub`).
			Where(`sub.id = ?`, obj.SubscriptionID).
			RunWith(tx).
			QueryRowContext(ctx)
		if err := row.Scan(database.Scan(ctx, subscription)...); err != nil {
			return err
		}
		sub = subscription
		return nil
	}); err != nil {
		return nil, err
	}
	return sub, nil
}

// Job returns api.JobResolver implementation.
func (r *Resolver) Job() api.JobResolver { return &jobResolver{r} }



@@ 737,6 1050,14 @@ func (r *Resolver) Task() api.TaskResolver { return &taskResolver{r} }
// User returns api.UserResolver implementation.
func (r *Resolver) User() api.UserResolver { return &userResolver{r} }

// UserWebhookSubscription returns api.UserWebhookSubscriptionResolver implementation.
func (r *Resolver) UserWebhookSubscription() api.UserWebhookSubscriptionResolver {
	return &userWebhookSubscriptionResolver{r}
}

// WebhookDelivery returns api.WebhookDeliveryResolver implementation.
func (r *Resolver) WebhookDelivery() api.WebhookDeliveryResolver { return &webhookDeliveryResolver{r} }

type jobResolver struct{ *Resolver }
type jobGroupResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver }


@@ 746,3 1067,5 @@ type sSHKeyResolver struct{ *Resolver }
type secretFileResolver struct{ *Resolver }
type taskResolver struct{ *Resolver }
type userResolver struct{ *Resolver }
type userWebhookSubscriptionResolver struct{ *Resolver }
type webhookDeliveryResolver struct{ *Resolver }

M api/server.go => api/server.go +6 -1
@@ 6,6 6,7 @@ import (

	"git.sr.ht/~sircmpwn/core-go/config"
	"git.sr.ht/~sircmpwn/core-go/server"
	"git.sr.ht/~sircmpwn/core-go/webhooks"
	"github.com/99designs/gqlgen/graphql"

	"git.sr.ht/~sircmpwn/builds.sr.ht/api/graph"


@@ 18,6 19,7 @@ func main() {
	appConfig := config.LoadConfig(":5102")

	gqlConfig := api.Config{Resolvers: &graph.Resolver{}}
	gqlConfig.Directives.Private = server.Private
	gqlConfig.Directives.Access = func(ctx context.Context, obj interface{},
		next graphql.Resolver, scope model.AccessScope,
		kind model.AccessKind) (interface{}, error) {


@@ 34,9 36,12 @@ func main() {
		scopes[i] = s.String()
	}

	webhookQueue := webhooks.NewQueue(schema)

	server.NewServer("builds.sr.ht", appConfig).
		WithDefaultMiddleware().
		WithMiddleware(loaders.Middleware).
		WithMiddleware(loaders.Middleware, webhooks.Middleware(webhookQueue)).
		WithSchema(schema, scopes).
		WithQueues(webhookQueue.Queue).
		Run()
}

A api/webhooks/webhooks.go => api/webhooks/webhooks.go +37 -0
@@ 0,0 1,37 @@
package webhooks

import (
	"context"
	"time"

	"git.sr.ht/~sircmpwn/core-go/auth"
	"git.sr.ht/~sircmpwn/core-go/webhooks"
	sq "github.com/Masterminds/squirrel"
	"github.com/google/uuid"

	"git.sr.ht/~sircmpwn/builds.sr.ht/api/graph/model"
)

func deliverUserWebhook(ctx context.Context, event model.WebhookEvent,
	payload model.WebhookPayload, payloadUUID uuid.UUID) {
	q := webhooks.ForContext(ctx)
	userID := auth.ForContext(ctx).UserID
	query := sq.
		Select().
		From("gql_user_wh_sub sub").
		Where("sub.user_id = ?", userID)
	q.Schedule(ctx, query, "user", event.String(),
		payloadUUID, payload)
}

func DeliverUserJobEvent(ctx context.Context,
	event model.WebhookEvent, job *model.Job) {
	payloadUUID := uuid.New()
	payload := model.JobEvent{
		UUID:  payloadUUID.String(),
		Event: event,
		Date:  time.Now().UTC(),
		Job:   job,
	}
	deliverUserWebhook(ctx, event, &payload, payloadUUID)
}

A buildsrht/alembic/versions/f79186791a25_add_graphql_user_webhook_tables.py => buildsrht/alembic/versions/f79186791a25_add_graphql_user_webhook_tables.py +71 -0
@@ 0,0 1,71 @@
"""Add GraphQL user webhook tables

Revision ID: f79186791a25
Revises: c0dd67b07f2b
Create Date: 2022-06-13 13:17:54.559113

"""

# revision identifiers, used by Alembic.
revision = 'f79186791a25'
down_revision = 'c0dd67b07f2b'

from alembic import op
import sqlalchemy as sa


def upgrade():
    op.execute("""
    CREATE TYPE webhook_event AS ENUM (
        'JOB_CREATED'
    );

    CREATE TYPE auth_method AS ENUM (
        'OAUTH_LEGACY',
        'OAUTH2',
        'COOKIE',
        'INTERNAL',
        'WEBHOOK'
    );

    CREATE TABLE gql_user_wh_sub (
        id serial PRIMARY KEY,
        created timestamp NOT NULL,
        events webhook_event[] NOT NULL check (array_length(events, 1) > 0),
        url varchar NOT NULL,
        query varchar NOT NULL,

        auth_method auth_method NOT NULL check (auth_method in ('OAUTH2', 'INTERNAL')),
        token_hash varchar(128) check ((auth_method = 'OAUTH2') = (token_hash IS NOT NULL)),
        grants varchar,
        client_id uuid,
        expires timestamp check ((auth_method = 'OAUTH2') = (expires IS NOT NULL)),
        node_id varchar check ((auth_method = 'INTERNAL') = (node_id IS NOT NULL)),

        user_id integer NOT NULL references "user"(id)
    );

    CREATE INDEX gql_user_wh_sub_token_hash_idx ON gql_user_wh_sub (token_hash);

    CREATE TABLE gql_user_wh_delivery (
        id serial PRIMARY KEY,
        uuid uuid NOT NULL,
        date timestamp NOT NULL,
        event webhook_event NOT NULL,
        subscription_id integer NOT NULL references gql_user_wh_sub(id) ON DELETE CASCADE,
        request_body varchar NOT NULL,
        response_body varchar,
        response_headers varchar,
        response_status integer
    );
    """)


def downgrade():
    op.execute("""
    DROP TABLE gql_user_wh_delivery;
    DROP INDEX gql_user_wh_sub_token_hash_idx;
    DROP TABLE gql_user_wh_sub;
    DROP TYPE auth_method;
    DROP TYPE webhook_event;
    """)