@@ -12,10 +12,12 @@ import (
12
12
"runtime/pprof"
13
13
"sync"
14
14
15
+ "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
15
16
"github.com/sirupsen/logrus"
16
17
"github.com/spf13/cobra"
17
18
"google.golang.org/grpc"
18
19
health "google.golang.org/grpc/health/grpc_health_v1"
20
+ "google.golang.org/grpc/metadata"
19
21
"google.golang.org/grpc/reflection"
20
22
21
23
"github.com/operator-framework/operator-registry/pkg/api"
@@ -89,7 +91,8 @@ will not be reflected in the served content.
89
91
}
90
92
91
93
func (s * serve ) run (ctx context.Context ) error {
92
- p := newProfilerInterface (s .pprofAddr , s .logger )
94
+ mainLogger := s .logger .Dup ()
95
+ p := newProfilerInterface (s .pprofAddr , mainLogger )
93
96
if err := p .startEndpoint (); err != nil {
94
97
return fmt .Errorf ("could not start pprof endpoint: %v" , err )
95
98
}
@@ -102,12 +105,12 @@ func (s *serve) run(ctx context.Context) error {
102
105
// Immediately set up termination log
103
106
err := log .AddDefaultWriterHooks (s .terminationLog )
104
107
if err != nil {
105
- s . logger .WithError (err ).Warn ("unable to set termination log path" )
108
+ mainLogger .WithError (err ).Warn ("unable to set termination log path" )
106
109
}
107
110
108
111
// Ensure there is a default nsswitch config
109
112
if err := dns .EnsureNsswitch (); err != nil {
110
- s . logger .WithError (err ).Warn ("unable to write default nsswitch config" )
113
+ mainLogger .WithError (err ).Warn ("unable to write default nsswitch config" )
111
114
}
112
115
113
116
if s .cacheDir == "" && s .cacheEnforceIntegrity {
@@ -121,12 +124,12 @@ func (s *serve) run(ctx context.Context) error {
121
124
}
122
125
defer os .RemoveAll (s .cacheDir )
123
126
}
124
- s . logger = s . logger .WithFields (logrus.Fields {
127
+ mainLogger = mainLogger .WithFields (logrus.Fields {
125
128
"configs" : s .configDir ,
126
129
"cache" : s .cacheDir ,
127
130
})
128
131
129
- store , err := cache .New (s .cacheDir , cache .WithLog (s . logger ))
132
+ store , err := cache .New (s .cacheDir , cache .WithLog (mainLogger ))
130
133
if err != nil {
131
134
return err
132
135
}
@@ -148,26 +151,30 @@ func (s *serve) run(ctx context.Context) error {
148
151
return nil
149
152
}
150
153
151
- s . logger = s . logger .WithFields (logrus.Fields {"port" : s .port })
154
+ mainLogger = mainLogger .WithFields (logrus.Fields {"port" : s .port })
152
155
153
156
lis , err := net .Listen ("tcp" , ":" + s .port )
154
157
if err != nil {
155
158
return fmt .Errorf ("failed to listen: %s" , err )
156
159
}
157
160
158
- grpcServer := grpc .NewServer ()
161
+ streamLogger , unaryLogger := loggingInterceptors (s .logger .Dup ())
162
+ grpcServer := grpc .NewServer (
163
+ grpc .ChainStreamInterceptor (streamLogger ),
164
+ grpc .ChainUnaryInterceptor (unaryLogger ),
165
+ )
159
166
api .RegisterRegistryServer (grpcServer , server .NewRegistryServer (store ))
160
167
health .RegisterHealthServer (grpcServer , server .NewHealthServer ())
161
168
reflection .Register (grpcServer )
162
- s . logger .Info ("serving registry" )
169
+ mainLogger .Info ("serving registry" )
163
170
p .stopCpuProfileCache ()
164
171
165
172
return graceful .Shutdown (s .logger , func () error {
166
173
return grpcServer .Serve (lis )
167
174
}, func () {
168
175
grpcServer .GracefulStop ()
169
176
if err := p .stopEndpoint (ctx ); err != nil {
170
- s . logger .Warnf ("error shutting down pprof server: %v" , err )
177
+ mainLogger .Warnf ("error shutting down pprof server: %v" , err )
171
178
}
172
179
})
173
180
@@ -293,3 +300,48 @@ func (p *profilerInterface) setCacheReady() {
293
300
p .cacheReady = true
294
301
p .cacheLock .Unlock ()
295
302
}
303
+
304
+ func loggingInterceptors (logger * logrus.Entry ) (grpc.StreamServerInterceptor , grpc.UnaryServerInterceptor ) {
305
+ requestLogger := logger .Dup ()
306
+ requestLoggerOpts := []logging.Option {
307
+ logging .WithLogOnEvents (logging .StartCall , logging .FinishCall ),
308
+ logging .WithFieldsFromContext (func (ctx context.Context ) logging.Fields {
309
+ fields := logging .ExtractFields (ctx )
310
+ metadataFields := logging.Fields {}
311
+ if md , ok := metadata .FromIncomingContext (ctx ); ok {
312
+ for k , v := range md {
313
+ metadataFields = append (metadataFields , k , v )
314
+ }
315
+ fields = fields .AppendUnique (metadataFields )
316
+ }
317
+ return fields
318
+ }),
319
+ }
320
+ return logging .StreamServerInterceptor (interceptorLogger (requestLogger ), requestLoggerOpts ... ),
321
+ logging .UnaryServerInterceptor (interceptorLogger (requestLogger ), requestLoggerOpts ... )
322
+ }
323
+
324
+ func interceptorLogger (l * logrus.Entry ) logging.Logger {
325
+ return logging .LoggerFunc (func (_ context.Context , lvl logging.Level , msg string , fields ... any ) {
326
+ f := make (map [string ]any , len (fields )/ 2 )
327
+ i := logging .Fields (fields ).Iterator ()
328
+ for i .Next () {
329
+ k , v := i .At ()
330
+ f [k ] = v
331
+ }
332
+ l := l .WithFields (f )
333
+
334
+ switch lvl {
335
+ case logging .LevelDebug :
336
+ l .Debug (msg )
337
+ case logging .LevelInfo :
338
+ l .Info (msg )
339
+ case logging .LevelWarn :
340
+ l .Warn (msg )
341
+ case logging .LevelError :
342
+ l .Error (msg )
343
+ default :
344
+ panic (fmt .Sprintf ("unknown level %v" , lvl ))
345
+ }
346
+ })
347
+ }
0 commit comments