Skip to content

move remaining state to db, allow multiple build servers #1785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 4, 2022
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ prometheus = { version = "0.13.0", default-features = false }
rustwide = "0.15.0"
mime_guess = "2"
zstd = "0.11.0"
git2 = { version = "0.14.0", default-features = false }
hostname = "0.3.1"
git2 = { version = "0.14.4", default-features = false }
path-slash = "0.2.0"
once_cell = { version = "1.4.0", features = ["parking_lot"] }
base64 = "0.13"
Expand Down
34 changes: 30 additions & 4 deletions src/bin/cratesfyi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use std::sync::Arc;
use anyhow::{anyhow, Context as _, Error, Result};
use docs_rs::db::{self, add_path_into_database, Pool, PoolClient};
use docs_rs::repositories::RepositoryStatsUpdater;
use docs_rs::utils::{remove_crate_priority, set_crate_priority};
use docs_rs::utils::{
get_config, queue_builder, remove_crate_priority, set_crate_priority, ConfigName,
};
use docs_rs::{
BuildQueue, Config, Context, Index, Metrics, PackageKind, RustwideBuilder, Server, Storage,
};
Expand Down Expand Up @@ -93,6 +95,18 @@ enum CommandLine {
socket_addr: String,
},

StartRegistryWatcher {
/// Enable or disable the repository stats updater
#[structopt(
long = "repository-stats-updater",
default_value = "disabled",
possible_values(Toggle::VARIANTS)
)]
repository_stats_updater: Toggle,
},

StartBuildServer,

