-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Kibana task to deploy agentless connectors for 9.0 #203973
base: main
Are you sure you want to change the base?
Conversation
0500e73
to
7456f1b
Compare
fetchAllAgentPolicies: agentPolicyService.fetchAllAgentPolicies, | ||
fetchAllAgentPolicyIds: agentPolicyService.fetchAllAgentPolicyIds, | ||
}, | ||
agentPolicyService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done due to the fact that I needed a create
method that depends on a lot of other private/internal methods.
I had to either make the methods public + add here; or I could pass the service itself. Potentially there might be other way, but I'm not familiar enough with Kibana development yet to know, please tell me if there's a better way :)
Pinging @elastic/fleet (Team:Fleet) |
@@ -196,7 +196,7 @@ export const bulkGetAgentPoliciesHandler: FleetRequestHandler< | |||
'full query parameter require agent policies read permissions' | |||
); | |||
} | |||
let items = await agentPolicyService.getByIDs(soClient, ids, { | |||
let items = await agentPolicyService.getByIds(soClient, ids, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side-effect of removing the usage of AgentPolicyServiceInterface: interface had getByIDs
and implementation has getByIds
. I chose the latter to stay, but it's easy to rename implementation to getByIDs
. This was mostly done to avoid pinging other code owners that might have used the interface method name.
if (policy.supports_agentless !== true) { | ||
this.logger.debug(`Policy ${policy.id} does not support agentless, skipping`); | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason this doesn't work - I never get a policy that has supports_agentless
field.
throw new Error(`Connector ${connector.id} service_type is null or empty`); | ||
} | ||
|
||
if (NATIVE_CONNECTOR_DEFINITIONS[connector.service_type] == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using our regular NATIVE_CONNECTOR_DEFINITIONS
as a source of truth for connectors that we support. I could theoretically instead list integrations that are branched off connectors-py
instead, is it possible/better?
const AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_ID = 'search:agentless-connectors-sync-task'; | ||
const AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_TYPE = 'search:agentless-connectors-sync'; | ||
|
||
const SCHEDULE = { interval: '1m' }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elastic/fleet - what's the minimal interval with which we could query fleet package policies (we narrow them with a kuery that only returns our package elastic_connectors
?
Can we do 10 seconds? 30 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we only query for a certain package, there shouldn't be too many results, so it shouldn't be a problem with scale. I think using 30s sounds fine too, 10s might be too frequent.
description: | ||
'This task peridocally checks native connectors, agent policies and syncs them if they are out of sync', | ||
timeout: '1m', | ||
maxAttempts: 3, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even need to retry, since we run pretty often?
41d1313
to
6d02c2a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fleet changes LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great stuff! The changes in the search_connectors
plugin LGTM. I have a couple of minor comments regarding naming and one question about hardcoding the package version in the task manager logic.
I’ll defer reviewing the changes in the fleet plugin to the fleet team. EDIT: I see they just approved 🚀
x-pack/solutions/search/plugins/search_connectors/server/task.ts
Outdated
Show resolved
Hide resolved
x-pack/solutions/search/plugins/search_connectors/server/task.ts
Outdated
Show resolved
Hide resolved
x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts
Outdated
Show resolved
Hide resolved
|
||
const connectorsInputName = 'connectors-py'; | ||
const pkgName = 'elastic_connectors'; | ||
const pkgVersion = '0.0.4'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this version hardcoded here? The current (latest) version in the integration registry should be def tracked somewhere by fleet, can we look it up in the package registry dynamically?
Context, 0.0.4
is already outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe code edited in this PR will help? https://github.com/elastic/kibana/pull/192081/files here I was able to access package info and adjust permissions dynamically
|
||
const SCHEDULE = { interval: '1m' }; | ||
|
||
export function infraSyncTaskRunner( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
potential followup: It would be cool if we could force schedule this when e.g. a user creates a new connector. This would limit the wait time for the infrastructure to get deployed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! Should be easy, because I already actually schedule this task during plugin startup - doing it again is easy and will require really minor refactoring.
x-pack/solutions/search/plugins/search_connectors/server/task.ts
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚢
const taskInstance = await taskManager.ensureScheduled({ | ||
id: AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_ID, | ||
taskType: AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_TYPE, | ||
schedule: SCHEDULE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taking a quick look here from Response Ops. I was reading the PR description and was wondering if we need to have this task run every 30s indefinitely or if it would be possible to make it event based so it runs after a user creates or deletes a connector? Or perhaps a combo of the two but the schedule runs less frequently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was thinking the same ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now this seemed to us like the best way to move forward:
The task runs and checks if any agentless policies need to be created for our connector records. Connector records can be created in multiple ways:
- User creates a connector via UI
- Connector is created automatically by already running agentless connector deployment
- User creates a connector via API/CLI
Scenario #1 can be done with an event triggered by Kibana UI easily. Scenario #2 does not need this logic. Scenario #3 really needs this task - our CLI doesn't have access to Task Manager + our API is hosted in Elasticsearch, and Elasticsearch also has no way to affect this task run time.
This way we've taken current approach with polling every 30 seconds (a minute should be fine too), plus the task itself queries reasonably small amount of data, I believe, for it hopefully not to be too problematic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GenAI connectors have a similar sort of constraint, where something in Kibana wants to know when connectors get created / updated / deleted. Added in #189027
That PR originally contained some connector logic for the new "hooks", but we extracted that and restructured into a stand-alone PR: #194081 , rather than ship the two pieces together.
So, in theory case 3 can be handled this way.
Looking at those PRs, I'm also wondering if you need to handle the case of connectors being updated / deleted ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've skimmed through. the change but don't understand how it handles case 3 - we have customer calling Elasticsearch API directly, Kibana is not involved in this.
So we cannot have hooks attached to this call, all we can do is poll the content of a couple indices to see if changes were made. Am I missing some detail in the mentioned PR that works around this limitation?
Connector update is not important for us, but deletion is also handled in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, these aren't alerting connectors? These are "search" connectors? If so, you're correct, completely different "connector" framework I was talking about (I was talking about the alerting connectors).
💛 Build succeeded, but was flaky
Failed CI StepsTest Failures
Metrics [docs]
History
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ResponseOps code LGTM, left a few comments
const AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_ID = 'search:agentless-connectors-manager-task'; | ||
const AGENTLESS_CONNECTOR_DEPLOYMENTS_SYNC_TASK_TYPE = 'search:agentless-connectors-manager'; | ||
|
||
const SCHEDULE = { interval: '30s' }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting this to the largest value you are willing to live with, will be helpful to Kibana's task throughput :-)
I believe a comment in the PR indicated it could be set to "1m"
which would cut down the executions by 50% (useful!)
}; | ||
} | ||
}, | ||
cancel: async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that if you want to have the cancel
actually stop the task from running, you'll have to do a bit more. This function is invoked when TM decides the task needs to be cancelled (running longer than it's time limit). The basic idea is you set a local indicating you've been cancelled, and then can check that in the run()
method. Example here:
Lines 250 to 279 in 0e13d86
const createTaskRunnerFactory = | |
({ | |
logger, | |
telemetry, | |
executeEnrichPolicy, | |
getStoreSize, | |
}: { | |
logger: Logger; | |
telemetry: AnalyticsServiceSetup; | |
executeEnrichPolicy: ExecuteEnrichPolicy; | |
getStoreSize: GetStoreSize; | |
}) => | |
({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => { | |
let cancelled = false; | |
const isCancelled = () => cancelled; | |
return { | |
run: async () => | |
runTask({ | |
executeEnrichPolicy, | |
getStoreSize, | |
isCancelled, | |
logger, | |
taskInstance, | |
telemetry, | |
}), | |
cancel: async () => { | |
cancelled = true; | |
}, | |
}; | |
}; |
isCancelled()
local function they created - I think it did at one point, must have been removed in another PR ...
Closes https://github.com/elastic/search-team/issues/8508
Closes https://github.com/elastic/search-team/issues/8465
Summary
This PR adds a background task for search_connectors plugin. This task checks connector records and agentless package policies and sees if new connector was added/old was deleted, and then adds/deletes package policies for these connectors.
Scenario 1: a new connector was added by a user/API call
User creates an Elastic-managed connector:
Screen.Recording.2024-12-25.at.12.59.14.mov
When the user is done, a package policy is created by this background task:
Screen.Recording.2024-12-25.at.13.00.14.mov
Scenario 2: a connector was deleted by a user/API call
User deletes an Elastic-managed connector:
Screen.Recording.2024-12-25.at.13.21.13.mov
Checklist
Check the PR satisfies following conditions.
Reviewers should verify this PR satisfies this list as well.
release_note:breaking
label should be applied in these situations.release_note:*
label is applied per the guidelines