From 0a2552096c94306623b2b0daf4fefe8157e1ff48 Mon Sep 17 00:00:00 2001 From: Adnan Maolood Date: Mon, 13 Jun 2022 16:52:21 -0400 Subject: [PATCH] api/graph: Implement GraphQL-native user webhooks Implement GraphQL-native user webhooks for job creation events. --- api/graph/model/webhooks.go | 233 +++++++++++++ api/graph/schema.graphqls | 147 ++++++++ api/graph/schema.resolvers.go | 323 ++++++++++++++++++ api/server.go | 7 +- api/webhooks/webhooks.go | 37 ++ ...6791a25_add_graphql_user_webhook_tables.py | 71 ++++ 6 files changed, 817 insertions(+), 1 deletion(-) create mode 100644 api/graph/model/webhooks.go create mode 100644 api/webhooks/webhooks.go create mode 100644 buildsrht/alembic/versions/f79186791a25_add_graphql_user_webhook_tables.py diff --git a/api/graph/model/webhooks.go b/api/graph/model/webhooks.go new file mode 100644 index 0000000..8bd08d1 --- /dev/null +++ b/api/graph/model/webhooks.go @@ -0,0 +1,233 @@ +package model + +import ( + "context" + "database/sql" + "fmt" + "strconv" + "time" + + "git.sr.ht/~sircmpwn/core-go/database" + "git.sr.ht/~sircmpwn/core-go/model" + sq "github.com/Masterminds/squirrel" + "github.com/lib/pq" +) + +type WebhookDelivery struct { + UUID string `json:"uuid"` + Date time.Time `json:"date"` + Event WebhookEvent `json:"event"` + RequestBody string `json:"requestBody"` + ResponseBody *string `json:"responseBody"` + ResponseHeaders *string `json:"responseHeaders"` + ResponseStatus *int `json:"responseStatus"` + + ID int + SubscriptionID int + Name string + + alias string + fields *database.ModelFields +} + +func (whd *WebhookDelivery) WithName(name string) *WebhookDelivery { + whd.Name = name + return whd +} + +func (whd *WebhookDelivery) As(alias string) *WebhookDelivery { + whd.alias = alias + return whd +} + +func (whd *WebhookDelivery) Alias() string { + return whd.alias +} + +func (whd *WebhookDelivery) Table() string { + return "gql_" + whd.Name + "_wh_delivery" +} + +func (whd *WebhookDelivery) Fields() *database.ModelFields { + if whd.fields != nil { + return whd.fields + } + whd.fields = &database.ModelFields{ + Fields: []*database.FieldMap{ + {"uuid", "uuid", &whd.UUID}, + {"date", "date", &whd.Date}, + {"event", "event", &whd.Event}, + {"request_body", "requestBody", &whd.RequestBody}, + {"response_body", "responseBody", &whd.ResponseBody}, + {"response_headers", "responseHeaders", &whd.ResponseHeaders}, + {"response_status", "responseStatus", &whd.ResponseStatus}, + + // Always fetch: + {"id", "", &whd.ID}, + {"subscription_id", "", &whd.SubscriptionID}, + }, + } + return whd.fields +} + +func (whd *WebhookDelivery) QueryWithCursor(ctx context.Context, + runner sq.BaseRunner, q sq.SelectBuilder, + cur *model.Cursor) ([]*WebhookDelivery, *model.Cursor) { + var ( + err error + rows *sql.Rows + ) + + if cur.Next != "" { + next, _ := strconv.ParseInt(cur.Next, 10, 64) + q = q.Where(database.WithAlias(whd.alias, "id")+"<= ?", next) + } + q = q. + OrderBy(database.WithAlias(whd.alias, "id") + " DESC"). + Limit(uint64(cur.Count + 1)) + + if rows, err = q.RunWith(runner).QueryContext(ctx); err != nil { + panic(err) + } + defer rows.Close() + + var deliveries []*WebhookDelivery + for rows.Next() { + var delivery WebhookDelivery + if err := rows.Scan(database.Scan(ctx, &delivery)...); err != nil { + panic(err) + } + delivery.Name = whd.Name + deliveries = append(deliveries, &delivery) + } + + if len(deliveries) > cur.Count { + cur = &model.Cursor{ + Count: cur.Count, + Next: strconv.Itoa(deliveries[len(deliveries)-1].ID), + Search: cur.Search, + } + deliveries = deliveries[:cur.Count] + } else { + cur = nil + } + + return deliveries, cur +} + +type UserWebhookSubscription struct { + ID int `json:"id"` + Events []WebhookEvent `json:"events"` + Query string `json:"query"` + URL string `json:"url"` + + UserID int + AuthMethod string + ClientID *string + TokenHash *string + Expires *time.Time + Grants *string + NodeID *string + + alias string + fields *database.ModelFields +} + +func (we *WebhookEvent) Scan(src interface{}) error { + bytes, ok := src.([]uint8) + if !ok { + return fmt.Errorf("Unable to scan from %T into WebhookEvent", src) + } + *we = WebhookEvent(string(bytes)) + if !we.IsValid() { + return fmt.Errorf("%s is not a valid WebhookEvent", string(bytes)) + } + return nil +} + +func (UserWebhookSubscription) IsWebhookSubscription() {} + +func (sub *UserWebhookSubscription) As(alias string) *UserWebhookSubscription { + sub.alias = alias + return sub +} + +func (sub *UserWebhookSubscription) Alias() string { + return sub.alias +} + +func (sub *UserWebhookSubscription) Table() string { + return "gql_user_wh_sub" +} + +func (sub *UserWebhookSubscription) Fields() *database.ModelFields { + if sub.fields != nil { + return sub.fields + } + sub.fields = &database.ModelFields{ + Fields: []*database.FieldMap{ + {"events", "events", pq.Array(&sub.Events)}, + {"url", "url", &sub.URL}, + + // Always fetch: + {"id", "", &sub.ID}, + {"query", "", &sub.Query}, + {"user_id", "", &sub.UserID}, + {"auth_method", "", &sub.AuthMethod}, + {"token_hash", "", &sub.TokenHash}, + {"client_id", "", &sub.ClientID}, + {"grants", "", &sub.Grants}, + {"expires", "", &sub.Expires}, + {"node_id", "", &sub.NodeID}, + }, + } + return sub.fields +} + +func (sub *UserWebhookSubscription) QueryWithCursor(ctx context.Context, + runner sq.BaseRunner, q sq.SelectBuilder, + cur *model.Cursor) ([]WebhookSubscription, *model.Cursor) { + var ( + err error + rows *sql.Rows + ) + + if cur.Next != "" { + next, _ := strconv.ParseInt(cur.Next, 10, 64) + q = q.Where(database.WithAlias(sub.alias, "id")+"<= ?", next) + } + q = q. + OrderBy(database.WithAlias(sub.alias, "id")). + Limit(uint64(cur.Count + 1)) + + if rows, err = q.RunWith(runner).QueryContext(ctx); err != nil { + panic(err) + } + defer rows.Close() + + var ( + subs []WebhookSubscription + lastID int + ) + for rows.Next() { + var sub UserWebhookSubscription + if err := rows.Scan(database.Scan(ctx, &sub)...); err != nil { + panic(err) + } + subs = append(subs, &sub) + lastID = sub.ID + } + + if len(subs) > cur.Count { + cur = &model.Cursor{ + Count: cur.Count, + Next: strconv.Itoa(lastID), + Search: cur.Search, + } + subs = subs[:cur.Count] + } else { + cur = nil + } + + return subs, cur +} diff --git a/api/graph/schema.graphqls b/api/graph/schema.graphqls index 6481693..6f429fe 100644 --- a/api/graph/schema.graphqls +++ b/api/graph/schema.graphqls @@ -8,6 +8,12 @@ scalar File "Used to provide a human-friendly description of an access scope" directive @scopehelp(details: String!) on ENUM_VALUE +""" +This is used to decorate fields which are only accessible with a personal +access token, and are not available to clients using OAuth 2.0 access tokens. +""" +directive @private on FIELD_DEFINITION + enum AccessScope { PROFILE @scopehelp(details: "profile information") JOBS @scopehelp(details: "build jobs") @@ -253,6 +259,99 @@ type SecretFile implements Secret { data: Binary! @worker } +type OAuthClient { + uuid: String! +} + +enum WebhookEvent { + JOB_CREATED @access(scope: JOBS, kind: RO) +} + +interface WebhookSubscription { + id: Int! + events: [WebhookEvent!]! + query: String! + url: String! + + """ + If this webhook was registered by an authorized OAuth 2.0 client, this + field is non-null. + """ + client: OAuthClient @private + + "All deliveries which have been sent to this webhook." + deliveries(cursor: Cursor): WebhookDeliveryCursor! + + "Returns a sample payload for this subscription, for testing purposes" + sample(event: WebhookEvent!): String! +} + +type UserWebhookSubscription implements WebhookSubscription { + id: Int! + events: [WebhookEvent!]! + query: String! + url: String! + client: OAuthClient @private + deliveries(cursor: Cursor): WebhookDeliveryCursor! + sample(event: WebhookEvent!): String! +} + +type WebhookDelivery { + uuid: String! + date: Time! + event: WebhookEvent! + subscription: WebhookSubscription! + requestBody: String! + + """ + These details are provided only after a response is received from the + remote server. If a response is sent whose Content-Type is not text/*, or + cannot be decoded as UTF-8, the response body will be null. It will be + truncated after 64 KiB. + """ + responseBody: String + responseHeaders: String + responseStatus: Int +} + +interface WebhookPayload { + uuid: String! + event: WebhookEvent! + date: Time! +} + +type JobEvent implements WebhookPayload { + uuid: String! + event: WebhookEvent! + date: Time! + + job: Job! +} + +""" +A cursor for enumerating a list of webhook deliveries + +If there are additional results available, the cursor object may be passed +back into the same endpoint to retrieve another page. If the cursor is null, +there are no remaining results to return. +""" +type WebhookDeliveryCursor { + results: [WebhookDelivery!]! + cursor: Cursor +} + +""" +A cursor for enumerating a list of webhook subscriptions + +If there are additional results available, the cursor object may be passed +back into the same endpoint to retrieve another page. If the cursor is null, +there are no remaining results to return. +""" +type WebhookSubscriptionCursor { + results: [WebhookSubscription!]! + cursor: Cursor +} + type Query { "Returns API version information." version: Version! @@ -272,6 +371,25 @@ type Query { "Returns secrets owned by the authenticated user." secrets(cursor: Cursor): SecretCursor! @access(scope: SECRETS, kind: RO) + + """ + Returns a list of user webhook subscriptions. For clients + authenticated with a personal access token, this returns all webhooks + configured by all GraphQL clients for your account. For clients + authenticated with an OAuth 2.0 access token, this returns only webhooks + registered for your client. + """ + userWebhooks(cursor: Cursor): WebhookSubscriptionCursor! + + "Returns details of a user webhook subscription by its ID." + userWebhook(id: Int!): WebhookSubscription + + """ + Returns information about the webhook currently being processed. This is + not valid during normal queries over HTTP, and will return an error if used + outside of a webhook context. + """ + webhook: WebhookPayload! } enum TriggerType { @@ -296,6 +414,12 @@ input TriggerInput { webhook: WebhookTriggerInput } +input UserWebhookInput { + url: String! + events: [WebhookEvent!]! + query: String! +} + type Mutation { """ Submits a new job to the queue. @@ -341,4 +465,27 @@ type Mutation { "Uploads a build artifact." createArtifact(jobId: Int!, path: String!, contents: File!): Artifact @worker + + """ + Creates a new user webhook subscription. When an event from the + provided list of events occurs, the 'query' parameter (a GraphQL query) + will be evaluated and the results will be sent to the provided URL as the + body of an HTTP POST request. The list of events must include at least one + event, and no duplicates. + + This query is evaluated in the webhook context, such that query { webhook } + may be used to access details of the event which trigged the webhook. The + query may not make any mutations. + """ + createUserWebhook(config: UserWebhookInput!): WebhookSubscription! + + """ + Deletes a user webhook. Any events already queued may still be + delivered after this request completes. Clients authenticated with a + personal access token may delete any webhook registered for their account, + but authorized OAuth 2.0 clients may only delete their own webhooks. + Manually deleting a webhook configured by a third-party client may cause + unexpected behavior with the third-party integration. + """ + deleteUserWebhook(id: Int!): WebhookSubscription! } diff --git a/api/graph/schema.resolvers.go b/api/graph/schema.resolvers.go index c3495e1..54cb0ad 100644 --- a/api/graph/schema.resolvers.go +++ b/api/graph/schema.resolvers.go @@ -10,16 +10,22 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "strings" + "time" "git.sr.ht/~sircmpwn/builds.sr.ht/api/graph/api" "git.sr.ht/~sircmpwn/builds.sr.ht/api/graph/model" "git.sr.ht/~sircmpwn/builds.sr.ht/api/loaders" + "git.sr.ht/~sircmpwn/builds.sr.ht/api/webhooks" "git.sr.ht/~sircmpwn/core-go/auth" "git.sr.ht/~sircmpwn/core-go/config" "git.sr.ht/~sircmpwn/core-go/database" coremodel "git.sr.ht/~sircmpwn/core-go/model" + "git.sr.ht/~sircmpwn/core-go/server" + corewebhooks "git.sr.ht/~sircmpwn/core-go/webhooks" sq "github.com/Masterminds/squirrel" + "github.com/google/uuid" "github.com/lib/pq" yaml "gopkg.in/yaml.v2" ) @@ -313,6 +319,7 @@ func (r *mutationResolver) Submit(ctx context.Context, manifest string, tags []s } } + webhooks.DeliverUserJobEvent(ctx, model.WebhookEventJobCreated, &job) return &job, nil } @@ -563,6 +570,109 @@ func (r *mutationResolver) CreateArtifact(ctx context.Context, jobID int, path s panic(fmt.Errorf("not implemented")) } +func (r *mutationResolver) CreateUserWebhook(ctx context.Context, config model.UserWebhookInput) (model.WebhookSubscription, error) { + schema := server.ForContext(ctx).Schema + if err := corewebhooks.Validate(schema, config.Query); err != nil { + return nil, err + } + + user := auth.ForContext(ctx) + ac, err := corewebhooks.NewAuthConfig(ctx) + if err != nil { + return nil, err + } + + var sub model.UserWebhookSubscription + if len(config.Events) == 0 { + return nil, fmt.Errorf("Must specify at least one event") + } + events := make([]string, len(config.Events)) + for i, ev := range config.Events { + events[i] = ev.String() + // TODO: gqlgen does not support doing anything useful with directives + // on enums at the time of writing, so we have to do a little bit of + // manual fuckery + var access string + switch ev { + case model.WebhookEventJobCreated: + access = "JOBS" + default: + return nil, fmt.Errorf("Unsupported event %s", ev.String()) + } + if !user.Grants.Has(access, auth.RO) { + return nil, fmt.Errorf("Insufficient access granted for webhook event %s", ev.String()) + } + } + + u, err := url.Parse(config.URL) + if err != nil { + return nil, err + } else if u.Host == "" { + return nil, fmt.Errorf("Cannot use URL without host") + } else if u.Scheme != "http" && u.Scheme != "https" { + return nil, fmt.Errorf("Cannot use non-HTTP or HTTPS URL") + } + + if err := database.WithTx(ctx, nil, func(tx *sql.Tx) error { + row := tx.QueryRowContext(ctx, ` + INSERT INTO gql_user_wh_sub ( + created, events, url, query, + auth_method, + token_hash, grants, client_id, expires, + node_id, + user_id + ) VALUES ( + NOW() at time zone 'utc', + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 + ) RETURNING id, url, query, events, user_id;`, + pq.Array(events), config.URL, config.Query, + ac.AuthMethod, + ac.TokenHash, ac.Grants, ac.ClientID, ac.Expires, // OAUTH2 + ac.NodeID, // INTERNAL + user.UserID) + + if err := row.Scan(&sub.ID, &sub.URL, + &sub.Query, pq.Array(&sub.Events), &sub.UserID); err != nil { + return err + } + return nil + }); err != nil { + return nil, err + } + + return &sub, nil +} + +func (r *mutationResolver) DeleteUserWebhook(ctx context.Context, id int) (model.WebhookSubscription, error) { + var sub model.UserWebhookSubscription + + filter, err := corewebhooks.FilterWebhooks(ctx) + if err != nil { + return nil, err + } + + if err := database.WithTx(ctx, nil, func(tx *sql.Tx) error { + row := sq.Delete(`gql_user_wh_sub`). + PlaceholderFormat(sq.Dollar). + Where(sq.And{sq.Expr(`id = ?`, id), filter}). + Suffix(`RETURNING id, url, query, events, user_id`). + RunWith(tx). + QueryRowContext(ctx) + if err := row.Scan(&sub.ID, &sub.URL, + &sub.Query, pq.Array(&sub.Events), &sub.UserID); err != nil { + return err + } + return nil + }); err != nil { + if err == sql.ErrNoRows { + return nil, fmt.Errorf("No user webhook by ID %d found for this user", id) + } + return nil, err + } + + return &sub, nil +} + func (r *pGPKeyResolver) PrivateKey(ctx context.Context, obj *model.PGPKey) (string, error) { // TODO: This is simple to implement, but I'm not going to rig it up until // we need it @@ -662,6 +772,79 @@ func (r *queryResolver) Secrets(ctx context.Context, cursor *coremodel.Cursor) ( return &model.SecretCursor{secrets, cursor}, nil } +func (r *queryResolver) UserWebhooks(ctx context.Context, cursor *coremodel.Cursor) (*model.WebhookSubscriptionCursor, error) { + if cursor == nil { + cursor = coremodel.NewCursor(nil) + } + + filter, err := corewebhooks.FilterWebhooks(ctx) + if err != nil { + return nil, err + } + + var subs []model.WebhookSubscription + if err := database.WithTx(ctx, &sql.TxOptions{ + Isolation: 0, + ReadOnly: true, + }, func(tx *sql.Tx) error { + sub := (&model.UserWebhookSubscription{}).As(`sub`) + query := database. + Select(ctx, sub). + From(`gql_user_wh_sub sub`). + Where(filter) + subs, cursor = sub.QueryWithCursor(ctx, tx, query, cursor) + return nil + }); err != nil { + return nil, err + } + + return &model.WebhookSubscriptionCursor{subs, cursor}, nil +} + +func (r *queryResolver) UserWebhook(ctx context.Context, id int) (model.WebhookSubscription, error) { + var sub model.UserWebhookSubscription + + filter, err := corewebhooks.FilterWebhooks(ctx) + if err != nil { + return nil, err + } + + if err := database.WithTx(ctx, &sql.TxOptions{ + Isolation: 0, + ReadOnly: true, + }, func(tx *sql.Tx) error { + row := database. + Select(ctx, &sub). + From(`gql_user_wh_sub`). + Where(sq.And{sq.Expr(`id = ?`, id), filter}). + RunWith(tx). + QueryRowContext(ctx) + if err := row.Scan(database.Scan(ctx, &sub)...); err != nil { + return err + } + return nil + }); err != nil { + if err == sql.ErrNoRows { + return nil, fmt.Errorf("No user webhook by ID %d found for this user", id) + } + return nil, err + } + + return &sub, nil +} + +func (r *queryResolver) Webhook(ctx context.Context) (model.WebhookPayload, error) { + raw, err := corewebhooks.Payload(ctx) + if err != nil { + return nil, err + } + payload, ok := raw.(model.WebhookPayload) + if !ok { + panic("Invalid webhook payload context") + } + return payload, nil +} + func (r *sSHKeyResolver) PrivateKey(ctx context.Context, obj *model.SSHKey) (string, error) { // TODO: This is simple to implement, but I'm not going to rig it up until // we need it @@ -710,6 +893,136 @@ func (r *userResolver) Jobs(ctx context.Context, obj *model.User, cursor *coremo return &model.JobCursor{jobs, cursor}, nil } +func (r *userWebhookSubscriptionResolver) Client(ctx context.Context, obj *model.UserWebhookSubscription) (*model.OAuthClient, error) { + if obj.ClientID == nil { + return nil, nil + } + return &model.OAuthClient{ + UUID: *obj.ClientID, + }, nil +} + +func (r *userWebhookSubscriptionResolver) Deliveries(ctx context.Context, obj *model.UserWebhookSubscription, cursor *coremodel.Cursor) (*model.WebhookDeliveryCursor, error) { + if cursor == nil { + cursor = coremodel.NewCursor(nil) + } + + var deliveries []*model.WebhookDelivery + if err := database.WithTx(ctx, &sql.TxOptions{ + Isolation: 0, + ReadOnly: true, + }, func(tx *sql.Tx) error { + d := (&model.WebhookDelivery{}). + WithName(`user`). + As(`delivery`) + query := database. + Select(ctx, d). + From(`gql_user_wh_delivery delivery`). + Where(`delivery.subscription_id = ?`, obj.ID) + deliveries, cursor = d.QueryWithCursor(ctx, tx, query, cursor) + return nil + }); err != nil { + return nil, err + } + + return &model.WebhookDeliveryCursor{deliveries, cursor}, nil +} + +func (r *userWebhookSubscriptionResolver) Sample(ctx context.Context, obj *model.UserWebhookSubscription, event model.WebhookEvent) (string, error) { + payloadUUID := uuid.New() + webhook := corewebhooks.WebhookContext{ + User: auth.ForContext(ctx), + PayloadUUID: payloadUUID, + Name: "user", + Event: event.String(), + Subscription: &corewebhooks.WebhookSubscription{ + ID: obj.ID, + URL: obj.URL, + Query: obj.Query, + AuthMethod: obj.AuthMethod, + TokenHash: obj.TokenHash, + Grants: obj.Grants, + ClientID: obj.ClientID, + Expires: obj.Expires, + NodeID: obj.NodeID, + }, + } + + auth := auth.ForContext(ctx) + switch event { + case model.WebhookEventJobCreated: + note := "Sample job" + webhook.Payload = &model.JobEvent{ + UUID: payloadUUID.String(), + Event: event, + Date: time.Now().UTC(), + Job: &model.Job{ + ID: -1, + Created: time.Now().UTC(), + Updated: time.Now().UTC(), + Manifest: "image: alpine/latest\ntasks:\n - hello: echo hello world", + Note: ¬e, + Image: "alpine/latest", + Runner: nil, + OwnerID: auth.UserID, + JobGroupID: nil, + RawTags: nil, + RawStatus: "success", + }, + } + default: + return "", fmt.Errorf("Unsupported event %s", event.String()) + } + + subctx := corewebhooks.Context(ctx, webhook.Payload) + bytes, err := webhook.Exec(subctx, server.ForContext(ctx).Schema) + if err != nil { + return "", err + } + return string(bytes), nil +} + +func (r *webhookDeliveryResolver) Subscription(ctx context.Context, obj *model.WebhookDelivery) (model.WebhookSubscription, error) { + if obj.Name == "" { + panic("WebhookDelivery without name") + } + + // XXX: This could use a loader but it's unlikely to be a bottleneck + var sub model.WebhookSubscription + if err := database.WithTx(ctx, &sql.TxOptions{ + Isolation: 0, + ReadOnly: true, + }, func(tx *sql.Tx) error { + // XXX: This needs some work to generalize to other kinds of webhooks + var subscription interface { + model.WebhookSubscription + database.Model + } = nil + switch obj.Name { + case "user": + subscription = (&model.UserWebhookSubscription{}).As(`sub`) + default: + panic(fmt.Errorf("unknown webhook name %q", obj.Name)) + } + // Note: No filter needed because, if we have access to the delivery, + // we also have access to the subscription. + row := database. + Select(ctx, subscription). + From(`gql_`+obj.Name+`_wh_sub sub`). + Where(`sub.id = ?`, obj.SubscriptionID). + RunWith(tx). + QueryRowContext(ctx) + if err := row.Scan(database.Scan(ctx, subscription)...); err != nil { + return err + } + sub = subscription + return nil + }); err != nil { + return nil, err + } + return sub, nil +} + // Job returns api.JobResolver implementation. func (r *Resolver) Job() api.JobResolver { return &jobResolver{r} } @@ -737,6 +1050,14 @@ func (r *Resolver) Task() api.TaskResolver { return &taskResolver{r} } // User returns api.UserResolver implementation. func (r *Resolver) User() api.UserResolver { return &userResolver{r} } +// UserWebhookSubscription returns api.UserWebhookSubscriptionResolver implementation. +func (r *Resolver) UserWebhookSubscription() api.UserWebhookSubscriptionResolver { + return &userWebhookSubscriptionResolver{r} +} + +// WebhookDelivery returns api.WebhookDeliveryResolver implementation. +func (r *Resolver) WebhookDelivery() api.WebhookDeliveryResolver { return &webhookDeliveryResolver{r} } + type jobResolver struct{ *Resolver } type jobGroupResolver struct{ *Resolver } type mutationResolver struct{ *Resolver } @@ -746,3 +1067,5 @@ type sSHKeyResolver struct{ *Resolver } type secretFileResolver struct{ *Resolver } type taskResolver struct{ *Resolver } type userResolver struct{ *Resolver } +type userWebhookSubscriptionResolver struct{ *Resolver } +type webhookDeliveryResolver struct{ *Resolver } diff --git a/api/server.go b/api/server.go index 41890ac..f7e9061 100644 --- a/api/server.go +++ b/api/server.go @@ -6,6 +6,7 @@ import ( "git.sr.ht/~sircmpwn/core-go/config" "git.sr.ht/~sircmpwn/core-go/server" + "git.sr.ht/~sircmpwn/core-go/webhooks" "github.com/99designs/gqlgen/graphql" "git.sr.ht/~sircmpwn/builds.sr.ht/api/graph" @@ -18,6 +19,7 @@ func main() { appConfig := config.LoadConfig(":5102") gqlConfig := api.Config{Resolvers: &graph.Resolver{}} + gqlConfig.Directives.Private = server.Private gqlConfig.Directives.Access = func(ctx context.Context, obj interface{}, next graphql.Resolver, scope model.AccessScope, kind model.AccessKind) (interface{}, error) { @@ -34,9 +36,12 @@ func main() { scopes[i] = s.String() } + webhookQueue := webhooks.NewQueue(schema) + server.NewServer("builds.sr.ht", appConfig). WithDefaultMiddleware(). - WithMiddleware(loaders.Middleware). + WithMiddleware(loaders.Middleware, webhooks.Middleware(webhookQueue)). WithSchema(schema, scopes). + WithQueues(webhookQueue.Queue). Run() } diff --git a/api/webhooks/webhooks.go b/api/webhooks/webhooks.go new file mode 100644 index 0000000..34893fc --- /dev/null +++ b/api/webhooks/webhooks.go @@ -0,0 +1,37 @@ +package webhooks + +import ( + "context" + "time" + + "git.sr.ht/~sircmpwn/core-go/auth" + "git.sr.ht/~sircmpwn/core-go/webhooks" + sq "github.com/Masterminds/squirrel" + "github.com/google/uuid" + + "git.sr.ht/~sircmpwn/builds.sr.ht/api/graph/model" +) + +func deliverUserWebhook(ctx context.Context, event model.WebhookEvent, + payload model.WebhookPayload, payloadUUID uuid.UUID) { + q := webhooks.ForContext(ctx) + userID := auth.ForContext(ctx).UserID + query := sq. + Select(). + From("gql_user_wh_sub sub"). + Where("sub.user_id = ?", userID) + q.Schedule(ctx, query, "user", event.String(), + payloadUUID, payload) +} + +func DeliverUserJobEvent(ctx context.Context, + event model.WebhookEvent, job *model.Job) { + payloadUUID := uuid.New() + payload := model.JobEvent{ + UUID: payloadUUID.String(), + Event: event, + Date: time.Now().UTC(), + Job: job, + } + deliverUserWebhook(ctx, event, &payload, payloadUUID) +} diff --git a/buildsrht/alembic/versions/f79186791a25_add_graphql_user_webhook_tables.py b/buildsrht/alembic/versions/f79186791a25_add_graphql_user_webhook_tables.py new file mode 100644 index 0000000..7a3cdb2 --- /dev/null +++ b/buildsrht/alembic/versions/f79186791a25_add_graphql_user_webhook_tables.py @@ -0,0 +1,71 @@ +"""Add GraphQL user webhook tables + +Revision ID: f79186791a25 +Revises: c0dd67b07f2b +Create Date: 2022-06-13 13:17:54.559113 + +""" + +# revision identifiers, used by Alembic. +revision = 'f79186791a25' +down_revision = 'c0dd67b07f2b' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.execute(""" + CREATE TYPE webhook_event AS ENUM ( + 'JOB_CREATED' + ); + + CREATE TYPE auth_method AS ENUM ( + 'OAUTH_LEGACY', + 'OAUTH2', + 'COOKIE', + 'INTERNAL', + 'WEBHOOK' + ); + + CREATE TABLE gql_user_wh_sub ( + id serial PRIMARY KEY, + created timestamp NOT NULL, + events webhook_event[] NOT NULL check (array_length(events, 1) > 0), + url varchar NOT NULL, + query varchar NOT NULL, + + auth_method auth_method NOT NULL check (auth_method in ('OAUTH2', 'INTERNAL')), + token_hash varchar(128) check ((auth_method = 'OAUTH2') = (token_hash IS NOT NULL)), + grants varchar, + client_id uuid, + expires timestamp check ((auth_method = 'OAUTH2') = (expires IS NOT NULL)), + node_id varchar check ((auth_method = 'INTERNAL') = (node_id IS NOT NULL)), + + user_id integer NOT NULL references "user"(id) + ); + + CREATE INDEX gql_user_wh_sub_token_hash_idx ON gql_user_wh_sub (token_hash); + + CREATE TABLE gql_user_wh_delivery ( + id serial PRIMARY KEY, + uuid uuid NOT NULL, + date timestamp NOT NULL, + event webhook_event NOT NULL, + subscription_id integer NOT NULL references gql_user_wh_sub(id) ON DELETE CASCADE, + request_body varchar NOT NULL, + response_body varchar, + response_headers varchar, + response_status integer + ); + """) + + +def downgrade(): + op.execute(""" + DROP TABLE gql_user_wh_delivery; + DROP INDEX gql_user_wh_sub_token_hash_idx; + DROP TABLE gql_user_wh_sub; + DROP TYPE auth_method; + DROP TYPE webhook_event; + """) -- 2.38.5