/// Starts the daemon
Daemon {
/// Enable or disable the registry watcher to automatically enqueue newly published crates
Expand Down Expand Up @@ -123,6 +137,20 @@ impl CommandLine {

match self {
Self::Build(build) => build.handle_args(ctx)?,
Self::StartRegistryWatcher {
repository_stats_updater,
} => {
if repository_stats_updater == Toggle::Enabled {
docs_rs::utils::daemon::start_background_repository_stats_updater(&ctx)?;
}

docs_rs::utils::watch_registry(ctx.build_queue()?, ctx.config()?, ctx.index()?)?;
}
Self::StartBuildServer => {
let build_queue = ctx.build_queue()?;
let rustwide_builder = RustwideBuilder::init(&ctx)?;
queue_builder(rustwide_builder, build_queue)?;
}
Self::StartWebServer { socket_addr } => {
// Blocks indefinitely
let _ = Server::start(Some(&socket_addr), &ctx)?;
Expand Down Expand Up @@ -336,10 +364,8 @@ impl BuildSubcommand {
.pool()?
.get()
.context("failed to get a database connection")?;
let res =
conn.query("SELECT * FROM config WHERE name = 'rustc_version';", &[])?;

if !res.is_empty() {
if get_config::<String>(&mut conn, ConfigName::RustcVersion)?.is_some() {
println!("update-toolchain was already called in the past, exiting");
return Ok(());
}
Expand Down
155 changes: 122 additions & 33 deletions src/build_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ use crate::db::{delete_crate, Pool};
use crate::docbuilder::PackageKind;
use crate::error::Result;
use crate::storage::Storage;
use crate::utils::{get_crate_priority, report_error};
use crate::utils::{get_config, get_crate_priority, report_error, set_config, ConfigName};
use crate::{Config, Index, Metrics, RustwideBuilder};
use anyhow::Context;

use crates_index_diff::Change;
use log::{debug, info};

use std::fs;
use std::path::PathBuf;
use git2::Oid;
use std::sync::Arc;

#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize)]
Expand Down Expand Up @@ -48,6 +47,24 @@ impl BuildQueue {
}
}

pub fn last_seen_reference(&self) -> Result<Option<Oid>> {
let mut conn = self.db.get()?;
if let Some(value) = get_config::<String>(&mut conn, ConfigName::LastSeenIndexReference)? {
return Ok(Some(Oid::from_str(&value)?));
}
Ok(None)
}

fn set_last_seen_reference(&self, oid: Oid) -> Result<()> {
let mut conn = self.db.get()?;
set_config(
&mut conn,
ConfigName::LastSeenIndexReference,
oid.to_string(),
)?;
Ok(())
}

pub fn add_crate(
&self,
name: &str,
Expand Down Expand Up @@ -118,14 +135,36 @@ impl BuildQueue {
f: impl FnOnce(&QueuedCrate) -> Result<()>,
) -> Result<()> {
let mut conn = self.db.get()?;

let queued = self.queued_crates()?;
let to_process = match queued.get(0) {
let mut transaction = conn.transaction()?;

// fetch the next available crate from the queue table.
// We are using `SELECT FOR UPDATE` inside a transaction so
// the QueuedCrate is locked until we are finished with it.
// `SKIP LOCKED` here will enable another build-server to just
// skip over taken (=locked) rows and start building the first
// available one.
let to_process = match transaction
.query_opt(
"SELECT id, name, version, priority, registry
FROM queue
WHERE attempt < $1
ORDER BY priority ASC, attempt ASC, id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED",
&[&self.max_attempts],
)?
.map(|row| QueuedCrate {
id: row.get("id"),
name: row.get("name"),
version: row.get("version"),
priority: row.get("priority"),
registry: row.get("registry"),
}) {
Some(krate) => krate,
None => return Ok(()),
};

let res = f(to_process).with_context(|| {
let res = f(&to_process).with_context(|| {
format!(
"Failed to build package {}-{} from queue",
to_process.name, to_process.version
Expand All @@ -134,15 +173,16 @@ impl BuildQueue {
self.metrics.total_builds.inc();
match res {
Ok(()) => {
conn.execute("DELETE FROM queue WHERE id = $1;", &[&to_process.id])?;
transaction.execute("DELETE FROM queue WHERE id = $1;", &[&to_process.id])?;
}
Err(e) => {
// Increase attempt count
let rows = conn.query(
"UPDATE queue SET attempt = attempt + 1 WHERE id = $1 RETURNING attempt;",
&[&to_process.id],
)?;
let attempt: i32 = rows[0].get(0);
let attempt: i32 = transaction
.query_one(
"UPDATE queue SET attempt = attempt + 1 WHERE id = $1 RETURNING attempt;",
&[&to_process.id],
)?
.get(0);

if attempt >= self.max_attempts {
self.metrics.failed_builds.inc();
Expand All @@ -152,39 +192,31 @@ impl BuildQueue {
}
}

transaction.commit()?;

Ok(())
}
}

/// Locking functions.
impl BuildQueue {
pub(crate) fn lock_path(&self) -> PathBuf {
self.config.prefix.join("docsrs.lock")
}
/// Checks for the lock and returns whether it currently exists.
pub fn is_locked(&self) -> Result<bool> {
let mut conn = self.db.get()?;

/// Checks for the lock file and returns whether it currently exists.
pub fn is_locked(&self) -> bool {
self.lock_path().exists()
Ok(get_config::<bool>(&mut conn, ConfigName::QueueLocked)?.unwrap_or(false))
}

/// Creates a lock file. Daemon will check this lock file and stop operating if it exists.
/// lock the queue. Daemon will check this lock and stop operating if it exists.
pub fn lock(&self) -> Result<()> {
let path = self.lock_path();
if !path.exists() {
fs::OpenOptions::new().write(true).create(true).open(path)?;
}

Ok(())
let mut conn = self.db.get()?;
set_config(&mut conn, ConfigName::QueueLocked, true)
}

/// Removes lock file.
/// unlock the queue.
pub fn unlock(&self) -> Result<()> {
let path = self.lock_path();
if path.exists() {
fs::remove_file(path)?;
}

Ok(())
let mut conn = self.db.get()?;
set_config(&mut conn, ConfigName::QueueLocked, false)
}
}

Expand Down Expand Up @@ -266,6 +298,13 @@ impl BuildQueue {
}
}

// additionally set the reference in the database
// so this survives recreating the registry watcher
// server.
self.set_last_seen_reference(oid)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It worries me that this isn't atomic. I worry that we'll:

  • update this in the local git repo
  • fail to update it in the database
  • end up building the same list of crates twice

I guess that's not the end of the world. But it would be nice to add some more error logging, and to update it in the database before updating the git repo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the order of these, first setting the ref in the db, then in the repo.

What additional logging to you imagine?

At the call-site of get_new_crates ( now in watch_registry) all errors will be logged already.


// store the last seen reference as git reference in
// the local crates.io index repo.
diff.set_last_seen_reference(oid)?;

Ok(crates_added)
Expand Down Expand Up @@ -559,4 +598,54 @@ mod tests {
Ok(())
});
}

#[test]
fn test_last_seen_reference_in_db() {
crate::test::wrapper(|env| {
let queue = env.build_queue();
queue.unlock()?;
assert!(!queue.is_locked()?);
// initial db ref is empty
assert_eq!(queue.last_seen_reference()?, None);
assert!(!queue.is_locked()?);

let oid = git2::Oid::from_str("ffffffff")?;
queue.set_last_seen_reference(oid)?;

assert_eq!(queue.last_seen_reference()?, Some(oid));
assert!(!queue.is_locked()?);

Ok(())
});
}

#[test]
fn test_broken_db_reference_breaks() {
crate::test::wrapper(|env| {
let mut conn = env.db().conn();
set_config(&mut conn, ConfigName::LastSeenIndexReference, "invalid")?;

let queue = env.build_queue();
assert!(queue.last_seen_reference().is_err());

Ok(())
});
}

#[test]
fn test_queue_lock() {
crate::test::wrapper(|env| {
let queue = env.build_queue();
// unlocked without config
assert!(!queue.is_locked()?);

queue.lock()?;
assert!(queue.is_locked()?);

queue.unlock()?;
assert!(!queue.is_locked()?);

Ok(())
});
}
}
5 changes: 3 additions & 2 deletions src/db/add_package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,15 @@ pub(crate) fn add_build_into_database(
) -> Result<i32> {
debug!("Adding build into database");
let rows = conn.query(
"INSERT INTO builds (rid, rustc_version, docsrs_version, build_status)
VALUES ($1, $2, $3, $4)
"INSERT INTO builds (rid, rustc_version, docsrs_version, build_status, build_server)
VALUES ($1, $2, $3, $4, $5)
RETURNING id",
&[
&release_id,
&res.rustc_version,
&res.docsrs_version,
&res.successful,
&hostname::get()?.to_str().unwrap_or(""),
],
)?;
Ok(rows[0].get(0))
Expand Down
8 changes: 7 additions & 1 deletion src/db/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,13 @@ pub fn migrate(version: Option<Version>, conn: &mut Client) -> crate::error::Res
)
.map(|_| ())
}
)
),
sql_migration!(
context, 33, "add hostname to build-table",
"ALTER TABLE builds ADD COLUMN build_server TEXT NOT NULL DEFAULT '';",
"ALTER TABLE builds DROP COLUMN build_server;",
),

];

for migration in migrations {
Expand Down
16 changes: 9 additions & 7 deletions src/docbuilder/rustwide_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use crate::error::Result;
use crate::index::api::ReleaseData;
use crate::repositories::RepositoryStatsUpdater;
use crate::storage::{rustdoc_archive_path, source_archive_path};
use crate::utils::{copy_dir_all, parse_rustc_version, queue_builder, CargoMetadata};
use crate::utils::{
copy_dir_all, parse_rustc_version, queue_builder, set_config, CargoMetadata, ConfigName,
};
use crate::{db::blacklist::is_blacklisted, utils::MetadataPackage};
use crate::{Config, Context, Index, Metrics, Storage};
use anyhow::{anyhow, bail, Error};
Expand All @@ -20,7 +22,6 @@ use rustwide::cmd::{Command, CommandError, SandboxBuilder, SandboxImage};
use rustwide::logging::{self, LogStorage};
use rustwide::toolchain::ToolchainError;
use rustwide::{AlternativeRegistry, Build, Crate, Toolchain, Workspace, WorkspaceBuilder};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
Expand Down Expand Up @@ -225,12 +226,12 @@ impl RustwideBuilder {
.tempdir()?;
copy_dir_all(source, &dest)?;
add_path_into_database(&self.storage, "", &dest)?;
conn.query(
"INSERT INTO config (name, value) VALUES ('rustc_version', $1) \
ON CONFLICT (name) DO UPDATE SET value = $1;",
&[&Value::String(self.rustc_version.clone())],
)?;

set_config(
&mut conn,
ConfigName::RustcVersion,
self.rustc_version.clone(),
)?;
Ok(())
})()
.map_err(|e| failure::Error::from_boxed_compat(e.into()))
Expand Down Expand Up @@ -806,6 +807,7 @@ pub(crate) struct BuildResult {
mod tests {
use super::*;
use crate::test::{assert_redirect, assert_success, wrapper};
use serde_json::Value;

#[test]
#[ignore]
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl Metrics {
self.idle_db_connections.set(pool.idle_connections() as i64);
self.used_db_connections.set(pool.used_connections() as i64);
self.max_db_connections.set(pool.max_size() as i64);
self.queue_is_locked.set(queue.is_locked() as i64);
self.queue_is_locked.set(queue.is_locked()? as i64);

self.queued_crates_count.set(queue.pending_count()? as i64);
self.prioritized_crates_count
Expand Down
Loading