Skip to content

Commit

Permalink
refactor(structured_doc): split pre-sync stage to optimize pull reque…
Browse files Browse the repository at this point in the history
…st indexing speed (#3538)

* refactor(structured_doc): support pre sync

chore(structured_doc): presync pull state before indexing

Signed-off-by: Wei Zhang <[email protected]>

chore: skip indexing gitlab pulls (#3515)

Signed-off-by: Wei Zhang <[email protected]>
# Conflicts:
#	ee/tabby-webserver/src/service/background_job/third_party_integration.rs

chore: use a todo id fn to create pull id for state

Signed-off-by: Wei Zhang <[email protected]>

chore: extract sync issue function

Signed-off-by: Wei Zhang <[email protected]>

Update ee/tabby-webserver/src/service/background_job/third_party_integration/pulls.rs

Co-authored-by: Wei Zhang <[email protected]>

* [autofix.ci] apply automated fixes

* chore: save raw doc in structured_doc_state to reduce unnecessary requests

Signed-off-by: Wei Zhang <[email protected]>

* chore: return pull when pre sync

Signed-off-by: Wei Zhang <[email protected]>

* chore: reduce unnecessary backtrace

Signed-off-by: Wei Zhang <[email protected]>

* [autofix.ci] apply automated fixes

* chore: add changie log

Signed-off-by: Wei Zhang <[email protected]>

---------

Signed-off-by: Wei Zhang <[email protected]>
Co-authored-by: Meng Zhang <[email protected]>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 12, 2024
1 parent 0603c8e commit 860522a
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 166 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Fixed and Improvements
body: Refactors the pull request indexing process to enhance the speed of incremental indexing for pull docs.
time: 2024-12-12T09:57:38.860665+08:00
45 changes: 17 additions & 28 deletions crates/tabby-index/src/indexer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,13 @@ mod structured_doc_tests {
let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
let updated = indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await;
.presync(&StructuredDocState {
id: doc.id().to_string(),
updated_at,
deleted: false,
})
.await
&& indexer.sync(doc).await;
println!("{}", updated);
updated
});
Expand Down Expand Up @@ -119,14 +118,13 @@ mod structured_doc_tests {
let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
let updated = indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await;
.presync(&StructuredDocState {
id: doc.id().to_string(),
updated_at,
deleted: false,
})
.await
&& indexer.sync(doc).await;
println!("{}", updated);
updated
});
Expand Down Expand Up @@ -163,18 +161,9 @@ mod structured_doc_tests {
}),
};

let updated_at = chrono::Utc::now();
let res = tokio::runtime::Runtime::new().unwrap().block_on(async {
indexer
.sync(
StructuredDocState {
updated_at,
deleted: false,
},
doc,
)
.await
});
let res = tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { indexer.sync(doc).await });
assert!(res);
indexer.commit();

