From aaf5ed4694df1eebae7682f4fa957787be0e16c9 Mon Sep 17 00:00:00 2001 From: zacYL <100330102+zacYL@users.noreply.github.com> Date: Mon, 28 Aug 2023 14:42:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E7=9B=AE=E5=BD=95?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=20#1012=20(#1065)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: v1 #1012 * feat: 统计目录size大小 #1012 * feat: 存储调整 #1012 * feat: 数据刷新调整 #1012 * feat: 数据刷新调整 #1012 * feat: 缓存数据存入调整 #1012 * feat: 目录节点大小统计调整 #1012 * feat: 目录统计表中增加节点数 #1012 * feat: 增加统计以及下载接口 #1012 * feat: 执行计划变更为每周6执行一次 #1012 * feat: 修复目录层级不全导致数据不准确的问题 #1012 * feat: 修复目录层级不全导致数据不准确的问题 #1012 * feat: 目录size存储回node表中 #1012 * feat: 删除多余代码 #1012 * feat: 删除多余代码 #1012 * feat: job增加回滚操作 #1012 * feat: 类名调整 #1012 * feat: 转换报错修复 #1012 * feat: 修改时间不更新 #1012 * feat: move节点更新逻辑调整 #1012 * feat: move节点更新逻辑调整 #1012 * feat: 没有根目录这个节点,所以忽略path为/的节点 #1012 * feat: 增加子job执行策略 #1012 * feat: 去除多余代码 #1012 * feat: 将实时计算结果更新到db中 #1012 * feat: 计算逻辑调整 #1012 * feat: 测试用例调整 #1012 * feat: 目录统计缓存逻辑调整 #1012 * feat: 使用redis存储缓存记录 #1012 * feat: 修复ERR unknown command 'KEYS' #1012 * feat: redis存储逻辑调整,避免使用keys或者scan命令报错问题 #1012 * feat: 代码调整 #1012 * feat: 批量操作 #1012 * feat: redis key逻辑调整 #1012 * feat: redis存储逻辑调整;执行计划支持每天执行部分表 #1012 * feat: 配置调整 #1012 * feat: 空列表判断 #1012 * feat: 写redis前,先写内存缓存,内存缓存满后再更新到redis #1012 * feat: nodenum取值调整 #1012 * feat: redis设置值方式调整 #1012 * feat: 存储数据读取redis调整 #1012 * feat: 新增节点下非目录节点数量字段 #1012 --- .../bkrepo/analyst/utils/EasyExcelUtils.kt | 27 + .../artifact/constant/ArtifactConstants.kt | 6 + .../bkrepo/common/artifact/path/PathUtils.kt | 13 + src/backend/job/biz-job/build.gradle.kts | 1 + .../com/tencent/bkrepo/job/Constants.kt | 8 + .../batch/base/CompositeMongoDbBatchJob.kt | 6 + .../bkrepo/job/batch/base/MongoDbBatchJob.kt | 7 + .../job/batch/context/FolderChildContext.kt | 52 ++ .../batch/context/ProjectRepoChildContext.kt | 8 +- .../job/batch/node/FolderStatChildJob.kt | 549 ++++++++++++++++++ .../node/NodeStatCompositeMongoDbBatchJob.kt | 9 +- .../job/batch/node/ProjectRepoStatChildJob.kt | 4 +- ...eStatCompositeMongoDbBatchJobProperties.kt | 15 +- .../bkrepo/opdata/constant/Constants.kt | 2 + .../opdata/controller/NodeController.kt | 2 +- .../bkrepo/opdata/service/NodeService.kt | 2 +- .../bkrepo/repository/pojo/node/NodeDetail.kt | 2 + .../bkrepo/repository/pojo/node/NodeInfo.kt | 2 + .../repository/pojo/node/NodeSizeInfo.kt | 2 + .../tencent/bkrepo/repository/dao/NodeDao.kt | 35 ++ .../listener/NodeModifyEventListener.kt | 331 +++++++++++ .../tencent/bkrepo/repository/model/TNode.kt | 1 + .../service/node/impl/NodeBaseService.kt | 2 + .../service/node/impl/NodeStatsSupport.kt | 18 +- .../bkrepo/repository/util/NodeQueryHelper.kt | 9 + .../repository/service/NodeServiceTest.kt | 2 + 26 files changed, 1103 insertions(+), 12 deletions(-) create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/FolderChildContext.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/FolderStatChildJob.kt create mode 100644 src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeModifyEventListener.kt diff --git a/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/utils/EasyExcelUtils.kt b/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/utils/EasyExcelUtils.kt index 19e1e6c95c..7dd41de3c9 100644 --- a/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/utils/EasyExcelUtils.kt +++ b/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/utils/EasyExcelUtils.kt @@ -1,3 +1,30 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + package com.tencent.bkrepo.analyst.utils import com.alibaba.excel.EasyExcel diff --git a/src/backend/common/common-artifact/artifact-api/src/main/kotlin/com/tencent/bkrepo/common/artifact/constant/ArtifactConstants.kt b/src/backend/common/common-artifact/artifact-api/src/main/kotlin/com/tencent/bkrepo/common/artifact/constant/ArtifactConstants.kt index 8681d0ffbb..63ab74ca67 100644 --- a/src/backend/common/common-artifact/artifact-api/src/main/kotlin/com/tencent/bkrepo/common/artifact/constant/ArtifactConstants.kt +++ b/src/backend/common/common-artifact/artifact-api/src/main/kotlin/com/tencent/bkrepo/common/artifact/constant/ArtifactConstants.kt @@ -112,6 +112,12 @@ const val PIPELINE = "pipeline" */ const val REPORT = "report" + +/** + * 日志仓库 + */ +const val LOG = "log" + /** * 文件访问请求是否为直接下载 */ diff --git a/src/backend/common/common-artifact/artifact-api/src/main/kotlin/com/tencent/bkrepo/common/artifact/path/PathUtils.kt b/src/backend/common/common-artifact/artifact-api/src/main/kotlin/com/tencent/bkrepo/common/artifact/path/PathUtils.kt index 0e5b8096bc..ade82cd301 100644 --- a/src/backend/common/common-artifact/artifact-api/src/main/kotlin/com/tencent/bkrepo/common/artifact/path/PathUtils.kt +++ b/src/backend/common/common-artifact/artifact-api/src/main/kotlin/com/tencent/bkrepo/common/artifact/path/PathUtils.kt @@ -207,6 +207,19 @@ object PathUtils { return result } + /** + * 获取当前路径的所有上级目录列表 + */ + fun resolveAncestorFolder(fullPath: String): List { + return resolveAncestor(fullPath).map { + if (it != ROOT) { + it.removeSuffix(StringPool.SLASH) + } else { + it + } + } + } + /** * 根据fullPath解析文件名称,返回格式abc.txt * diff --git a/src/backend/job/biz-job/build.gradle.kts b/src/backend/job/biz-job/build.gradle.kts index 0829851b94..ac28ba2d20 100644 --- a/src/backend/job/biz-job/build.gradle.kts +++ b/src/backend/job/biz-job/build.gradle.kts @@ -31,6 +31,7 @@ dependencies { implementation(project(":common:common-security")) implementation(project(":common:common-storage:storage-service")) implementation(project(":common:common-stream")) + implementation(project(":common:common-redis")) implementation(project(":repository:api-repository")) implementation(project(":helm:api-helm")) implementation(project(":oci:api-oci")) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt index dba5b7dee7..ba9cc162c4 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/Constants.kt @@ -51,6 +51,14 @@ const val CATEGORY = "category" const val CREATED_DATE = "createdDate" const val LAST_MODIFIED_DATE = "lastModifiedDate" const val DELETED_DATE = "deleted" +const val FULLPATH = "fullPath" +const val PATH = "path" + +/** + * 缓存类型 + */ +const val REDIS_CACHE_TYPE = "redis" +const val MEMORY_CACHE_TYPE = "memory" /** * metrics diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/CompositeMongoDbBatchJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/CompositeMongoDbBatchJob.kt index 593c9ed1cd..4db1b45d6e 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/CompositeMongoDbBatchJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/CompositeMongoDbBatchJob.kt @@ -44,6 +44,12 @@ abstract class CompositeMongoDbBatchJob( return CompositeJobContext(enabledChildJobs) } + override fun onRunCollectionFinished(collectionName: String, context: CompositeJobContext) { + context.childJobs.forEach { + logException { it.onRunCollectionFinished(collectionName, context.childContext(it.getJobName())) } + } + } + protected abstract fun createChildJobs(): List> @Suppress("TooGenericExceptionCaught") diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt index 04a197b5da..6efb9fced6 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/base/MongoDbBatchJob.kt @@ -76,6 +76,12 @@ abstract class MongoDbBatchJob( abstract fun entityClass(): Class + + /** + * 表执行结束回调 + * */ + open fun onRunCollectionFinished(collectionName: String, context: Context) {} + private val batchSize: Int get() = properties.batchSize @@ -162,6 +168,7 @@ abstract class MongoDbBatchJob( } while (querySize == pageSize && isRunning()) }.apply { val elapsedTime = HumanReadable.time(this) + onRunCollectionFinished(collectionName, context) logger.info("Job[${getJobName()}]: collection $collectionName run completed,sum [$sum] elapse $elapsedTime") } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/FolderChildContext.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/FolderChildContext.kt new file mode 100644 index 0000000000..3ecbe6e762 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/FolderChildContext.kt @@ -0,0 +1,52 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.job.batch.context + +import com.tencent.bkrepo.job.MEMORY_CACHE_TYPE +import com.tencent.bkrepo.job.batch.base.ChildJobContext +import com.tencent.bkrepo.job.batch.base.JobContext +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.LongAdder + +class FolderChildContext( + parentContent: JobContext, + // 是否执行任务 + var runFlag: Boolean = false, + // 缓存类型redis和内存:数据量级大的建议使用redis + var cacheType: String = MEMORY_CACHE_TYPE, + // 表对应项目记录: 主要用于redis缓存生成key使用 + var projectMap: ConcurrentHashMap> = ConcurrentHashMap(), + // 用于内存缓存下存储目录统计信息 + var folderCache: ConcurrentHashMap = ConcurrentHashMap() +) : ChildJobContext(parentContent) { + + data class FolderMetrics( + var nodeNum: LongAdder = LongAdder(), + var capSize: LongAdder = LongAdder() + ) +} \ No newline at end of file diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/ProjectRepoChildContext.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/ProjectRepoChildContext.kt index 816fa7f51e..60954cb1ec 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/ProjectRepoChildContext.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/context/ProjectRepoChildContext.kt @@ -41,7 +41,9 @@ class ProjectRepoChildContext( .storageCredentials ?.key ?: "default" val repo = repoMetrics.getOrPut(row.repoName) { RepoMetrics(row.repoName, credentialsKey) } - repo.size.add(row.size) + if (!row.folder) { + repo.size.add(row.size) + } repo.num.increment() repo.addFolderMetrics(row) repo.addExtensionMetrics(row) @@ -94,7 +96,9 @@ class ProjectRepoChildContext( } val metric = folderMetrics.getOrPut(firstLevelPath) { FolderMetric(firstLevelPath) } metric.nodeNum.increment() - metric.capSize.add(row.size) + if (!row.folder) { + metric.capSize.add(row.size) + } } fun addExtensionMetrics(row: NodeStatCompositeMongoDbBatchJob.Node) { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/FolderStatChildJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/FolderStatChildJob.kt new file mode 100644 index 0000000000..f1653bdb49 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/FolderStatChildJob.kt @@ -0,0 +1,549 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.job.batch.node + +import com.tencent.bkrepo.common.api.constant.StringPool +import com.tencent.bkrepo.common.artifact.constant.LOG +import com.tencent.bkrepo.common.artifact.constant.REPORT +import com.tencent.bkrepo.common.artifact.path.PathUtils +import com.tencent.bkrepo.common.service.log.LoggerHolder +import com.tencent.bkrepo.job.DELETED_DATE +import com.tencent.bkrepo.job.FOLDER +import com.tencent.bkrepo.job.FULLPATH +import com.tencent.bkrepo.job.MEMORY_CACHE_TYPE +import com.tencent.bkrepo.job.PROJECT +import com.tencent.bkrepo.job.REDIS_CACHE_TYPE +import com.tencent.bkrepo.job.REPO +import com.tencent.bkrepo.job.batch.base.ChildJobContext +import com.tencent.bkrepo.job.batch.base.ChildMongoDbBatchJob +import com.tencent.bkrepo.job.batch.base.JobContext +import com.tencent.bkrepo.job.batch.context.FolderChildContext +import com.tencent.bkrepo.job.config.properties.CompositeJobProperties +import com.tencent.bkrepo.job.config.properties.NodeStatCompositeMongoDbBatchJobProperties +import org.springframework.data.mongodb.core.BulkOperations.BulkMode +import org.springframework.data.mongodb.core.MongoTemplate +import org.springframework.data.mongodb.core.query.Criteria +import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.Update +import org.springframework.data.mongodb.core.query.isEqualTo +import org.springframework.data.redis.core.HashOperations +import org.springframework.data.redis.core.RedisTemplate +import org.springframework.data.redis.core.ScanOptions +import java.time.DayOfWeek +import java.time.LocalDateTime +import kotlin.system.measureTimeMillis +import kotlin.text.toLongOrNull as toLongOrNull1 + + +/** + * 目录大小以及文件个数统计 + */ +class FolderStatChildJob( + val properties: CompositeJobProperties, + private val mongoTemplate: MongoTemplate, + private val redisTemplate: RedisTemplate +) : ChildMongoDbBatchJob(properties) { + + override fun onParentJobStart(context: ChildJobContext) { + require(context is FolderChildContext) + runTaskCheck(context) + logger.info("start to stat the size of folder, run flag is ${context.runFlag}") + } + + override fun run(row: NodeStatCompositeMongoDbBatchJob.Node, collectionName: String, context: JobContext) { + require(context is FolderChildContext) + if (!context.runFlag) return + if (!collectionRunCheck(collectionName)) return + if (row.deleted != null) return + // 判断是否在不统计项目或者仓库列表中 + if (ignoreProjectOrRepoCheck(row.projectId, row.repoName)) return + //只统计非目录类节点;没有根目录这个节点,不需要统计 + if (row.folder || row.path == PathUtils.ROOT) { + return + } + + // 更新当前节点所有上级目录(排除根目录)统计信息 + val folderFullPaths = PathUtils.resolveAncestorFolder(row.fullPath) + for (fullPath in folderFullPaths) { + if (fullPath == PathUtils.ROOT) continue + updateCache( + collectionName = collectionName, + projectId = row.projectId, + repoName = row.repoName, + fullPath = fullPath, + size = row.size, + context = context + ) + } + } + + override fun onParentJobFinished(context: ChildJobContext) { + require(context is FolderChildContext) + logger.info("stat size of folder done") + } + + + override fun createChildJobContext(parentJobContext: JobContext): ChildJobContext { + val cacheType = try { + redisTemplate.execute { null } + REDIS_CACHE_TYPE + } catch (e: Exception) { + MEMORY_CACHE_TYPE + } + return FolderChildContext(parentJobContext, cacheType = cacheType) + } + + override fun onRunCollectionFinished(collectionName: String, context: JobContext) { + super.onRunCollectionFinished(collectionName, context) + require(context is FolderChildContext) + // 如果使用的是redis作为缓存,将内存中的临时记录写入redis + updateRedisCache(context, force = true) + // 当表执行完成后,将属于该表的所有记录写入数据库 + storeCacheToDB(collectionName, context) + context.projectMap.remove(collectionName) + } + + /** + * 判断项目或者仓库是否不需要进行目录统计 + */ + private fun ignoreProjectOrRepoCheck(projectId: String, repoName: String): Boolean { + return IGNORE_PROJECT_PREFIX_LIST.firstOrNull { projectId.startsWith(it) } != null + || IGNORE_REPO_LIST.contains(repoName) + } + + /** + * 更新缓存中的size和nodeNum + */ + private fun updateCache( + collectionName: String, + projectId: String, + repoName: String, + fullPath: String, + size: Long, + context: FolderChildContext + ) { + val elapsedTime = measureTimeMillis { + if (context.cacheType == REDIS_CACHE_TYPE) { + updateRedisCache( + collectionName = collectionName, + projectId = projectId, + repoName = repoName, + fullPath = fullPath, + size = size, + context = context + ) + } else { + updateMemoryCache( + collectionName = collectionName, + projectId = projectId, + repoName = repoName, + fullPath = fullPath, + size = size, + context = context + ) + } + context.projectMap.putIfAbsent(collectionName, mutableSetOf()) + context.projectMap[collectionName]!!.add(projectId) + } + logger.debug("updateCache, elapse: $elapsedTime") + } + + /** + * 更新redis缓存中对应key下将新增的size和nodeNum + */ + private fun updateRedisCache( + collectionName: String, + projectId: String, + repoName: String, + fullPath: String, + size: Long, + context: FolderChildContext + ) { + // 避免每次请求都去请求redis, 先将数据缓存在本地cache中,到达上限后更新到redis + updateMemoryCache( + collectionName = collectionName, + projectId = projectId, + repoName = repoName, + fullPath = fullPath, + size = size, + context = context + ) + updateRedisCache(context) + } + + /** + * 将存储在内存中的临时记录更新到redis + */ + private fun updateRedisCache( + context: FolderChildContext, + force: Boolean = false + ) { + if (context.cacheType != REDIS_CACHE_TYPE) return + if (!force && context.folderCache.size < 10000) return + // 避免每次设置值都创建一个 Redis 连接 + redisTemplate.execute { connection -> + val hashCommands = connection.hashCommands() + for (entry in context.folderCache) { + val folderInfo = extractFolderInfoFromCacheKey(entry.key) ?: continue + val cName = extractCollectionNameFromCacheKey(entry.key) + if (!cName.isNullOrEmpty()) { + val sizeHKey = buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = SIZE + ) + val nodeNumHKey = buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = NODE_NUM + ) + val key = buildCacheKey(collectionName = cName, projectId = folderInfo.projectId) + hashCommands.hIncrBy(key.toByteArray(), sizeHKey.toByteArray(), entry.value.capSize.toLong()) + hashCommands.hIncrBy(key.toByteArray(), nodeNumHKey.toByteArray(), entry.value.nodeNum.toLong()) + } + } + null + } + context.folderCache.clear() + } + + /** + * 更新内存缓存中对应key下将新增的size和nodeNum + */ + private fun updateMemoryCache( + collectionName: String, + projectId: String, + repoName: String, + fullPath: String, + size: Long, + context: FolderChildContext + ) { + val key = buildCacheKey( + collectionName = collectionName, projectId = projectId, repoName = repoName, fullPath = fullPath + ) + val folderMetrics = context.folderCache.getOrPut(key) { FolderChildContext.FolderMetrics() } + folderMetrics.capSize.add(size) + folderMetrics.nodeNum.increment() + } + + private fun runTaskCheck(context: FolderChildContext) { + require(properties is NodeStatCompositeMongoDbBatchJobProperties) + // 当值小于 1 时,任务不执行 + if (properties.runPolicy <= 0) return + context.runFlag = true + } + + /** + * 判断该collectionName是否允许执行 + */ + private fun collectionRunCheck(collectionName: String): Boolean { + require(properties is NodeStatCompositeMongoDbBatchJobProperties) + if (!properties.multipleExecutions) { + return true + } + val collectionNum = collectionName.removePrefix(COLLECTION_NAME_PREFIX).toIntOrNull() ?: return false + val remainder = collectionNum % 7 + 1 + + // 当值为1 - 7时,优先执行node_num%7 +1 == runPolicy对应的node表 + if (properties.runPolicy in 1..7) { + return properties.runPolicy == remainder + } + return DayOfWeek.of(remainder) == LocalDateTime.now().dayOfWeek + } + + /** + * 将缓存中的数据更新到DB中 + */ + private fun storeCacheToDB(collectionName: String, context: FolderChildContext) { + if (context.cacheType == REDIS_CACHE_TYPE) { + storeRedisCacheToDB(collectionName, context) + } else { + storeMemoryCacheToDB(collectionName, context) + } + } + + + /** + * 将redis缓存中属于collectionName下的记录写入DB中 + */ + private fun storeRedisCacheToDB(collectionName: String, context: FolderChildContext) { + logger.info("store redis cache to db withe table $collectionName") + val hashOps = redisTemplate.opsForHash() + context.projectMap[collectionName]?.forEach { + storeFolderOfProject(collectionName, it, hashOps) + } + } + + + /** + * 存储对应项目下缓存在redis下的folder记录 + */ + private fun storeFolderOfProject( + collectionName: String, + projectId: String, + hashOps: HashOperations + ) { + val projectKey = buildCacheKey(collectionName = collectionName, projectId = projectId) + val storedProjectIdKey = buildCacheKey(collectionName = collectionName, projectId = projectId, tag = STORED) + val updateList = ArrayList>() + + val options = ScanOptions.scanOptions().build() + redisTemplate.execute { connection -> + val hashCommands = connection.hashCommands() + val cursor = hashCommands.hScan(projectKey.toByteArray(), options) + while (cursor.hasNext()) { + val entry: Map.Entry = cursor.next() + val folderInfo = extractFolderInfoFromRedisKey(String(entry.key)) ?: continue + // 由于可能KEYS或者SCAN命令会被禁用,调整redis存储格式,key为collectionName, + // hkey为projectId:repoName:fullPath:size或者nodenum, hvalue为对应值, + // 为了避免遍历时删除,用一个额外的key去记录当前collectionName+project下已经存储到db的目录记录 + val storedFolderHkey = buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, fullPath = folderInfo.fullPath + ) + val storedHkeyExist = hashCommands.hExists( + storedProjectIdKey.toByteArray(), storedFolderHkey.toByteArray() + ) + if (storedHkeyExist == null || !storedHkeyExist) { + val statInfo = getFolderStatInfo( + collectionName, entry, folderInfo, hashOps + ) + updateList.add(buildUpdateClausesForFolder( + projectId = folderInfo.projectId, + repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, + size = statInfo.size, + nodeNum = statInfo.nodeNum + )) + if (updateList.size >= BATCH_LIMIT) { + mongoTemplate.bulkOps(BulkMode.UNORDERED,collectionName) + .updateOne(updateList) + .execute() + updateList.clear() + Thread.sleep(200) + } + hashCommands.hSet( + storedProjectIdKey.toByteArray(), storedFolderHkey.toByteArray(), STORED.toByteArray() + ) + } + } + } + if (updateList.isNotEmpty()) { + mongoTemplate.bulkOps(BulkMode.UNORDERED,collectionName) + .updateOne(updateList) + .execute() + updateList.clear() + } + redisTemplate.delete(projectKey) + redisTemplate.delete(storedProjectIdKey) + } + + + /** + * 从redis中获取对应目录的统计信息 + */ + private fun getFolderStatInfo( + collectionName: String, + entry: Map.Entry, + folderInfo: FolderInfo, + hashOps: HashOperations + ): StatInfo { + val size: Long + val nodeNum: Long + val key = buildCacheKey(collectionName = collectionName, projectId = folderInfo.projectId) + if (String(entry.key).endsWith(SIZE)) { + val nodeNumKey = buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = NODE_NUM + ) + size = String(entry.value).toLongOrNull1() ?: 0 + nodeNum = hashOps.get(key, nodeNumKey)?.toLongOrNull1() ?: 0 + } else { + val sizeKey = buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = SIZE + ) + nodeNum = String(entry.value).toLongOrNull1() ?: 0 + size = hashOps.get(key, sizeKey)?.toLongOrNull1() ?: 0 + } + return StatInfo(size, nodeNum) + } + + /** + * 将memory缓存中属于collectionName下的记录写入DB中 + */ + private fun storeMemoryCacheToDB(collectionName: String, context: FolderChildContext) { + logger.info("store memory cache to db withe table $collectionName") + if (context.folderCache.isEmpty()) { + return + } + val updateList = ArrayList>() + val prefix = buildCacheKey(collectionName = collectionName, projectId = StringPool.EMPTY) + for(entry in context.folderCache) { + if (!entry.key.startsWith(prefix)) continue + extractFolderInfoFromCacheKey(entry.key)?.let { + updateList.add(buildUpdateClausesForFolder( + projectId = it.projectId, + repoName = it.repoName, + fullPath = it.fullPath, + size = entry.value.capSize.toLong(), + nodeNum = entry.value.nodeNum.toLong() + )) + } + if (updateList.size >= BATCH_LIMIT) { + mongoTemplate.bulkOps(BulkMode.UNORDERED,collectionName) + .updateOne(updateList) + .execute() + updateList.clear() + try { + Thread.sleep(500) + } catch (_: Exception) { + } + } + } + if (updateList.isEmpty()) return + mongoTemplate.bulkOps(BulkMode.UNORDERED,collectionName) + .updateOne(updateList) + .execute() + updateList.clear() + } + + /** + * 生成db更新语句 + */ + private fun buildUpdateClausesForFolder( + projectId: String, + repoName: String, + fullPath: String, + size: Long, + nodeNum: Long + ): org.springframework.data.util.Pair { + val query = Query( + Criteria.where(PROJECT).isEqualTo(projectId) + .and(REPO).isEqualTo(repoName) + .and(FULLPATH).isEqualTo(fullPath) + .and(DELETED_DATE).isEqualTo(null) + .and(FOLDER).isEqualTo(true) + ) + val update = Update().set(SIZE, size) + .set(NODE_NUM, nodeNum) + return org.springframework.data.util.Pair.of(query, update) + } + + + /** + * 生成缓存key + */ + private fun buildCacheKey( + projectId: String, + repoName: String? = null, + fullPath: String? = null, + collectionName: String? = null, + tag: String? = null, + ): String { + return StringBuilder().apply { + collectionName?.let { + this.append(it).append(StringPool.COLON) + } + this.append(projectId) + repoName?.let { + this.append(StringPool.COLON).append(repoName) + } + fullPath?.let { + this.append(StringPool.COLON).append(fullPath) + } + tag?.let { + this.append(StringPool.COLON).append(tag) + } + }.toString() + } + + private fun extractFolderInfoFromRedisKey(key: String): FolderInfo? { + val values = key.split(StringPool.COLON) + return try { + FolderInfo( + projectId = values[0], + repoName = values[1], + fullPath = values[2] + ) + } catch (e: Exception) { + null + } + } + + + + /** + * 从缓存key中解析出目录信息 + */ + private fun extractFolderInfoFromCacheKey(key: String): FolderInfo? { + val values = key.split(StringPool.COLON) + return try { + FolderInfo( + projectId = values[1], + repoName = values[2], + fullPath = values[3] + ) + } catch (e: Exception) { + null + } + } + + /** + * 从缓存key中解析出collectionName + */ + private fun extractCollectionNameFromCacheKey(key: String): String? { + val values = key.split(StringPool.COLON) + return try { + values.firstOrNull() + } catch (e: Exception) { + null + } + } + + + data class FolderInfo( + var projectId: String, + var repoName: String, + var fullPath: String + ) + + data class StatInfo( + var size: Long, + var nodeNum: Long + ) + + + companion object { + private val logger = LoggerHolder.jobLogger + private const val SIZE = "size" + private const val NODE_NUM = "nodeNum" + private val IGNORE_PROJECT_PREFIX_LIST = listOf("CODE_", "CLOSED_SOURCE_", "git_") + private val IGNORE_REPO_LIST = listOf(REPORT, LOG) + private const val STORED = "stored" + private const val BATCH_LIMIT = 250 + private const val COLLECTION_NAME_PREFIX = "node_" + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/NodeStatCompositeMongoDbBatchJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/NodeStatCompositeMongoDbBatchJob.kt index aaec1a1b82..78b69e3eac 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/NodeStatCompositeMongoDbBatchJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/NodeStatCompositeMongoDbBatchJob.kt @@ -7,14 +7,16 @@ import com.tencent.bkrepo.job.config.properties.NodeStatCompositeMongoDbBatchJob import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.data.mongodb.core.MongoTemplate import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.redis.core.RedisTemplate import org.springframework.stereotype.Component import java.util.Date @Component @EnableConfigurationProperties(NodeStatCompositeMongoDbBatchJobProperties::class) class NodeStatCompositeMongoDbBatchJob( - private val properties: NodeStatCompositeMongoDbBatchJobProperties, + val properties: NodeStatCompositeMongoDbBatchJobProperties, private val mongoTemplate: MongoTemplate, + private val redisTemplate: RedisTemplate ) : CompositeMongoDbBatchJob(properties) { override fun collectionNames(): List { @@ -29,7 +31,8 @@ class NodeStatCompositeMongoDbBatchJob( override fun createChildJobs(): List> { return listOf( - ProjectRepoStatChildJob(properties, mongoTemplate) + ProjectRepoStatChildJob(properties, mongoTemplate), + FolderStatChildJob(properties, mongoTemplate, redisTemplate) ) } @@ -68,7 +71,7 @@ class NodeStatCompositeMongoDbBatchJob( path = map[Node::path.name] as String fullPath = map[Node::fullPath.name] as String name = map[Node::name.name] as String - size = map[Node::size.name] as Long + size = map[Node::size.name].toString().toLong() // 查询出的deleted默认为Date类型 deleted = map[Node::deleted.name] as Date? projectId = map[Node::projectId.name] as String diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/ProjectRepoStatChildJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/ProjectRepoStatChildJob.kt index ee8d8b647f..5634e5f990 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/ProjectRepoStatChildJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/node/ProjectRepoStatChildJob.kt @@ -25,7 +25,9 @@ class ProjectRepoStatChildJob( return } val metric = context.metrics.getOrPut(row.projectId) { ProjectRepoChildContext.ProjectMetrics(row.projectId) } - metric.capSize.add(row.size) + if (!row.folder) { + metric.capSize.add(row.size) + } metric.nodeNum.increment() metric.addRepoMetrics(row) } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/NodeStatCompositeMongoDbBatchJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/NodeStatCompositeMongoDbBatchJobProperties.kt index d7960d0769..8604e07e68 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/NodeStatCompositeMongoDbBatchJobProperties.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/NodeStatCompositeMongoDbBatchJobProperties.kt @@ -4,5 +4,18 @@ import org.springframework.boot.context.properties.ConfigurationProperties @ConfigurationProperties("job.node-stat-composite-mongodb-batch") class NodeStatCompositeMongoDbBatchJobProperties ( - override var cron: String = "0 0 15 * * ?" + override var cron: String = "0 0 15 * * ?", + /** + * 可用用于控制任务不执行或者执行执行哪些表数据 + * 当值小于 1 时,任务不执行 + * 当值大于 7 时,不特定指定执行哪些表 + * 当值为1 - 7时,优先执行node_num%7 +1 == runPolicy对应的node表 + */ + var runPolicy: Int = 8, + /** + * 是否分多次执行 + * false: 一次性将所有表执行完 + * true: 分7天去执行所有表,每天执行 node_num%7 +1 == 当天的node表 + */ + var multipleExecutions: Boolean = false ): CompositeJobProperties() diff --git a/src/backend/opdata/api-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/constant/Constants.kt b/src/backend/opdata/api-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/constant/Constants.kt index 024dfbf245..6a8285940d 100644 --- a/src/backend/opdata/api-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/constant/Constants.kt +++ b/src/backend/opdata/api-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/constant/Constants.kt @@ -60,6 +60,8 @@ const val OPDATA_REPOSITORY = "repository" const val OPDATA_REPO_NAME = "repoName" const val OPDATA_PATH = "path" + + const val OPDATA_FILE_EXTENSION_METRICS = "file_extension_metrics" const val B_0 = "0" diff --git a/src/backend/opdata/biz-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/controller/NodeController.kt b/src/backend/opdata/biz-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/controller/NodeController.kt index b6ec7c6b97..08790fdfad 100644 --- a/src/backend/opdata/biz-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/controller/NodeController.kt +++ b/src/backend/opdata/biz-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/controller/NodeController.kt @@ -65,7 +65,7 @@ class NodeController( } /** - * 获取当前仓库目录下的空目录列表 + * 删除当前仓库目录下的空目录列表 */ @DeleteMapping("/emptyFolders/{projectId}/{repoName}") @LogOperate(type = "EMPTY_FOLDER_DELETE") diff --git a/src/backend/opdata/biz-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/service/NodeService.kt b/src/backend/opdata/biz-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/service/NodeService.kt index 6e48fa0109..a5dca54863 100644 --- a/src/backend/opdata/biz-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/service/NodeService.kt +++ b/src/backend/opdata/biz-opdata/src/main/kotlin/com/tencent/bkrepo/opdata/service/NodeService.kt @@ -41,7 +41,7 @@ import org.springframework.stereotype.Service class NodeService( private val emptyFolderService: EmptyFolderStatJob, private val folderMetricsRepository: FolderMetricsRepository -) { + ) { fun getEmptyFolder( projectId: String, diff --git a/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeDetail.kt b/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeDetail.kt index 5093af688c..f8702c171d 100644 --- a/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeDetail.kt +++ b/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeDetail.kt @@ -67,6 +67,8 @@ data class NodeDetail( val fullPath: String = nodeInfo.fullPath, @ApiModelProperty("文件大小,单位byte") val size: Long = nodeInfo.size, + @ApiModelProperty("文件节点个数") + val nodeNum: Long? = nodeInfo.nodeNum, @ApiModelProperty("文件sha256") val sha256: String? = nodeInfo.sha256, @ApiModelProperty("文件md5") diff --git a/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeInfo.kt b/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeInfo.kt index c21e071906..640d6e716d 100644 --- a/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeInfo.kt +++ b/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeInfo.kt @@ -62,6 +62,8 @@ data class NodeInfo( val fullPath: String, @ApiModelProperty("文件大小,单位byte") val size: Long, + @ApiModelProperty("文件节点个数") + val nodeNum: Long? = null, @ApiModelProperty("文件sha256") val sha256: String? = null, @ApiModelProperty("文件md5") diff --git a/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeSizeInfo.kt b/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeSizeInfo.kt index a8f8acc610..400ad08e06 100644 --- a/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeSizeInfo.kt +++ b/src/backend/repository/api-repository/src/main/kotlin/com/tencent/bkrepo/repository/pojo/node/NodeSizeInfo.kt @@ -41,6 +41,8 @@ import io.swagger.annotations.ApiModelProperty data class NodeSizeInfo( @ApiModelProperty("子节点数量, 包含文件夹") val subNodeCount: Long = 0, + @ApiModelProperty("子节点数量, 不包含文件夹") + val subNodeWithoutFolderCount: Long = 0, @ApiModelProperty("文件大小总和") val size: Long ) diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/dao/NodeDao.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/dao/NodeDao.kt index 48af2ad1da..ba2685f84f 100644 --- a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/dao/NodeDao.kt +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/dao/NodeDao.kt @@ -40,6 +40,7 @@ import com.tencent.bkrepo.repository.pojo.node.NodeListOption import com.tencent.bkrepo.repository.util.NodeQueryHelper import org.springframework.data.domain.Page import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.Update import org.springframework.data.mongodb.core.query.and import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.data.mongodb.core.query.where @@ -70,6 +71,40 @@ class NodeDao : HashShardingMongoDao() { return this.exists(NodeQueryHelper.nodeQuery(projectId, repoName, fullPath)) } + /** + * 更新目录下变更的文件数量以及涉及的文件大小 + */ + fun incSizeAndNodeNumOfFolder( + projectId: String, + repoName: String, + fullPath: String, + size: Long, + nodeNum: Long + ) { + val query = NodeQueryHelper.nodeFolderQuery(projectId, repoName, fullPath) + val update = Update().inc(TNode::size.name, size) + .inc(TNode::nodeNum.name, nodeNum) + .set(TNode::lastModifiedDate.name, LocalDateTime.now()) + this.updateFirst(query, update) + } + + /** + * 设置目录下的文件数量以及文件大小 + */ + fun setSizeAndNodeNumOfFolder( + projectId: String, + repoName: String, + fullPath: String, + size: Long, + nodeNum: Long + ) { + val query = NodeQueryHelper.nodeFolderQuery(projectId, repoName, fullPath) + val update = Update().set(TNode::size.name, size) + .set(TNode::nodeNum.name, nodeNum) + this.updateFirst(query, update) + } + + /** * 根据[sha256]分页查询节点,需要遍历所有分表 * diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeModifyEventListener.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeModifyEventListener.kt new file mode 100644 index 0000000000..05e9f26645 --- /dev/null +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeModifyEventListener.kt @@ -0,0 +1,331 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT + * LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package com.tencent.bkrepo.repository.listener + +import com.google.common.cache.CacheBuilder +import com.google.common.cache.RemovalCause +import com.tencent.bkrepo.common.artifact.api.ArtifactInfo +import com.tencent.bkrepo.common.artifact.constant.LOG +import com.tencent.bkrepo.common.artifact.constant.REPORT +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.common.artifact.event.base.EventType +import com.tencent.bkrepo.common.artifact.event.node.NodeCopiedEvent +import com.tencent.bkrepo.common.artifact.event.node.NodeCreatedEvent +import com.tencent.bkrepo.common.artifact.event.node.NodeDeletedEvent +import com.tencent.bkrepo.common.artifact.event.node.NodeMovedEvent +import com.tencent.bkrepo.common.artifact.path.PathUtils +import com.tencent.bkrepo.repository.dao.NodeDao +import com.tencent.bkrepo.repository.model.TNode +import com.tencent.bkrepo.repository.service.node.NodeService +import org.slf4j.LoggerFactory +import org.springframework.context.event.EventListener +import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.and +import org.springframework.data.mongodb.core.query.isEqualTo +import org.springframework.data.mongodb.core.query.where +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Component +import java.time.LocalDateTime +import java.util.concurrent.TimeUnit + + +/** + * 节点事件监听,用户统计目录size以及目录下文件个数 + */ +@Component +class NodeModifyEventListener( + private val nodeService: NodeService, + private val nodeDao: NodeDao + ) { + + private val cache = CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterWrite(1, TimeUnit.MINUTES) + .removalListener, Pair> { + if (it.cause == RemovalCause.REPLACED) return@removalListener + logger.info("remove ${it.key}, ${it.value}, cause ${it.cause}, Thread ${Thread.currentThread().name}") + nodeDao.incSizeAndNodeNumOfFolder( + projectId = it.key!!.first, + repoName = it.key!!.second, + fullPath = it.key!!.third, + size = it.value.first, + nodeNum = it.value.second + ) + } + .build, Pair>() + + /** + * 允许接收的事件类型 + */ + private val acceptTypes = setOf( + EventType.NODE_COPIED, + EventType.NODE_CREATED, + EventType.NODE_DELETED, + EventType.NODE_MOVED + ) + + + @EventListener(ArtifactEvent::class) + fun handle(event: ArtifactEvent) { + if (!acceptTypes.contains(event.type)) { + return + } + if (ignoreProjectOrRepoCheck(event.projectId, event.repoName)) return + try { + updateCacheOfModifiedFolder(event) + } catch (ignore: Exception) { + logger.warn("update folder cache error: ${ignore.message}") + } + } + + + /** + * 定时将缓存中的数据更新到db中 + */ + @Scheduled(fixedDelay = FIXED_DELAY, initialDelay = FIXED_DELAY) + fun storeFolderData() { + cache.invalidateAll() + } + + /** + * 判断项目或者仓库是否不需要进行目录统计 + */ + private fun ignoreProjectOrRepoCheck(projectId: String, repoName: String): Boolean { + IGNORE_PROJECT_PREFIX_LIST.forEach { + if (projectId.startsWith(it)){ + return true + } + } + return IGNORE_REPO_LIST.contains(repoName) + } + + + + /** + * 将变更的目录节点数据存放在缓存中 + */ + private fun updateCacheOfModifiedFolder(event: ArtifactEvent) { + logger.info("event type ${event.type}") + val modifiedNodeList = mutableListOf() + when (event.type) { + EventType.NODE_MOVED -> { + require(event is NodeMovedEvent) + val dstFullPath = buildDstFullPath(event.dstFullPath, event.resourceKey) + val createdNode = ModifiedNodeInfo( + projectId = event.dstProjectId, + repoName = event.dstRepoName, + fullPath = dstFullPath + ) + val deletedNode = ModifiedNodeInfo( + projectId = event.projectId, + repoName = event.repoName, + fullPath = event.resourceKey, + deleted = true + ) + modifiedNodeList.add(createdNode) + modifiedNodeList.add(deletedNode) + } + EventType.NODE_DELETED -> { + require(event is NodeDeletedEvent) + val deletedNode = ModifiedNodeInfo( + projectId = event.projectId, + repoName = event.repoName, + fullPath = event.resourceKey, + deleted = true + ) + modifiedNodeList.add(deletedNode) + } + EventType.NODE_COPIED -> { + require(event is NodeCopiedEvent) + val dstFullPath = buildDstFullPath(event.dstFullPath, event.resourceKey) + val createdNode = ModifiedNodeInfo( + projectId = event.dstProjectId, + repoName = event.dstRepoName, + fullPath = dstFullPath + ) + modifiedNodeList.add(createdNode) + } + EventType.NODE_CREATED -> { + require(event is NodeCreatedEvent) + val createdNode = ModifiedNodeInfo( + projectId = event.projectId, + repoName = event.repoName, + fullPath = event.resourceKey + ) + modifiedNodeList.add(createdNode) + } + else -> throw UnsupportedOperationException() + } + modifiedNodeList.forEach { + findFoldersAndUpdateCache(it) + } + } + + private fun findFoldersAndUpdateCache(modifiedNode: ModifiedNodeInfo) { + val artifactInfo = ArtifactInfo( + projectId = modifiedNode.projectId, + repoName = modifiedNode.repoName, + artifactUri = modifiedNode.fullPath + ) + val node = if (modifiedNode.deleted) { + nodeService.getDeletedNodeDetail(artifactInfo).firstOrNull() ?: return + } else { + // 查询节点信息,当节点新增,然后删除后可能会找不到节点 + nodeService.getNodeDetail(artifactInfo) + ?: nodeService.getDeletedNodeDetail(artifactInfo).firstOrNull() ?: return + } + + logger.info("start to stat modified node size with fullPath ${node.fullPath}" + + " in repo ${node.projectId}|${node.repoName}") + if (node.folder) { + findAndCacheSubFolders( + artifactInfo = artifactInfo, + deleted = node.nodeInfo.deleted, + deletedFlag = modifiedNode.deleted + ) + } else { + updateCache( + projectId = artifactInfo.projectId, + repoName = artifactInfo.repoName, + fullPath = artifactInfo.getArtifactFullPath(), + size = node.size, + deleted = modifiedNode.deleted + ) + } + } + + private fun updateCache( + projectId: String, + repoName: String, + fullPath: String, + size: Long, + deleted: Boolean = false + ) { + + // 更新当前节点所有上级目录统计信息 + PathUtils.resolveAncestorFolder(fullPath).forEach{ + if (it != PathUtils.ROOT) { + val key = Triple(projectId, repoName, it) + var (cachedSize, nodeNum) = cache.getIfPresent(key) ?: Pair(0L, 0L) + if (deleted) { + cachedSize -= size + nodeNum -= 1 + } else { + cachedSize += size + nodeNum += 1 + } + cache.put(key, Pair(cachedSize, nodeNum)) + } + } + } + + private fun findAndCacheSubFolders( + artifactInfo: ArtifactInfo, + deleted: String? = null, + deletedFlag: Boolean = false + ) { + findAllNodesUnderFolder( + artifactInfo.projectId, + artifactInfo.repoName, + artifactInfo.getArtifactFullPath(), + deleted = deleted + ).forEach { + updateCache( + projectId = artifactInfo.projectId, + repoName = artifactInfo.repoName, + fullPath = it.fullPath.getFolderPath(), + size = it.size, + deleted = deletedFlag + ) + } + } + + + private fun findAllNodesUnderFolder( + projectId: String, + repoName: String, + fullPath: String, + deleted: String? = null + ): List { + val srcRootNodePath = PathUtils.toPath(fullPath) + val query = buildNodeQuery(projectId, repoName, srcRootNodePath, deleted) + return nodeDao.find(query) + } + + + /** + * 查询目录下的节点,排除path为"/"的节点 + */ + private fun buildNodeQuery( + projectId: String, + repoName: String, + srcRootNodePath: String, + deleted: String? = null + ): Query { + val criteria = where(TNode::projectId).isEqualTo(projectId) + .and(TNode::repoName).isEqualTo(repoName) + .apply { + if (deleted.isNullOrEmpty()) { + this.and(TNode::deleted).isEqualTo(null) + } else { + // 节点删除时其下所有节点的deleted值是一致的,但是节点move时其下所有节点的deleted是不一致的 + this.and(TNode::deleted).gte(LocalDateTime.parse(deleted)) + } + } + .and(TNode::fullPath).regex("^${PathUtils.escapeRegex(srcRootNodePath)}") + .and(TNode::folder).isEqualTo(false) + .and(TNode::path).ne(PathUtils.ROOT) + return Query(criteria).withHint(TNode.FULL_PATH_IDX) + } + + + private fun buildDstFullPath(dstFullPath: String, srcFullPath: String): String { + val path = PathUtils.toPath(dstFullPath) + val name = PathUtils.resolveName(srcFullPath) + return PathUtils.combineFullPath(path, name) + } + + private fun String.getFolderPath(): String { + val path = PathUtils.resolveParent(this) + return PathUtils.normalizeFullPath(path) + } + + private data class ModifiedNodeInfo( + var projectId: String, + var repoName: String, + var fullPath: String, + var deleted: Boolean = false + ) + + companion object { + private val logger = LoggerFactory.getLogger(NodeModifyEventListener::class.java) + private const val FIXED_DELAY = 30000L + private val IGNORE_PROJECT_PREFIX_LIST = listOf("CODE_", "CLOSED_SOURCE_", "git_") + private val IGNORE_REPO_LIST = listOf(REPORT, LOG) + } +} \ No newline at end of file diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/model/TNode.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/model/TNode.kt index 191b6263d4..d772bd72bb 100644 --- a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/model/TNode.kt +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/model/TNode.kt @@ -86,6 +86,7 @@ data class TNode( var copyIntoCredentialsKey: String? = null, var metadata: MutableList? = null, var clusterNames: Set? = null, + var nodeNum: Long? = null, @ShardingKey(count = SHARDING_COUNT) var projectId: String, diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/node/impl/NodeBaseService.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/node/impl/NodeBaseService.kt index d97cf63ff0..0cf8f15b9f 100644 --- a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/node/impl/NodeBaseService.kt +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/node/impl/NodeBaseService.kt @@ -176,6 +176,7 @@ abstract class NodeBaseService( size = if (folder) 0 else size ?: 0, sha256 = if (folder) null else sha256, md5 = if (folder) null else md5, + nodeNum = null, metadata = MetadataUtils.compatibleConvertAndCheck(metadata, nodeMetadata), createdBy = createdBy ?: operator, createdDate = createdDate ?: LocalDateTime.now(), @@ -422,6 +423,7 @@ abstract class NodeBaseService( name = it.name, fullPath = it.fullPath, size = it.size, + nodeNum = it.nodeNum, sha256 = it.sha256, md5 = it.md5, metadata = metadata, diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/node/impl/NodeStatsSupport.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/node/impl/NodeStatsSupport.kt index 38821c257e..ebf637a5b4 100644 --- a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/node/impl/NodeStatsSupport.kt +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/service/node/impl/NodeStatsSupport.kt @@ -63,13 +63,25 @@ open class NodeStatsSupport( ?: throw ErrorCodeException(ArtifactMessageCode.NODE_NOT_FOUND, fullPath) // 节点为文件直接返回 if (!node.folder) { - return NodeSizeInfo(subNodeCount = 0, size = node.size) + return NodeSizeInfo(subNodeCount = 0, subNodeWithoutFolderCount = 0, size = node.size) } val listOption = NodeListOption(includeFolder = true, deep = true) val criteria = NodeQueryHelper.nodeListCriteria(projectId, repoName, node.fullPath, listOption) val count = nodeDao.count(Query(criteria)) - val size = aggregateComputeSize(criteria) - return NodeSizeInfo(subNodeCount = count, size = size) + val listOptionWithOutFolder = NodeListOption(includeFolder = false, deep = true) + val criteriaWithOutFolder = NodeQueryHelper.nodeListCriteria( + projectId, repoName, node.fullPath, listOptionWithOutFolder + ) + val countWithOutFolder = nodeDao.count(Query(criteriaWithOutFolder)) + val size = aggregateComputeSize(criteriaWithOutFolder) + nodeDao.setSizeAndNodeNumOfFolder( + projectId = projectId, + repoName = repoName, + fullPath = fullPath, + size = size, + nodeNum = countWithOutFolder + ) + return NodeSizeInfo(subNodeCount = count, subNodeWithoutFolderCount = countWithOutFolder, size = size) } override fun countFileNode(artifact: ArtifactInfo): Long { diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/util/NodeQueryHelper.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/util/NodeQueryHelper.kt index f370ad00b9..a489df1bb6 100644 --- a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/util/NodeQueryHelper.kt +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/util/NodeQueryHelper.kt @@ -63,6 +63,15 @@ object NodeQueryHelper { return Query(criteria) } + fun nodeFolderQuery(projectId: String, repoName: String, fullPath: String? = null): Query { + val criteria = where(TNode::projectId).isEqualTo(projectId) + .and(TNode::repoName).isEqualTo(repoName) + .and(TNode::deleted).isEqualTo(null) + .apply { fullPath?.run { and(TNode::fullPath).isEqualTo(fullPath) } } + .and(TNode::folder).isEqualTo(true) + return Query(criteria) + } + fun nodeListCriteria(projectId: String, repoName: String, path: String, option: NodeListOption): Criteria { val nodePath = toPath(path) val criteria = where(TNode::projectId).isEqualTo(projectId) diff --git a/src/backend/repository/biz-repository/src/test/kotlin/com/tencent/bkrepo/repository/service/NodeServiceTest.kt b/src/backend/repository/biz-repository/src/test/kotlin/com/tencent/bkrepo/repository/service/NodeServiceTest.kt index 43d5c0cf3b..98f818d2f0 100644 --- a/src/backend/repository/biz-repository/src/test/kotlin/com/tencent/bkrepo/repository/service/NodeServiceTest.kt +++ b/src/backend/repository/biz-repository/src/test/kotlin/com/tencent/bkrepo/repository/service/NodeServiceTest.kt @@ -325,6 +325,7 @@ class NodeServiceTest @Autowired constructor( val pathSizeInfo = nodeService.computeSize(node("/a/b")) assertEquals(42, pathSizeInfo.subNodeCount) + assertEquals(40, pathSizeInfo.subNodeWithoutFolderCount) assertEquals(40, pathSizeInfo.size) } @@ -348,6 +349,7 @@ class NodeServiceTest @Autowired constructor( val pathSizeInfo = nodeService.computeSize(node("/")) assertEquals(42, pathSizeInfo.subNodeCount) + assertEquals(40, pathSizeInfo.subNodeWithoutFolderCount) assertEquals(40, pathSizeInfo.size) }