mirror of
https://github.com/caddyserver/caddy.git
synced 2024-12-27 22:23:48 +03:00
0950ba4f0b
This could lead to bugs if handlers are not careful, but it is surely useful. We'll see how it goes, what the feedback is like, etc.
390 lines
12 KiB
Go
390 lines
12 KiB
Go
// Copyright 2015 Matthew Holt and The Caddy Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package caddyevents
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/caddyserver/caddy/v2"
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func init() {
|
|
caddy.RegisterModule(App{})
|
|
}
|
|
|
|
// App implements a global eventing system within Caddy.
|
|
// Modules can emit and subscribe to events, providing
|
|
// hooks into deep parts of the code base that aren't
|
|
// otherwise accessible. Events provide information about
|
|
// what and when things are happening, and this facility
|
|
// allows handlers to take action when events occur,
|
|
// add information to the event's metadata, and even
|
|
// control program flow in some cases.
|
|
//
|
|
// Events are propagated in a DOM-like fashion. An event
|
|
// emitted from module `a.b.c` (the "origin") will first
|
|
// invoke handlers listening to `a.b.c`, then `a.b`,
|
|
// then `a`, then those listening regardless of origin.
|
|
// If a handler returns the special error Aborted, then
|
|
// propagation immediately stops and the event is marked
|
|
// as aborted. Emitters may optionally choose to adjust
|
|
// program flow based on an abort.
|
|
//
|
|
// Modules can subscribe to events by origin and/or name.
|
|
// A handler is invoked only if it is subscribed to the
|
|
// event by name and origin. Subscriptions should be
|
|
// registered during the provisioning phase, before apps
|
|
// are started.
|
|
//
|
|
// Event handlers are fired synchronously as part of the
|
|
// regular flow of the program. This allows event handlers
|
|
// to control the flow of the program if the origin permits
|
|
// it and also allows handlers to convey new information
|
|
// back into the origin module before it continues.
|
|
// In essence, event handlers are similar to HTTP
|
|
// middleware handlers.
|
|
//
|
|
// Event bindings/subscribers are unordered; i.e.
|
|
// event handlers are invoked in an arbitrary order.
|
|
// Event handlers should not rely on the logic of other
|
|
// handlers to succeed.
|
|
//
|
|
// The entirety of this app module is EXPERIMENTAL and
|
|
// subject to change. Pay attention to release notes.
|
|
type App struct {
|
|
// Subscriptions bind handlers to one or more events
|
|
// either globally or scoped to specific modules or module
|
|
// namespaces.
|
|
Subscriptions []*Subscription `json:"subscriptions,omitempty"`
|
|
|
|
// Map of event name to map of module ID/namespace to handlers
|
|
subscriptions map[string]map[caddy.ModuleID][]Handler
|
|
|
|
logger *zap.Logger
|
|
started bool
|
|
}
|
|
|
|
// Subscription represents binding of one or more handlers to
|
|
// one or more events.
|
|
type Subscription struct {
|
|
// The name(s) of the event(s) to bind to. Default: all events.
|
|
Events []string `json:"events,omitempty"`
|
|
|
|
// The ID or namespace of the module(s) from which events
|
|
// originate to listen to for events. Default: all modules.
|
|
//
|
|
// Events propagate up, so events emitted by module "a.b.c"
|
|
// will also trigger the event for "a.b" and "a". Thus, to
|
|
// receive all events from "a.b.c" and "a.b.d", for example,
|
|
// one can subscribe to either "a.b" or all of "a" entirely.
|
|
Modules []caddy.ModuleID `json:"modules,omitempty"`
|
|
|
|
// The event handler modules. These implement the actual
|
|
// behavior to invoke when an event occurs. At least one
|
|
// handler is required.
|
|
HandlersRaw []json.RawMessage `json:"handlers,omitempty" caddy:"namespace=events.handlers inline_key=handler"`
|
|
|
|
// The decoded handlers; Go code that is subscribing to
|
|
// an event should set this field directly; HandlersRaw
|
|
// is meant for JSON configuration to fill out this field.
|
|
Handlers []Handler `json:"-"`
|
|
}
|
|
|
|
// CaddyModule returns the Caddy module information.
|
|
func (App) CaddyModule() caddy.ModuleInfo {
|
|
return caddy.ModuleInfo{
|
|
ID: "events",
|
|
New: func() caddy.Module { return new(App) },
|
|
}
|
|
}
|
|
|
|
// Provision sets up the app.
|
|
func (app *App) Provision(ctx caddy.Context) error {
|
|
app.logger = ctx.Logger()
|
|
app.subscriptions = make(map[string]map[caddy.ModuleID][]Handler)
|
|
|
|
for _, sub := range app.Subscriptions {
|
|
if sub.HandlersRaw != nil {
|
|
handlersIface, err := ctx.LoadModule(sub, "HandlersRaw")
|
|
if err != nil {
|
|
return fmt.Errorf("loading event subscriber modules: %v", err)
|
|
}
|
|
for _, h := range handlersIface.([]any) {
|
|
sub.Handlers = append(sub.Handlers, h.(Handler))
|
|
}
|
|
if len(sub.Handlers) == 0 {
|
|
// pointless to bind without any handlers
|
|
return fmt.Errorf("no handlers defined")
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Start runs the app.
|
|
func (app *App) Start() error {
|
|
for _, sub := range app.Subscriptions {
|
|
if err := app.Subscribe(sub); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
app.started = true
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the app.
|
|
func (app *App) Stop() error {
|
|
return nil
|
|
}
|
|
|
|
// Subscribe binds one or more event handlers to one or more events
|
|
// according to the subscription s. For now, subscriptions can only
|
|
// be created during the provision phase; new bindings cannot be
|
|
// created after the events app has started.
|
|
func (app *App) Subscribe(s *Subscription) error {
|
|
if app.started {
|
|
return fmt.Errorf("events already started; new subscriptions closed")
|
|
}
|
|
|
|
// handle special case of catch-alls (omission of event name or module space implies all)
|
|
if len(s.Events) == 0 {
|
|
s.Events = []string{""}
|
|
}
|
|
if len(s.Modules) == 0 {
|
|
s.Modules = []caddy.ModuleID{""}
|
|
}
|
|
|
|
for _, eventName := range s.Events {
|
|
if app.subscriptions[eventName] == nil {
|
|
app.subscriptions[eventName] = make(map[caddy.ModuleID][]Handler)
|
|
}
|
|
for _, originModule := range s.Modules {
|
|
app.subscriptions[eventName][originModule] = append(app.subscriptions[eventName][originModule], s.Handlers...)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// On is syntactic sugar for Subscribe() that binds a single handler
|
|
// to a single event from any module. If the eventName is empty string,
|
|
// it counts for all events.
|
|
func (app *App) On(eventName string, handler Handler) error {
|
|
return app.Subscribe(&Subscription{
|
|
Events: []string{eventName},
|
|
Handlers: []Handler{handler},
|
|
})
|
|
}
|
|
|
|
// Emit creates and dispatches an event named eventName to all relevant handlers with
|
|
// the metadata data. Events are emitted and propagated synchronously. The returned Event
|
|
// value will have any additional information from the invoked handlers.
|
|
//
|
|
// Note that the data map is not copied, for efficiency. After Emit() is called, the
|
|
// data passed in should not be changed in other goroutines.
|
|
func (app *App) Emit(ctx caddy.Context, eventName string, data map[string]any) Event {
|
|
logger := app.logger.With(zap.String("name", eventName))
|
|
|
|
id, err := uuid.NewRandom()
|
|
if err != nil {
|
|
logger.Error("failed generating new event ID", zap.Error(err))
|
|
}
|
|
|
|
eventName = strings.ToLower(eventName)
|
|
|
|
e := Event{
|
|
Data: data,
|
|
id: id,
|
|
ts: time.Now(),
|
|
name: eventName,
|
|
origin: ctx.Module(),
|
|
}
|
|
|
|
logger = logger.With(
|
|
zap.String("id", e.id.String()),
|
|
zap.String("origin", e.origin.CaddyModule().String()))
|
|
|
|
// add event info to replacer, make sure it's in the context
|
|
repl, ok := ctx.Context.Value(caddy.ReplacerCtxKey).(*caddy.Replacer)
|
|
if !ok {
|
|
repl = caddy.NewReplacer()
|
|
ctx.Context = context.WithValue(ctx.Context, caddy.ReplacerCtxKey, repl)
|
|
}
|
|
repl.Map(func(key string) (any, bool) {
|
|
switch key {
|
|
case "event":
|
|
return e, true
|
|
case "event.id":
|
|
return e.id, true
|
|
case "event.name":
|
|
return e.name, true
|
|
case "event.time":
|
|
return e.ts, true
|
|
case "event.time_unix":
|
|
return e.ts.UnixMilli(), true
|
|
case "event.module":
|
|
return e.origin.CaddyModule().ID, true
|
|
case "event.data":
|
|
return e.Data, true
|
|
}
|
|
|
|
if strings.HasPrefix(key, "event.data.") {
|
|
key = strings.TrimPrefix(key, "event.data.")
|
|
if val, ok := e.Data[key]; ok {
|
|
return val, true
|
|
}
|
|
}
|
|
|
|
return nil, false
|
|
})
|
|
|
|
logger.Debug("event", zap.Any("data", e.Data))
|
|
|
|
// invoke handlers bound to the event by name and also all events; this for loop
|
|
// iterates twice at most: once for the event name, once for "" (all events)
|
|
for {
|
|
moduleID := e.origin.CaddyModule().ID
|
|
|
|
// implement propagation up the module tree (i.e. start with "a.b.c" then "a.b" then "a" then "")
|
|
for {
|
|
if app.subscriptions[eventName] == nil {
|
|
break // shortcut if event not bound at all
|
|
}
|
|
|
|
for _, handler := range app.subscriptions[eventName][moduleID] {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Error("context canceled; event handling stopped")
|
|
return e
|
|
default:
|
|
}
|
|
|
|
if err := handler.Handle(ctx, e); err != nil {
|
|
aborted := errors.Is(err, ErrAborted)
|
|
|
|
logger.Error("handler error",
|
|
zap.Error(err),
|
|
zap.Bool("aborted", aborted))
|
|
|
|
if aborted {
|
|
e.Aborted = err
|
|
return e
|
|
}
|
|
}
|
|
}
|
|
|
|
if moduleID == "" {
|
|
break
|
|
}
|
|
lastDot := strings.LastIndex(string(moduleID), ".")
|
|
if lastDot < 0 {
|
|
moduleID = "" // include handlers bound to events regardless of module
|
|
} else {
|
|
moduleID = moduleID[:lastDot]
|
|
}
|
|
}
|
|
|
|
// include handlers listening to all events
|
|
if eventName == "" {
|
|
break
|
|
}
|
|
eventName = ""
|
|
}
|
|
|
|
return e
|
|
}
|
|
|
|
// Event represents something that has happened or is happening.
|
|
// An Event value is not synchronized, so it should be copied if
|
|
// being used in goroutines.
|
|
//
|
|
// EXPERIMENTAL: As with the rest of this package, events are
|
|
// subject to change.
|
|
type Event struct {
|
|
// If non-nil, the event has been aborted, meaning
|
|
// propagation has stopped to other handlers and
|
|
// the code should stop what it was doing. Emitters
|
|
// may choose to use this as a signal to adjust their
|
|
// code path appropriately.
|
|
Aborted error
|
|
|
|
// The data associated with the event. Usually the
|
|
// original emitter will be the only one to set or
|
|
// change these values, but the field is exported
|
|
// so handlers can have full access if needed.
|
|
// However, this map is not synchronized, so
|
|
// handlers must not use this map directly in new
|
|
// goroutines; instead, copy the map to use it in a
|
|
// goroutine.
|
|
Data map[string]any
|
|
|
|
id uuid.UUID
|
|
ts time.Time
|
|
name string
|
|
origin caddy.Module
|
|
}
|
|
|
|
// CloudEvent exports event e as a structure that, when
|
|
// serialized as JSON, is compatible with the
|
|
// CloudEvents spec.
|
|
func (e Event) CloudEvent() CloudEvent {
|
|
dataJSON, _ := json.Marshal(e.Data)
|
|
return CloudEvent{
|
|
ID: e.id.String(),
|
|
Source: e.origin.CaddyModule().String(),
|
|
SpecVersion: "1.0",
|
|
Type: e.name,
|
|
Time: e.ts,
|
|
DataContentType: "application/json",
|
|
Data: dataJSON,
|
|
}
|
|
}
|
|
|
|
// CloudEvent is a JSON-serializable structure that
|
|
// is compatible with the CloudEvents specification.
|
|
// See https://cloudevents.io.
|
|
type CloudEvent struct {
|
|
ID string `json:"id"`
|
|
Source string `json:"source"`
|
|
SpecVersion string `json:"specversion"`
|
|
Type string `json:"type"`
|
|
Time time.Time `json:"time"`
|
|
DataContentType string `json:"datacontenttype,omitempty"`
|
|
Data json.RawMessage `json:"data,omitempty"`
|
|
}
|
|
|
|
// ErrAborted cancels an event.
|
|
var ErrAborted = errors.New("event aborted")
|
|
|
|
// Handler is a type that can handle events.
|
|
type Handler interface {
|
|
Handle(context.Context, Event) error
|
|
}
|
|
|
|
// Interface guards
|
|
var (
|
|
_ caddy.App = (*App)(nil)
|
|
_ caddy.Provisioner = (*App)(nil)
|
|
)
|