package main import ( "context" "flag" "fmt" "net/http" "os" "os/signal" "sync/atomic" "syscall" "time" "go.opentelemetry.io/contrib/bridges/otelslog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/log/global" "go.opentelemetry.io/otel/metric" sdklog "go.opentelemetry.io/otel/sdk/log" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" ) var ( metricsEndpoint = flag.String("vm.endpoint", "http://localhost:8428/opentelemetry/v1/metrics", "VictoriaMetrics endpoint") logsEndpoint = flag.String("vl.endpoint", "http://localhost:9428/insert/opentelemetry/v1/logs", "VictoriaLogs endpoint") pushInterval = flag.Duration("vm.pushInterval", 10*time.Second, "How often to push collected metrics samples to vm.endpoint") ) var ( logger = otelslog.NewLogger("demo-app") ) func main() { flag.Parse() fmt.Println("Starting web server...") ctx, cancel := context.WithCancel(context.Background()) defer cancel() mux := http.NewServeMux() mux.HandleFunc("/api/fast", func(writer http.ResponseWriter, request *http.Request) { logger.InfoContext(ctx, "Anonymous access to fast endpoint") writer.WriteHeader(http.StatusOK) writer.Write([]byte(`fast ok`)) }) mux.HandleFunc("/api/slow", func(writer http.ResponseWriter, request *http.Request) { time.Sleep(time.Second * 2) logger.InfoContext(ctx, "Anonymous access to slow endpoint") writer.WriteHeader(http.StatusOK) writer.Write([]byte(`slow ok`)) }) mw, err := newMiddleware(ctx, mux) if err != nil { panic(fmt.Sprintf("cannot build middleware: %q", err)) } mustStop := make(chan os.Signal, 1) signal.Notify(mustStop, os.Interrupt, syscall.SIGTERM) go func() { err := http.ListenAndServe("localhost:8081", mw) if err != nil { panic(err) } }() fmt.Println("web server started at http://localhost:8081") <-mustStop fmt.Println("receive shutdown signal, stopping webserver") for _, shutdown := range mw.onShutdown { if err := shutdown(ctx); err != nil { fmt.Printf("cannot shutdown metric provider: %s", err) } } fmt.Println("Done!") } func newMeterProvider(ctx context.Context) (*sdkmetric.MeterProvider, error) { exporter, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithEndpointURL(*metricsEndpoint)) if err != nil { return nil, fmt.Errorf("cannot create otlphttp exporter: %w", err) } res, err := resource.New(ctx, resource.WithAttributes( attribute.String("job", "otlp"), attribute.String("instance", "localhost"), ), ) if err != nil { return nil, fmt.Errorf("cannot create meter resource: %w", err) } // define histograms for measuring requests latency expView := sdkmetric.NewView( // classic histogram sdkmetric.Instrument{ Name: "http.requests.latency", Kind: sdkmetric.InstrumentKindHistogram, }, // exponential histogram sdkmetric.Stream{ Name: "http.requests.latency.exp", Aggregation: sdkmetric.AggregationBase2ExponentialHistogram{ MaxSize: 160, MaxScale: 20, }, }, ) return sdkmetric.NewMeterProvider( sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter, sdkmetric.WithInterval(*pushInterval))), sdkmetric.WithResource(res), sdkmetric.WithView(expView), ), nil } func newLoggerProvider(ctx context.Context) (*sdklog.LoggerProvider, error) { exporter, err := otlploghttp.New(ctx, otlploghttp.WithEndpointURL(*logsEndpoint)) if err != nil { return nil, err } provider := sdklog.NewLoggerProvider( sdklog.WithProcessor(sdklog.NewBatchProcessor(exporter)), ) return provider, nil } func newMiddleware(ctx context.Context, h http.Handler) (*middleware, error) { mw := &middleware{ ctx: ctx, h: h, } lp, err := newLoggerProvider(ctx) if err != nil { return nil, fmt.Errorf("cannot create logs provider: %w", err) } global.SetLoggerProvider(lp) mp, err := newMeterProvider(ctx) if err != nil { return nil, fmt.Errorf("cannot create metrics provider: %w", err) } otel.SetMeterProvider(mp) meter := mp.Meter("") mw.requestsLatency, err = meter.Float64Histogram("http.requests.latency") if err != nil { return nil, fmt.Errorf("cannot create Float64Histogram: %w", err) } mw.requestsCount, err = meter.Int64Counter("http.requests") if err != nil { return nil, fmt.Errorf("cannot create Int64Counter: %w", err) } cb := func(c context.Context, o metric.Int64Observer) error { o.Observe(atomic.LoadInt64(&mw.activeRequests)) return nil } _, err = meter.Int64ObservableGauge("http.requests.active", metric.WithInt64Callback(cb)) if err != nil { return nil, fmt.Errorf("cannot create Int64ObservableGauge: %w", err) } mw.onShutdown = append(mw.onShutdown, mp.Shutdown, lp.Shutdown) return mw, nil } type middleware struct { ctx context.Context h http.Handler requestsCount metric.Int64Counter requestsLatency metric.Float64Histogram activeRequests int64 onShutdown []func(ctx context.Context) error } func (m *middleware) ServeHTTP(w http.ResponseWriter, r *http.Request) { t := time.Now() path := r.URL.Path m.requestsCount.Add(m.ctx, 1, metric.WithAttributes(attribute.String("path", path))) atomic.AddInt64(&m.activeRequests, 1) defer func() { atomic.AddInt64(&m.activeRequests, -1) m.requestsLatency.Record(m.ctx, time.Since(t).Seconds(), metric.WithAttributes(attribute.String("path", path))) }() m.h.ServeHTTP(w, r) }