Skip to content

Commit

Permalink
feat: create job run before run background jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Zhang <[email protected]>
  • Loading branch information
zwpaper committed Jan 13, 2025
1 parent 6bd3b02 commit 5238b31
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 82 deletions.
45 changes: 19 additions & 26 deletions ee/tabby-webserver/src/service/background_job/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tabby_db::DbConn;
use tabby_schema::{context::ContextService, CoreError};
use tracing::warn;

use super::helper::Job;
use super::helper::{Job, JobLogger};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DbMaintainanceJob;
Expand All @@ -15,32 +14,27 @@ impl Job for DbMaintainanceJob {
const NAME: &'static str = "db_maintainance";
}

macro_rules! append_error {
($errors:expr, $e:expr, $($arg:tt)*) => {
{
let msg = format!($($arg)*);
warn!("{}: {}", msg, $e);
$errors.push(msg);
}
};
}

impl DbMaintainanceJob {
pub async fn cron(
now: DateTime<Utc>,
context: Arc<dyn ContextService>,
db: DbConn,
job_id: i64,
) -> tabby_schema::Result<()> {
let mut errors = vec![];
let logger = JobLogger::new(db.clone(), job_id);
let mut exit_code = 0;

if let Err(e) = db.delete_expired_token().await {
append_error!(errors, e, "Failed to delete expired tokens");
exit_code = -1;
logkit::warn!(exit_code = exit_code; "Failed to delete expired tokens: {}", e);
};
if let Err(e) = db.delete_expired_password_resets().await {
append_error!(errors, e, "Failed to delete expired password resets");
exit_code = -1;
logkit::warn!(exit_code = exit_code; "Failed to delete expired password resets: {}", e);
};
if let Err(e) = db.delete_expired_ephemeral_threads().await {
append_error!(errors, e, "Failed to delete expired ephemeral threads");
exit_code = -1;
logkit::warn!(exit_code = exit_code; "Failed to delete expired ephemeral threads: {}", e);
};

// Read all active sources
Expand All @@ -55,28 +49,27 @@ impl DbMaintainanceJob {
.delete_unused_source_id_read_access_policy(&active_source_ids)
.await
{
append_error!(
errors,
e,
"Failed to delete unused source id read access policy"
);
exit_code = -1;
logkit::warn!(exit_code = exit_code; "Failed to delete unused source id read access policy: {}", e);
};
}
Err(e) => {
append_error!(errors, e, "Failed to read active sources");
exit_code = -1;
logkit::warn!(exit_code = exit_code; "Failed to read active sources: {}", e);
}
}

if let Err(e) = Self::data_retention(now, &db).await {
append_error!(errors, e, "Failed to run data retention job");
exit_code = -1;
logkit::warn!(exit_code = exit_code; "Failed to run data retention job: {}", e);
}

if errors.is_empty() {
logger.finalize().await;
if exit_code == 0 {
Ok(())
} else {
Err(CoreError::Other(anyhow::anyhow!(
"Failed to run db maintenance job:\n\n{}",
errors.join(";\n\n")
"Failed to run db maintenance job"
)))
}
}
Expand Down
19 changes: 14 additions & 5 deletions ee/tabby-webserver/src/service/background_job/git.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tabby_index::public::CodeIndexer;
use tabby_inference::Embedding;
use tabby_schema::{job::JobService, repository::GitRepositoryService};

use super::{helper::Job, BackgroundJobEvent};
use super::{helper::Job, BackgroundJobEvent, JobLogger};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SchedulerGitJob {
Expand Down Expand Up @@ -41,11 +41,18 @@ impl SchedulerGitJob {
_now: DateTime<Utc>,
git_repository: Arc<dyn GitRepositoryService>,
job: Arc<dyn JobService>,
db: tabby_db::DbConn,
job_id: i64,
) -> tabby_schema::Result<()> {
let repositories = git_repository
.repository_list()
.await
.context("Must be able to retrieve repositories for sync")?;
let logger = JobLogger::new(db.clone(), job_id);
let repositories = match git_repository.repository_list().await {
Ok(repos) => repos,
Err(err) => {
logkit::warn!(exit_code = -1; "Failed to list repositories: {}", err);
logger.finalize().await;
return Err(err);
}
};

let repositories: Vec<_> = repositories
.into_iter()
Expand All @@ -57,6 +64,8 @@ impl SchedulerGitJob {
.trigger(BackgroundJobEvent::SchedulerGitRepository(repository).to_command())
.await;
}

logger.finalize().await;
Ok(())
}
}
9 changes: 8 additions & 1 deletion ee/tabby-webserver/src/service/background_job/helper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ mod logger;
pub use cron::CronStream;
pub use logger::JobLogger;

pub trait Job {
pub trait Job: serde::Serialize {
const NAME: &'static str;

fn name(&self) -> &'static str {
Self::NAME
}
fn to_command(&self) -> String {
serde_json::to_string(self).unwrap()
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::sync::Arc;

use serde::Serialize;
use tabby_index::public::{run_index_garbage_collection, CodeIndexer};
use tabby_schema::{context::ContextService, repository::RepositoryService};

use super::helper::Job;
use super::helper::{Job, JobLogger};

#[derive(Serialize)]
pub struct IndexGarbageCollection;

impl Job for IndexGarbageCollection {
Expand All @@ -16,22 +18,45 @@ impl IndexGarbageCollection {
self,
repository: Arc<dyn RepositoryService>,
context: Arc<dyn ContextService>,
db: tabby_db::DbConn,
job_id: i64,
) -> tabby_schema::Result<()> {
let logger = JobLogger::new(db.clone(), job_id);

// Run garbage collection on the index
let sources = context
.read(None)
.await?
let sources = match context.read(None).await {
Ok(sources) => sources,
Err(err) => {
logkit::warn!(exit_code = -1; "Failed to list sources: {}", err);
logger.finalize().await;
return Err(err);
}
};
let sources = sources
.sources
.into_iter()
.map(|x| x.source_id())
.collect::<Vec<_>>();
run_index_garbage_collection(sources)?;

if let Err(e) = run_index_garbage_collection(sources) {
logkit::warn!(exit_code = -1; "Failed to run index garbage collection: {}", e);
logger.finalize().await;
return Err(e.into());
}

// Run garbage collection on the code repositories (cloned directories)
let repositories = repository.list_all_code_repository().await?;
let repositories = match repository.list_all_code_repository().await {
Ok(repos) => repos,
Err(err) => {
logkit::warn!(exit_code = -1; "Failed to list repositories: {}", err);
logger.finalize().await;
return Err(err);
}
};
let mut code = CodeIndexer::default();
code.garbage_collection(&repositories).await;

logger.finalize().await;
Ok(())
}
}
27 changes: 22 additions & 5 deletions ee/tabby-webserver/src/service/background_job/license_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tabby_schema::{
context::ContextService,
license::{LicenseService, LicenseType},
notification::{NotificationRecipient, NotificationService},
};

use super::helper::Job;
use super::helper::{Job, JobLogger};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LicenseCheckJob;
Expand All @@ -22,21 +21,39 @@ impl LicenseCheckJob {
_now: DateTime<Utc>,
license_service: Arc<dyn LicenseService>,
notification_service: Arc<dyn NotificationService>,
db: tabby_db::DbConn,
job_id: i64,
) -> tabby_schema::Result<()> {
let license = license_service.read().await?;
let logger = JobLogger::new(db.clone(), job_id);

let license = match license_service.read().await {
Ok(license) => license,
Err(err) => {
logkit::warn!(exit_code = -1; "Failed to read license: {}", err);
logger.finalize().await;
return Err(err);
}
};
if license.r#type == LicenseType::Community {
return Ok(());
}
if let Some(expire_in_days) = license.expire_in_days() {
if expire_in_days < 7 && expire_in_days > 0 {
notification_service
if let Err(e) = notification_service
.create(
NotificationRecipient::Admin,
&make_expring_message(expire_in_days),
)
.await?;
.await
{
logkit::warn!(exit_code = -1; "Failed to create notification: {}", e);
logger.finalize().await;
return Err(e);
}
}
}

logger.finalize().await;
Ok(())
}
}
Expand Down
Loading

0 comments on commit 5238b31

Please sign in to comment.