Skip to content

Commit 2c08c86

Browse files
committed
internal/frontend: add queue
The frontend server is not initiated with a queue. The frontend task queue will be used to support frontend fetches. frontend.FetchAndUpdateState is added, which is a copy of worker.FetchAndUpdateState for use in testing and locally. Updates golang/go#36811 Updates golang/go#37002 Updates golang/go#37106 Change-Id: I41922d30462d2623a061aa1f207bb2b39f7b54e2 Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/743102 Reviewed-by: Jonathan Amsterdam <[email protected]>
1 parent d94b545 commit 2c08c86

File tree

6 files changed

+103
-27
lines changed

6 files changed

+103
-27
lines changed

cmd/frontend/main.go

+32-20
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright 2019 The Go Authors. All rights reserved.
22
// Use of this source code is governed by a BSD-style
33
// license that can be found in the LICENSE file.
4-
54
package main
65

76
import (
@@ -14,6 +13,7 @@ import (
1413
"strings"
1514
"time"
1615

16+
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
1717
"cloud.google.com/go/profiler"
1818
"contrib.go.opencensus.io/integrations/ocsql"
1919
"github.com/go-redis/redis/v7"
@@ -28,42 +28,44 @@ import (
2828
"golang.org/x/pkgsite/internal/postgres"
2929
"golang.org/x/pkgsite/internal/proxy"
3030
"golang.org/x/pkgsite/internal/proxydatasource"
31+
"golang.org/x/pkgsite/internal/queue"
32+
"golang.org/x/pkgsite/internal/source"
3133
)
3234

3335
var (
36+
queueName = config.GetEnv("GO_DISCOVERY_FRONTEND_TASK_QUEUE", "")
3437
staticPath = flag.String("static", "content/static", "path to folder containing static files served")
3538
thirdPartyPath = flag.String("third_party", "third_party", "path to folder containing third-party libraries")
3639
reloadTemplates = flag.Bool("reload_templates", false, "reload templates on each page load (to be used during development)")
37-
directProxy = flag.String("direct_proxy", "", "if set to a valid URL, uses the module proxy referred to by this URL "+
40+
proxyURL = flag.String("proxy_url", "https://proxy.golang.org", "Uses the module proxy referred to by this URL "+
41+
"for direct proxy mode and frontend fetches")
42+
directProxy = flag.Bool("direct_proxy", false, "if set to true, uses the module proxy referred to by this URL "+
3843
"as a direct backend, bypassing the database")
3944
)
4045

4146
func main() {
4247
flag.Parse()
43-
4448
ctx := context.Background()
45-
4649
cfg, err := config.Init(ctx)
4750
if err != nil {
4851
log.Fatal(ctx, err)
4952
}
5053
cfg.Dump(os.Stderr)
51-
5254
if cfg.UseProfiler {
5355
if err := profiler.Start(profiler.Config{}); err != nil {
5456
log.Fatalf(ctx, "profiler.Start: %v", err)
5557
}
5658
}
57-
5859
var (
59-
ds internal.DataSource
60-
exp internal.ExperimentSource
60+
ds internal.DataSource
61+
exp internal.ExperimentSource
62+
fetchQueue queue.Queue
6163
)
62-
if *directProxy != "" {
63-
proxyClient, err := proxy.New(*directProxy)
64-
if err != nil {
65-
log.Fatal(ctx, err)
66-
}
64+
proxyClient, err := proxy.New(*proxyURL)
65+
if err != nil {
66+
log.Fatal(ctx, err)
67+
}
68+
if *directProxy {
6769
ds = proxydatasource.New(proxyClient)
6870
exp = internal.NewLocalExperimentSource(readLocalExperiments(ctx))
6971
} else {
@@ -80,14 +82,16 @@ func main() {
8082
defer db.Close()
8183
ds = db
8284
exp = db
85+
sourceClient := source.NewClient(config.SourceTimeout)
86+
fetchQueue = newQueue(ctx, cfg, proxyClient, sourceClient, db)
8387
}
8488
var haClient *redis.Client
8589
if cfg.RedisHAHost != "" {
8690
haClient = redis.NewClient(&redis.Options{
8791
Addr: cfg.RedisHAHost + ":" + cfg.RedisHAPort,
8892
})
8993
}
90-
server, err := frontend.NewServer(ds, haClient, *staticPath, *thirdPartyPath, *reloadTemplates)
94+
server, err := frontend.NewServer(ds, fetchQueue, haClient, *staticPath, *thirdPartyPath, *reloadTemplates)
9195
if err != nil {
9296
log.Fatalf(ctx, "frontend.NewServer: %v", err)
9397
}
@@ -99,7 +103,6 @@ func main() {
99103
})
100104
}
101105
server.Install(router.Handle, cacheClient)
102-
103106
views := append(dcensus.ServerViews,
104107
postgres.SearchLatencyDistribution,
105108
postgres.SearchResponseCount,
@@ -119,7 +122,6 @@ func main() {
119122
}
120123
go http.ListenAndServe(cfg.DebugAddr("localhost:8081"), dcensusServer)
121124
}
122-
123125
panicHandler, err := server.PanicHandler()
124126
if err != nil {
125127
log.Fatal(ctx, err)
@@ -129,7 +131,6 @@ func main() {
129131
if err != nil {
130132
log.Fatal(ctx, err)
131133
}
132-
133134
mw := middleware.Chain(
134135
middleware.RequestLog(requestLogger),
135136
middleware.Quota(cfg.Quota),
@@ -140,19 +141,31 @@ func main() {
140141
middleware.Timeout(54*time.Second),
141142
middleware.Experiment(experimenter),
142143
)
143-
144144
addr := cfg.HostAddr("localhost:8080")
145145
log.Infof(ctx, "Listening on addr %s", addr)
146146
log.Fatal(ctx, http.ListenAndServe(addr, mw(router)))
147147
}
148148

149+
func newQueue(ctx context.Context, cfg *config.Config, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB) queue.Queue {
150+
if !cfg.OnAppEngine() {
151+
return queue.NewInMemory(ctx, proxyClient, sourceClient, db, 10, frontend.FetchAndUpdateState)
152+
}
153+
client, err := cloudtasks.NewClient(ctx)
154+
if err != nil {
155+
log.Fatal(ctx, err)
156+
}
157+
if queueName == "" {
158+
log.Fatalf(ctx, "queueName cannot be empty")
159+
}
160+
return queue.NewGCP(cfg, client, queueName)
161+
}
162+
149163
// openDB opens a connection to a database with the given driver, using connection info from
150164
// the given config.
151165
// It first tries the main connection info (DBConnInfo), and if that fails, it uses backup
152166
// connection info it if exists (DBSecondaryConnInfo).
153167
func openDB(ctx context.Context, cfg *config.Config, driver string) (_ *database.DB, err error) {
154168
derrors.Wrap(&err, "openDB(ctx, cfg, %q)", driver)
155-
156169
log.Infof(ctx, "opening database on host %s", cfg.DBHost)
157170
ddb, err := database.Open(driver, cfg.DBConnInfo())
158171
if err == nil {
@@ -167,7 +180,6 @@ func openDB(ctx context.Context, cfg *config.Config, driver string) (_ *database
167180
cfg.DBHost, err, cfg.DBSecondaryHost)
168181
return database.Open(driver, ci)
169182
}
170-
171183
func getLogger(ctx context.Context, cfg *config.Config) middleware.Logger {
172184
if cfg.OnAppEngine() {
173185
logger, err := log.UseStackdriver(ctx, cfg, "frontend-log")

internal/frontend/fetch.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2020 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package frontend
6+
7+
import (
8+
"context"
9+
"net/http"
10+
11+
"golang.org/x/pkgsite/internal"
12+
"golang.org/x/pkgsite/internal/derrors"
13+
"golang.org/x/pkgsite/internal/fetch"
14+
"golang.org/x/pkgsite/internal/log"
15+
"golang.org/x/pkgsite/internal/postgres"
16+
"golang.org/x/pkgsite/internal/proxy"
17+
"golang.org/x/pkgsite/internal/source"
18+
)
19+
20+
// FetchAndUpdateState is used by the InMemory queue for testing in
21+
// internal/frontend and running cmd/frontend locally. It is a copy of
22+
// worker.FetchAndUpdateState that does not update module_version_states, so that
23+
// we don't have to import internal/worker here. It is not meant to be used
24+
// when running on AppEngine.
25+
func FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion string, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB) (_ int, err error) {
26+
defer func() {
27+
if err != nil {
28+
log.Infof(ctx, "FetchAndUpdateState(%q, %q) completed with err: %v. ", modulePath, requestedVersion, err)
29+
} else {
30+
log.Infof(ctx, "FetchAndUpdateState(%q, %q) succeeded", modulePath, requestedVersion)
31+
}
32+
derrors.Wrap(&err, "FetchAndUpdateState(%q, %q)", modulePath, requestedVersion)
33+
}()
34+
35+
fr := fetch.FetchModule(ctx, modulePath, requestedVersion, proxyClient, sourceClient)
36+
if fr.Error == nil {
37+
// Only attempt to insert the module into module_version_states if the
38+
// fetch process was successful.
39+
if err := db.InsertModule(ctx, fr.Module); err != nil {
40+
return http.StatusInternalServerError, err
41+
}
42+
}
43+
var errMsg string
44+
if fr.Error != nil {
45+
errMsg = fr.Error.Error()
46+
}
47+
vm := &internal.VersionMap{
48+
ModulePath: fr.ModulePath,
49+
RequestedVersion: fr.RequestedVersion,
50+
ResolvedVersion: fr.ResolvedVersion,
51+
Status: fr.Status,
52+
Error: errMsg,
53+
}
54+
if err := db.UpsertVersionMap(ctx, vm); err != nil {
55+
return http.StatusInternalServerError, err
56+
}
57+
if fr.Error != nil {
58+
return fr.Status, fr.Error
59+
}
60+
return http.StatusOK, nil
61+
}

internal/frontend/server.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ import (
2424
"golang.org/x/pkgsite/internal/licenses"
2525
"golang.org/x/pkgsite/internal/log"
2626
"golang.org/x/pkgsite/internal/middleware"
27+
"golang.org/x/pkgsite/internal/queue"
2728
)
2829

2930
// Server can be installed to serve the go discovery frontend.
3031
type Server struct {
31-
ds internal.DataSource
32+
ds internal.DataSource
33+
queue queue.Queue
3234
// cmplClient is a redis client that has access to the "completions" sorted
3335
// set.
3436
cmplClient *redis.Client
@@ -45,14 +47,15 @@ type Server struct {
4547
// NewServer creates a new Server for the given database and template directory.
4648
// reloadTemplates should be used during development when it can be helpful to
4749
// reload templates from disk each time a page is loaded.
48-
func NewServer(ds internal.DataSource, cmplClient *redis.Client, staticPath string, thirdPartyPath string, reloadTemplates bool) (*Server, error) {
50+
func NewServer(ds internal.DataSource, q queue.Queue, cmplClient *redis.Client, staticPath string, thirdPartyPath string, reloadTemplates bool) (*Server, error) {
4951
templateDir := filepath.Join(staticPath, "html")
5052
ts, err := parsePageTemplates(templateDir)
5153
if err != nil {
5254
return nil, fmt.Errorf("error parsing templates: %v", err)
5355
}
5456
s := &Server{
5557
ds: ds,
58+
queue: q,
5659
cmplClient: cmplClient,
5760
staticPath: staticPath,
5861
thirdPartyPath: thirdPartyPath,

internal/frontend/server_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestMain(m *testing.M) {
3535
}
3636

3737
func TestHTMLInjection(t *testing.T) {
38-
s, err := NewServer(testDB, nil, "../../content/static", "../../third_party", false)
38+
s, err := NewServer(testDB, nil, nil, "../../content/static", "../../third_party", false)
3939
if err != nil {
4040
t.Fatalf("NewServer: %v", err)
4141
}
@@ -201,7 +201,7 @@ func testServer(t *testing.T, experimentNames ...string) {
201201
ctx = experiment.NewContext(ctx, experiment.NewSet(expmap))
202202
insertTestModules(ctx, t, testModules)
203203

204-
s, err := NewServer(testDB, nil, "../../content/static", "../../third_party", false)
204+
s, err := NewServer(testDB, nil, nil, "../../content/static", "../../third_party", false)
205205
if err != nil {
206206
t.Fatalf("NewServer: %v", err)
207207
}
@@ -860,7 +860,7 @@ func TestServerErrors(t *testing.T) {
860860
t.Fatal(err)
861861
}
862862

863-
s, err := NewServer(testDB, nil, "../../content/static", "../../third_party", false)
863+
s, err := NewServer(testDB, nil, nil, "../../content/static", "../../third_party", false)
864864
if err != nil {
865865
t.Fatalf("NewServer: %v", err)
866866
}

internal/testing/integration/frontend_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func TestModulePackageDirectoryResolution(t *testing.T) {
152152
in(".DetailsContent", hasText("I'm a package"))),
153153
},
154154
}
155-
s, err := frontend.NewServer(testDB, nil, "../../../content/static", "../../../third_party", false)
155+
s, err := frontend.NewServer(testDB, nil, nil, "../../../content/static", "../../../third_party", false)
156156
if err != nil {
157157
t.Fatal(err)
158158
}

internal/testing/integration/integration_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestEndToEndProcessing(t *testing.T) {
8686
workerServer.Install(workerMux.Handle)
8787
workerHTTP := httptest.NewServer(workerMux)
8888

89-
frontendServer, err := frontend.NewServer(testDB, redisHAClient, "../../../content/static", "../../../third_party", false)
89+
frontendServer, err := frontend.NewServer(testDB, queue, redisHAClient, "../../../content/static", "../../../third_party", false)
9090
if err != nil {
9191
t.Fatal(err)
9292
}

0 commit comments

Comments
 (0)