Expand Down
38 changes: 25 additions & 13 deletions crates/tabby-index/src/structured_doc/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ use crate::{indexer::TantivyDocBuilder, Indexer};
/// StructuredDocState tracks the state of the document source.
/// It helps determine whether the document should be updated or deleted.
pub struct StructuredDocState {
// id is the unique identifier of the document.
// It is used to track the document in the indexer.
pub id: String,

// updated_at is the time when the document was last updated.
// when the updated_at is earlier than the document's index time,
// the update will be skipped.
pub updated_at: DateTime<Utc>,

// deleted indicates whether the document should be removed from the indexer.
// For instance, a closed pull request will be marked as deleted,
// prompting the indexer to remove it from the index.
Expand All @@ -39,21 +44,34 @@ impl StructuredDocIndexer {
Self { indexer, builder }
}

// Runs pre-sync checks to determine if the document needs to be updated.
// Returns false if `sync` is not required to be called.
pub async fn presync(&self, state: &StructuredDocState) -> bool {
if state.deleted {
self.indexer.delete(&state.id);
return false;
}

if self.indexer.is_indexed_after(&state.id, state.updated_at)
&& !self.indexer.has_failed_chunks(&state.id)
{
return false;
};

true
}

// The sync process updates the document in the indexer incrementally.
// It first determines whether the document requires an update.
//
// If an update is needed, it checks the deletion state of the document.
// If the document is marked as deleted, it will be removed.
// Next, the document is rebuilt, the original is deleted, and the newly indexed document is added.
pub async fn sync(&self, state: StructuredDocState, document: StructuredDoc) -> bool {
if !self.require_updates(state.updated_at, &document) {
pub async fn sync(&self, document: StructuredDoc) -> bool {
if !self.require_updates(&document) {
return false;
}

if state.deleted {
return self.delete(document.id()).await;
}

stream! {
let (id, s) = self.builder.build(document).await;
self.indexer.delete(&id);
Expand All @@ -80,7 +98,7 @@ impl StructuredDocIndexer {
self.indexer.commit();
}

fn require_updates(&self, updated_at: DateTime<Utc>, document: &StructuredDoc) -> bool {
fn require_updates(&self, document: &StructuredDoc) -> bool {
if document.should_skip() {
return false;
}
Expand All @@ -89,12 +107,6 @@ impl StructuredDocIndexer {
return true;
}

if self.indexer.is_indexed_after(document.id(), updated_at)
&& !self.indexer.has_failed_chunks(document.id())
{
return false;
};

true
}

Expand Down
2 changes: 1 addition & 1 deletion ee/tabby-schema/src/schema/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,5 @@ pub trait IntegrationService: Send + Sync {
) -> Result<Vec<Integration>>;

async fn get_integration(&self, id: ID) -> Result<Integration>;
async fn update_integration_sync_status(&self, id: ID, error: Option<String>) -> Result<()>;
async fn update_integration_sync_status(&self, id: &ID, error: Option<String>) -> Result<()>;
}
2 changes: 1 addition & 1 deletion ee/tabby-schema/src/schema/repository/third_party.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub trait ThirdPartyRepositoryService: Send + Sync + RepositoryProvider {
last: Option<usize>,
) -> Result<Vec<ProvidedRepository>>;

async fn get_provided_repository(&self, id: ID) -> Result<ProvidedRepository>;
async fn get_provided_repository(&self, id: &ID) -> Result<ProvidedRepository>;

async fn update_repository_active(&self, id: ID, active: bool) -> Result<()>;
async fn upsert_repository(
Expand Down
139 changes: 107 additions & 32 deletions ee/tabby-webserver/src/service/background_job/third_party_integration.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::sync::Arc;

use anyhow::Result;
use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use issues::{list_github_issues, list_gitlab_issues};
use juniper::ID;
use pulls::list_github_pulls;
use pulls::{get_github_pull_doc, list_github_pull_states};
use serde::{Deserialize, Serialize};
use tabby_common::config::CodeRepository;
use tabby_index::public::{CodeIndexer, StructuredDoc, StructuredDocIndexer, StructuredDocState};
Expand Down Expand Up @@ -90,7 +91,7 @@ impl SchedulerGithubGitlabJob {
integration_service: Arc<dyn IntegrationService>,
) -> tabby_schema::Result<()> {
let repository = repository_service
.get_provided_repository(self.repository_id)
.get_provided_repository(&self.repository_id)
.await?;
let integration = integration_service
.get_integration(repository.integration_id.clone())
Expand All @@ -116,50 +117,105 @@ impl SchedulerGithubGitlabJob {
"Indexing documents for repository {}",
repository.display_name
);
let index = StructuredDocIndexer::new(embedding);
let issue_stream = match fetch_all_issues(&integration, &repository).await {

self.sync_issues(
&integration,
integration_service.clone(),
&repository,
embedding.clone(),
)
.await?;

self.sync_pulls(&integration, integration_service, &repository, embedding)
.await?;

Ok(())
}

async fn sync_pulls(
&self,
integration: &Integration,
integration_service: Arc<dyn IntegrationService>,
repository: &ProvidedRepository,
embedding: Arc<dyn Embedding>,
) -> tabby_schema::Result<()> {
let mut pull_state_stream = match fetch_all_pull_states(integration, repository).await {
Ok(s) => s,
Err(e) => {
integration_service
.update_integration_sync_status(integration.id, Some(e.to_string()))
.update_integration_sync_status(&integration.id, Some(e.to_string()))
.await?;
logkit::error!("Failed to fetch issues: {}", e);
return Err(e);
logkit::error!("Failed to fetch pulls: {}", e);
return Ok(());
}
};

let pull_stream = match fetch_all_pulls(&integration, &repository).await {
Ok(s) => Some(s),
let mut count = 0;
let mut num_updated = 0;

let index = StructuredDocIndexer::new(embedding);
while let Some((pull, state)) = pull_state_stream.next().await {
count += 1;
if count % 100 == 0 {
logkit::info!(
"{} pull docs seen, {} pull docs updated",
count,
num_updated
);
}

if !index.presync(&state).await {
continue;
}

let pull_doc = fetch_pull_structured_doc(integration, repository, pull).await?;

index.sync(pull_doc).await;
num_updated += 1;
}
logkit::info!(
"{} pull docs seen, {} pull docs updated",
count,
num_updated
);
index.commit();

Ok(())
}

async fn sync_issues(
&self,
integration: &Integration,
integration_service: Arc<dyn IntegrationService>,
repository: &ProvidedRepository,
embedding: Arc<dyn Embedding>,
) -> tabby_schema::Result<()> {
let issue_stream = match fetch_all_issues(integration, repository).await {
Ok(s) => s,
Err(e) => {
integration_service
.update_integration_sync_status(integration.id, Some(e.to_string()))
.update_integration_sync_status(&integration.id, Some(e.to_string()))
.await?;
logkit::warn!("Failed to fetch pulls: {}", e);
None
logkit::error!("Failed to fetch issues: {}", e);
return Err(e);
}
};

let index = StructuredDocIndexer::new(embedding);
stream! {
let mut count = 0;
let mut num_updated = 0;
let combined_stream = if let Some(pull_stream) = pull_stream {
issue_stream.chain(pull_stream).boxed()
} else {
issue_stream.boxed()
};

for await (state, doc) in combined_stream {
if index.sync(state, doc).await {
num_updated += 1
for await (state, doc) in issue_stream {
if index.presync(&state).await && index.sync(doc).await {
num_updated += 1
}
count += 1;
if count % 100 == 0 {
logkit::info!("{} issue docs seen, {} issue docs updated", count, num_updated);
};
}

count += 1;
if count % 100 == 0 {
logkit::info!("{} docs seen, {} docs updated", count, num_updated);
};
}

logkit::info!("{} docs seen, {} docs updated", count, num_updated);
logkit::info!("{} issue docs seen, {} issue docs updated", count, num_updated);
index.commit();
}
.count()
Expand Down Expand Up @@ -212,13 +268,13 @@ async fn fetch_all_issues(

Ok(s)
}
async fn fetch_all_pulls(

async fn fetch_all_pull_states(
integration: &Integration,
repository: &ProvidedRepository,
) -> tabby_schema::Result<BoxStream<'static, (StructuredDocState, StructuredDoc)>> {
) -> tabby_schema::Result<BoxStream<'static, (pulls::Pull, StructuredDocState)>> {
match &integration.kind {
IntegrationKind::Github | IntegrationKind::GithubSelfHosted => Ok(list_github_pulls(
&repository.source_id(),
IntegrationKind::Github | IntegrationKind::GithubSelfHosted => Ok(list_github_pull_states(
integration.api_base(),
&repository.display_name,
&integration.access_token,
Expand All @@ -230,3 +286,22 @@ async fn fetch_all_pulls(
)),
}
}

async fn fetch_pull_structured_doc(
integration: &Integration,
repository: &ProvidedRepository,
pull: pulls::Pull,
) -> Result<StructuredDoc> {
match pull {
pulls::Pull::GitHub(pull) => {
get_github_pull_doc(
&repository.source_id(),
pull,
integration.api_base(),
&repository.display_name,
&integration.access_token,
)
.await
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub async fn list_github_issues(
})
};
yield (StructuredDocState {
id: doc.id().to_string(),
updated_at: issue.updated_at,
deleted: false,
}, doc);
Expand Down Expand Up @@ -133,6 +134,7 @@ pub async fn list_gitlab_issues(
closed: issue.state == "closed",
})};
yield (StructuredDocState {
id: doc.id().to_string(),
updated_at: issue.updated_at,
deleted: false,
}, doc);
Expand Down
Loading

0 comments on commit 860522a

Please sign in to comment.